Skip to main content

ntex_rt/
builder.rs

1use std::{fmt, future::Future, io, marker::PhantomData, panic, rc::Rc, sync::Arc, time};
2
3use crate::driver::Runner;
4use crate::system::{System, SystemConfig};
5
6#[derive(Debug, Clone)]
7/// Builder struct for a ntex runtime.
8///
9/// Either use `Builder::build` to create a system and start actors.
10/// Alternatively, use `Builder::run` to start the runtime and
11/// run a function in its context.
12pub struct Builder {
13    /// Name of the System. Defaults to "ntex" if unset.
14    name: String,
15    /// Whether the Arbiter will stop the whole System on uncaught panic. Defaults to false.
16    stop_on_panic: bool,
17    /// New thread stack size
18    stack_size: usize,
19    /// Arbiters ping interval
20    ping_interval: usize,
21    /// Disable signal handling
22    disable_signals: bool,
23    /// Thread pool config
24    pool_limit: usize,
25    pool_recv_timeout: time::Duration,
26    /// testing flag
27    testing: bool,
28}
29
30impl Builder {
31    pub(super) fn new() -> Self {
32        Builder {
33            name: "ntex".into(),
34            stop_on_panic: false,
35            stack_size: 0,
36            ping_interval: 1000,
37            disable_signals: false,
38            testing: false,
39            pool_limit: 256,
40            pool_recv_timeout: time::Duration::from_secs(60),
41        }
42    }
43
44    #[must_use]
45    /// Sets the name of the System.
46    pub fn name<N: AsRef<str>>(mut self, name: N) -> Self {
47        self.name = name.as_ref().into();
48        self
49    }
50
51    #[must_use]
52    /// Sets the option `stop_on_panic`
53    ///
54    /// It controls whether the System is stopped when an
55    /// uncaught panic is thrown from a worker thread.
56    ///
57    /// Defaults is set to false.
58    pub fn stop_on_panic(mut self, stop_on_panic: bool) -> Self {
59        self.stop_on_panic = stop_on_panic;
60        self
61    }
62
63    #[must_use]
64    /// Disable signal handling.
65    ///
66    /// By default signal handling is enabled.
67    pub fn disable_signals(mut self) -> Self {
68        self.disable_signals = true;
69        self
70    }
71
72    #[must_use]
73    /// Sets the size of the stack (in bytes) for the new thread.
74    pub fn stack_size(mut self, size: usize) -> Self {
75        self.stack_size = size;
76        self
77    }
78
79    #[must_use]
80    /// Sets ping interval for spawned arbiters.
81    ///
82    /// Interval is in milliseconds. By default 1000 milliseconds is set.
83    /// To disable pings set value to zero.
84    pub fn ping_interval(mut self, interval: usize) -> Self {
85        self.ping_interval = interval;
86        self
87    }
88
89    #[must_use]
90    /// Set the thread number limit of the inner thread pool, if exists. The
91    /// default value is 256.
92    pub fn thread_pool_limit(mut self, value: usize) -> Self {
93        self.pool_limit = value;
94        self
95    }
96
97    #[must_use]
98    /// Mark system as testing
99    pub fn testing(mut self) -> Self {
100        self.testing = true;
101        self.disable_signals = true;
102        self
103    }
104
105    #[must_use]
106    /// Set the waiting timeout of the inner thread, if exists. The default is
107    /// 60 seconds.
108    pub fn thread_pool_recv_timeout<T>(mut self, timeout: T) -> Self
109    where
110        time::Duration: From<T>,
111    {
112        self.pool_recv_timeout = timeout.into();
113        self
114    }
115
116    /// Create new System.
117    ///
118    /// This method panics if it can not create runtime
119    pub fn build<R: Runner>(self, runner: R) -> SystemRunner {
120        let config = SystemConfig {
121            name: self.name.clone(),
122            testing: self.testing,
123            stack_size: self.stack_size,
124            stop_on_panic: self.stop_on_panic,
125            ping_interval: self.ping_interval,
126            disable_signals: self.disable_signals,
127            pool_limit: self.pool_limit,
128            pool_recv_timeout: self.pool_recv_timeout,
129            runner: Arc::new(runner),
130        };
131        self.build_with(config)
132    }
133
134    /// Create new System.
135    ///
136    /// This method panics if it can not create runtime
137    pub fn build_with(self, config: SystemConfig) -> SystemRunner {
138        let runner = config.runner.clone();
139
140        // init system arbiter and run configuration method
141        SystemRunner {
142            config,
143            runner,
144            _t: PhantomData,
145        }
146    }
147}
148
149/// Helper object that runs System's event loop
150#[must_use = "SystemRunner must be run"]
151pub struct SystemRunner {
152    config: SystemConfig,
153    runner: Arc<dyn Runner>,
154    _t: PhantomData<Rc<()>>,
155}
156
157impl SystemRunner {
158    /// This function will start event loop and will finish once the
159    /// `System::stop()` function is called.
160    pub fn run_until_stop(self) -> io::Result<()> {
161        self.run(|| Ok(()))
162    }
163
164    /// This function will start event loop and will finish once the
165    /// `System::stop()` function is called.
166    pub fn run<F>(self, f: F) -> io::Result<()>
167    where
168        F: FnOnce() -> io::Result<()> + 'static,
169    {
170        log::info!("Starting {:?} system", self.config.name);
171
172        let SystemRunner { config, runner, .. } = self;
173
174        // run loop
175        crate::driver::block_on(runner.as_ref(), async move {
176            let (system, stop) = System::start(config);
177            crate::signals::start(&system);
178
179            f()?;
180
181            match stop.await {
182                Ok(code) => {
183                    if code != 0 {
184                        Err(io::Error::other(format!("Non-zero exit code: {code}")))
185                    } else {
186                        Ok(())
187                    }
188                }
189                Err(_) => Err(io::Error::other("Closed")),
190            }
191        })
192    }
193
194    /// Execute a future and wait for result.
195    pub fn block_on<F, R>(self, fut: F) -> R
196    where
197        F: Future<Output = R> + 'static,
198        R: 'static,
199    {
200        let SystemRunner { config, runner, .. } = self;
201
202        crate::driver::block_on(runner.as_ref(), async move {
203            let (system, _) = System::start(config);
204            crate::signals::start(&system);
205
206            let loc = current_location();
207            ntex_error::set_backtrace_start(loc.file(), loc.line() + 2);
208            fut.await
209        })
210    }
211
212    #[cfg(feature = "tokio")]
213    /// Execute a future and wait for result.
214    pub async fn run_local<F, R>(self, fut: F) -> R
215    where
216        F: Future<Output = R> + 'static,
217        R: 'static,
218    {
219        let SystemRunner { config, .. } = self;
220
221        // run loop
222        tok_io::task::LocalSet::new()
223            .run_until(async move {
224                _ = System::start(config);
225
226                let loc = current_location();
227                ntex_error::set_backtrace_start(loc.file(), loc.line() + 2);
228                fut.await
229            })
230            .await
231    }
232}
233
234#[track_caller]
235pub(crate) fn current_location() -> &'static panic::Location<'static> {
236    panic::Location::caller()
237}
238
239impl fmt::Debug for SystemRunner {
240    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
241        f.debug_struct("SystemRunner")
242            .field("config", &self.config)
243            .finish()
244    }
245}