actor_lib/
lib.rs

1//! # Actor-lib SDK
2//! This is the Rust SDK for Actor-lib.
3//!
4//! ## Usage
5//! ```rust
6//!use actor_lib::*;
7//!
8//! #[tokio::main]
9//! async fn main() -> Result<(), BoxDynError> {
10//!
11//!    Ok(())
12//! }
13
14use std::collections::HashMap;
15use std::error::Error;
16use std::fmt::Debug;
17use std::sync::{Arc};
18use tokio::sync::{mpsc, oneshot};
19use tokio::runtime::Runtime;
20use futures::lock::Mutex;
21use std::future::Future;
22use std::pin::Pin;
23use tokio::sync::oneshot::Sender;
24
25pub type BoxDynError =  Box<dyn Error + Send + Sync>;
26pub type CallbackFuture<O> = Pin<Box<dyn Future<Output = O> + Send>>;
27pub type Callback<T> = Box<dyn (Fn(T) -> CallbackFuture<Result<(), BoxDynError>>) + Send + Sync>;
28
29
30
31#[derive(Debug)]
32pub struct Actor<Message, State, Response> {
33    tx: Mutex<Option<mpsc::Sender<(Message, i32)>>>,
34    rt:  Mutex<Option<Arc<Runtime>>>,
35    join_handle: Mutex<Option<tokio::task::JoinHandle<()>>>,
36    state: Option<Arc<Mutex<State>>>,
37    self_ref: Mutex<Option<Arc<Actor<Message, State, Response>>>>,
38    message_id: Mutex<i32>,
39    call_back: Mutex<HashMap<i32, Callback<Result<Response, BoxDynError>>>>,
40    promise: Mutex<HashMap<i32, Sender<Result<Response, BoxDynError>>>>,
41    name: String,
42
43}
44#[derive(Debug)]
45pub struct Context<Message, State, Response> {
46    pub mgs: Message,
47    pub state: Arc<Mutex<State>>,
48    pub self_ref: Arc<Actor<Message, State, Response>>,
49}
50
51
52impl<Message: Debug + Send + Sync + 'static, State: Debug + Send + Sync + 'static, Response: Debug + Send + Sync + 'static> Actor<Message, State, Response> {
53     pub async fn new<F>(name: String, handler: F, state: State, buffer: usize) -> Arc<Self>
54        where
55            F: Fn(Context<Message, State, Response>) -> CallbackFuture<Result<Response, BoxDynError>>,
56            F: Send + Sync + 'static,
57    {
58        let state_arc = Arc::new(Mutex::new(state));
59        let state_clone = state_arc.clone();
60        let (tx, mut rx) = mpsc::channel(buffer);
61        let rt = tokio::runtime::Builder::new_multi_thread()
62            .worker_threads(1)
63            .enable_all()
64            .build()
65            .unwrap();
66
67        let actor = Actor {
68            tx: Mutex::new(Some(tx)),
69            rt: Mutex::new(None),
70            join_handle: Mutex::new(None),
71            state: Some(state_clone),
72            self_ref: Mutex::new(None),
73            message_id: Mutex::new(0),
74            call_back: Mutex::new(HashMap::new()),
75            promise: Mutex::new(HashMap::new()),
76            name: name,
77        };
78
79        let ret = Arc::new(actor);
80        let ret_clone = ret.clone();
81        let ret_clone2 = ret.clone();
82        let ret_clone3 = ret.clone();
83        let mut me = ret.self_ref.lock().await;
84        *me = Some(ret.clone());
85
86
87        let _join_handle = rt.spawn(async move {
88            let me = ret_clone2.clone();
89            while let Some(message) = rx.recv().await {
90                let msg = message.0;
91                let message_id = message.1;
92                let state_clone = state_arc.clone();
93                {
94                    let state_lock = state_clone.lock().await;
95                    log::debug!("<{}> Got message: {:?} Current state: {:?}", me.name, msg, state_lock);
96
97                }
98                let state = state_arc.clone();
99                let context = Context {
100                    mgs: msg,
101                    state: state,
102                    self_ref: me.clone(),
103                };
104                let r = handler(context);
105                {
106                    let result = r.await;
107                    log::trace!("<{}> Work result: {:?}", me.name, result);
108                    if result.is_err() {
109                        log::error!("<{}> Work error: {:?}", me.name, result);
110                    }
111                    let mut call_back_lock = ret_clone3.call_back.lock().await;
112                    let call_back = call_back_lock.remove(&message_id);
113                    match call_back {
114                        None => {
115                            log::trace!("<{}> No callback for message_id: {}", me.name, message_id);
116
117                            let mut promise_lock = ret_clone3.promise.lock().await;
118                            let promise = promise_lock.remove(&message_id);
119                            match promise {
120                                None => {
121                                    log::trace!("<{}> No promise for message_id: {}", me.name, message_id);
122                                }
123                                Some(promise) => {
124                                    log::trace!("<{}> Promise result: {:?}", me.name, result);
125                                    let _ = promise.send(result);
126                                }
127                            }
128
129                        }
130                        Some(call_back) => {
131                            log::trace!("<{}> Callback result: {:?}", me.name, result);
132                            let _ = call_back(result).await;
133                        }
134                    }
135                }
136                let state_clone = state_arc.clone();
137                let state_lock = state_clone.lock().await;
138                log::trace!("<{}> After work on message new state: {:?}", me.name, state_lock);
139            }
140        });
141        let mut rt_mutex = ret.rt.lock().await;
142        *rt_mutex = Some(Arc::new(rt));
143        log::info!("<{}> Actor started", ret_clone.name);
144
145        ret_clone
146    }
147
148    pub async fn callback<F>(&self, msg: Message, handler: F) -> Result<(), BoxDynError>
149        where
150            F: Fn(Result<Response, BoxDynError>) -> CallbackFuture<Result<(), BoxDynError>>,
151            F: Send + Sync + 'static,
152    {
153
154
155        log::trace!("<{}> Ask message: {:?}", self.name, msg);
156        let tx_lock = self.tx.lock().await;
157        let tx = tx_lock.as_ref();
158        match tx {
159            None => {
160                return Err(std::io::Error::new(std::io::ErrorKind::Other, "Err").into());
161            }
162            Some(tx) => {
163                let mut message_id_lock = self.message_id.lock().await;
164                *message_id_lock += 1;
165                let mut call_back_lock = self.call_back.lock().await;
166                call_back_lock.insert(*message_id_lock, Box::new(handler));
167                tx.send((msg, *message_id_lock)).await?;
168            }
169        }
170        Ok(())
171    }
172
173    pub async fn ask(&self, mgs: Message) -> Result<Response, BoxDynError>
174    {
175
176        log::trace!("<{}> Result message: {:?}", self.name, mgs);
177        let tx_lock = self.tx.lock().await;
178        let tx = tx_lock.as_ref();
179        match tx {
180            None => {
181                return Err(std::io::Error::new(std::io::ErrorKind::Other, "Err").into());
182            }
183            Some(tx) => {
184                let (sender, receiver) = oneshot::channel();
185                {
186                    let mut message_id_lock = self.message_id.lock().await;
187                    *message_id_lock += 1;
188                    let mut promise_lock = self.promise.lock().await;
189                    promise_lock.insert(*message_id_lock, sender);
190                    tx.send((mgs, *message_id_lock)).await?;
191                }
192                let result:Result<Response, BoxDynError> = receiver.await?;
193                result
194            }
195        }
196    }
197
198
199
200    pub async fn send(&self, msg: Message) -> Result<(), BoxDynError> {
201        log::trace!("<{}> Push message: {:?}", self.name, msg);
202        let tx_lock = self.tx.lock().await;
203        let tx = tx_lock.as_ref();
204        match tx {
205            None => {
206                return Err(std::io::Error::new(std::io::ErrorKind::Other, "Err").into());
207            }
208            Some(tx) => {
209                tx.send((msg, 0)).await?;
210            }
211        }
212        Ok(())
213    }
214
215    pub async fn state(&self) -> Result<Arc<Mutex<State>>, BoxDynError> {
216        let state_opt = self.state.clone();
217        log::trace!("<{}> State: {:?}", self.name, state_opt);
218        match state_opt {
219            None => {
220                return Err(std::io::Error::new(std::io::ErrorKind::Other, "No state").into());
221            }
222            Some(state) => {
223                Ok(state)
224            }
225        }
226    }
227
228    pub async fn stop(&self){
229        let mut tx_lock = self.tx.lock().await;
230        *tx_lock = None;
231        let mut rt_lock = self.rt.lock().await;
232        let worker = rt_lock.take().unwrap();
233        std::mem::forget(worker);
234        *rt_lock = None;
235        let mut me_lock = self.self_ref.lock().await;
236        *me_lock = None;
237        // self.state = None;
238        let mut join_handle_lock = self.join_handle.lock().await;
239        *join_handle_lock = None;
240        log::debug!("<{}> Stop worker", self.name);
241
242    }
243}
244
245
246