Skip to main content

ntex_rt/
builder.rs

1use std::{fmt, future::Future, io, marker::PhantomData, rc::Rc, sync::Arc, time};
2
3use crate::driver::Runner;
4use crate::system::{System, SystemConfig};
5
6#[derive(Debug, Clone)]
7/// Builder struct for a ntex runtime.
8///
9/// Either use `Builder::build` to create a system and start actors.
10/// Alternatively, use `Builder::run` to start the runtime and
11/// run a function in its context.
12pub struct Builder {
13    /// Name of the System. Defaults to "ntex" if unset.
14    name: String,
15    /// Whether the Arbiter will stop the whole System on uncaught panic. Defaults to false.
16    stop_on_panic: bool,
17    /// New thread stack size
18    stack_size: usize,
19    /// Arbiters ping interval
20    ping_interval: usize,
21    /// Thread pool config
22    pool_limit: usize,
23    pool_recv_timeout: time::Duration,
24    /// testing flag
25    testing: bool,
26}
27
28impl Builder {
29    pub(super) fn new() -> Self {
30        Builder {
31            name: "ntex".into(),
32            stop_on_panic: false,
33            stack_size: 0,
34            ping_interval: 1000,
35            testing: false,
36            pool_limit: 256,
37            pool_recv_timeout: time::Duration::from_secs(60),
38        }
39    }
40
41    #[must_use]
42    /// Sets the name of the System.
43    pub fn name<N: AsRef<str>>(mut self, name: N) -> Self {
44        self.name = name.as_ref().into();
45        self
46    }
47
48    #[must_use]
49    /// Sets the option `stop_on_panic`
50    ///
51    /// It controls whether the System is stopped when an
52    /// uncaught panic is thrown from a worker thread.
53    ///
54    /// Defaults is set to false.
55    pub fn stop_on_panic(mut self, stop_on_panic: bool) -> Self {
56        self.stop_on_panic = stop_on_panic;
57        self
58    }
59
60    #[must_use]
61    /// Sets the size of the stack (in bytes) for the new thread.
62    pub fn stack_size(mut self, size: usize) -> Self {
63        self.stack_size = size;
64        self
65    }
66
67    #[must_use]
68    /// Sets ping interval for spawned arbiters.
69    ///
70    /// Interval is in milliseconds. By default 5000 milliseconds is set.
71    /// To disable pings set value to zero.
72    pub fn ping_interval(mut self, interval: usize) -> Self {
73        self.ping_interval = interval;
74        self
75    }
76
77    #[must_use]
78    /// Set the thread number limit of the inner thread pool, if exists. The
79    /// default value is 256.
80    pub fn thread_pool_limit(mut self, value: usize) -> Self {
81        self.pool_limit = value;
82        self
83    }
84
85    #[must_use]
86    /// Mark system as testing
87    pub fn testing(mut self) -> Self {
88        self.testing = true;
89        self
90    }
91
92    #[must_use]
93    /// Set the waiting timeout of the inner thread, if exists. The default is
94    /// 60 seconds.
95    pub fn thread_pool_recv_timeout<T>(mut self, timeout: T) -> Self
96    where
97        time::Duration: From<T>,
98    {
99        self.pool_recv_timeout = timeout.into();
100        self
101    }
102
103    /// Create new System.
104    ///
105    /// This method panics if it can not create runtime
106    pub fn build<R: Runner>(self, runner: R) -> SystemRunner {
107        let config = SystemConfig {
108            name: self.name.clone(),
109            testing: self.testing,
110            stack_size: self.stack_size,
111            stop_on_panic: self.stop_on_panic,
112            ping_interval: self.ping_interval,
113            pool_limit: self.pool_limit,
114            pool_recv_timeout: self.pool_recv_timeout,
115            runner: Arc::new(runner),
116        };
117        self.build_with(config)
118    }
119
120    /// Create new System.
121    ///
122    /// This method panics if it can not create runtime
123    pub fn build_with(self, config: SystemConfig) -> SystemRunner {
124        let runner = config.runner.clone();
125        let system = System::construct(config);
126
127        // init system arbiter and run configuration method
128        SystemRunner {
129            system,
130            runner,
131            _t: PhantomData,
132        }
133    }
134}
135
136/// Helper object that runs System's event loop
137#[must_use = "SystemRunner must be run"]
138pub struct SystemRunner {
139    system: System,
140    runner: Arc<dyn Runner>,
141    _t: PhantomData<Rc<()>>,
142}
143
144impl SystemRunner {
145    /// Get current system.
146    pub fn system(&self) -> System {
147        self.system.clone()
148    }
149
150    /// This function will start event loop and will finish once the
151    /// `System::stop()` function is called.
152    pub fn run_until_stop(self) -> io::Result<()> {
153        self.run(|| Ok(()))
154    }
155
156    /// This function will start event loop and will finish once the
157    /// `System::stop()` function is called.
158    pub fn run<F>(self, f: F) -> io::Result<()>
159    where
160        F: FnOnce() -> io::Result<()> + 'static,
161    {
162        log::info!("Starting {:?} system", self.system.name());
163
164        let SystemRunner {
165            mut system, runner, ..
166        } = self;
167
168        // run loop
169        crate::driver::block_on(runner.as_ref(), async move {
170            let stop = system.start();
171
172            f()?;
173
174            match stop.await {
175                Ok(code) => {
176                    if code != 0 {
177                        Err(io::Error::other(format!("Non-zero exit code: {code}")))
178                    } else {
179                        Ok(())
180                    }
181                }
182                Err(_) => Err(io::Error::other("Closed")),
183            }
184        })
185    }
186
187    /// Execute a future and wait for result.
188    pub fn block_on<F, R>(self, fut: F) -> R
189    where
190        F: Future<Output = R> + 'static,
191        R: 'static,
192    {
193        let SystemRunner {
194            mut system, runner, ..
195        } = self;
196
197        crate::driver::block_on(runner.as_ref(), async move {
198            let stop = system.start();
199            drop(stop);
200
201            fut.await
202        })
203    }
204
205    #[cfg(feature = "tokio")]
206    /// Execute a future and wait for result.
207    pub async fn run_local<F, R>(self, fut: F) -> R
208    where
209        F: Future<Output = R> + 'static,
210        R: 'static,
211    {
212        let SystemRunner { mut system, .. } = self;
213
214        // run loop
215        tok_io::task::LocalSet::new()
216            .run_until(async move {
217                let stop = system.start();
218                drop(stop);
219
220                fut.await
221            })
222            .await
223    }
224}
225
226impl fmt::Debug for SystemRunner {
227    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
228        f.debug_struct("SystemRunner")
229            .field("system", &self.system)
230            .finish()
231    }
232}