tiny_actor/channel/
mod.rs

1//! Module containing the [Channel], and [DynChannel] and [AnyChannel] traits. In
2//! general these are never used directly, but just part of an [Address] or [Child].
3
4use crate::*;
5use concurrent_queue::{ConcurrentQueue, PopError, PushError};
6use event_listener::{Event, EventListener};
7use std::{
8    fmt::Debug,
9    sync::atomic::{AtomicI32, AtomicU64, AtomicUsize, Ordering},
10};
11
12mod channel_trait;
13mod receiving;
14mod sending;
15pub use {channel_trait::*, receiving::*, sending::*};
16
17/// Contains all data that should be shared between Addresses, Inboxes and the Child.
18pub struct Channel<M> {
19    /// The underlying queue
20    queue: ConcurrentQueue<M>,
21    /// The capacity of the channel
22    capacity: Capacity,
23    /// The amount of addresses associated to this channel.
24    /// Once this is 0, not more addresses can be created and the Channel is closed.
25    address_count: AtomicUsize,
26    /// The amount of inboxes associated to this channel.
27    /// Once this is 0, it is impossible to spawn new processes, and the Channel.
28    /// has exited.
29    inbox_count: AtomicUsize,
30    /// Subscribe when trying to receive a message from this channel.
31    recv_event: Event,
32    /// Subscribe when trying to send a message into this channel.
33    send_event: Event,
34    /// Subscribe when waiting for Actor to exit.
35    exit_event: Event,
36    /// The amount of processes that should still be halted.
37    /// Can be negative bigger than amount of processes in total.
38    halt_count: AtomicI32,
39    /// The actor_id, generated once and cannot be changed afterwards.
40    actor_id: u64,
41}
42
43impl<M> Channel<M> {
44    /// Create a new channel, given an address count, inbox_count and capacity.
45    ///
46    /// After this, it must be ensured that the correct amount of inboxes and addresses actually exist.
47    pub(crate) fn new(address_count: usize, inbox_count: usize, capacity: Capacity) -> Self {
48        Self {
49            queue: match &capacity {
50                Capacity::Bounded(size) => ConcurrentQueue::bounded(size.to_owned()),
51                Capacity::Unbounded(_) => ConcurrentQueue::unbounded(),
52            },
53            capacity,
54            address_count: AtomicUsize::new(address_count),
55            inbox_count: AtomicUsize::new(inbox_count),
56            recv_event: Event::new(),
57            send_event: Event::new(),
58            exit_event: Event::new(),
59            halt_count: AtomicI32::new(0),
60            actor_id: next_actor_id(),
61        }
62    }
63
64    /// Sets the inbox-count
65    pub(crate) fn set_inbox_count(&self, count: usize) {
66        self.inbox_count.store(count, Ordering::Release)
67    }
68
69    /// Try to add an inbox to the channel, incrementing inbox-count by 1. Afterwards,
70    /// a new Inbox may be created from this channel.
71    ///
72    /// Returns the old inbox-count
73    ///
74    /// Whereas `add_inbox()` also adds an `Inbox` if `inbox-count == 0` this method returns an error instead.
75    /// This method is slower than add_inbox, since it uses `fetch-update` on the inbox-count.
76    pub(crate) fn try_add_inbox(&self) -> Result<usize, ()> {
77        let result = self
78            .inbox_count
79            .fetch_update(Ordering::AcqRel, Ordering::Acquire, |val| {
80                if val < 1 {
81                    None
82                } else {
83                    Some(val + 1)
84                }
85            });
86
87        match result {
88            Ok(prev) => Ok(prev),
89            Err(_) => Err(()),
90        }
91    }
92
93    /// Remove an Inbox from the channel, decrementing inbox-count by 1. This should be
94    /// called during the Inbox's destructor.
95    ///
96    /// If there are no more Inboxes remaining, this will close the channel and set
97    /// `inboxes_dropped` to true. This will also drop any messages still inside the
98    /// channel.
99    ///
100    /// Returns the previous inbox-count.
101    ///
102    /// ## Notifies
103    /// * `prev-inbox-count == 1` -> all exit-listeners
104    ///
105    /// ## Panics
106    /// * `prev-inbox-count == 0`
107    pub(crate) fn remove_inbox(&self) -> usize {
108        // Subtract one from the inbox count
109        let prev_count = self.inbox_count.fetch_sub(1, Ordering::AcqRel);
110        assert!(prev_count != 0);
111
112        // If previous count was 1, then all inboxes have been dropped.
113        if prev_count == 1 {
114            self.close();
115            // Also notify the exit-listeners, since the process exited.
116            self.exit_event.notify(usize::MAX);
117            // drop all messages, since no more inboxes exist.
118            while self.pop_msg().is_ok() {}
119        }
120
121        prev_count
122    }
123
124    /// Takes the next message out of the channel.
125    ///
126    /// Returns an error if the queue is closed, returns none if there is no message
127    /// in the queue.
128    ///
129    /// ## Notifies
130    /// on success -> 1 send_listener & 1 recv_listener
131    pub(crate) fn pop_msg(&self) -> Result<M, PopError> {
132        self.queue.pop().map(|msg| {
133            self.send_event.notify(1);
134            self.recv_event.notify(1);
135            msg
136        })
137    }
138
139    /// Push a message into the channel.
140    ///
141    /// Can fail either because the queue is full, or because it is closed.
142    ///
143    /// ## Notifies
144    /// on success -> 1 recv_listener
145    pub(crate) fn push_msg(&self, msg: M) -> Result<(), PushError<M>> {
146        match self.queue.push(msg) {
147            Ok(()) => {
148                self.recv_event.notify(1);
149                Ok(())
150            }
151            Err(e) => Err(e),
152        }
153    }
154
155    /// Can be called by an inbox to know whether it should halt.
156    ///
157    /// This decrements the halt-counter by one when it is called, therefore every
158    /// inbox should only receive true from this method once! The inbox keeps it's own
159    /// local state about whether it has received true from this method.
160    pub(crate) fn inbox_should_halt(&self) -> bool {
161        // If the count is bigger than 0, we might have to halt.
162        if self.halt_count.load(Ordering::Acquire) > 0 {
163            // Now subtract 1 from the count
164            let prev_count = self.halt_count.fetch_sub(1, Ordering::AcqRel);
165            // If the count before updating was bigger than 0, we halt.
166            // If this decrements below 0, we treat it as if it's 0.
167            if prev_count > 0 {
168                return true;
169            }
170        }
171
172        // Otherwise, just continue
173        false
174    }
175
176    /// Get a new recv-event listener
177    pub(crate) fn get_recv_listener(&self) -> EventListener {
178        self.recv_event.listen()
179    }
180
181    /// Get a new send-event listener
182    pub(crate) fn get_send_listener(&self) -> EventListener {
183        self.send_event.listen()
184    }
185
186    /// Get a new exit-event listener
187    pub(crate) fn get_exit_listener(&self) -> EventListener {
188        self.exit_event.listen()
189    }
190}
191
192impl<M> Debug for Channel<M> {
193    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
194        f.debug_struct("Channel")
195            .field("queue", &self.queue)
196            .field("capacity", &self.capacity)
197            .field("address_count", &self.address_count)
198            .field("inbox_count", &self.inbox_count)
199            .field("halt_count", &self.halt_count)
200            .finish()
201    }
202}
203
204fn next_actor_id() -> u64 {
205    static ACTOR_ID_COUNTER: AtomicU64 = AtomicU64::new(0);
206    ACTOR_ID_COUNTER.fetch_add(1, Ordering::Relaxed)
207}
208
209#[cfg(test)]
210mod test {
211    use std::{
212        sync::{atomic::Ordering, Arc},
213        time::Duration,
214    };
215
216    use super::{next_actor_id, Channel};
217    use crate::*;
218    use concurrent_queue::{PopError, PushError};
219    use event_listener::EventListener;
220    use futures::FutureExt;
221
222    #[test]
223    fn actor_ids_increase() {
224        let mut old_id = next_actor_id();
225        for _ in 0..100 {
226            let id = next_actor_id();
227            assert!(id > old_id);
228            old_id = id;
229        }
230    }
231
232    #[test]
233    fn channels_have_actor_ids() {
234        let id1 = Channel::<()>::new(1, 1, Capacity::Bounded(10)).actor_id();
235        let id2 = Channel::<()>::new(1, 1, Capacity::Bounded(10)).actor_id();
236        assert!(id1 < id2);
237    }
238
239    #[test]
240    fn capacity_types_are_correct() {
241        let channel = Channel::<()>::new(1, 1, Capacity::Bounded(10));
242        assert!(channel.queue.capacity().is_some());
243        assert!(channel.is_bounded());
244        let channel = Channel::<()>::new(1, 1, Capacity::Unbounded(BackPressure::default()));
245        assert!(channel.queue.capacity().is_none());
246        assert!(!channel.is_bounded());
247    }
248
249    #[test]
250    fn adding_removing_addresses() {
251        let channel = Channel::<()>::new(1, 1, Capacity::default());
252        assert_eq!(channel.address_count(), 1);
253        channel.add_address();
254        assert_eq!(channel.address_count(), 2);
255        channel.remove_address();
256        assert_eq!(channel.address_count(), 1);
257        channel.remove_address();
258        assert_eq!(channel.address_count(), 0);
259    }
260
261    #[test]
262    #[should_panic]
263    fn remove_address_below_0() {
264        let channel = Channel::<()>::new(0, 1, Capacity::default());
265        channel.remove_address();
266    }
267
268    #[test]
269    fn adding_removing_inboxes() {
270        let channel = Channel::<()>::new(1, 1, Capacity::default());
271        assert_eq!(channel.process_count(), 1);
272        channel.try_add_inbox().unwrap();
273        assert_eq!(channel.process_count(), 2);
274        channel.remove_inbox();
275        assert_eq!(channel.process_count(), 1);
276        channel.remove_inbox();
277        assert_eq!(channel.process_count(), 0);
278    }
279
280    #[test]
281    #[should_panic]
282    fn remove_inbox_below_0() {
283        let channel = Channel::<()>::new(1, 0, Capacity::default());
284        channel.remove_inbox();
285    }
286
287    #[test]
288    fn closing() {
289        let channel = Channel::<()>::new(1, 1, Capacity::default());
290        let listeners = Listeners::size_10(&channel);
291
292        channel.close();
293
294        assert!(channel.is_closed());
295        assert!(!channel.has_exited());
296        assert_eq!(channel.push_msg(()), Err(PushError::Closed(())));
297        assert_eq!(channel.pop_msg(), Err(PopError::Closed));
298        listeners.assert_notified(Assert {
299            recv: 10,
300            exit: 0,
301            send: 10,
302        });
303    }
304
305    #[test]
306    fn exiting() {
307        let channel = Channel::<()>::new(1, 1, Capacity::default());
308        let listeners = Listeners::size_10(&channel);
309
310        channel.remove_inbox();
311
312        assert!(channel.is_closed());
313        assert!(channel.has_exited());
314        assert_eq!(channel.process_count(), 0);
315        assert_eq!(channel.address_count(), 1);
316        assert_eq!(channel.push_msg(()), Err(PushError::Closed(())));
317        assert_eq!(channel.pop_msg(), Err(PopError::Closed));
318        listeners.assert_notified(Assert {
319            recv: 10,
320            exit: 10,
321            send: 10,
322        });
323    }
324
325    #[test]
326    fn removing_all_addresses() {
327        let channel = Channel::<()>::new(1, 1, Capacity::default());
328        let listeners = Listeners::size_10(&channel);
329
330        channel.remove_address();
331
332        assert!(!channel.is_closed());
333        assert!(!channel.has_exited());
334        assert_eq!(channel.address_count(), 0);
335        assert_eq!(channel.process_count(), 1);
336        assert_eq!(channel.push_msg(()), Ok(()));
337        listeners.assert_notified(Assert {
338            recv: 1,
339            exit: 0,
340            send: 0,
341        });
342    }
343
344    #[test]
345    fn exiting_drops_all_messages() {
346        let msg = Arc::new(());
347
348        let channel = Channel::new(1, 1, Capacity::Bounded(10));
349        channel.send_now(msg.clone()).unwrap();
350
351        assert_eq!(Arc::strong_count(&msg), 2);
352        channel.remove_inbox();
353        assert_eq!(Arc::strong_count(&msg), 1);
354    }
355
356    #[test]
357    fn closing_doesnt_drop_messages() {
358        let channel = Channel::<Arc<()>>::new(1, 1, Capacity::default());
359        let msg = Arc::new(());
360        channel.push_msg(msg.clone()).unwrap();
361        assert_eq!(Arc::strong_count(&msg), 2);
362        channel.close();
363        assert_eq!(Arc::strong_count(&msg), 2);
364    }
365
366    #[tokio::test]
367    async fn immedeate_halt() {
368        for i in 0..100 {
369            let (_child, address) = spawn(Config::default(), basic_actor!());
370            spin_sleep::sleep(Duration::from_nanos(i));
371            address.halt();
372            address.await;
373        }
374    }
375
376    #[test]
377    fn add_inbox_with_0_inboxes_is_err() {
378        let channel = Channel::<Arc<()>>::new(1, 1, Capacity::default());
379        channel.remove_inbox();
380        assert_eq!(channel.try_add_inbox(), Err(()));
381        assert_eq!(channel.process_count(), 0);
382    }
383
384    #[test]
385    fn add_inbox_with_0_addresses_is_ok() {
386        let channel = Channel::<Arc<()>>::new(1, 1, Capacity::default());
387        channel.remove_inbox();
388        assert!(matches!(channel.try_add_inbox(), Err(_)));
389        assert_eq!(channel.process_count(), 0);
390    }
391
392    #[test]
393    fn push_msg() {
394        let channel = Channel::<()>::new(1, 1, Capacity::default());
395        let listeners = Listeners::size_10(&channel);
396
397        channel.push_msg(()).unwrap();
398
399        assert_eq!(channel.msg_count(), 1);
400        listeners.assert_notified(Assert {
401            recv: 1,
402            exit: 0,
403            send: 0,
404        });
405    }
406
407    #[test]
408    fn pop_msg() {
409        let channel = Channel::<()>::new(1, 1, Capacity::default());
410        channel.push_msg(()).unwrap();
411        let listeners = Listeners::size_10(&channel);
412
413        channel.pop_msg().unwrap();
414        assert_eq!(channel.msg_count(), 0);
415        listeners.assert_notified(Assert {
416            recv: 1,
417            exit: 0,
418            send: 1,
419        });
420    }
421
422    #[test]
423    fn halt() {
424        let channel = Channel::<()>::new(1, 3, Capacity::default());
425        let listeners = Listeners::size_10(&channel);
426
427        channel.halt();
428
429        assert_eq!(channel.halt_count.load(Ordering::Acquire), i32::MAX);
430        listeners.assert_notified(Assert {
431            recv: 10,
432            exit: 0,
433            send: 10,
434        });
435    }
436
437    #[test]
438    fn halt_closes_channel() {
439        let channel = Channel::<()>::new(1, 3, Capacity::default());
440        channel.halt();
441        assert!(channel.is_closed());
442    }
443
444    #[test]
445    fn partial_halt() {
446        let channel = Channel::<()>::new(1, 3, Capacity::default());
447        let listeners = Listeners::size_10(&channel);
448
449        channel.halt_some(2);
450
451        assert_eq!(channel.halt_count.load(Ordering::Acquire), 2);
452        listeners.assert_notified(Assert {
453            recv: 10,
454            exit: 0,
455            send: 0,
456        });
457    }
458
459    #[test]
460    fn inbox_should_halt() {
461        let channel = Channel::<()>::new(1, 3, Capacity::default());
462        channel.halt_some(2);
463
464        assert!(channel.inbox_should_halt());
465        assert!(channel.inbox_should_halt());
466        assert!(!channel.inbox_should_halt());
467    }
468
469    struct Listeners {
470        recv: Vec<EventListener>,
471        exit: Vec<EventListener>,
472        send: Vec<EventListener>,
473    }
474
475    struct Assert {
476        recv: usize,
477        exit: usize,
478        send: usize,
479    }
480
481    impl Listeners {
482        fn size_10<T>(channel: &Channel<T>) -> Self {
483            Self {
484                recv: (0..10)
485                    .into_iter()
486                    .map(|_| channel.get_recv_listener())
487                    .collect(),
488                exit: (0..10)
489                    .into_iter()
490                    .map(|_| channel.get_exit_listener())
491                    .collect(),
492                send: (0..10)
493                    .into_iter()
494                    .map(|_| channel.get_send_listener())
495                    .collect(),
496            }
497        }
498
499        fn assert_notified(self, assert: Assert) {
500            let recv = self
501                .recv
502                .into_iter()
503                .map(|l| l.now_or_never().is_some())
504                .filter(|bool| *bool)
505                .collect::<Vec<_>>()
506                .len();
507            let exit = self
508                .exit
509                .into_iter()
510                .map(|l| l.now_or_never().is_some())
511                .filter(|bool| *bool)
512                .collect::<Vec<_>>()
513                .len();
514            let send = self
515                .send
516                .into_iter()
517                .map(|l| l.now_or_never().is_some())
518                .filter(|bool| *bool)
519                .collect::<Vec<_>>()
520                .len();
521
522            assert_eq!(assert.recv, recv);
523            assert_eq!(assert.exit, exit);
524            assert_eq!(assert.send, send);
525        }
526    }
527}