Skip to main content

ntex_rt/
builder.rs

1use std::{fmt, future::Future, io, marker::PhantomData, rc::Rc, sync::Arc, time};
2
3use async_channel::unbounded;
4
5use crate::arbiter::{Arbiter, ArbiterController};
6use crate::driver::Runner;
7use crate::pool::ThreadPool;
8use crate::system::{System, SystemCommand, SystemConfig, SystemSupport};
9
10#[derive(Debug, Clone)]
11/// Builder struct for a ntex runtime.
12///
13/// Either use `Builder::build` to create a system and start actors.
14/// Alternatively, use `Builder::run` to start the tokio runtime and
15/// run a function in its context.
16pub struct Builder {
17    /// Name of the System. Defaults to "ntex" if unset.
18    name: String,
19    /// Whether the Arbiter will stop the whole System on uncaught panic. Defaults to false.
20    stop_on_panic: bool,
21    /// New thread stack size
22    stack_size: usize,
23    /// Arbiters ping interval
24    ping_interval: usize,
25    /// Thread pool config
26    pool_limit: usize,
27    pool_recv_timeout: time::Duration,
28    /// testing flag
29    testing: bool,
30}
31
32impl Builder {
33    pub(super) fn new() -> Self {
34        Builder {
35            name: "ntex".into(),
36            stop_on_panic: false,
37            stack_size: 0,
38            ping_interval: 1000,
39            testing: false,
40            pool_limit: 256,
41            pool_recv_timeout: time::Duration::from_secs(60),
42        }
43    }
44
45    #[must_use]
46    /// Sets the name of the System.
47    pub fn name<N: AsRef<str>>(mut self, name: N) -> Self {
48        self.name = name.as_ref().into();
49        self
50    }
51
52    #[must_use]
53    /// Sets the option `stop_on_panic`
54    ///
55    /// It controls whether the System is stopped when an
56    /// uncaught panic is thrown from a worker thread.
57    ///
58    /// Defaults is set to false.
59    pub fn stop_on_panic(mut self, stop_on_panic: bool) -> Self {
60        self.stop_on_panic = stop_on_panic;
61        self
62    }
63
64    #[must_use]
65    /// Sets the size of the stack (in bytes) for the new thread.
66    pub fn stack_size(mut self, size: usize) -> Self {
67        self.stack_size = size;
68        self
69    }
70
71    #[must_use]
72    /// Sets ping interval for spawned arbiters.
73    ///
74    /// Interval is in milliseconds. By default 5000 milliseconds is set.
75    /// To disable pings set value to zero.
76    pub fn ping_interval(mut self, interval: usize) -> Self {
77        self.ping_interval = interval;
78        self
79    }
80
81    #[must_use]
82    /// Set the thread number limit of the inner thread pool, if exists. The
83    /// default value is 256.
84    pub fn thread_pool_limit(mut self, value: usize) -> Self {
85        self.pool_limit = value;
86        self
87    }
88
89    #[must_use]
90    /// Mark system as testing
91    pub fn testing(mut self) -> Self {
92        self.testing = true;
93        self
94    }
95
96    #[must_use]
97    /// Set the waiting timeout of the inner thread, if exists. The default is
98    /// 60 seconds.
99    pub fn thread_pool_recv_timeout<T>(mut self, timeout: T) -> Self
100    where
101        time::Duration: From<T>,
102    {
103        self.pool_recv_timeout = timeout.into();
104        self
105    }
106
107    /// Create new System.
108    ///
109    /// This method panics if it can not create tokio runtime
110    pub fn build<R: Runner>(self, runner: R) -> SystemRunner {
111        let name = self.name.clone();
112        let testing = self.testing;
113        let stack_size = self.stack_size;
114        let stop_on_panic = self.stop_on_panic;
115
116        self.build_with(SystemConfig {
117            name,
118            testing,
119            stack_size,
120            stop_on_panic,
121            runner: Arc::new(runner),
122        })
123    }
124
125    /// Create new System.
126    ///
127    /// This method panics if it can not create tokio runtime
128    pub fn build_with(self, config: SystemConfig) -> SystemRunner {
129        let (stop_tx, stop) = oneshot::channel();
130        let (sys_sender, sys_receiver) = unbounded();
131
132        // thread pool
133        let pool = ThreadPool::new(&self.name, self.pool_limit, self.pool_recv_timeout);
134
135        let (arb, controller) = Arbiter::new_system(self.name.clone());
136        let _ = sys_sender.try_send(SystemCommand::RegisterArbiter(arb.id(), arb.clone()));
137        let system = System::construct(sys_sender, arb, config.clone(), pool);
138
139        // system arbiter
140        let support = SystemSupport::new(stop_tx, sys_receiver, self.ping_interval);
141
142        // init system arbiter and run configuration method
143        SystemRunner {
144            stop,
145            support,
146            controller,
147            system,
148            config,
149            _t: PhantomData,
150        }
151    }
152}
153
154/// Helper object that runs System's event loop
155#[must_use = "SystemRunner must be run"]
156pub struct SystemRunner {
157    stop: oneshot::Receiver<i32>,
158    support: SystemSupport,
159    controller: ArbiterController,
160    system: System,
161    config: SystemConfig,
162    _t: PhantomData<Rc<()>>,
163}
164
165impl SystemRunner {
166    /// Get current system.
167    pub fn system(&self) -> System {
168        self.system.clone()
169    }
170
171    /// This function will start event loop and will finish once the
172    /// `System::stop()` function is called.
173    pub fn run_until_stop(self) -> io::Result<()> {
174        self.run(|| Ok(()))
175    }
176
177    /// This function will start event loop and will finish once the
178    /// `System::stop()` function is called.
179    pub fn run<F>(self, f: F) -> io::Result<()>
180    where
181        F: FnOnce() -> io::Result<()> + 'static,
182    {
183        log::info!("Starting {:?} system", self.config.name);
184
185        let SystemRunner {
186            controller,
187            stop,
188            support,
189            config,
190            ..
191        } = self;
192
193        // run loop
194        config.block_on(async move {
195            f()?;
196
197            crate::spawn(support.run());
198            crate::spawn(controller.run());
199            match stop.await {
200                Ok(code) => {
201                    if code != 0 {
202                        Err(io::Error::other(format!("Non-zero exit code: {code}")))
203                    } else {
204                        Ok(())
205                    }
206                }
207                Err(_) => Err(io::Error::other("Closed")),
208            }
209        })
210    }
211
212    /// Execute a future and wait for result.
213    pub fn block_on<F, R>(self, fut: F) -> R
214    where
215        F: Future<Output = R> + 'static,
216        R: 'static,
217    {
218        let SystemRunner {
219            controller,
220            support,
221            config,
222            ..
223        } = self;
224
225        config.block_on(async move {
226            crate::spawn(support.run());
227            crate::spawn(controller.run());
228            fut.await
229        })
230    }
231
232    #[cfg(feature = "tokio")]
233    /// Execute a future and wait for result.
234    pub async fn run_local<F, R>(self, fut: F) -> R
235    where
236        F: Future<Output = R> + 'static,
237        R: 'static,
238    {
239        let SystemRunner {
240            controller,
241            support,
242            ..
243        } = self;
244
245        // run loop
246        tok_io::task::LocalSet::new()
247            .run_until(async move {
248                crate::spawn(support.run());
249                crate::spawn(controller.run());
250                fut.await
251            })
252            .await
253    }
254}
255
256impl fmt::Debug for SystemRunner {
257    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
258        f.debug_struct("SystemRunner")
259            .field("system", &self.system)
260            .field("support", &self.support)
261            .field("config", &self.config)
262            .finish()
263    }
264}