tower_actor/
lib.rs

1mod actor;
2mod future;
3mod message;
4
5pub use actor::*;
6pub use future::*;
7pub use message::*;
8
9#[cfg(test)]
10mod tests {
11    use crate::actor::Actor;
12    use crate::ActorError;
13    use std::sync::Arc;
14    use std::task::{Context, Poll};
15    use thiserror::Error;
16    use tokio::sync::{oneshot, Mutex};
17
18    use tower::Service;
19
20    #[derive(Error, Debug, PartialEq, Eq)]
21    enum TestError {
22        #[error("Poll Error")]
23        ActorTerminated,
24        #[error("Poll Sender error")]
25        ActorHungUp,
26    }
27
28    impl From<ActorError> for TestError {
29        fn from(value: ActorError) -> Self {
30            match value {
31                ActorError::ActorTerminated => TestError::ActorTerminated,
32                ActorError::ActorHungUp => TestError::ActorHungUp,
33            }
34        }
35    }
36
37    #[test]
38    #[should_panic]
39    fn follows_service_contract() {
40        let mut actor = Actor::<(), (), TestError>::new(10, |_| async move { Ok(()) });
41        actor.call(());
42    }
43
44    // Test misbehaving actor dropping the call
45    #[tokio::test]
46    async fn test_misbehaving_actor() {
47        let mut actor = Actor::<(), (), TestError>::new(10, |mut rx| async move {
48            let msg = rx.recv().await;
49            drop(msg);
50            Ok(())
51        });
52        let mut cx = Context::from_waker(futures::task::noop_waker_ref());
53
54        assert_eq!(actor.poll_ready(&mut cx), Poll::Ready(Ok(())));
55        assert_eq!(actor.call(()).await, Err(TestError::ActorHungUp));
56    }
57
58    // Test failed actor crashing the receiver on another thread
59    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
60    async fn test_crashing_actor() {
61        let mut actor = Actor::<(), (), TestError>::new(10, |mut rx| async move {
62            let _msg = rx.recv().await;
63            panic!();
64        });
65        let mut cx = Context::from_waker(futures::task::noop_waker_ref());
66
67        assert_eq!(actor.poll_ready(&mut cx), Poll::Ready(Ok(())));
68        assert_eq!(actor.call(()).await, Err(TestError::ActorHungUp));
69    }
70
71    // Test failed actor crashing on another thread before processing messages
72    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
73    async fn test_fast_crashing_actor() {
74        let mut actor = Actor::<(), (), TestError>::new(10, |mut _rx| async move {
75            panic!();
76        });
77        let mut cx = Context::from_waker(futures::task::noop_waker_ref());
78
79        assert_eq!(
80            actor.poll_ready(&mut cx),
81            Poll::Ready(Err(TestError::ActorTerminated))
82        );
83    }
84
85    // Test buffer full
86    #[tokio::test]
87    async fn test_actor_pending() {
88        let (outer_tx, outer_rx) = oneshot::channel::<()>();
89        let mut cx = Context::from_waker(futures::task::noop_waker_ref());
90
91        let mut actor = Actor::<(), (), TestError>::new(1, |mut rx| async move {
92            // Wait to complete our first message until signaled
93            outer_rx.await.unwrap();
94
95            let msg = rx.recv().await.unwrap();
96            msg.rsp_sender.send(Ok(())).unwrap();
97
98            // Make sure we can receive one more message for the last poll_ready
99            rx.recv().await;
100            Ok(())
101        });
102
103        // Prep the actor
104        assert_eq!(actor.poll_ready(&mut cx), Poll::Ready(Ok(())));
105
106        // Launch the message
107        let fut = actor.call(());
108
109        // Observe the pending status
110        assert_eq!(actor.poll_ready(&mut cx), Poll::Pending);
111
112        // Complete the message
113        outer_tx.send(()).unwrap();
114
115        // Observe the value and actor are OK
116        assert_eq!(fut.await, Ok(()));
117        assert_eq!(actor.poll_ready(&mut cx), Poll::Ready(Ok(())));
118    }
119
120    // End to end test of 'happy path' where Actor receives messages and can mutate itself
121    #[tokio::test]
122    async fn test_happy_path() {
123        let mut actor = Actor::<u32, u32, TestError>::new(10, |mut rx| async move {
124            // Simulate some actor state...
125            let counter = Arc::new(Mutex::new(0));
126
127            while let Some(msg) = rx.recv().await {
128                let rsp_sender = msg.rsp_sender;
129                let counter = counter.clone();
130                let new_count = msg.req;
131
132                // Which farms out the work to another thread
133                tokio::spawn(async move {
134                    let mut lock = counter.lock().await;
135                    *lock += new_count;
136                    rsp_sender.send(Ok(*lock))
137                });
138            }
139            Ok(())
140        });
141        let mut cx = Context::from_waker(futures::task::noop_waker_ref());
142
143        assert_eq!(actor.poll_ready(&mut cx), Poll::Ready(Ok(())));
144        assert_eq!(actor.call(10).await, Ok(10));
145
146        assert_eq!(actor.poll_ready(&mut cx), Poll::Ready(Ok(())));
147        assert_eq!(actor.call(20).await, Ok(30));
148
149        assert_eq!(actor.poll_ready(&mut cx), Poll::Ready(Ok(())));
150        assert_eq!(actor.call(30).await, Ok(60));
151    }
152}