Skip to main content

ntex_rt/
builder.rs

1use std::{future::Future, io, marker::PhantomData, rc::Rc, sync::Arc, time};
2
3use async_channel::unbounded;
4
5use crate::arbiter::{Arbiter, ArbiterController};
6use crate::driver::Runner;
7use crate::pool::ThreadPool;
8use crate::system::{System, SystemCommand, SystemConfig, SystemSupport};
9
10#[derive(Debug, Clone)]
11/// Builder struct for a ntex runtime.
12///
13/// Either use `Builder::build` to create a system and start actors.
14/// Alternatively, use `Builder::run` to start the tokio runtime and
15/// run a function in its context.
16pub struct Builder {
17    /// Name of the System. Defaults to "ntex" if unset.
18    name: String,
19    /// Whether the Arbiter will stop the whole System on uncaught panic. Defaults to false.
20    stop_on_panic: bool,
21    /// New thread stack size
22    stack_size: usize,
23    /// Arbiters ping interval
24    ping_interval: usize,
25    /// Thread pool config
26    pool_limit: usize,
27    pool_recv_timeout: time::Duration,
28    /// testing flag
29    testing: bool,
30}
31
32impl Builder {
33    pub(super) fn new() -> Self {
34        Builder {
35            name: "ntex".into(),
36            stop_on_panic: false,
37            stack_size: 0,
38            ping_interval: 1000,
39            testing: false,
40            pool_limit: 256,
41            pool_recv_timeout: time::Duration::from_secs(60),
42        }
43    }
44
45    /// Sets the name of the System.
46    pub fn name<N: AsRef<str>>(mut self, name: N) -> Self {
47        self.name = name.as_ref().into();
48        self
49    }
50
51    /// Sets the option 'stop_on_panic' which controls whether the System is stopped when an
52    /// uncaught panic is thrown from a worker thread.
53    ///
54    /// Defaults to false.
55    pub fn stop_on_panic(mut self, stop_on_panic: bool) -> Self {
56        self.stop_on_panic = stop_on_panic;
57        self
58    }
59
60    /// Sets the size of the stack (in bytes) for the new thread.
61    pub fn stack_size(mut self, size: usize) -> Self {
62        self.stack_size = size;
63        self
64    }
65
66    /// Sets ping interval for spawned arbiters.
67    ///
68    /// Interval is in milliseconds. By default 5000 milliseconds is set.
69    /// To disable pings set value to zero.
70    pub fn ping_interval(mut self, interval: usize) -> Self {
71        self.ping_interval = interval;
72        self
73    }
74
75    /// Set the thread number limit of the inner thread pool, if exists. The
76    /// default value is 256.
77    pub fn thread_pool_limit(mut self, value: usize) -> Self {
78        self.pool_limit = value;
79        self
80    }
81
82    /// Mark system as testing
83    pub fn testing(mut self) -> Self {
84        self.testing = true;
85        self
86    }
87
88    /// Set the waiting timeout of the inner thread, if exists. The default is
89    /// 60 seconds.
90    pub fn thread_pool_recv_timeout<T>(mut self, timeout: T) -> Self
91    where
92        time::Duration: From<T>,
93    {
94        self.pool_recv_timeout = timeout.into();
95        self
96    }
97
98    /// Create new System.
99    ///
100    /// This method panics if it can not create tokio runtime
101    pub fn build<R: Runner>(self, runner: R) -> SystemRunner {
102        let name = self.name.clone();
103        let testing = self.testing;
104        let stack_size = self.stack_size;
105        let stop_on_panic = self.stop_on_panic;
106
107        self.build_with(SystemConfig {
108            name,
109            testing,
110            stack_size,
111            stop_on_panic,
112            runner: Arc::new(runner),
113        })
114    }
115
116    /// Create new System.
117    ///
118    /// This method panics if it can not create tokio runtime
119    pub fn build_with(self, config: SystemConfig) -> SystemRunner {
120        let (stop_tx, stop) = oneshot::channel();
121        let (sys_sender, sys_receiver) = unbounded();
122
123        // thread pool
124        let pool = ThreadPool::new(&self.name, self.pool_limit, self.pool_recv_timeout);
125
126        let (arb, controller) = Arbiter::new_system(self.name.clone());
127        let _ = sys_sender.try_send(SystemCommand::RegisterArbiter(arb.id(), arb.clone()));
128        let system = System::construct(sys_sender, arb, config.clone(), pool);
129
130        // system arbiter
131        let support = SystemSupport::new(stop_tx, sys_receiver, self.ping_interval);
132
133        // init system arbiter and run configuration method
134        SystemRunner {
135            stop,
136            support,
137            controller,
138            system,
139            config,
140            _t: PhantomData,
141        }
142    }
143}
144
145/// Helper object that runs System's event loop
146#[must_use = "SystemRunner must be run"]
147pub struct SystemRunner {
148    stop: oneshot::Receiver<i32>,
149    support: SystemSupport,
150    controller: ArbiterController,
151    system: System,
152    config: SystemConfig,
153    _t: PhantomData<Rc<()>>,
154}
155
156impl SystemRunner {
157    /// Get current system.
158    pub fn system(&self) -> System {
159        self.system.clone()
160    }
161
162    /// This function will start event loop and will finish once the
163    /// `System::stop()` function is called.
164    pub fn run_until_stop(self) -> io::Result<()> {
165        self.run(|| Ok(()))
166    }
167
168    /// This function will start event loop and will finish once the
169    /// `System::stop()` function is called.
170    pub fn run<F>(self, f: F) -> io::Result<()>
171    where
172        F: FnOnce() -> io::Result<()> + 'static,
173    {
174        log::info!("Starting {:?} system", self.config.name);
175
176        let SystemRunner {
177            controller,
178            stop,
179            support,
180            config,
181            ..
182        } = self;
183
184        // run loop
185        config.block_on(async move {
186            f()?;
187
188            let _ = crate::spawn(support.run());
189            let _ = crate::spawn(controller.run());
190            match stop.await {
191                Ok(code) => {
192                    if code != 0 {
193                        Err(io::Error::other(format!("Non-zero exit code: {code}")))
194                    } else {
195                        Ok(())
196                    }
197                }
198                Err(_) => Err(io::Error::other("Closed")),
199            }
200        })
201    }
202
203    /// Execute a future and wait for result.
204    pub fn block_on<F, R>(self, fut: F) -> R
205    where
206        F: Future<Output = R> + 'static,
207        R: 'static,
208    {
209        let SystemRunner {
210            controller,
211            support,
212            config,
213            ..
214        } = self;
215
216        config.block_on(async move {
217            let _ = crate::spawn(support.run());
218            let _ = crate::spawn(controller.run());
219            fut.await
220        })
221    }
222
223    #[cfg(feature = "tokio")]
224    /// Execute a future and wait for result.
225    pub async fn run_local<F, R>(self, fut: F) -> R
226    where
227        F: Future<Output = R> + 'static,
228        R: 'static,
229    {
230        let SystemRunner {
231            controller,
232            support,
233            ..
234        } = self;
235
236        // run loop
237        tok_io::task::LocalSet::new()
238            .run_until(async move {
239                let _ = crate::spawn(support.run());
240                let _ = crate::spawn(controller.run());
241                fut.await
242            })
243            .await
244    }
245}