1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
use std::{
    cell::RefCell,
    fmt,
    future::Future,
    pin::Pin,
    sync::atomic::{AtomicUsize, Ordering},
    task::{Context, Poll},
    thread,
};

use futures_core::ready;
use tokio::sync::mpsc;

use crate::system::{System, SystemCommand};

pub(crate) static COUNT: AtomicUsize = AtomicUsize::new(0);

thread_local!(
    static HANDLE: RefCell<Option<ArbiterHandle>> = const { RefCell::new(None) };
);

pub(crate) enum ArbiterCommand {
    Stop,
    Execute(Pin<Box<dyn Future<Output = ()> + Send>>),
}

impl fmt::Debug for ArbiterCommand {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        match self {
            ArbiterCommand::Stop => write!(f, "ArbiterCommand::Stop"),
            ArbiterCommand::Execute(_) => write!(f, "ArbiterCommand::Execute"),
        }
    }
}

/// A handle for sending spawn and stop messages to an [Arbiter].
#[derive(Debug, Clone)]
pub struct ArbiterHandle {
    tx: mpsc::UnboundedSender<ArbiterCommand>,
}

impl ArbiterHandle {
    pub(crate) fn new(tx: mpsc::UnboundedSender<ArbiterCommand>) -> Self {
        Self { tx }
    }

    /// Send a future to the [Arbiter]'s thread and spawn it.
    ///
    /// If you require a result, include a response channel in the future.
    ///
    /// Returns true if future was sent successfully and false if the [Arbiter] has died.
    pub fn spawn<Fut>(&self, future: Fut) -> bool
    where
        Fut: Future<Output = ()> + Send + 'static,
    {
        self.tx
            .send(ArbiterCommand::Execute(Box::pin(future)))
            .is_ok()
    }

    /// Send a function to the [Arbiter]'s thread and execute it.
    ///
    /// Any result from the function is discarded. If you require a result, include a response
    /// channel in the function.
    ///
    /// Returns true if function was sent successfully and false if the [Arbiter] has died.
    pub fn spawn_fn<F>(&self, f: F) -> bool
    where
        F: FnOnce() + Send + 'static,
    {
        self.spawn(async { f() })
    }

    /// Instruct [Arbiter] to stop processing it's event loop.
    ///
    /// Returns true if stop message was sent successfully and false if the [Arbiter] has
    /// been dropped.
    pub fn stop(&self) -> bool {
        self.tx.send(ArbiterCommand::Stop).is_ok()
    }
}

/// An Arbiter represents a thread that provides an asynchronous execution environment for futures
/// and functions.
///
/// When an arbiter is created, it spawns a new [OS thread](thread), and hosts an event loop.
#[derive(Debug)]
pub struct Arbiter {
    tx: mpsc::UnboundedSender<ArbiterCommand>,
    thread_handle: thread::JoinHandle<()>,
}

impl Arbiter {
    /// Spawn a new Arbiter thread and start its event loop.
    ///
    /// # Panics
    /// Panics if a [System] is not registered on the current thread.
    #[cfg(not(all(target_os = "linux", feature = "io-uring")))]
    #[allow(clippy::new_without_default)]
    pub fn new() -> Arbiter {
        Self::with_tokio_rt(|| {
            crate::runtime::default_tokio_runtime().expect("Cannot create new Arbiter's Runtime.")
        })
    }

    /// Spawn a new Arbiter using the [Tokio Runtime](tokio-runtime) returned from a closure.
    ///
    /// [tokio-runtime]: tokio::runtime::Runtime
    #[cfg(not(all(target_os = "linux", feature = "io-uring")))]
    pub fn with_tokio_rt<F>(runtime_factory: F) -> Arbiter
    where
        F: FnOnce() -> tokio::runtime::Runtime + Send + 'static,
    {
        let sys = System::current();
        let system_id = sys.id();
        let arb_id = COUNT.fetch_add(1, Ordering::Relaxed);

        let name = format!("actix-rt|system:{}|arbiter:{}", system_id, arb_id);
        let (tx, rx) = mpsc::unbounded_channel();

        let (ready_tx, ready_rx) = std::sync::mpsc::channel::<()>();

        let thread_handle = thread::Builder::new()
            .name(name.clone())
            .spawn({
                let tx = tx.clone();
                move || {
                    let rt = crate::runtime::Runtime::from(runtime_factory());
                    let hnd = ArbiterHandle::new(tx);

                    System::set_current(sys);

                    HANDLE.with(|cell| *cell.borrow_mut() = Some(hnd.clone()));

                    // register arbiter
                    let _ = System::current()
                        .tx()
                        .send(SystemCommand::RegisterArbiter(arb_id, hnd));

                    ready_tx.send(()).unwrap();

                    // run arbiter event processing loop
                    rt.block_on(ArbiterRunner { rx });

                    // deregister arbiter
                    let _ = System::current()
                        .tx()
                        .send(SystemCommand::DeregisterArbiter(arb_id));
                }
            })
            .unwrap_or_else(|err| panic!("Cannot spawn Arbiter's thread: {name:?}: {err:?}"));

        ready_rx.recv().unwrap();

        Arbiter { tx, thread_handle }
    }

    /// Spawn a new Arbiter thread and start its event loop with `tokio-uring` runtime.
    ///
    /// # Panics
    /// Panics if a [System] is not registered on the current thread.
    #[cfg(all(target_os = "linux", feature = "io-uring"))]
    #[allow(clippy::new_without_default)]
    pub fn new() -> Arbiter {
        let sys = System::current();
        let system_id = sys.id();
        let arb_id = COUNT.fetch_add(1, Ordering::Relaxed);

        let name = format!("actix-rt|system:{}|arbiter:{}", system_id, arb_id);
        let (tx, rx) = mpsc::unbounded_channel();

        let (ready_tx, ready_rx) = std::sync::mpsc::channel::<()>();

        let thread_handle = thread::Builder::new()
            .name(name.clone())
            .spawn({
                let tx = tx.clone();
                move || {
                    let hnd = ArbiterHandle::new(tx);

                    System::set_current(sys);

                    HANDLE.with(|cell| *cell.borrow_mut() = Some(hnd.clone()));

                    // register arbiter
                    let _ = System::current()
                        .tx()
                        .send(SystemCommand::RegisterArbiter(arb_id, hnd));

                    ready_tx.send(()).unwrap();

                    // run arbiter event processing loop
                    tokio_uring::start(ArbiterRunner { rx });

                    // deregister arbiter
                    let _ = System::current()
                        .tx()
                        .send(SystemCommand::DeregisterArbiter(arb_id));
                }
            })
            .unwrap_or_else(|err| panic!("Cannot spawn Arbiter's thread: {name:?}: {err:?}"));

        ready_rx.recv().unwrap();

        Arbiter { tx, thread_handle }
    }

    /// Sets up an Arbiter runner in a new System using the environment's local set.
    pub(crate) fn in_new_system() -> ArbiterHandle {
        let (tx, rx) = mpsc::unbounded_channel();

        let hnd = ArbiterHandle::new(tx);

        HANDLE.with(|cell| *cell.borrow_mut() = Some(hnd.clone()));

        crate::spawn(ArbiterRunner { rx });

        hnd
    }

    /// Return a handle to the this Arbiter's message sender.
    pub fn handle(&self) -> ArbiterHandle {
        ArbiterHandle::new(self.tx.clone())
    }

    /// Return a handle to the current thread's Arbiter's message sender.
    ///
    /// # Panics
    /// Panics if no Arbiter is running on the current thread.
    pub fn current() -> ArbiterHandle {
        HANDLE.with(|cell| match *cell.borrow() {
            Some(ref hnd) => hnd.clone(),
            None => panic!("Arbiter is not running."),
        })
    }

    /// Try to get current running arbiter handle.
    ///
    /// Returns `None` if no Arbiter has been started.
    ///
    /// Unlike [`current`](Self::current), this never panics.
    pub fn try_current() -> Option<ArbiterHandle> {
        HANDLE.with(|cell| cell.borrow().clone())
    }

    /// Stop Arbiter from continuing it's event loop.
    ///
    /// Returns true if stop message was sent successfully and false if the Arbiter has been dropped.
    pub fn stop(&self) -> bool {
        self.tx.send(ArbiterCommand::Stop).is_ok()
    }

    /// Send a future to the Arbiter's thread and spawn it.
    ///
    /// If you require a result, include a response channel in the future.
    ///
    /// Returns true if future was sent successfully and false if the Arbiter has died.
    #[track_caller]
    pub fn spawn<Fut>(&self, future: Fut) -> bool
    where
        Fut: Future<Output = ()> + Send + 'static,
    {
        self.tx
            .send(ArbiterCommand::Execute(Box::pin(future)))
            .is_ok()
    }

    /// Send a function to the Arbiter's thread and execute it.
    ///
    /// Any result from the function is discarded. If you require a result, include a response
    /// channel in the function.
    ///
    /// Returns true if function was sent successfully and false if the Arbiter has died.
    #[track_caller]
    pub fn spawn_fn<F>(&self, f: F) -> bool
    where
        F: FnOnce() + Send + 'static,
    {
        self.spawn(async { f() })
    }

    /// Wait for Arbiter's event loop to complete.
    ///
    /// Joins the underlying OS thread handle. See [`JoinHandle::join`](thread::JoinHandle::join).
    pub fn join(self) -> thread::Result<()> {
        self.thread_handle.join()
    }
}

/// A persistent future that processes [Arbiter] commands.
struct ArbiterRunner {
    rx: mpsc::UnboundedReceiver<ArbiterCommand>,
}

impl Future for ArbiterRunner {
    type Output = ();

    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        // process all items currently buffered in channel
        loop {
            match ready!(self.rx.poll_recv(cx)) {
                // channel closed; no more messages can be received
                None => return Poll::Ready(()),

                // process arbiter command
                Some(item) => match item {
                    ArbiterCommand::Stop => {
                        return Poll::Ready(());
                    }
                    ArbiterCommand::Execute(task_fut) => {
                        tokio::task::spawn_local(task_fut);
                    }
                },
            }
        }
    }
}