1mod actor;
2mod future;
3mod message;
4
5pub use actor::*;
6pub use future::*;
7pub use message::*;
8
9#[cfg(test)]
10mod tests {
11 use crate::actor::Actor;
12 use crate::ActorError;
13 use std::sync::Arc;
14 use std::task::{Context, Poll};
15 use thiserror::Error;
16 use tokio::sync::{oneshot, Mutex};
17
18 use tower::Service;
19
20 #[derive(Error, Debug, PartialEq, Eq)]
21 enum TestError {
22 #[error("Poll Error")]
23 ActorTerminated,
24 #[error("Poll Sender error")]
25 ActorHungUp,
26 }
27
28 impl From<ActorError> for TestError {
29 fn from(value: ActorError) -> Self {
30 match value {
31 ActorError::ActorTerminated => TestError::ActorTerminated,
32 ActorError::ActorHungUp => TestError::ActorHungUp,
33 }
34 }
35 }
36
37 #[test]
38 #[should_panic]
39 fn follows_service_contract() {
40 let mut actor = Actor::<(), (), TestError>::new(10, |_| async move { Ok(()) });
41 actor.call(());
42 }
43
44 #[tokio::test]
46 async fn test_misbehaving_actor() {
47 let mut actor = Actor::<(), (), TestError>::new(10, |mut rx| async move {
48 let msg = rx.recv().await;
49 drop(msg);
50 Ok(())
51 });
52 let mut cx = Context::from_waker(futures::task::noop_waker_ref());
53
54 assert_eq!(actor.poll_ready(&mut cx), Poll::Ready(Ok(())));
55 assert_eq!(actor.call(()).await, Err(TestError::ActorHungUp));
56 }
57
58 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
60 async fn test_crashing_actor() {
61 let mut actor = Actor::<(), (), TestError>::new(10, |mut rx| async move {
62 let _msg = rx.recv().await;
63 panic!();
64 });
65 let mut cx = Context::from_waker(futures::task::noop_waker_ref());
66
67 assert_eq!(actor.poll_ready(&mut cx), Poll::Ready(Ok(())));
68 assert_eq!(actor.call(()).await, Err(TestError::ActorHungUp));
69 }
70
71 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
73 async fn test_fast_crashing_actor() {
74 let mut actor = Actor::<(), (), TestError>::new(10, |mut _rx| async move {
75 panic!();
76 });
77 let mut cx = Context::from_waker(futures::task::noop_waker_ref());
78
79 assert_eq!(
80 actor.poll_ready(&mut cx),
81 Poll::Ready(Err(TestError::ActorTerminated))
82 );
83 }
84
85 #[tokio::test]
87 async fn test_actor_pending() {
88 let (outer_tx, outer_rx) = oneshot::channel::<()>();
89 let mut cx = Context::from_waker(futures::task::noop_waker_ref());
90
91 let mut actor = Actor::<(), (), TestError>::new(1, |mut rx| async move {
92 outer_rx.await.unwrap();
94
95 let msg = rx.recv().await.unwrap();
96 msg.rsp_sender.send(Ok(())).unwrap();
97
98 rx.recv().await;
100 Ok(())
101 });
102
103 assert_eq!(actor.poll_ready(&mut cx), Poll::Ready(Ok(())));
105
106 let fut = actor.call(());
108
109 assert_eq!(actor.poll_ready(&mut cx), Poll::Pending);
111
112 outer_tx.send(()).unwrap();
114
115 assert_eq!(fut.await, Ok(()));
117 assert_eq!(actor.poll_ready(&mut cx), Poll::Ready(Ok(())));
118 }
119
120 #[tokio::test]
122 async fn test_happy_path() {
123 let mut actor = Actor::<u32, u32, TestError>::new(10, |mut rx| async move {
124 let counter = Arc::new(Mutex::new(0));
126
127 while let Some(msg) = rx.recv().await {
128 let rsp_sender = msg.rsp_sender;
129 let counter = counter.clone();
130 let new_count = msg.req;
131
132 tokio::spawn(async move {
134 let mut lock = counter.lock().await;
135 *lock += new_count;
136 rsp_sender.send(Ok(*lock))
137 });
138 }
139 Ok(())
140 });
141 let mut cx = Context::from_waker(futures::task::noop_waker_ref());
142
143 assert_eq!(actor.poll_ready(&mut cx), Poll::Ready(Ok(())));
144 assert_eq!(actor.call(10).await, Ok(10));
145
146 assert_eq!(actor.poll_ready(&mut cx), Poll::Ready(Ok(())));
147 assert_eq!(actor.call(20).await, Ok(30));
148
149 assert_eq!(actor.poll_ready(&mut cx), Poll::Ready(Ok(())));
150 assert_eq!(actor.call(30).await, Ok(60));
151 }
152}