tokiactor/
lib.rs

1#![doc = include_str!("../README.md")]
2
3use anyhow::Context;
4use futures::Future;
5use std::marker::PhantomData;
6
7/// Actor to wrap function
8pub struct Actor<I, O, FN>
9where
10    FN: FnMut(I) -> O,
11{
12    /// Function wrapped in actor
13    pub f: FN,
14    _i: PhantomData<I>,
15    _o: PhantomData<O>,
16}
17
18impl<I, O, FN> Clone for Actor<I, O, FN>
19where
20    FN: FnMut(I) -> O + Clone,
21{
22    fn clone(&self) -> Self {
23        Self {
24            f: self.f.clone(),
25            _i: self._i,
26            _o: self._o,
27        }
28    }
29}
30
31impl<I, O, FN> From<FN> for Actor<I, O, FN>
32where
33    FN: FnMut(I) -> O,
34{
35    fn from(f: FN) -> Self {
36        Self {
37            f,
38            _i: PhantomData,
39            _o: PhantomData,
40        }
41    }
42}
43
44/// Channel to communicated with `Actor` instances
45pub struct Handle<I, O> {
46    pub n: usize,
47    tx: async_channel::Sender<(I, tokio::sync::oneshot::Sender<O>)>,
48    rx: async_channel::Receiver<(I, tokio::sync::oneshot::Sender<O>)>,
49}
50
51impl<I, O> Clone for Handle<I, O> {
52    fn clone(&self) -> Self {
53        Self {
54            n: self.n,
55            tx: self.tx.clone(),
56            rx: self.rx.clone(),
57        }
58    }
59}
60
61/// Error container for channel communication
62#[derive(Debug, thiserror::Error)]
63pub enum HandleError<T> {
64    Send(#[from] async_channel::SendError<T>),
65    Recv(#[from] tokio::sync::oneshot::error::RecvError),
66}
67
68/// A trait for inspecting input and output values.
69pub trait Inspector<I, O> {
70    /// Inspects the input value and returns it.
71    fn inspect_i(&mut self, i: I) -> I {
72        i
73    }
74
75    /// Inspects the output value and returns it.
76    fn inspect_o(&mut self, o: O) -> O {
77        o
78    }
79}
80
81impl<I, O> Handle<I, O>
82where
83    I: Send + 'static,
84    O: Send + 'static,
85{
86    /// create a new Handle with `n` of channel depth
87    /// ```rust
88    /// use tokiactor::*;
89    ///
90    /// let handle = Handle::<i32, i32>::new(1);
91    /// ```
92    pub fn new(n: usize) -> Self {
93        let (tx, rx) = match n {
94            0 => async_channel::unbounded(),
95            _ => async_channel::bounded(n),
96        };
97        Self { n, tx, rx }
98    }
99
100    /// spawn `Actor` with type impl `Into<Actor<I, O, FN>>` in system thread
101    /// ```rust
102    /// use tokiactor::*;
103    /// let handle = Handle::new(1).spawn(|i: i32| i + 1);
104    /// ```
105    pub fn spawn<FN, IntoActor>(self, actor: IntoActor) -> Handle<I, O>
106    where
107        FN: FnMut(I) -> O + Send + 'static,
108        IntoActor: Into<Actor<I, O, FN>>,
109    {
110        let rx = self.rx.clone();
111        let mut actor = actor.into();
112        std::thread::spawn(move || {
113            while let Ok((i, tx)) = rx.recv_blocking() {
114                if tx.send((actor.f)(i)).is_err() {
115                    break;
116                }
117            }
118        });
119        self
120    }
121
122    /// spawn `n` `Actor` with type impl `Into<Actor<I, O, FN>>` in system threads when `Actor` is `Clone`
123    /// ```rust
124    /// use tokiactor::*;
125    /// let handle = Handle::new(1).spawn_n(10, |i: i32| i + 1);
126    /// ```
127    pub fn spawn_n<FN, IntoActor>(self, n: usize, actor: IntoActor) -> Handle<I, O>
128    where
129        FN: FnMut(I) -> O + Send + 'static,
130        IntoActor: Into<Actor<I, O, FN>>,
131        Actor<I, O, FN>: Clone,
132    {
133        let rx = self.rx.clone();
134        let actor = actor.into();
135        for _ in 0..n {
136            let mut actor = actor.clone();
137            let rx = rx.clone();
138            std::thread::spawn(move || {
139                while let Ok((i, tx)) = rx.recv_blocking() {
140                    if tx.send((actor.f)(i)).is_err() {
141                        break;
142                    }
143                }
144            });
145        }
146        self
147    }
148
149    /// spawn async `Actor` with type impl `Into<Actor<I, O, FU, FN>>` in tokio thread
150    /// ```rust
151    /// use tokiactor::*;
152    /// use tokio;
153    /// tokio::runtime::Runtime::new().unwrap().block_on(
154    ///     async move {
155    ///         let r = Handle::new(1)
156    ///             .spawn_tokio(move |i: i32| async move {i + 41})
157    ///             .handle(1)
158    ///             .await;
159    ///         assert_eq!(r, 42)
160    ///     }
161    /// );
162    /// ```
163    pub fn spawn_tokio<FN, FU, IntoActor>(self, actor: IntoActor) -> Handle<I, O>
164    where
165        FN: FnMut(I) -> FU + Send + 'static,
166        FU: Future<Output = O> + Send + 'static,
167        IntoActor: Into<Actor<I, FU, FN>>,
168    {
169        let rx = self.rx.clone();
170        let mut actor = actor.into();
171        tokio::spawn(async move {
172            let err = std::sync::Arc::from(std::sync::atomic::AtomicBool::new(false));
173            while let Ok((i, tx)) = rx.recv().await {
174                if err.load(std::sync::atomic::Ordering::SeqCst) {
175                    break;
176                }
177                let o = (actor.f)(i);
178                let err = err.clone();
179                tokio::spawn(async move {
180                    let o = o.await;
181                    if tx.send(o).is_err() {
182                        err.store(true, std::sync::atomic::Ordering::SeqCst);
183                    }
184                });
185            }
186        });
187        self
188    }
189
190    /// Handle input
191    /// ```rust
192    /// use tokiactor::*;
193    /// use tokio;
194    /// tokio::runtime::Runtime::new().unwrap().block_on(
195    ///     async move {
196    ///         let r = Handle::new(1)
197    ///             .spawn_tokio(move |i: i32| async move {i + 41})
198    ///             .handle(1)
199    ///             .await;
200    ///     }
201    /// );
202    /// ```
203    pub async fn handle(&self, i: I) -> O {
204        let (t, r) = tokio::sync::oneshot::channel::<O>();
205        self.tx.send((i, t)).await.expect("failed to send to actor");
206        r.await.expect("failed to recv from actor")
207    }
208
209    /// Try handle input, return error when actor is dead...
210    /// ```rust
211    /// use tokiactor::*;
212    /// use tokio;
213    /// tokio::runtime::Runtime::new().unwrap().block_on(
214    ///     async move {
215    ///         Handle::new(1)
216    ///             .spawn_tokio(move |i: i32| async move {i + 41})
217    ///             .try_handle(1)
218    ///             .await;
219    ///     }
220    /// );
221    /// ```
222    pub async fn try_handle(
223        &self,
224        i: I,
225    ) -> Result<O, HandleError<(I, tokio::sync::oneshot::Sender<O>)>> {
226        let (t, r) = tokio::sync::oneshot::channel::<O>();
227        self.tx.send((i, t)).await?;
228        Ok(r.await?)
229    }
230
231    /// Inspect Handle excution, check `Inspector` trait for more info
232    /// ```rust
233    /// use tokiactor::*;
234    /// use tokio;
235    /// use std::time::Instant;
236    ///
237    /// #[derive(Clone)]
238    /// struct Timer(Instant);
239    ///
240    /// impl Inspector<i32, i32> for Timer
241    /// {
242    ///     fn inspect_i(&mut self, i: i32) -> i32
243    ///     {
244    ///         self.0 = Instant::now();
245    ///         i
246    ///     }
247    ///
248    ///     fn inspect_o(&mut self, o: i32) -> i32
249    ///     {
250    ///         let delta = self.0.elapsed();
251    ///         // get execution time, do logging...
252    ///         dbg!(delta);
253    ///         o
254    ///     }
255    /// }
256    ///
257    /// tokio::runtime::Runtime::new().unwrap().block_on(
258    ///     async move {
259    ///         Handle::new(1)
260    ///             .spawn_tokio(move |i: i32| async move {i + 41})
261    ///             .inspect(Timer(Instant::now()))
262    ///             .try_handle(1)
263    ///             .await;
264    ///     }
265    /// );
266    ///
267    /// ```
268    pub fn inspect<INS>(self, ins: INS) -> Handle<I, O>
269    where
270        INS: Inspector<I, O> + Send + 'static + Clone,
271    {
272        let ss = self;
273        Handle::new(ss.n).spawn_tokio(Actor::from(move |i: I| {
274            let ss = ss.clone();
275            let mut ins = ins.clone();
276            async move {
277                let i = ins.inspect_i(i);
278                let o = ss.handle(i).await;
279                ins.inspect_o(o)
280            }
281        }))
282    }
283
284    ///convert `Handle<I, O>` to `Handle<I, P>` with `Fn(O)->P`
285    /// ```rust
286    /// use tokiactor::*;
287    /// use tokio;
288    ///
289    /// tokio::runtime::Runtime::new().unwrap().block_on(
290    ///     async move {
291    ///         Handle::new(1)
292    ///             .spawn(move |i: i32| i + 1)
293    ///             .convert(|v| v as f32);
294    ///     }
295    /// )
296    /// ```
297    pub fn convert<F, P>(self, f: F) -> Handle<I, P>
298    where
299        P: Send + 'static,
300        F: Fn(O) -> P + Send + 'static + Clone,
301    {
302        let n = self.n;
303        self.then(Handle::new(n).spawn_tokio(move |i| {
304            let f = f.clone();
305            async move { f(i) }
306        }))
307    }
308
309    ///`Handle<I, O> + Handle<O, P> => Handle<I, P>`
310    /// ```rust
311    /// use tokiactor::*;
312    /// use tokio;
313    ///
314    /// tokio::runtime::Runtime::new().unwrap().block_on(
315    ///     async move {
316    ///         let r = Handle::new(1)
317    ///             .spawn(move |i: i32| i + 1)
318    ///             .then(Handle::new(1).spawn(move |i: i32| i * 10)).handle(1).await;
319    ///         assert_eq!(r, 20);
320    ///     }
321    /// )
322    /// ```
323    pub fn then<P>(self, rhs: Handle<O, P>) -> Handle<I, P>
324    where
325        P: Send + 'static,
326    {
327        let ss = self;
328        let rr = rhs;
329        Handle::new(ss.n).spawn_tokio(move |i| {
330            let ss = ss.clone();
331            let rr = rr.clone();
332            async move {
333                let o = ss.handle(i).await;
334                rr.handle(o).await
335            }
336        })
337    }
338
339    /// `Handle<I, O> | Handle<U, V> => Handle<(I,U),(O,V)>`
340    /// ```rust
341    /// use tokiactor::*;
342    /// use tokio;
343    ///
344    /// tokio::runtime::Runtime::new().unwrap().block_on(
345    ///     async move {
346    ///         let r = Handle::new(1)
347    ///             .spawn(move |i: i32| i + 1)
348    ///             .join(Handle::new(1).spawn(move |i: i32| i * 10)).handle((2, 1)).await;
349    ///         assert_eq!(r, (3, 10));
350    ///     }
351    /// )
352    /// ```
353    pub fn join<U, V>(self, rhs: Handle<U, V>) -> Handle<(I, U), (O, V)>
354    where
355        U: Send + 'static,
356        V: Send + 'static,
357    {
358        let ss = self;
359        let rr = rhs;
360        Handle::new(ss.n.max(rr.n)).spawn_tokio(Actor::from(move |(i, u)| {
361            let ss = ss.clone();
362            let rr = rr.clone();
363            async move { futures::join!(ss.handle(i), rr.handle(u)) }
364        }))
365    }
366}
367
368/// Handle return value is an `Option`
369pub type OptionHandle<I, O> = Handle<I, Option<O>>;
370
371impl<I, O> OptionHandle<I, O>
372where
373    I: Send + 'static,
374    O: Send + 'static,
375{
376    /// `OptionHandle<I, O> | OptionHandle<U, V> => OptionHandle<(I,U), (O,V)>`
377    /// ```rust
378    /// use tokiactor::*;
379    /// use tokio;
380    ///
381    /// tokio::runtime::Runtime::new().unwrap().block_on(
382    ///     async move {
383    ///         let r = Handle::new(1)
384    ///             .spawn(move |i: i32| Some(i + 1))
385    ///             .and(Handle::new(1).spawn(move |i: i32| Some(i * 10))).handle((1, 1)).await;
386    ///         assert_eq!(r, Some((2, 10)));
387    ///     }
388    /// )
389    /// ```
390    pub fn and<U, V>(self, rhs: OptionHandle<U, V>) -> OptionHandle<(I, U), (O, V)>
391    where
392        U: Send + 'static,
393        V: Send + 'static,
394    {
395        self.join(rhs)
396            .convert(|(o, v): (Option<O>, Option<V>)| match (o, v) {
397                (Some(o), Some(v)) => Some((o, v)),
398                _ => None,
399            })
400    }
401
402    /// `OptionHandle<I, O> + OptionHandle<O, P> => OptionHandle<I, P>`
403    /// ```rust
404    /// use tokiactor::*;
405    /// use tokio;
406    ///
407    /// tokio::runtime::Runtime::new().unwrap().block_on(
408    ///     async move {
409    ///         let r = Handle::new(1)
410    ///             .spawn(move |i: i32| Some(i + 1))
411    ///             .and_then(Handle::new(1).spawn(move |i: i32| Some(i * 10))).handle(1).await;
412    ///         assert_eq!(r, Some(20));
413    ///     }
414    /// )
415    /// ```
416    pub fn and_then<P>(self, rhs: OptionHandle<O, P>) -> OptionHandle<I, P>
417    where
418        P: Send + 'static,
419    {
420        let ss = self;
421        let rr = rhs;
422        Handle::new(ss.n).spawn_tokio(Actor::from(move |i| {
423            let ss = ss.clone();
424            let rr = rr.clone();
425            async move {
426                let o = ss.handle(i).await;
427                match o {
428                    Some(o) => rr.handle(o).await,
429                    None => None,
430                }
431            }
432        }))
433    }
434
435    /// `OptionHandle<I, O> + Handle<O, P> => OptionHandle<I, P>`
436    /// ```rust
437    /// use tokiactor::*;
438    /// use tokio;
439    ///
440    /// tokio::runtime::Runtime::new().unwrap().block_on(
441    ///     async move {
442    ///         let r = Handle::new(1)
443    ///             .spawn(move |i: i32| Some(i + 1))
444    ///             .map(Handle::new(1).spawn(move |i: i32| i * 10)).handle(1).await;
445    ///         assert_eq!(r, Some(20));
446    ///     }
447    /// )
448    /// ```
449    pub fn map<P>(self, rhs: Handle<O, P>) -> OptionHandle<I, P>
450    where
451        P: Send + 'static,
452    {
453        let ss = self;
454        let rr = rhs;
455        Handle::new(ss.n).spawn_tokio(Actor::from(move |i| {
456            let ss = ss.clone();
457            let rr = rr.clone();
458            async move {
459                let o = ss.handle(i).await;
460                match o {
461                    Some(o) => Some(rr.handle(o).await),
462                    None => None,
463                }
464            }
465        }))
466    }
467
468    /// convert `OptionHandle` to `ResultHandle`
469    /// ```rust
470    /// use tokiactor::*;
471    /// use tokio;
472    ///
473    /// tokio::runtime::Runtime::new().unwrap().block_on(
474    ///     async move {
475    ///         let r = Handle::new(1)
476    ///             .spawn(move |i: i32| Some(i + 1))
477    ///             .ok_or("null error").handle(1).await;
478    ///         assert_eq!(r.unwrap(), 2);
479    ///     }
480    /// )
481    /// ```
482    pub fn ok_or<C: ToString>(self, context: C) -> ResultHandle<I, O, anyhow::Error> {
483        let context = context.to_string();
484        self.convert(move |r: Option<O>| r.context(context.clone()))
485    }
486}
487
488/// Handle return value is a `Result`
489pub type ResultHandle<I, O, E> = Handle<I, Result<O, E>>;
490
491/// Chain error together when connecto two `ResultHandle` together
492#[derive(Debug, thiserror::Error)]
493#[error("e={0}, en={1}")]
494pub struct ChainError<E, EN>(E, EN);
495
496unsafe impl<E, EN> Sync for ChainError<E, EN>
497where
498    E: Sync,
499    EN: Sync,
500{
501}
502
503impl<I, O, E> ResultHandle<I, O, E>
504where
505    I: Send + 'static,
506    O: Send + 'static,
507    E: Send + 'static,
508{
509    /// `ResultHandle<I, O, E> + ResultHandle<U, V, EN> => ResultHandle<(I, U), (O, V), anyhow::Error>`
510    /// ```rust
511    /// use tokiactor::*;
512    /// use tokio;
513    /// use thiserror;
514    ///
515    /// #[derive(Debug, thiserror::Error)]
516    /// pub enum Error{}
517    ///
518    /// tokio::runtime::Runtime::new().unwrap().block_on(
519    ///     async move {
520    ///         let r = Handle::new(1)
521    ///             .spawn(move |i: i32| {
522    ///                 Result::<i32, Error>::Ok(i+1)
523    ///              })
524    ///             .and(
525    ///                 Handle::new(1).spawn(move |i: i32| {
526    ///                 Result::<i32, Error>::Ok(i*1)
527    ///             })
528    ///         ).handle((1, 1)).await;
529    ///         assert_eq!(r.unwrap(), (2, 1));
530    ///     }
531    /// );
532    /// ```
533    pub fn and<U, V, EN>(
534        self,
535        rhs: ResultHandle<U, V, EN>,
536    ) -> ResultHandle<(I, U), (O, V), anyhow::Error>
537    where
538        E: Sync + std::error::Error,
539        U: Send + 'static,
540        V: Send + 'static,
541        EN: Sync + Send + std::error::Error + 'static,
542    {
543        self.join(rhs)
544            .convert(|(o, v): (Result<O, E>, Result<V, EN>)| match (o, v) {
545                (Ok(o), Ok(v)) => Ok((o, v)),
546                (Err(e), Err(en)) => Err(anyhow::Error::new(ChainError(e, en))),
547                (Err(e), _) => Err(anyhow::Error::new(e)),
548                (_, Err(e)) => Err(anyhow::Error::new(e)),
549            })
550    }
551
552    /// `ResultHandle<I, O, E> | ResultHandle<O, P, EN> => ResultHandle<I, P, anyhow::Error>`
553    /// ```rust
554    /// use tokiactor::*;
555    /// use tokio;
556    /// use thiserror;
557    ///
558    /// #[derive(Debug, thiserror::Error)]
559    /// pub enum Error{}
560    ///
561    /// tokio::runtime::Runtime::new().unwrap().block_on(
562    ///     async move {
563    ///         let r = Handle::new(1)
564    ///             .spawn(move |i: i32| {
565    ///                 Result::<i32, Error>::Ok(i+1)
566    ///              })
567    ///             .and_then(
568    ///                 Handle::new(1).spawn(move |i: i32| {
569    ///                 Result::<i32, Error>::Ok(i*10)
570    ///             })
571    ///         ).handle(1).await;
572    ///         assert_eq!(r.unwrap(), 20);
573    ///     }
574    /// );
575    /// ```
576    pub fn and_then<P, EN>(self, rhs: ResultHandle<O, P, EN>) -> ResultHandle<I, P, anyhow::Error>
577    where
578        E: Sync + std::error::Error,
579        P: Send + 'static,
580        EN: Sync + Send + std::error::Error + 'static,
581    {
582        let ss = self;
583        let rr = rhs;
584        Handle::new(ss.n).spawn_tokio(Actor::from(move |i| {
585            let ss = ss.clone();
586            let rr = rr.clone();
587            async move {
588                let o = ss.handle(i).await?;
589                let p = rr.handle(o).await?;
590                anyhow::Result::Ok(p)
591            }
592        }))
593    }
594
595    /// `ResultHandle<I, O, E> | Handle<O, P> => ResultHandle<I, P, E>`
596    /// ```rust
597    /// use tokiactor::*;
598    /// use tokio;
599    /// use thiserror;
600    ///
601    /// #[derive(Debug, thiserror::Error)]
602    /// pub enum Error{}
603    ///
604    /// tokio::runtime::Runtime::new().unwrap().block_on(
605    ///     async move {
606    ///         let r = Handle::new(1)
607    ///             .spawn(move |i: i32| {
608    ///                 Result::<i32, Error>::Ok(i+1)
609    ///              })
610    ///             .map(
611    ///                 Handle::new(1).spawn(move |i: i32| {
612    ///                 i*10
613    ///             })
614    ///         ).handle(1).await;
615    ///         assert_eq!(r.unwrap(), 20);
616    ///     }
617    /// );
618    /// ```
619    pub fn map<P>(self, rhs: Handle<O, P>) -> ResultHandle<I, P, E>
620    where
621        P: Send + 'static,
622    {
623        let ss = self;
624        let rr = rhs;
625        Handle::new(ss.n).spawn_tokio(Actor::from(move |i| {
626            let ss = ss.clone();
627            let rr = rr.clone();
628            async move {
629                let o = ss.handle(i).await?;
630                let p = rr.handle(o).await;
631                anyhow::Result::Ok(p)
632            }
633        }))
634    }
635
636    /// `ResultHandle<I, O, E> => OptionHandle<I, O>`
637    /// ```rust
638    /// use tokiactor::*;
639    /// use tokio;
640    /// use thiserror;
641    ///
642    /// #[derive(Debug, thiserror::Error)]
643    /// pub enum Error{}
644    ///
645    /// tokio::runtime::Runtime::new().unwrap().block_on(
646    ///     async move {
647    ///         let r = Handle::new(1)
648    ///             .spawn(move |i: i32| {
649    ///                 Result::<i32, Error>::Ok(i+1)
650    ///              }).ok().handle(1).await;
651    ///         assert_eq!(r.unwrap(), 2);
652    ///     }
653    /// );
654    /// ```
655    pub fn ok(self) -> OptionHandle<I, O> {
656        self.convert(move |r: Result<O, E>| r.ok())
657    }
658}