1use crate::*;
5use concurrent_queue::{ConcurrentQueue, PopError, PushError};
6use event_listener::{Event, EventListener};
7use std::{
8 fmt::Debug,
9 sync::atomic::{AtomicI32, AtomicU64, AtomicUsize, Ordering},
10};
11
12mod channel_trait;
13mod receiving;
14mod sending;
15pub use {channel_trait::*, receiving::*, sending::*};
16
17pub struct Channel<M> {
19 queue: ConcurrentQueue<M>,
21 capacity: Capacity,
23 address_count: AtomicUsize,
26 inbox_count: AtomicUsize,
30 recv_event: Event,
32 send_event: Event,
34 exit_event: Event,
36 halt_count: AtomicI32,
39 actor_id: u64,
41}
42
43impl<M> Channel<M> {
44 pub(crate) fn new(address_count: usize, inbox_count: usize, capacity: Capacity) -> Self {
48 Self {
49 queue: match &capacity {
50 Capacity::Bounded(size) => ConcurrentQueue::bounded(size.to_owned()),
51 Capacity::Unbounded(_) => ConcurrentQueue::unbounded(),
52 },
53 capacity,
54 address_count: AtomicUsize::new(address_count),
55 inbox_count: AtomicUsize::new(inbox_count),
56 recv_event: Event::new(),
57 send_event: Event::new(),
58 exit_event: Event::new(),
59 halt_count: AtomicI32::new(0),
60 actor_id: next_actor_id(),
61 }
62 }
63
64 pub(crate) fn set_inbox_count(&self, count: usize) {
66 self.inbox_count.store(count, Ordering::Release)
67 }
68
69 pub(crate) fn try_add_inbox(&self) -> Result<usize, ()> {
77 let result = self
78 .inbox_count
79 .fetch_update(Ordering::AcqRel, Ordering::Acquire, |val| {
80 if val < 1 {
81 None
82 } else {
83 Some(val + 1)
84 }
85 });
86
87 match result {
88 Ok(prev) => Ok(prev),
89 Err(_) => Err(()),
90 }
91 }
92
93 pub(crate) fn remove_inbox(&self) -> usize {
108 let prev_count = self.inbox_count.fetch_sub(1, Ordering::AcqRel);
110 assert!(prev_count != 0);
111
112 if prev_count == 1 {
114 self.close();
115 self.exit_event.notify(usize::MAX);
117 while self.pop_msg().is_ok() {}
119 }
120
121 prev_count
122 }
123
124 pub(crate) fn pop_msg(&self) -> Result<M, PopError> {
132 self.queue.pop().map(|msg| {
133 self.send_event.notify(1);
134 self.recv_event.notify(1);
135 msg
136 })
137 }
138
139 pub(crate) fn push_msg(&self, msg: M) -> Result<(), PushError<M>> {
146 match self.queue.push(msg) {
147 Ok(()) => {
148 self.recv_event.notify(1);
149 Ok(())
150 }
151 Err(e) => Err(e),
152 }
153 }
154
155 pub(crate) fn inbox_should_halt(&self) -> bool {
161 if self.halt_count.load(Ordering::Acquire) > 0 {
163 let prev_count = self.halt_count.fetch_sub(1, Ordering::AcqRel);
165 if prev_count > 0 {
168 return true;
169 }
170 }
171
172 false
174 }
175
176 pub(crate) fn get_recv_listener(&self) -> EventListener {
178 self.recv_event.listen()
179 }
180
181 pub(crate) fn get_send_listener(&self) -> EventListener {
183 self.send_event.listen()
184 }
185
186 pub(crate) fn get_exit_listener(&self) -> EventListener {
188 self.exit_event.listen()
189 }
190}
191
192impl<M> Debug for Channel<M> {
193 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
194 f.debug_struct("Channel")
195 .field("queue", &self.queue)
196 .field("capacity", &self.capacity)
197 .field("address_count", &self.address_count)
198 .field("inbox_count", &self.inbox_count)
199 .field("halt_count", &self.halt_count)
200 .finish()
201 }
202}
203
204fn next_actor_id() -> u64 {
205 static ACTOR_ID_COUNTER: AtomicU64 = AtomicU64::new(0);
206 ACTOR_ID_COUNTER.fetch_add(1, Ordering::Relaxed)
207}
208
209#[cfg(test)]
210mod test {
211 use std::{
212 sync::{atomic::Ordering, Arc},
213 time::Duration,
214 };
215
216 use super::{next_actor_id, Channel};
217 use crate::*;
218 use concurrent_queue::{PopError, PushError};
219 use event_listener::EventListener;
220 use futures::FutureExt;
221
222 #[test]
223 fn actor_ids_increase() {
224 let mut old_id = next_actor_id();
225 for _ in 0..100 {
226 let id = next_actor_id();
227 assert!(id > old_id);
228 old_id = id;
229 }
230 }
231
232 #[test]
233 fn channels_have_actor_ids() {
234 let id1 = Channel::<()>::new(1, 1, Capacity::Bounded(10)).actor_id();
235 let id2 = Channel::<()>::new(1, 1, Capacity::Bounded(10)).actor_id();
236 assert!(id1 < id2);
237 }
238
239 #[test]
240 fn capacity_types_are_correct() {
241 let channel = Channel::<()>::new(1, 1, Capacity::Bounded(10));
242 assert!(channel.queue.capacity().is_some());
243 assert!(channel.is_bounded());
244 let channel = Channel::<()>::new(1, 1, Capacity::Unbounded(BackPressure::default()));
245 assert!(channel.queue.capacity().is_none());
246 assert!(!channel.is_bounded());
247 }
248
249 #[test]
250 fn adding_removing_addresses() {
251 let channel = Channel::<()>::new(1, 1, Capacity::default());
252 assert_eq!(channel.address_count(), 1);
253 channel.add_address();
254 assert_eq!(channel.address_count(), 2);
255 channel.remove_address();
256 assert_eq!(channel.address_count(), 1);
257 channel.remove_address();
258 assert_eq!(channel.address_count(), 0);
259 }
260
261 #[test]
262 #[should_panic]
263 fn remove_address_below_0() {
264 let channel = Channel::<()>::new(0, 1, Capacity::default());
265 channel.remove_address();
266 }
267
268 #[test]
269 fn adding_removing_inboxes() {
270 let channel = Channel::<()>::new(1, 1, Capacity::default());
271 assert_eq!(channel.process_count(), 1);
272 channel.try_add_inbox().unwrap();
273 assert_eq!(channel.process_count(), 2);
274 channel.remove_inbox();
275 assert_eq!(channel.process_count(), 1);
276 channel.remove_inbox();
277 assert_eq!(channel.process_count(), 0);
278 }
279
280 #[test]
281 #[should_panic]
282 fn remove_inbox_below_0() {
283 let channel = Channel::<()>::new(1, 0, Capacity::default());
284 channel.remove_inbox();
285 }
286
287 #[test]
288 fn closing() {
289 let channel = Channel::<()>::new(1, 1, Capacity::default());
290 let listeners = Listeners::size_10(&channel);
291
292 channel.close();
293
294 assert!(channel.is_closed());
295 assert!(!channel.has_exited());
296 assert_eq!(channel.push_msg(()), Err(PushError::Closed(())));
297 assert_eq!(channel.pop_msg(), Err(PopError::Closed));
298 listeners.assert_notified(Assert {
299 recv: 10,
300 exit: 0,
301 send: 10,
302 });
303 }
304
305 #[test]
306 fn exiting() {
307 let channel = Channel::<()>::new(1, 1, Capacity::default());
308 let listeners = Listeners::size_10(&channel);
309
310 channel.remove_inbox();
311
312 assert!(channel.is_closed());
313 assert!(channel.has_exited());
314 assert_eq!(channel.process_count(), 0);
315 assert_eq!(channel.address_count(), 1);
316 assert_eq!(channel.push_msg(()), Err(PushError::Closed(())));
317 assert_eq!(channel.pop_msg(), Err(PopError::Closed));
318 listeners.assert_notified(Assert {
319 recv: 10,
320 exit: 10,
321 send: 10,
322 });
323 }
324
325 #[test]
326 fn removing_all_addresses() {
327 let channel = Channel::<()>::new(1, 1, Capacity::default());
328 let listeners = Listeners::size_10(&channel);
329
330 channel.remove_address();
331
332 assert!(!channel.is_closed());
333 assert!(!channel.has_exited());
334 assert_eq!(channel.address_count(), 0);
335 assert_eq!(channel.process_count(), 1);
336 assert_eq!(channel.push_msg(()), Ok(()));
337 listeners.assert_notified(Assert {
338 recv: 1,
339 exit: 0,
340 send: 0,
341 });
342 }
343
344 #[test]
345 fn exiting_drops_all_messages() {
346 let msg = Arc::new(());
347
348 let channel = Channel::new(1, 1, Capacity::Bounded(10));
349 channel.send_now(msg.clone()).unwrap();
350
351 assert_eq!(Arc::strong_count(&msg), 2);
352 channel.remove_inbox();
353 assert_eq!(Arc::strong_count(&msg), 1);
354 }
355
356 #[test]
357 fn closing_doesnt_drop_messages() {
358 let channel = Channel::<Arc<()>>::new(1, 1, Capacity::default());
359 let msg = Arc::new(());
360 channel.push_msg(msg.clone()).unwrap();
361 assert_eq!(Arc::strong_count(&msg), 2);
362 channel.close();
363 assert_eq!(Arc::strong_count(&msg), 2);
364 }
365
366 #[tokio::test]
367 async fn immedeate_halt() {
368 for i in 0..100 {
369 let (_child, address) = spawn(Config::default(), basic_actor!());
370 spin_sleep::sleep(Duration::from_nanos(i));
371 address.halt();
372 address.await;
373 }
374 }
375
376 #[test]
377 fn add_inbox_with_0_inboxes_is_err() {
378 let channel = Channel::<Arc<()>>::new(1, 1, Capacity::default());
379 channel.remove_inbox();
380 assert_eq!(channel.try_add_inbox(), Err(()));
381 assert_eq!(channel.process_count(), 0);
382 }
383
384 #[test]
385 fn add_inbox_with_0_addresses_is_ok() {
386 let channel = Channel::<Arc<()>>::new(1, 1, Capacity::default());
387 channel.remove_inbox();
388 assert!(matches!(channel.try_add_inbox(), Err(_)));
389 assert_eq!(channel.process_count(), 0);
390 }
391
392 #[test]
393 fn push_msg() {
394 let channel = Channel::<()>::new(1, 1, Capacity::default());
395 let listeners = Listeners::size_10(&channel);
396
397 channel.push_msg(()).unwrap();
398
399 assert_eq!(channel.msg_count(), 1);
400 listeners.assert_notified(Assert {
401 recv: 1,
402 exit: 0,
403 send: 0,
404 });
405 }
406
407 #[test]
408 fn pop_msg() {
409 let channel = Channel::<()>::new(1, 1, Capacity::default());
410 channel.push_msg(()).unwrap();
411 let listeners = Listeners::size_10(&channel);
412
413 channel.pop_msg().unwrap();
414 assert_eq!(channel.msg_count(), 0);
415 listeners.assert_notified(Assert {
416 recv: 1,
417 exit: 0,
418 send: 1,
419 });
420 }
421
422 #[test]
423 fn halt() {
424 let channel = Channel::<()>::new(1, 3, Capacity::default());
425 let listeners = Listeners::size_10(&channel);
426
427 channel.halt();
428
429 assert_eq!(channel.halt_count.load(Ordering::Acquire), i32::MAX);
430 listeners.assert_notified(Assert {
431 recv: 10,
432 exit: 0,
433 send: 10,
434 });
435 }
436
437 #[test]
438 fn halt_closes_channel() {
439 let channel = Channel::<()>::new(1, 3, Capacity::default());
440 channel.halt();
441 assert!(channel.is_closed());
442 }
443
444 #[test]
445 fn partial_halt() {
446 let channel = Channel::<()>::new(1, 3, Capacity::default());
447 let listeners = Listeners::size_10(&channel);
448
449 channel.halt_some(2);
450
451 assert_eq!(channel.halt_count.load(Ordering::Acquire), 2);
452 listeners.assert_notified(Assert {
453 recv: 10,
454 exit: 0,
455 send: 0,
456 });
457 }
458
459 #[test]
460 fn inbox_should_halt() {
461 let channel = Channel::<()>::new(1, 3, Capacity::default());
462 channel.halt_some(2);
463
464 assert!(channel.inbox_should_halt());
465 assert!(channel.inbox_should_halt());
466 assert!(!channel.inbox_should_halt());
467 }
468
469 struct Listeners {
470 recv: Vec<EventListener>,
471 exit: Vec<EventListener>,
472 send: Vec<EventListener>,
473 }
474
475 struct Assert {
476 recv: usize,
477 exit: usize,
478 send: usize,
479 }
480
481 impl Listeners {
482 fn size_10<T>(channel: &Channel<T>) -> Self {
483 Self {
484 recv: (0..10)
485 .into_iter()
486 .map(|_| channel.get_recv_listener())
487 .collect(),
488 exit: (0..10)
489 .into_iter()
490 .map(|_| channel.get_exit_listener())
491 .collect(),
492 send: (0..10)
493 .into_iter()
494 .map(|_| channel.get_send_listener())
495 .collect(),
496 }
497 }
498
499 fn assert_notified(self, assert: Assert) {
500 let recv = self
501 .recv
502 .into_iter()
503 .map(|l| l.now_or_never().is_some())
504 .filter(|bool| *bool)
505 .collect::<Vec<_>>()
506 .len();
507 let exit = self
508 .exit
509 .into_iter()
510 .map(|l| l.now_or_never().is_some())
511 .filter(|bool| *bool)
512 .collect::<Vec<_>>()
513 .len();
514 let send = self
515 .send
516 .into_iter()
517 .map(|l| l.now_or_never().is_some())
518 .filter(|bool| *bool)
519 .collect::<Vec<_>>()
520 .len();
521
522 assert_eq!(assert.recv, recv);
523 assert_eq!(assert.exit, exit);
524 assert_eq!(assert.send, send);
525 }
526 }
527}