async_accumulator/
async_accumulator.rs

1use async_trait::async_trait;
2use std::time::Duration;
3use tokio::time::sleep;
4use tractor::prelude::*;
5
6struct Accumulator {
7    sum: usize,
8}
9
10enum AccumulatorMsg {
11    Add { a: usize },
12    Sub { a: usize },
13}
14
15impl Actor for Accumulator {
16    type Msg = AccumulatorMsg;
17}
18
19#[async_trait]
20impl ActorBehaviorAsync for Accumulator {
21    async fn handle(&mut self, msg: AccumulatorMsg, _: &Context<Self>) {
22        match msg {
23            AccumulatorMsg::Add { a } => self.sum += a,
24            AccumulatorMsg::Sub { a } => self.sum -= a,
25        }
26    }
27}
28
29#[async_trait]
30impl ActorHooksAsync for Accumulator {
31    async fn stopped(&mut self) {
32        println!("Final sum: {}", self.sum);
33    }
34}
35
36#[derive(Clone)]
37struct Accum(Addr<Accumulator>);
38
39impl Accum {
40    fn overloaded(&self) -> bool {
41        self.0.len() > 1000
42    }
43
44    fn add(&self, a: usize) {
45        self.0.send(AccumulatorMsg::Add { a });
46    }
47
48    fn sub(&self, a: usize) {
49        self.0.send(AccumulatorMsg::Sub { a });
50    }
51}
52
53struct Feeder;
54
55impl Actor for Feeder {
56    type Msg = (usize, Accum);
57}
58
59#[async_trait]
60impl ActorBehaviorAsync for Feeder {
61    async fn handle(&mut self, (num, addr): (usize, Accum), _: &Context<Self>) {
62        for _i in 0..num {
63            while addr.overloaded() {
64                sleep(Duration::from_millis(1)).await;
65                // println!("Overload");
66            }
67            addr.add(1);
68        }
69    }
70}
71
72#[async_trait]
73impl ActorHooksAsync for Feeder {}
74
75fn run() {
76    let accum = Accum(Accumulator { sum: 0 }.start());
77    let feeder = Feeder.start();
78
79    feeder.send((1_000_000, accum));
80}
81
82fn main() {
83    ActorSystem::run_to_completion(run);
84}