async_accumulator/
async_accumulator.rs1use async_trait::async_trait;
2use std::time::Duration;
3use tokio::time::sleep;
4use tractor::prelude::*;
5
6struct Accumulator {
7 sum: usize,
8}
9
10enum AccumulatorMsg {
11 Add { a: usize },
12 Sub { a: usize },
13}
14
15impl Actor for Accumulator {
16 type Msg = AccumulatorMsg;
17}
18
19#[async_trait]
20impl ActorBehaviorAsync for Accumulator {
21 async fn handle(&mut self, msg: AccumulatorMsg, _: &Context<Self>) {
22 match msg {
23 AccumulatorMsg::Add { a } => self.sum += a,
24 AccumulatorMsg::Sub { a } => self.sum -= a,
25 }
26 }
27}
28
29#[async_trait]
30impl ActorHooksAsync for Accumulator {
31 async fn stopped(&mut self) {
32 println!("Final sum: {}", self.sum);
33 }
34}
35
36#[derive(Clone)]
37struct Accum(Addr<Accumulator>);
38
39impl Accum {
40 fn overloaded(&self) -> bool {
41 self.0.len() > 1000
42 }
43
44 fn add(&self, a: usize) {
45 self.0.send(AccumulatorMsg::Add { a });
46 }
47
48 fn sub(&self, a: usize) {
49 self.0.send(AccumulatorMsg::Sub { a });
50 }
51}
52
53struct Feeder;
54
55impl Actor for Feeder {
56 type Msg = (usize, Accum);
57}
58
59#[async_trait]
60impl ActorBehaviorAsync for Feeder {
61 async fn handle(&mut self, (num, addr): (usize, Accum), _: &Context<Self>) {
62 for _i in 0..num {
63 while addr.overloaded() {
64 sleep(Duration::from_millis(1)).await;
65 }
67 addr.add(1);
68 }
69 }
70}
71
72#[async_trait]
73impl ActorHooksAsync for Feeder {}
74
75fn run() {
76 let accum = Accum(Accumulator { sum: 0 }.start());
77 let feeder = Feeder.start();
78
79 feeder.send((1_000_000, accum));
80}
81
82fn main() {
83 ActorSystem::run_to_completion(run);
84}