g_set/
g_set.rs

1/// ```bash
2/// $ cargo build --examples
3/// $ RUST_LOG=debug ~/Projects/maelstrom/maelstrom test -w g-set --bin ./target/debug/examples/g_set --node-count 2 --concurrency 2n --time-limit 20 --rate 10 --log-stderr
4/// ```
5use async_trait::async_trait;
6use log::debug;
7use maelstrom::protocol::Message;
8use maelstrom::{done, Node, Result, Runtime};
9use serde::{Deserialize, Serialize};
10use std::collections::HashSet;
11use std::sync::{Arc, Mutex, MutexGuard};
12use std::time::Duration;
13
14pub(crate) fn main() -> Result<()> {
15    Runtime::init(try_main())
16}
17
18async fn try_main() -> Result<()> {
19    let runtime = Runtime::new();
20    let handler = Arc::new(Handler::default());
21    runtime.with_handler(handler).run().await
22}
23
24#[derive(Clone, Default)]
25struct Handler {
26    s: Arc<Mutex<HashSet<i64>>>,
27}
28
29#[async_trait]
30impl Node for Handler {
31    async fn process(&self, runtime: Runtime, req: Message) -> Result<()> {
32        let msg: Result<Request> = req.body.as_obj();
33        match msg {
34            Ok(Request::Read {}) => {
35                let data = to_seq(&self.s.lock().unwrap());
36                return runtime.reply(req, Request::ReadOk { value: data }).await;
37            }
38            Ok(Request::Add { element }) => {
39                self.s.lock().unwrap().insert(element);
40                return runtime.reply(req, Request::AddOk {}).await;
41            }
42            Ok(Request::ReplicateOne { element }) => {
43                self.s.lock().unwrap().insert(element);
44                return Ok(());
45            }
46            Ok(Request::ReplicateFull { value }) => {
47                let mut s = self.s.lock().unwrap();
48                for v in value {
49                    s.insert(v);
50                }
51                return Ok(());
52            }
53            Ok(Request::Init {}) => {
54                // spawn into tokio (instead of runtime) to not to wait
55                // until it is completed, as it will never be.
56                let (r0, h0) = (runtime.clone(), self.clone());
57                tokio::spawn(async move {
58                    loop {
59                        tokio::time::sleep(Duration::from_secs(5)).await;
60                        debug!("emit replication signal");
61                        let s = h0.s.lock().unwrap();
62                        for n in r0.neighbours() {
63                            let msg = Request::ReplicateFull { value: to_seq(&s) };
64                            drop(r0.send_async(n, msg));
65                        }
66                    }
67                });
68                return Ok(());
69            }
70            _ => done(runtime, req),
71        }
72    }
73}
74
75fn to_seq(s: &MutexGuard<HashSet<i64>>) -> Vec<i64> {
76    s.iter().copied().collect()
77}
78
79#[derive(Serialize, Deserialize)]
80#[serde(rename_all = "snake_case", tag = "type")]
81enum Request {
82    Init {},
83    Read {},
84    ReadOk { value: Vec<i64> },
85    Add { element: i64 },
86    AddOk {},
87    ReplicateOne { element: i64 },
88    ReplicateFull { value: Vec<i64> },
89}