1use async_trait::async_trait;
6use log::debug;
7use maelstrom::protocol::Message;
8use maelstrom::{done, Node, Result, Runtime};
9use serde::{Deserialize, Serialize};
10use std::collections::HashSet;
11use std::sync::{Arc, Mutex, MutexGuard};
12use std::time::Duration;
13
14pub(crate) fn main() -> Result<()> {
15 Runtime::init(try_main())
16}
17
18async fn try_main() -> Result<()> {
19 let runtime = Runtime::new();
20 let handler = Arc::new(Handler::default());
21 runtime.with_handler(handler).run().await
22}
23
24#[derive(Clone, Default)]
25struct Handler {
26 s: Arc<Mutex<HashSet<i64>>>,
27}
28
29#[async_trait]
30impl Node for Handler {
31 async fn process(&self, runtime: Runtime, req: Message) -> Result<()> {
32 let msg: Result<Request> = req.body.as_obj();
33 match msg {
34 Ok(Request::Read {}) => {
35 let data = to_seq(&self.s.lock().unwrap());
36 return runtime.reply(req, Request::ReadOk { value: data }).await;
37 }
38 Ok(Request::Add { element }) => {
39 self.s.lock().unwrap().insert(element);
40 return runtime.reply(req, Request::AddOk {}).await;
41 }
42 Ok(Request::ReplicateOne { element }) => {
43 self.s.lock().unwrap().insert(element);
44 return Ok(());
45 }
46 Ok(Request::ReplicateFull { value }) => {
47 let mut s = self.s.lock().unwrap();
48 for v in value {
49 s.insert(v);
50 }
51 return Ok(());
52 }
53 Ok(Request::Init {}) => {
54 let (r0, h0) = (runtime.clone(), self.clone());
57 tokio::spawn(async move {
58 loop {
59 tokio::time::sleep(Duration::from_secs(5)).await;
60 debug!("emit replication signal");
61 let s = h0.s.lock().unwrap();
62 for n in r0.neighbours() {
63 let msg = Request::ReplicateFull { value: to_seq(&s) };
64 drop(r0.send_async(n, msg));
65 }
66 }
67 });
68 return Ok(());
69 }
70 _ => done(runtime, req),
71 }
72 }
73}
74
75fn to_seq(s: &MutexGuard<HashSet<i64>>) -> Vec<i64> {
76 s.iter().copied().collect()
77}
78
79#[derive(Serialize, Deserialize)]
80#[serde(rename_all = "snake_case", tag = "type")]
81enum Request {
82 Init {},
83 Read {},
84 ReadOk { value: Vec<i64> },
85 Add { element: i64 },
86 AddOk {},
87 ReplicateOne { element: i64 },
88 ReplicateFull { value: Vec<i64> },
89}