1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
//! Provides an [`Actor`] type that wraps a state and allows mutating it
//! in turns using [`invoke`] and [`invoke_async`].
//!
//! [`invoke`]: Actor::invoke
//! [`invoke_async`]: Actor::invoke_async
//!
//! # Example
//!
//! It is recommended to create a wrapper type around the [`Actor`], and
//! implement async functions that use [`invoke`]/[`invoke_async`] to interact
//! with the inner private state.
//!
//! ```rust
//! use std::time::Duration;
//! use simple_actor::Actor;
//! use futures::FutureExt;
//!
//! #[derive(Clone)]
//! pub struct Adder(Actor<u32>);
//!
//! impl Adder {
//!     pub fn new(initial_value: u32) -> Self {
//!         let (actor, driver) = Actor::new(initial_value);
//!         tokio::spawn(driver);
//!         Self(actor)
//!     }
//!
//!     pub async fn add(&self, x: u32) {
//!         let _ = self.0.invoke(move |state| {
//!             // We can update the state.
//!             *state += x
//!         }).await;
//!     }
//!
//!     pub async fn add_twice_with_delay(&self, x: u32) -> Option<u32> {
//!         self.0
//!             .invoke_async(move |state| {
//!                 async move {
//!                     *state += x;
//!                     // We can .await while holding the state.
//!                     tokio::time::sleep(Duration::from_millis(500)).await;
//! 
//!                     *state += x;
//!                     // We can return a value at the end.
//!                     *state
//!                 }
//!                 .boxed()
//!             })
//!             .await
//!     }
//!
//!     pub async fn result(&self) -> Option<u32> {
//!         self.0.invoke(move |state| *state).await
//!     }
//!
//!     pub fn shutdown(&self) {
//!         self.0.shutdown()
//!     }
//! }
//!
//! #[tokio::main]
//! async fn main() {
//!     let adder = Adder::new(5);
//!
//!     adder.add(3).await;
//!     assert_eq!(adder.result().await, Some(8));
//!
//!     adder.add(2).await;
//!     assert_eq!(adder.result().await, Some(10));
//!
//!     assert_eq!(adder.add_twice_with_delay(3).await, Some(16));
//!     assert_eq!(adder.result().await, Some(16));
//!
//!     adder.shutdown();
//!     assert_eq!(adder.result().await, None);
//! }
//! ```
//!
//! ## Inspiration
//!
//! This crate is inspired by [`ghost_actor`], with a simpler implementation and
//! API.
//!
//! This crate [`invoke`] function returns `None` if the actor is down, which
//! avoids dealing with error type conversions.
//!
//! It also allows to hold the state in [`invoke_async`] and thus use
//! async-based state.
//!  
//! [`ghost_actor`]: https://github.com/holochain/ghost_actor

use futures::{
    channel::{mpsc, oneshot},
    Future, FutureExt, SinkExt, StreamExt,
};
use std::{hash::Hash, pin::Pin};

type BoxFuture<'a, T> = Pin<Box<dyn Future<Output = T> + 'a + Send>>;
type InnerInvoke<T> = Box<dyn for<'a> FnOnce(&'a mut T) -> BoxFuture<'a, ()> + Send>;
type SendInvoke<T> = mpsc::Sender<InnerInvoke<T>>;

/// Actor wrapping a state.
///
/// Cloning the actor provides an handle to the same actor.
pub struct Actor<T: 'static + Send>(SendInvoke<T>);
impl<T: 'static + Send> Actor<T> {
    /// Creates a new `Actor` with default inbound channel capacity (1024).
    ///
    /// Returned future must be spawned in an async executor.
    #[must_use]
    pub fn new(state: T) -> (Self, impl Future<Output = ()>) {
        Self::new_with_capacity(state, 1024)
    }

    /// Creates a new `Actor` with given capacity for its inbound channel.
    ///
    /// Returned future must be spawned in an async executor.
    #[must_use]
    pub fn new_with_capacity(mut state: T, capacity: usize) -> (Self, impl Future<Output = ()>) {
        let (invoke_tx, mut invoke_rx) = mpsc::channel::<InnerInvoke<T>>(capacity);

        let driver = FutureExt::boxed(async move {
            while let Some(invoke) = invoke_rx.next().await {
                invoke(&mut state).await;
            }
        });

        (Self(invoke_tx), driver)
    }

    /// Interacts with the state using a closure returning a future.
    /// This future holds the mutable reference to the state, and prevents the
    /// actor to process further invokes until this future ends.
    ///
    /// The future needs to be boxed using [`futures::FutureExt::boxed`].
    ///
    /// Returns `None` if the actor is no longer running.
    pub fn invoke_async<F, R>(&self, invoke: F) -> impl Future<Output = Option<R>>
    where
        F: for<'a> FnOnce(&'a mut T) -> BoxFuture<'a, R> + Send + 'static,
        R: 'static + Send,
    {
        let mut invoke_tx = self.0.clone();

        async move {
            let (response_tx, response_rx) = oneshot::channel();

            let real_invoke: InnerInvoke<T> = Box::new(move |state: &mut T| {
                async move {
                    let res = invoke(state).await;
                    let _ = response_tx.send(res);
                }
                .boxed()
            });

            invoke_tx.send(real_invoke).await.ok()?;

            response_rx.await.ok()
        }
    }

    /// Interact with the state using a closure.
    ///
    /// Returns `None` if the actor is no longer running.
    pub async fn invoke<F, R>(&self, invoke: F) -> Option<R>
    where
        F: FnOnce(&mut T) -> R + 'static + Send,
        R: 'static + Send,
    {
        self.invoke_async(|state| async move { invoke(state) }.boxed())
            .await
    }

    /// Tells if the actor still accepts new invokes.
    pub fn is_active(&self) -> bool {
        !self.0.is_closed()
    }

    /// Stop the actor, which will process every already queued invokes
    /// before really stopping.
    pub fn shutdown(&self) {
        self.0.clone().close_channel()
    }
}

impl<T: 'static + Send> Clone for Actor<T> {
    fn clone(&self) -> Self {
        Self(self.0.clone())
    }
}

impl<T: 'static + Send> PartialEq for Actor<T> {
    fn eq(&self, other: &Self) -> bool {
        self.0.same_receiver(&other.0)
    }
}

impl<T: 'static + Send> Eq for Actor<T> {}

impl<T: 'static + Send> Hash for Actor<T> {
    fn hash<Hasher: std::hash::Hasher>(&self, hasher: &mut Hasher) {
        self.0.hash_receiver(hasher);
    }
}