Skip to main content

ntex_rt/
builder.rs

1use std::{fmt, future::Future, io, marker::PhantomData, panic, rc::Rc, sync::Arc, time};
2
3use crate::driver::Runner;
4use crate::system::{System, SystemConfig};
5
6#[derive(Debug, Clone)]
7/// Builder struct for a ntex runtime.
8///
9/// Either use `Builder::build` to create a system and start actors.
10/// Alternatively, use `Builder::run` to start the runtime and
11/// run a function in its context.
12pub struct Builder {
13    /// Name of the System. Defaults to "ntex" if unset.
14    name: String,
15    /// Whether the Arbiter will stop the whole System on uncaught panic. Defaults to false.
16    stop_on_panic: bool,
17    /// New thread stack size
18    stack_size: usize,
19    /// Arbiters ping interval
20    ping_interval: usize,
21    /// Thread pool config
22    pool_limit: usize,
23    /// Thread pool boundness
24    pool_bounded: bool,
25    pool_recv_timeout: time::Duration,
26    /// testing flag
27    testing: bool,
28}
29
30impl Builder {
31    pub(super) fn new() -> Self {
32        Builder {
33            name: "ntex".into(),
34            stop_on_panic: false,
35            stack_size: 0,
36            ping_interval: 1000,
37            testing: false,
38            pool_limit: 256,
39            pool_bounded: true,
40            pool_recv_timeout: time::Duration::from_secs(60),
41        }
42    }
43
44    #[must_use]
45    /// Sets the name of the System.
46    pub fn name<N: AsRef<str>>(mut self, name: N) -> Self {
47        self.name = name.as_ref().into();
48        self
49    }
50
51    #[must_use]
52    /// Sets the option `stop_on_panic`
53    ///
54    /// It controls whether the System is stopped when an
55    /// uncaught panic is thrown from a worker thread.
56    ///
57    /// Defaults is set to false.
58    pub fn stop_on_panic(mut self, stop_on_panic: bool) -> Self {
59        self.stop_on_panic = stop_on_panic;
60        self
61    }
62
63    #[must_use]
64    /// Sets the size of the stack (in bytes) for the new thread.
65    pub fn stack_size(mut self, size: usize) -> Self {
66        self.stack_size = size;
67        self
68    }
69
70    #[must_use]
71    /// Sets ping interval for spawned arbiters.
72    ///
73    /// Interval is in milliseconds. By default 5000 milliseconds is set.
74    /// To disable pings set value to zero.
75    pub fn ping_interval(mut self, interval: usize) -> Self {
76        self.ping_interval = interval;
77        self
78    }
79
80    #[must_use]
81    /// Set the thread number limit of the inner thread pool, if exists. The
82    /// default value is 256.
83    pub fn thread_pool_limit(mut self, value: usize) -> Self {
84        self.pool_limit = value;
85        self
86    }
87
88    #[must_use]
89    /// Configures the thread pool to be unbounded.
90    pub fn thread_pool_unbounded(mut self) -> Self {
91        self.pool_bounded = false;
92        self
93    }
94
95    #[must_use]
96    /// Mark system as testing
97    pub fn testing(mut self) -> Self {
98        self.testing = true;
99        self
100    }
101
102    #[must_use]
103    /// Set the waiting timeout of the inner thread, if exists. The default is
104    /// 60 seconds.
105    pub fn thread_pool_recv_timeout<T>(mut self, timeout: T) -> Self
106    where
107        time::Duration: From<T>,
108    {
109        self.pool_recv_timeout = timeout.into();
110        self
111    }
112
113    /// Create new System.
114    ///
115    /// This method panics if it can not create runtime
116    pub fn build<R: Runner>(self, runner: R) -> SystemRunner {
117        let config = SystemConfig {
118            name: self.name.clone(),
119            testing: self.testing,
120            stack_size: self.stack_size,
121            stop_on_panic: self.stop_on_panic,
122            ping_interval: self.ping_interval,
123            pool_limit: self.pool_limit,
124            pool_bounded: self.pool_bounded,
125            pool_recv_timeout: self.pool_recv_timeout,
126            runner: Arc::new(runner),
127        };
128        self.build_with(config)
129    }
130
131    /// Create new System.
132    ///
133    /// This method panics if it can not create runtime
134    pub fn build_with(self, config: SystemConfig) -> SystemRunner {
135        let runner = config.runner.clone();
136        let system = System::construct(config);
137
138        // init system arbiter and run configuration method
139        SystemRunner {
140            system,
141            runner,
142            _t: PhantomData,
143        }
144    }
145}
146
147/// Helper object that runs System's event loop
148#[must_use = "SystemRunner must be run"]
149pub struct SystemRunner {
150    system: System,
151    runner: Arc<dyn Runner>,
152    _t: PhantomData<Rc<()>>,
153}
154
155impl SystemRunner {
156    /// Get current system.
157    pub fn system(&self) -> System {
158        self.system.clone()
159    }
160
161    /// This function will start event loop and will finish once the
162    /// `System::stop()` function is called.
163    pub fn run_until_stop(self) -> io::Result<()> {
164        self.run(|| Ok(()))
165    }
166
167    /// This function will start event loop and will finish once the
168    /// `System::stop()` function is called.
169    pub fn run<F>(self, f: F) -> io::Result<()>
170    where
171        F: FnOnce() -> io::Result<()> + 'static,
172    {
173        log::info!("Starting {:?} system", self.system.name());
174
175        let SystemRunner {
176            mut system, runner, ..
177        } = self;
178
179        // run loop
180        crate::driver::block_on(runner.as_ref(), async move {
181            let stop = system.start();
182
183            f()?;
184
185            match stop.await {
186                Ok(code) => {
187                    if code != 0 {
188                        Err(io::Error::other(format!("Non-zero exit code: {code}")))
189                    } else {
190                        Ok(())
191                    }
192                }
193                Err(_) => Err(io::Error::other("Closed")),
194            }
195        })
196    }
197
198    /// Execute a future and wait for result.
199    pub fn block_on<F, R>(self, fut: F) -> R
200    where
201        F: Future<Output = R> + 'static,
202        R: 'static,
203    {
204        let SystemRunner {
205            mut system, runner, ..
206        } = self;
207
208        crate::driver::block_on(runner.as_ref(), async move {
209            let stop = system.start();
210            drop(stop);
211
212            let loc = current_location();
213            ntex_error::set_backtrace_start(loc.file(), loc.line() + 2);
214            fut.await
215        })
216    }
217
218    #[cfg(feature = "tokio")]
219    /// Execute a future and wait for result.
220    pub async fn run_local<F, R>(self, fut: F) -> R
221    where
222        F: Future<Output = R> + 'static,
223        R: 'static,
224    {
225        let SystemRunner { mut system, .. } = self;
226
227        // run loop
228        tok_io::task::LocalSet::new()
229            .run_until(async move {
230                let stop = system.start();
231                drop(stop);
232
233                let loc = current_location();
234                ntex_error::set_backtrace_start(loc.file(), loc.line() + 2);
235                fut.await
236            })
237            .await
238    }
239}
240
241#[track_caller]
242pub(crate) fn current_location() -> &'static panic::Location<'static> {
243    panic::Location::caller()
244}
245
246impl fmt::Debug for SystemRunner {
247    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
248        f.debug_struct("SystemRunner")
249            .field("system", &self.system)
250            .finish()
251    }
252}