tokiactor/lib.rs
1#![doc = include_str!("../README.md")]
2
3use anyhow::Context;
4use futures::Future;
5use std::marker::PhantomData;
6
7/// Actor to wrap function
8pub struct Actor<I, O, FN>
9where
10 FN: FnMut(I) -> O,
11{
12 /// Function wrapped in actor
13 pub f: FN,
14 _i: PhantomData<I>,
15 _o: PhantomData<O>,
16}
17
18impl<I, O, FN> Clone for Actor<I, O, FN>
19where
20 FN: FnMut(I) -> O + Clone,
21{
22 fn clone(&self) -> Self {
23 Self {
24 f: self.f.clone(),
25 _i: self._i,
26 _o: self._o,
27 }
28 }
29}
30
31impl<I, O, FN> From<FN> for Actor<I, O, FN>
32where
33 FN: FnMut(I) -> O,
34{
35 fn from(f: FN) -> Self {
36 Self {
37 f,
38 _i: PhantomData,
39 _o: PhantomData,
40 }
41 }
42}
43
44/// Channel to communicated with `Actor` instances
45pub struct Handle<I, O> {
46 pub n: usize,
47 tx: async_channel::Sender<(I, tokio::sync::oneshot::Sender<O>)>,
48 rx: async_channel::Receiver<(I, tokio::sync::oneshot::Sender<O>)>,
49}
50
51impl<I, O> Clone for Handle<I, O> {
52 fn clone(&self) -> Self {
53 Self {
54 n: self.n,
55 tx: self.tx.clone(),
56 rx: self.rx.clone(),
57 }
58 }
59}
60
61/// Error container for channel communication
62#[derive(Debug, thiserror::Error)]
63pub enum HandleError<T> {
64 Send(#[from] async_channel::SendError<T>),
65 Recv(#[from] tokio::sync::oneshot::error::RecvError),
66}
67
68/// A trait for inspecting input and output values.
69pub trait Inspector<I, O> {
70 /// Inspects the input value and returns it.
71 fn inspect_i(&mut self, i: I) -> I {
72 i
73 }
74
75 /// Inspects the output value and returns it.
76 fn inspect_o(&mut self, o: O) -> O {
77 o
78 }
79}
80
81impl<I, O> Handle<I, O>
82where
83 I: Send + 'static,
84 O: Send + 'static,
85{
86 /// create a new Handle with `n` of channel depth
87 /// ```rust
88 /// use tokiactor::*;
89 ///
90 /// let handle = Handle::<i32, i32>::new(1);
91 /// ```
92 pub fn new(n: usize) -> Self {
93 let (tx, rx) = match n {
94 0 => async_channel::unbounded(),
95 _ => async_channel::bounded(n),
96 };
97 Self { n, tx, rx }
98 }
99
100 /// spawn `Actor` with type impl `Into<Actor<I, O, FN>>` in system thread
101 /// ```rust
102 /// use tokiactor::*;
103 /// let handle = Handle::new(1).spawn(|i: i32| i + 1);
104 /// ```
105 pub fn spawn<FN, IntoActor>(self, actor: IntoActor) -> Handle<I, O>
106 where
107 FN: FnMut(I) -> O + Send + 'static,
108 IntoActor: Into<Actor<I, O, FN>>,
109 {
110 let rx = self.rx.clone();
111 let mut actor = actor.into();
112 std::thread::spawn(move || {
113 while let Ok((i, tx)) = rx.recv_blocking() {
114 if tx.send((actor.f)(i)).is_err() {
115 break;
116 }
117 }
118 });
119 self
120 }
121
122 /// spawn `n` `Actor` with type impl `Into<Actor<I, O, FN>>` in system threads when `Actor` is `Clone`
123 /// ```rust
124 /// use tokiactor::*;
125 /// let handle = Handle::new(1).spawn_n(10, |i: i32| i + 1);
126 /// ```
127 pub fn spawn_n<FN, IntoActor>(self, n: usize, actor: IntoActor) -> Handle<I, O>
128 where
129 FN: FnMut(I) -> O + Send + 'static,
130 IntoActor: Into<Actor<I, O, FN>>,
131 Actor<I, O, FN>: Clone,
132 {
133 let rx = self.rx.clone();
134 let actor = actor.into();
135 for _ in 0..n {
136 let mut actor = actor.clone();
137 let rx = rx.clone();
138 std::thread::spawn(move || {
139 while let Ok((i, tx)) = rx.recv_blocking() {
140 if tx.send((actor.f)(i)).is_err() {
141 break;
142 }
143 }
144 });
145 }
146 self
147 }
148
149 /// spawn async `Actor` with type impl `Into<Actor<I, O, FU, FN>>` in tokio thread
150 /// ```rust
151 /// use tokiactor::*;
152 /// use tokio;
153 /// tokio::runtime::Runtime::new().unwrap().block_on(
154 /// async move {
155 /// let r = Handle::new(1)
156 /// .spawn_tokio(move |i: i32| async move {i + 41})
157 /// .handle(1)
158 /// .await;
159 /// assert_eq!(r, 42)
160 /// }
161 /// );
162 /// ```
163 pub fn spawn_tokio<FN, FU, IntoActor>(self, actor: IntoActor) -> Handle<I, O>
164 where
165 FN: FnMut(I) -> FU + Send + 'static,
166 FU: Future<Output = O> + Send + 'static,
167 IntoActor: Into<Actor<I, FU, FN>>,
168 {
169 let rx = self.rx.clone();
170 let mut actor = actor.into();
171 tokio::spawn(async move {
172 let err = std::sync::Arc::from(std::sync::atomic::AtomicBool::new(false));
173 while let Ok((i, tx)) = rx.recv().await {
174 if err.load(std::sync::atomic::Ordering::SeqCst) {
175 break;
176 }
177 let o = (actor.f)(i);
178 let err = err.clone();
179 tokio::spawn(async move {
180 let o = o.await;
181 if tx.send(o).is_err() {
182 err.store(true, std::sync::atomic::Ordering::SeqCst);
183 }
184 });
185 }
186 });
187 self
188 }
189
190 /// Handle input
191 /// ```rust
192 /// use tokiactor::*;
193 /// use tokio;
194 /// tokio::runtime::Runtime::new().unwrap().block_on(
195 /// async move {
196 /// let r = Handle::new(1)
197 /// .spawn_tokio(move |i: i32| async move {i + 41})
198 /// .handle(1)
199 /// .await;
200 /// }
201 /// );
202 /// ```
203 pub async fn handle(&self, i: I) -> O {
204 let (t, r) = tokio::sync::oneshot::channel::<O>();
205 self.tx.send((i, t)).await.expect("failed to send to actor");
206 r.await.expect("failed to recv from actor")
207 }
208
209 /// Try handle input, return error when actor is dead...
210 /// ```rust
211 /// use tokiactor::*;
212 /// use tokio;
213 /// tokio::runtime::Runtime::new().unwrap().block_on(
214 /// async move {
215 /// Handle::new(1)
216 /// .spawn_tokio(move |i: i32| async move {i + 41})
217 /// .try_handle(1)
218 /// .await;
219 /// }
220 /// );
221 /// ```
222 pub async fn try_handle(
223 &self,
224 i: I,
225 ) -> Result<O, HandleError<(I, tokio::sync::oneshot::Sender<O>)>> {
226 let (t, r) = tokio::sync::oneshot::channel::<O>();
227 self.tx.send((i, t)).await?;
228 Ok(r.await?)
229 }
230
231 /// Inspect Handle excution, check `Inspector` trait for more info
232 /// ```rust
233 /// use tokiactor::*;
234 /// use tokio;
235 /// use std::time::Instant;
236 ///
237 /// #[derive(Clone)]
238 /// struct Timer(Instant);
239 ///
240 /// impl Inspector<i32, i32> for Timer
241 /// {
242 /// fn inspect_i(&mut self, i: i32) -> i32
243 /// {
244 /// self.0 = Instant::now();
245 /// i
246 /// }
247 ///
248 /// fn inspect_o(&mut self, o: i32) -> i32
249 /// {
250 /// let delta = self.0.elapsed();
251 /// // get execution time, do logging...
252 /// dbg!(delta);
253 /// o
254 /// }
255 /// }
256 ///
257 /// tokio::runtime::Runtime::new().unwrap().block_on(
258 /// async move {
259 /// Handle::new(1)
260 /// .spawn_tokio(move |i: i32| async move {i + 41})
261 /// .inspect(Timer(Instant::now()))
262 /// .try_handle(1)
263 /// .await;
264 /// }
265 /// );
266 ///
267 /// ```
268 pub fn inspect<INS>(self, ins: INS) -> Handle<I, O>
269 where
270 INS: Inspector<I, O> + Send + 'static + Clone,
271 {
272 let ss = self;
273 Handle::new(ss.n).spawn_tokio(Actor::from(move |i: I| {
274 let ss = ss.clone();
275 let mut ins = ins.clone();
276 async move {
277 let i = ins.inspect_i(i);
278 let o = ss.handle(i).await;
279 ins.inspect_o(o)
280 }
281 }))
282 }
283
284 ///convert `Handle<I, O>` to `Handle<I, P>` with `Fn(O)->P`
285 /// ```rust
286 /// use tokiactor::*;
287 /// use tokio;
288 ///
289 /// tokio::runtime::Runtime::new().unwrap().block_on(
290 /// async move {
291 /// Handle::new(1)
292 /// .spawn(move |i: i32| i + 1)
293 /// .convert(|v| v as f32);
294 /// }
295 /// )
296 /// ```
297 pub fn convert<F, P>(self, f: F) -> Handle<I, P>
298 where
299 P: Send + 'static,
300 F: Fn(O) -> P + Send + 'static + Clone,
301 {
302 let n = self.n;
303 self.then(Handle::new(n).spawn_tokio(move |i| {
304 let f = f.clone();
305 async move { f(i) }
306 }))
307 }
308
309 ///`Handle<I, O> + Handle<O, P> => Handle<I, P>`
310 /// ```rust
311 /// use tokiactor::*;
312 /// use tokio;
313 ///
314 /// tokio::runtime::Runtime::new().unwrap().block_on(
315 /// async move {
316 /// let r = Handle::new(1)
317 /// .spawn(move |i: i32| i + 1)
318 /// .then(Handle::new(1).spawn(move |i: i32| i * 10)).handle(1).await;
319 /// assert_eq!(r, 20);
320 /// }
321 /// )
322 /// ```
323 pub fn then<P>(self, rhs: Handle<O, P>) -> Handle<I, P>
324 where
325 P: Send + 'static,
326 {
327 let ss = self;
328 let rr = rhs;
329 Handle::new(ss.n).spawn_tokio(move |i| {
330 let ss = ss.clone();
331 let rr = rr.clone();
332 async move {
333 let o = ss.handle(i).await;
334 rr.handle(o).await
335 }
336 })
337 }
338
339 /// `Handle<I, O> | Handle<U, V> => Handle<(I,U),(O,V)>`
340 /// ```rust
341 /// use tokiactor::*;
342 /// use tokio;
343 ///
344 /// tokio::runtime::Runtime::new().unwrap().block_on(
345 /// async move {
346 /// let r = Handle::new(1)
347 /// .spawn(move |i: i32| i + 1)
348 /// .join(Handle::new(1).spawn(move |i: i32| i * 10)).handle((2, 1)).await;
349 /// assert_eq!(r, (3, 10));
350 /// }
351 /// )
352 /// ```
353 pub fn join<U, V>(self, rhs: Handle<U, V>) -> Handle<(I, U), (O, V)>
354 where
355 U: Send + 'static,
356 V: Send + 'static,
357 {
358 let ss = self;
359 let rr = rhs;
360 Handle::new(ss.n.max(rr.n)).spawn_tokio(Actor::from(move |(i, u)| {
361 let ss = ss.clone();
362 let rr = rr.clone();
363 async move { futures::join!(ss.handle(i), rr.handle(u)) }
364 }))
365 }
366}
367
368/// Handle return value is an `Option`
369pub type OptionHandle<I, O> = Handle<I, Option<O>>;
370
371impl<I, O> OptionHandle<I, O>
372where
373 I: Send + 'static,
374 O: Send + 'static,
375{
376 /// `OptionHandle<I, O> | OptionHandle<U, V> => OptionHandle<(I,U), (O,V)>`
377 /// ```rust
378 /// use tokiactor::*;
379 /// use tokio;
380 ///
381 /// tokio::runtime::Runtime::new().unwrap().block_on(
382 /// async move {
383 /// let r = Handle::new(1)
384 /// .spawn(move |i: i32| Some(i + 1))
385 /// .and(Handle::new(1).spawn(move |i: i32| Some(i * 10))).handle((1, 1)).await;
386 /// assert_eq!(r, Some((2, 10)));
387 /// }
388 /// )
389 /// ```
390 pub fn and<U, V>(self, rhs: OptionHandle<U, V>) -> OptionHandle<(I, U), (O, V)>
391 where
392 U: Send + 'static,
393 V: Send + 'static,
394 {
395 self.join(rhs)
396 .convert(|(o, v): (Option<O>, Option<V>)| match (o, v) {
397 (Some(o), Some(v)) => Some((o, v)),
398 _ => None,
399 })
400 }
401
402 /// `OptionHandle<I, O> + OptionHandle<O, P> => OptionHandle<I, P>`
403 /// ```rust
404 /// use tokiactor::*;
405 /// use tokio;
406 ///
407 /// tokio::runtime::Runtime::new().unwrap().block_on(
408 /// async move {
409 /// let r = Handle::new(1)
410 /// .spawn(move |i: i32| Some(i + 1))
411 /// .and_then(Handle::new(1).spawn(move |i: i32| Some(i * 10))).handle(1).await;
412 /// assert_eq!(r, Some(20));
413 /// }
414 /// )
415 /// ```
416 pub fn and_then<P>(self, rhs: OptionHandle<O, P>) -> OptionHandle<I, P>
417 where
418 P: Send + 'static,
419 {
420 let ss = self;
421 let rr = rhs;
422 Handle::new(ss.n).spawn_tokio(Actor::from(move |i| {
423 let ss = ss.clone();
424 let rr = rr.clone();
425 async move {
426 let o = ss.handle(i).await;
427 match o {
428 Some(o) => rr.handle(o).await,
429 None => None,
430 }
431 }
432 }))
433 }
434
435 /// `OptionHandle<I, O> + Handle<O, P> => OptionHandle<I, P>`
436 /// ```rust
437 /// use tokiactor::*;
438 /// use tokio;
439 ///
440 /// tokio::runtime::Runtime::new().unwrap().block_on(
441 /// async move {
442 /// let r = Handle::new(1)
443 /// .spawn(move |i: i32| Some(i + 1))
444 /// .map(Handle::new(1).spawn(move |i: i32| i * 10)).handle(1).await;
445 /// assert_eq!(r, Some(20));
446 /// }
447 /// )
448 /// ```
449 pub fn map<P>(self, rhs: Handle<O, P>) -> OptionHandle<I, P>
450 where
451 P: Send + 'static,
452 {
453 let ss = self;
454 let rr = rhs;
455 Handle::new(ss.n).spawn_tokio(Actor::from(move |i| {
456 let ss = ss.clone();
457 let rr = rr.clone();
458 async move {
459 let o = ss.handle(i).await;
460 match o {
461 Some(o) => Some(rr.handle(o).await),
462 None => None,
463 }
464 }
465 }))
466 }
467
468 /// convert `OptionHandle` to `ResultHandle`
469 /// ```rust
470 /// use tokiactor::*;
471 /// use tokio;
472 ///
473 /// tokio::runtime::Runtime::new().unwrap().block_on(
474 /// async move {
475 /// let r = Handle::new(1)
476 /// .spawn(move |i: i32| Some(i + 1))
477 /// .ok_or("null error").handle(1).await;
478 /// assert_eq!(r.unwrap(), 2);
479 /// }
480 /// )
481 /// ```
482 pub fn ok_or<C: ToString>(self, context: C) -> ResultHandle<I, O, anyhow::Error> {
483 let context = context.to_string();
484 self.convert(move |r: Option<O>| r.context(context.clone()))
485 }
486}
487
488/// Handle return value is a `Result`
489pub type ResultHandle<I, O, E> = Handle<I, Result<O, E>>;
490
491/// Chain error together when connecto two `ResultHandle` together
492#[derive(Debug, thiserror::Error)]
493#[error("e={0}, en={1}")]
494pub struct ChainError<E, EN>(E, EN);
495
496unsafe impl<E, EN> Sync for ChainError<E, EN>
497where
498 E: Sync,
499 EN: Sync,
500{
501}
502
503impl<I, O, E> ResultHandle<I, O, E>
504where
505 I: Send + 'static,
506 O: Send + 'static,
507 E: Send + 'static,
508{
509 /// `ResultHandle<I, O, E> + ResultHandle<U, V, EN> => ResultHandle<(I, U), (O, V), anyhow::Error>`
510 /// ```rust
511 /// use tokiactor::*;
512 /// use tokio;
513 /// use thiserror;
514 ///
515 /// #[derive(Debug, thiserror::Error)]
516 /// pub enum Error{}
517 ///
518 /// tokio::runtime::Runtime::new().unwrap().block_on(
519 /// async move {
520 /// let r = Handle::new(1)
521 /// .spawn(move |i: i32| {
522 /// Result::<i32, Error>::Ok(i+1)
523 /// })
524 /// .and(
525 /// Handle::new(1).spawn(move |i: i32| {
526 /// Result::<i32, Error>::Ok(i*1)
527 /// })
528 /// ).handle((1, 1)).await;
529 /// assert_eq!(r.unwrap(), (2, 1));
530 /// }
531 /// );
532 /// ```
533 pub fn and<U, V, EN>(
534 self,
535 rhs: ResultHandle<U, V, EN>,
536 ) -> ResultHandle<(I, U), (O, V), anyhow::Error>
537 where
538 E: Sync + std::error::Error,
539 U: Send + 'static,
540 V: Send + 'static,
541 EN: Sync + Send + std::error::Error + 'static,
542 {
543 self.join(rhs)
544 .convert(|(o, v): (Result<O, E>, Result<V, EN>)| match (o, v) {
545 (Ok(o), Ok(v)) => Ok((o, v)),
546 (Err(e), Err(en)) => Err(anyhow::Error::new(ChainError(e, en))),
547 (Err(e), _) => Err(anyhow::Error::new(e)),
548 (_, Err(e)) => Err(anyhow::Error::new(e)),
549 })
550 }
551
552 /// `ResultHandle<I, O, E> | ResultHandle<O, P, EN> => ResultHandle<I, P, anyhow::Error>`
553 /// ```rust
554 /// use tokiactor::*;
555 /// use tokio;
556 /// use thiserror;
557 ///
558 /// #[derive(Debug, thiserror::Error)]
559 /// pub enum Error{}
560 ///
561 /// tokio::runtime::Runtime::new().unwrap().block_on(
562 /// async move {
563 /// let r = Handle::new(1)
564 /// .spawn(move |i: i32| {
565 /// Result::<i32, Error>::Ok(i+1)
566 /// })
567 /// .and_then(
568 /// Handle::new(1).spawn(move |i: i32| {
569 /// Result::<i32, Error>::Ok(i*10)
570 /// })
571 /// ).handle(1).await;
572 /// assert_eq!(r.unwrap(), 20);
573 /// }
574 /// );
575 /// ```
576 pub fn and_then<P, EN>(self, rhs: ResultHandle<O, P, EN>) -> ResultHandle<I, P, anyhow::Error>
577 where
578 E: Sync + std::error::Error,
579 P: Send + 'static,
580 EN: Sync + Send + std::error::Error + 'static,
581 {
582 let ss = self;
583 let rr = rhs;
584 Handle::new(ss.n).spawn_tokio(Actor::from(move |i| {
585 let ss = ss.clone();
586 let rr = rr.clone();
587 async move {
588 let o = ss.handle(i).await?;
589 let p = rr.handle(o).await?;
590 anyhow::Result::Ok(p)
591 }
592 }))
593 }
594
595 /// `ResultHandle<I, O, E> | Handle<O, P> => ResultHandle<I, P, E>`
596 /// ```rust
597 /// use tokiactor::*;
598 /// use tokio;
599 /// use thiserror;
600 ///
601 /// #[derive(Debug, thiserror::Error)]
602 /// pub enum Error{}
603 ///
604 /// tokio::runtime::Runtime::new().unwrap().block_on(
605 /// async move {
606 /// let r = Handle::new(1)
607 /// .spawn(move |i: i32| {
608 /// Result::<i32, Error>::Ok(i+1)
609 /// })
610 /// .map(
611 /// Handle::new(1).spawn(move |i: i32| {
612 /// i*10
613 /// })
614 /// ).handle(1).await;
615 /// assert_eq!(r.unwrap(), 20);
616 /// }
617 /// );
618 /// ```
619 pub fn map<P>(self, rhs: Handle<O, P>) -> ResultHandle<I, P, E>
620 where
621 P: Send + 'static,
622 {
623 let ss = self;
624 let rr = rhs;
625 Handle::new(ss.n).spawn_tokio(Actor::from(move |i| {
626 let ss = ss.clone();
627 let rr = rr.clone();
628 async move {
629 let o = ss.handle(i).await?;
630 let p = rr.handle(o).await;
631 anyhow::Result::Ok(p)
632 }
633 }))
634 }
635
636 /// `ResultHandle<I, O, E> => OptionHandle<I, O>`
637 /// ```rust
638 /// use tokiactor::*;
639 /// use tokio;
640 /// use thiserror;
641 ///
642 /// #[derive(Debug, thiserror::Error)]
643 /// pub enum Error{}
644 ///
645 /// tokio::runtime::Runtime::new().unwrap().block_on(
646 /// async move {
647 /// let r = Handle::new(1)
648 /// .spawn(move |i: i32| {
649 /// Result::<i32, Error>::Ok(i+1)
650 /// }).ok().handle(1).await;
651 /// assert_eq!(r.unwrap(), 2);
652 /// }
653 /// );
654 /// ```
655 pub fn ok(self) -> OptionHandle<I, O> {
656 self.convert(move |r: Result<O, E>| r.ok())
657 }
658}