Skip to main content

ntex_rt/
builder.rs

1use std::{fmt, future::Future, io, marker::PhantomData, panic, rc::Rc, sync::Arc, time};
2
3use crate::driver::Runner;
4use crate::system::{System, SystemConfig};
5
6#[derive(Debug, Clone)]
7/// Builder struct for a ntex runtime.
8///
9/// Either use `Builder::build` to create a system and start actors.
10/// Alternatively, use `Builder::run` to start the runtime and
11/// run a function in its context.
12pub struct Builder {
13    /// Name of the System. Defaults to "ntex" if unset.
14    name: String,
15    /// Whether the Arbiter will stop the whole System on uncaught panic. Defaults to false.
16    stop_on_panic: bool,
17    /// New thread stack size
18    stack_size: usize,
19    /// Arbiters ping interval
20    ping_interval: usize,
21    /// Signal handling
22    signals: bool,
23    /// Thread pool config
24    pool_limit: usize,
25    pool_recv_timeout: time::Duration,
26    /// testing flag
27    testing: bool,
28}
29
30impl Builder {
31    pub(super) fn new() -> Self {
32        Builder {
33            name: "ntex".into(),
34            stop_on_panic: false,
35            stack_size: 0,
36            ping_interval: 1000,
37            signals: false,
38            testing: false,
39            pool_limit: 256,
40            pool_recv_timeout: time::Duration::from_secs(60),
41        }
42    }
43
44    #[must_use]
45    /// Sets the name of the System.
46    pub fn name<N: AsRef<str>>(mut self, name: N) -> Self {
47        self.name = name.as_ref().into();
48        self
49    }
50
51    #[must_use]
52    /// Sets the option `stop_on_panic`
53    ///
54    /// It controls whether the System is stopped when an
55    /// uncaught panic is thrown from a worker thread.
56    ///
57    /// Defaults is set to false.
58    pub fn stop_on_panic(mut self, stop_on_panic: bool) -> Self {
59        self.stop_on_panic = stop_on_panic;
60        self
61    }
62
63    #[must_use]
64    /// Disable signal handling.
65    ///
66    /// By default signal handling is disabled.
67    pub fn disable_signals(mut self) -> Self {
68        self.signals = false;
69        self
70    }
71
72    #[must_use]
73    /// Enable signal handling.
74    ///
75    /// By default signal handling is enabled.
76    pub fn enable_signals(mut self) -> Self {
77        self.signals = true;
78        self
79    }
80
81    #[must_use]
82    /// Sets the size of the stack (in bytes) for the new thread.
83    pub fn stack_size(mut self, size: usize) -> Self {
84        self.stack_size = size;
85        self
86    }
87
88    #[must_use]
89    /// Sets ping interval for spawned arbiters.
90    ///
91    /// Interval is in milliseconds. By default 1000 milliseconds is set.
92    /// To disable pings set value to zero.
93    pub fn ping_interval(mut self, interval: usize) -> Self {
94        self.ping_interval = interval;
95        self
96    }
97
98    #[must_use]
99    /// Set the thread number limit of the inner thread pool, if exists. The
100    /// default value is 256.
101    pub fn thread_pool_limit(mut self, value: usize) -> Self {
102        self.pool_limit = value;
103        self
104    }
105
106    #[must_use]
107    /// Mark system as testing
108    pub fn testing(mut self) -> Self {
109        self.testing = true;
110        self.signals = false;
111        self
112    }
113
114    #[must_use]
115    /// Set the waiting timeout of the inner thread, if exists. The default is
116    /// 60 seconds.
117    pub fn thread_pool_recv_timeout<T>(mut self, timeout: T) -> Self
118    where
119        time::Duration: From<T>,
120    {
121        self.pool_recv_timeout = timeout.into();
122        self
123    }
124
125    /// Create new System.
126    ///
127    /// This method panics if it can not create runtime
128    pub fn build<R: Runner>(self, runner: R) -> SystemRunner {
129        let config = SystemConfig {
130            name: self.name.clone(),
131            testing: self.testing,
132            stack_size: self.stack_size,
133            stop_on_panic: self.stop_on_panic,
134            ping_interval: self.ping_interval,
135            pool_limit: self.pool_limit,
136            pool_recv_timeout: self.pool_recv_timeout,
137            runner: Arc::new(runner),
138        };
139        self.build_with(config)
140    }
141
142    /// Create new System.
143    ///
144    /// This method panics if it can not create runtime
145    pub fn build_with(self, config: SystemConfig) -> SystemRunner {
146        let runner = config.runner.clone();
147
148        // init system arbiter and run configuration method
149        SystemRunner {
150            config,
151            runner,
152            signals: self.signals,
153            _t: PhantomData,
154        }
155    }
156}
157
158/// Helper object that runs System's event loop
159#[must_use = "SystemRunner must be run"]
160pub struct SystemRunner {
161    config: SystemConfig,
162    runner: Arc<dyn Runner>,
163    signals: bool,
164    _t: PhantomData<Rc<()>>,
165}
166
167impl SystemRunner {
168    /// This function will start event loop and will finish once the
169    /// `System::stop()` function is called.
170    pub fn run_until_stop(self) -> io::Result<()> {
171        self.run(|| Ok(()))
172    }
173
174    /// This function will start event loop and will finish once the
175    /// `System::stop()` function is called.
176    pub fn run<F>(self, f: F) -> io::Result<()>
177    where
178        F: FnOnce() -> io::Result<()> + 'static,
179    {
180        log::info!("Starting {:?} system", self.config.name);
181
182        let SystemRunner {
183            config,
184            runner,
185            signals,
186            ..
187        } = self;
188
189        // run loop
190        crate::driver::block_on(runner.as_ref(), async move {
191            let (system, stop) = System::start(config);
192            if signals {
193                system.enable_signals();
194            }
195
196            f()?;
197
198            match stop.await {
199                Ok(code) => {
200                    if code != 0 {
201                        Err(io::Error::other(format!("Non-zero exit code: {code}")))
202                    } else {
203                        Ok(())
204                    }
205                }
206                Err(_) => Err(io::Error::other("Closed")),
207            }
208        })
209    }
210
211    /// Execute a future and wait for result.
212    pub fn block_on<F, R>(self, fut: F) -> R
213    where
214        F: Future<Output = R> + 'static,
215        R: 'static,
216    {
217        let SystemRunner {
218            config,
219            runner,
220            signals,
221            ..
222        } = self;
223
224        crate::driver::block_on(runner.as_ref(), async move {
225            let (system, _) = System::start(config);
226            if signals {
227                system.enable_signals();
228            }
229
230            let loc = current_location();
231            ntex_error::set_backtrace_start(loc.file(), loc.line() + 2);
232            fut.await
233        })
234    }
235
236    #[cfg(feature = "tokio")]
237    /// Execute a future and wait for result.
238    pub async fn run_local<F, R>(self, fut: F) -> R
239    where
240        F: Future<Output = R> + 'static,
241        R: 'static,
242    {
243        let SystemRunner { config, .. } = self;
244
245        // run loop
246        tok_io::task::LocalSet::new()
247            .run_until(async move {
248                _ = System::start(config);
249
250                let loc = current_location();
251                ntex_error::set_backtrace_start(loc.file(), loc.line() + 2);
252                fut.await
253            })
254            .await
255    }
256}
257
258#[track_caller]
259pub(crate) fn current_location() -> &'static panic::Location<'static> {
260    panic::Location::caller()
261}
262
263impl fmt::Debug for SystemRunner {
264    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
265        f.debug_struct("SystemRunner")
266            .field("config", &self.config)
267            .finish()
268    }
269}