ntex_rt/
builder.rs

1use std::{future::Future, io, marker::PhantomData, rc::Rc, sync::Arc, time};
2
3use async_channel::unbounded;
4
5use crate::arbiter::{Arbiter, ArbiterController};
6use crate::driver::Runner;
7use crate::pool::ThreadPool;
8use crate::system::{System, SystemCommand, SystemConfig, SystemSupport};
9
10/// Builder struct for a ntex runtime.
11///
12/// Either use `Builder::build` to create a system and start actors.
13/// Alternatively, use `Builder::run` to start the tokio runtime and
14/// run a function in its context.
15pub struct Builder {
16    /// Name of the System. Defaults to "ntex" if unset.
17    name: String,
18    /// Whether the Arbiter will stop the whole System on uncaught panic. Defaults to false.
19    stop_on_panic: bool,
20    /// New thread stack size
21    stack_size: usize,
22    /// Arbiters ping interval
23    ping_interval: usize,
24    /// Thread pool config
25    pool_limit: usize,
26    pool_recv_timeout: time::Duration,
27}
28
29impl Builder {
30    pub(super) fn new() -> Self {
31        Builder {
32            name: "ntex".into(),
33            stop_on_panic: false,
34            stack_size: 0,
35            ping_interval: 1000,
36            pool_limit: 256,
37            pool_recv_timeout: time::Duration::from_secs(60),
38        }
39    }
40
41    /// Sets the name of the System.
42    pub fn name<N: AsRef<str>>(mut self, name: N) -> Self {
43        self.name = name.as_ref().into();
44        self
45    }
46
47    /// Sets the option 'stop_on_panic' which controls whether the System is stopped when an
48    /// uncaught panic is thrown from a worker thread.
49    ///
50    /// Defaults to false.
51    pub fn stop_on_panic(mut self, stop_on_panic: bool) -> Self {
52        self.stop_on_panic = stop_on_panic;
53        self
54    }
55
56    /// Sets the size of the stack (in bytes) for the new thread.
57    pub fn stack_size(mut self, size: usize) -> Self {
58        self.stack_size = size;
59        self
60    }
61
62    /// Sets ping interval for spawned arbiters.
63    ///
64    /// Interval is in milliseconds. By default 5000 milliseconds is set.
65    /// To disable pings set value to zero.
66    pub fn ping_interval(mut self, interval: usize) -> Self {
67        self.ping_interval = interval;
68        self
69    }
70
71    /// Set the thread number limit of the inner thread pool, if exists. The
72    /// default value is 256.
73    pub fn thread_pool_limit(mut self, value: usize) -> Self {
74        self.pool_limit = value;
75        self
76    }
77
78    /// Set the waiting timeout of the inner thread, if exists. The default is
79    /// 60 seconds.
80    pub fn thread_pool_recv_timeout<T>(mut self, timeout: T) -> Self
81    where
82        time::Duration: From<T>,
83    {
84        self.pool_recv_timeout = timeout.into();
85        self
86    }
87
88    /// Create new System.
89    ///
90    /// This method panics if it can not create tokio runtime
91    pub fn build<R: Runner>(self, runner: R) -> SystemRunner {
92        let (stop_tx, stop) = oneshot::channel();
93        let (sys_sender, sys_receiver) = unbounded();
94
95        let config = SystemConfig {
96            runner: Arc::new(runner),
97            stack_size: self.stack_size,
98            stop_on_panic: self.stop_on_panic,
99        };
100
101        // thread pool
102        let pool = ThreadPool::new(self.pool_limit, self.pool_recv_timeout);
103
104        let (arb, controller) = Arbiter::new_system(self.name.clone());
105        let _ = sys_sender.try_send(SystemCommand::RegisterArbiter(arb.id(), arb.clone()));
106        let system = System::construct(sys_sender, arb, config.clone(), pool);
107
108        // system arbiter
109        let support = SystemSupport::new(stop_tx, sys_receiver, self.ping_interval);
110
111        // init system arbiter and run configuration method
112        SystemRunner {
113            stop,
114            support,
115            controller,
116            system,
117            config,
118            _t: PhantomData,
119        }
120    }
121
122    /// Create new System.
123    ///
124    /// This method panics if it can not create tokio runtime
125    pub fn build_with(self, config: SystemConfig) -> SystemRunner {
126        let (stop_tx, stop) = oneshot::channel();
127        let (sys_sender, sys_receiver) = unbounded();
128
129        // thread pool
130        let pool = ThreadPool::new(self.pool_limit, self.pool_recv_timeout);
131
132        let (arb, controller) = Arbiter::new_system(self.name.clone());
133        let _ = sys_sender.try_send(SystemCommand::RegisterArbiter(arb.id(), arb.clone()));
134        let system = System::construct(sys_sender, arb, config.clone(), pool);
135
136        // system arbiter
137        let support = SystemSupport::new(stop_tx, sys_receiver, self.ping_interval);
138
139        // init system arbiter and run configuration method
140        SystemRunner {
141            stop,
142            support,
143            controller,
144            system,
145            config,
146            _t: PhantomData,
147        }
148    }
149}
150
151/// Helper object that runs System's event loop
152#[must_use = "SystemRunner must be run"]
153pub struct SystemRunner {
154    stop: oneshot::Receiver<i32>,
155    support: SystemSupport,
156    controller: ArbiterController,
157    system: System,
158    config: SystemConfig,
159    _t: PhantomData<Rc<()>>,
160}
161
162impl SystemRunner {
163    /// Get current system.
164    pub fn system(&self) -> System {
165        self.system.clone()
166    }
167
168    /// This function will start event loop and will finish once the
169    /// `System::stop()` function is called.
170    pub fn run_until_stop(self) -> io::Result<()> {
171        self.run(|| Ok(()))
172    }
173
174    /// This function will start event loop and will finish once the
175    /// `System::stop()` function is called.
176    pub fn run<F>(self, f: F) -> io::Result<()>
177    where
178        F: FnOnce() -> io::Result<()> + 'static,
179    {
180        let SystemRunner {
181            controller,
182            stop,
183            support,
184            config,
185            ..
186        } = self;
187
188        // run loop
189        config.block_on(async move {
190            f()?;
191
192            let _ = crate::spawn(support.run());
193            let _ = crate::spawn(controller.run());
194            match stop.await {
195                Ok(code) => {
196                    if code != 0 {
197                        Err(io::Error::other(format!("Non-zero exit code: {code}")))
198                    } else {
199                        Ok(())
200                    }
201                }
202                Err(_) => Err(io::Error::other("Closed")),
203            }
204        })
205    }
206
207    /// Execute a future and wait for result.
208    pub fn block_on<F, R>(self, fut: F) -> R
209    where
210        F: Future<Output = R> + 'static,
211        R: 'static,
212    {
213        let SystemRunner {
214            controller,
215            support,
216            config,
217            ..
218        } = self;
219
220        config.block_on(async move {
221            let _ = crate::spawn(support.run());
222            let _ = crate::spawn(controller.run());
223            fut.await
224        })
225    }
226
227    #[cfg(feature = "tokio")]
228    /// Execute a future and wait for result.
229    pub async fn run_local<F, R>(self, fut: F) -> R
230    where
231        F: Future<Output = R> + 'static,
232        R: 'static,
233    {
234        let SystemRunner {
235            controller,
236            support,
237            ..
238        } = self;
239
240        // run loop
241        tok_io::task::LocalSet::new()
242            .run_until(async move {
243                let _ = crate::spawn(support.run());
244                let _ = crate::spawn(controller.run());
245                fut.await
246            })
247            .await
248    }
249}