tiny_actor/channel/
channel_trait.rs1use event_listener::EventListener;
2use std::{
3 any::Any,
4 fmt::Debug,
5 sync::{atomic::Ordering, Arc},
6};
7
8use crate::*;
9
10pub trait DynChannel {
13 fn close(&self) -> bool;
14 fn halt_some(&self, n: u32);
15 fn halt(&self);
16 fn process_count(&self) -> usize;
17 fn msg_count(&self) -> usize;
18 fn address_count(&self) -> usize;
19 fn is_closed(&self) -> bool;
20 fn capacity(&self) -> &Capacity;
21 fn has_exited(&self) -> bool;
22 fn add_address(&self) -> usize;
23 fn remove_address(&self) -> usize;
24 fn get_exit_listener(&self) -> EventListener;
25 fn actor_id(&self) -> u64;
26 fn is_bounded(&self) -> bool;
27}
28
29pub trait AnyChannel: DynChannel + Debug + Send + Sync + 'static {
30 fn into_any(self: Arc<Self>) -> Arc<dyn Any + Send + Sync>;
31}
32
33impl<M: Send + 'static> AnyChannel for Channel<M> {
34 fn into_any(self: Arc<Self>) -> Arc<dyn Any + Send + Sync> {
35 self
36 }
37}
38
39impl<M> DynChannel for Channel<M> {
40 fn close(&self) -> bool {
46 if self.queue.close() {
47 self.recv_event.notify(usize::MAX);
48 self.send_event.notify(usize::MAX);
49 true
50 } else {
51 false
52 }
53 }
54
55 fn halt_some(&self, n: u32) {
62 let n = i32::try_from(n).unwrap_or(i32::MAX);
63
64 self.halt_count
65 .fetch_update(Ordering::AcqRel, Ordering::Acquire, |count| {
66 if count < 0 {
68 Some(n)
69 } else {
70 Some(count.saturating_add(n))
72 }
73 })
74 .unwrap();
75
76 self.recv_event.notify(usize::MAX);
77 }
78
79 fn process_count(&self) -> usize {
81 self.inbox_count.load(Ordering::Acquire)
82 }
83
84 fn msg_count(&self) -> usize {
86 self.queue.len()
87 }
88
89 fn address_count(&self) -> usize {
91 self.address_count.load(Ordering::Acquire)
92 }
93
94 fn is_closed(&self) -> bool {
96 self.queue.is_closed()
97 }
98
99 fn capacity(&self) -> &Capacity {
101 &self.capacity
102 }
103
104 fn has_exited(&self) -> bool {
106 self.inbox_count.load(Ordering::Acquire) == 0
107 }
108
109 fn add_address(&self) -> usize {
114 self.address_count.fetch_add(1, Ordering::AcqRel)
115 }
116
117 fn remove_address(&self) -> usize {
126 let prev_address_count = self.address_count.fetch_sub(1, Ordering::AcqRel);
128 assert!(prev_address_count >= 1);
129 prev_address_count
130 }
131
132 fn get_exit_listener(&self) -> EventListener {
133 self.get_exit_listener()
134 }
135
136 fn actor_id(&self) -> u64 {
138 self.actor_id
139 }
140
141 fn is_bounded(&self) -> bool {
143 self.capacity.is_bounded()
144 }
145
146 fn halt(&self) {
148 self.close();
149 self.halt_some(u32::MAX);
150 }
151}