1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
//! Provides an [`Actor`] type that wraps a state and allows mutating it
//! in turns using [`invoke`].
//!
//! [`invoke`]: Actor::invoke
//!
//! # Example
//!
//! It is recommended to create a wrapper type around the [`Actor`],
//! and implement async functions that use [`invoke`] to interact with
//! the inner private state.
//!
//! ```
//! use simple_actor::Actor;
//!
//! #[derive(Clone)]
//! pub struct Adder(Actor<u32>);
//!
//! impl Adder {
//!     pub fn new(initial_value: u32) -> Self {
//!         let (actor, driver) = Actor::new(initial_value);
//!         tokio::spawn(driver);
//!         Self(actor)
//!     }
//!
//!     pub async fn add(&self, x: u32) {
//!         let _ = self.0.invoke(move |state| *state += x).await;
//!     }
//!
//!     pub async fn result(&self) -> Option<u32> {
//!         self.0.invoke(move |state| *state).await
//!     }
//!
//!     pub fn shutdown(&self) {
//!         self.0.shutdown()
//!     }
//! }
//!
//! #[tokio::main]
//! async fn main() {
//!     let adder = Adder::new(5);
//!
//!     adder.add(3).await;
//!     assert_eq!(adder.result().await, Some(8));
//!
//!     adder.add(2).await;
//!     assert_eq!(adder.result().await, Some(10));
//!
//!     adder.shutdown();
//!     assert_eq!(adder.result().await, None);
//! }
//! ```
//! 
//! ## Inspiration
//! 
//! This crate is inspired by [`ghost_actor`], with a simpler implementation and
//! API. This crate `invoke` function returns `None` if the actor is down, which
//! avoids dealing with error type conversions.
//!  
//! [`ghost_actor`]: https://github.com/holochain/ghost_actor

use futures::{
    channel::{mpsc, oneshot},
    Future, FutureExt, SinkExt, StreamExt,
};

type InnerInvoke<T> = Box<dyn FnOnce(&mut T) + 'static + Send>;
type SendInvoke<T> = mpsc::Sender<InnerInvoke<T>>;

/// Actor wrapping a state.
#[derive(Clone)]
pub struct Actor<T: 'static + Send>(SendInvoke<T>);
impl<T: 'static + Send> Actor<T> {
    /// Creates a new `Actor` with default inbound channel capacity (1024).
    ///
    /// Returned future must be spawned in an async executor.
    #[must_use]
    pub fn new(state: T) -> (Self, impl Future<Output = ()>) {
        Self::new_with_capacity(state, 1024)
    }

    /// Creates a new `Actor` with given capacity for its inbound channel.
    ///
    /// Returned future must be spawned in an async executor.
    #[must_use]
    pub fn new_with_capacity(mut state: T, capacity: usize) -> (Self, impl Future<Output = ()>) {
        let (invoke_tx, invoke_rx) = mpsc::channel::<InnerInvoke<T>>(capacity);

        let driver = FutureExt::boxed(async move {
            let mut invoke_rx = StreamExt::ready_chunks(invoke_rx, 1024);

            while let Some(invokes) = invoke_rx.next().await {
                for invoke in invokes {
                    invoke(&mut state);
                }
            }
        });

        (Self(invoke_tx), driver)
    }

    /// Interact with the state.
    /// Returns `None` if the actor is no longer running.
    pub fn invoke<F, R>(&self, invoke: F) -> impl Future<Output = Option<R>>
    where
        F: FnOnce(&mut T) -> R + 'static + Send,
        R: 'static + Send,
    {
        let mut invoke_tx = self.0.clone();

        async move {
            let (response_tx, response_rx) = oneshot::channel();

            let real_invoke: InnerInvoke<T> = Box::new(move |state: &mut T| {
                let res = invoke(state);
                let _ = response_tx.send(res);
            });

            invoke_tx.send(real_invoke).await.ok()?;

            response_rx.await.ok()
        }
    }

    /// Interact with the state then awaits a Future.
    /// This Future cannot hold the reference to the state.
    pub async fn invoke_future<Fun, Fut, R>(&self, invoke: Fun) -> Option<R>
    where
        Fun: FnOnce(&mut T) -> Fut + 'static + Send,
        Fut: Future<Output = R> + 'static + Send,
        R: 'static + Send,
    {
        Some(self.invoke(invoke).await?.await)
    }

    /// Tells if the actor still accepts new invokes.
    pub fn is_active(&self) -> bool {
        !self.0.is_closed()
    }

    /// Stop the actor, which will process every already queued invokes
    /// before really stopping.
    pub fn shutdown(&self) {
        self.0.clone().close_channel()
    }
}