tiny_actor/channel/
channel_trait.rs

1use event_listener::EventListener;
2use std::{
3    any::Any,
4    fmt::Debug,
5    sync::{atomic::Ordering, Arc},
6};
7
8use crate::*;
9
10/// A [Channel]-trait, without information about it's message type. Therefore, it's impossible
11/// to send or receive messages through this.
12pub trait DynChannel {
13    fn close(&self) -> bool;
14    fn halt_some(&self, n: u32);
15    fn halt(&self);
16    fn process_count(&self) -> usize;
17    fn msg_count(&self) -> usize;
18    fn address_count(&self) -> usize;
19    fn is_closed(&self) -> bool;
20    fn capacity(&self) -> &Capacity;
21    fn has_exited(&self) -> bool;
22    fn add_address(&self) -> usize;
23    fn remove_address(&self) -> usize;
24    fn get_exit_listener(&self) -> EventListener;
25    fn actor_id(&self) -> u64;
26    fn is_bounded(&self) -> bool;
27}
28
29pub trait AnyChannel: DynChannel + Debug + Send + Sync + 'static {
30    fn into_any(self: Arc<Self>) -> Arc<dyn Any + Send + Sync>;
31}
32
33impl<M: Send + 'static> AnyChannel for Channel<M> {
34    fn into_any(self: Arc<Self>) -> Arc<dyn Any + Send + Sync> {
35        self
36    }
37}
38
39impl<M> DynChannel for Channel<M> {
40    /// Close the channel. Returns `true` if the channel was not closed before this.
41    /// Otherwise, this returns `false`.
42    ///
43    /// ## Notifies
44    /// * if `true` -> all send_listeners & recv_listeners
45    fn close(&self) -> bool {
46        if self.queue.close() {
47            self.recv_event.notify(usize::MAX);
48            self.send_event.notify(usize::MAX);
49            true
50        } else {
51            false
52        }
53    }
54
55    /// Halt n inboxes associated with this channel. If `n >= #inboxes`, all inboxes
56    /// will be halted. This might leave `halt-count > inbox-count`, however that's not
57    /// a problem. If n > i32::MAX, n = i32::MAX.
58    ///
59    /// # Notifies
60    /// * all recv-listeners
61    fn halt_some(&self, n: u32) {
62        let n = i32::try_from(n).unwrap_or(i32::MAX);
63
64        self.halt_count
65            .fetch_update(Ordering::AcqRel, Ordering::Acquire, |count| {
66                // If the count < 0, act as if it's 0.
67                if count < 0 {
68                    Some(n)
69                } else {
70                    // Otherwise, add both together.
71                    Some(count.saturating_add(n))
72                }
73            })
74            .unwrap();
75
76        self.recv_event.notify(usize::MAX);
77    }
78
79    /// Returns the amount of inboxes this channel has.
80    fn process_count(&self) -> usize {
81        self.inbox_count.load(Ordering::Acquire)
82    }
83
84    /// Returns the amount of messages currently in the channel.
85    fn msg_count(&self) -> usize {
86        self.queue.len()
87    }
88
89    /// Returns the amount of addresses this channel has.
90    fn address_count(&self) -> usize {
91        self.address_count.load(Ordering::Acquire)
92    }
93
94    /// Whether the queue asscociated to the channel has been closed.
95    fn is_closed(&self) -> bool {
96        self.queue.is_closed()
97    }
98
99    /// Capacity of the inbox.
100    fn capacity(&self) -> &Capacity {
101        &self.capacity
102    }
103
104    /// Whether all inboxes linked to this channel have exited.
105    fn has_exited(&self) -> bool {
106        self.inbox_count.load(Ordering::Acquire) == 0
107    }
108
109    /// Add an Address to the channel, incrementing address-count by 1. Afterwards,
110    /// a new Address may be created from this channel.
111    ///
112    /// Returns the previous inbox-count
113    fn add_address(&self) -> usize {
114        self.address_count.fetch_add(1, Ordering::AcqRel)
115    }
116
117    /// Remove an Address from the channel, decrementing address-count by 1. This should
118    /// be called from the destructor of the Address.
119    ///
120    /// ## Notifies
121    /// * `prev-address-count == 1` -> all send_listeners & recv_listeners
122    ///
123    /// ## Panics
124    /// * `prev-address-count == 0`
125    fn remove_address(&self) -> usize {
126        // Subtract one from the inbox count
127        let prev_address_count = self.address_count.fetch_sub(1, Ordering::AcqRel);
128        assert!(prev_address_count >= 1);
129        prev_address_count
130    }
131
132    fn get_exit_listener(&self) -> EventListener {
133        self.get_exit_listener()
134    }
135
136    /// Get the actor_id.
137    fn actor_id(&self) -> u64 {
138        self.actor_id
139    }
140
141    /// Whether the channel is bounded.
142    fn is_bounded(&self) -> bool {
143        self.capacity.is_bounded()
144    }
145
146    // Closes the channel and halts all actors.
147    fn halt(&self) {
148        self.close();
149        self.halt_some(u32::MAX);
150    }
151}