pub fn done(runtime: Runtime, message: Message) -> Result<()>Expand description
Returns a result with NotSupported error meaning that Node.process()
is not aware of specific message type or Ok(()) for init.
Example:
use async_trait::async_trait;
use maelstrom::{Node, Runtime, Result, done};
use maelstrom::protocol::Message;
struct Handler {}
#[async_trait]
impl Node for Handler {
async fn process(&self, runtime: Runtime, req: Message) -> Result<()> {
// would skip init and respond with Code == 10 for any other type.
done(runtime, req)
}
}Examples found in repository?
More examples
examples/echo_failure.rs (line 41)
28 async fn process(&self, runtime: Runtime, message: Message) -> Result<()> {
29 if message.get_type() == "echo" {
30 if self.inter.fetch_add(1, Ordering::SeqCst) > 0 {
31 let err = maelstrom::Error::TemporarilyUnavailable {};
32 let body = ErrorMessageBody::from_error(err);
33 return runtime.reply(message, body).await;
34 }
35
36 let echo = format!("Another echo {}", message.body.msg_id);
37 let msg = Value::Object(Map::from_iter([("echo".to_string(), Value::String(echo))]));
38 return runtime.reply(message, msg).await;
39 }
40
41 done(runtime, message)
42 }examples/lin_kv.rs (line 46)
30 async fn process(&self, runtime: Runtime, req: Message) -> Result<()> {
31 let (ctx, _handler) = Context::new();
32 let msg: Result<Request> = req.body.as_obj();
33 match msg {
34 Ok(Request::Read { key }) => {
35 let value = self.s.get(ctx, key.to_string()).await?;
36 return runtime.reply(req, Request::ReadOk { value }).await;
37 }
38 Ok(Request::Write { key, value }) => {
39 self.s.put(ctx, key.to_string(), value).await?;
40 return runtime.reply(req, Request::WriteOk {}).await;
41 }
42 Ok(Request::Cas { key, from, to, put }) => {
43 self.s.cas(ctx, key.to_string(), from, to, put).await?;
44 return runtime.reply(req, Request::CasOk {}).await;
45 }
46 _ => done(runtime, req),
47 }
48 }examples/broadcast.rs (line 59)
35 async fn process(&self, runtime: Runtime, req: Message) -> Result<()> {
36 let msg: Result<Request> = req.body.as_obj();
37 match msg {
38 Ok(Request::Read {}) => {
39 let data = self.snapshot();
40 let msg = Request::ReadOk { messages: data };
41 return runtime.reply(req, msg).await;
42 }
43 Ok(Request::Broadcast { message: element }) => {
44 if self.try_add(element) {
45 info!("messages now {}", element);
46 for node in runtime.neighbours() {
47 runtime.call_async(node, Request::Broadcast { message: element });
48 }
49 }
50
51 return runtime.reply_ok(req).await;
52 }
53 Ok(Request::Topology { topology }) => {
54 let neighbours = topology.get(runtime.node_id()).unwrap();
55 self.inner.lock().unwrap().t = neighbours.clone();
56 info!("My neighbors are {:?}", neighbours);
57 return runtime.reply_ok(req).await;
58 }
59 _ => done(runtime, req),
60 }
61 }examples/g_set.rs (line 70)
31 async fn process(&self, runtime: Runtime, req: Message) -> Result<()> {
32 let msg: Result<Request> = req.body.as_obj();
33 match msg {
34 Ok(Request::Read {}) => {
35 let data = to_seq(&self.s.lock().unwrap());
36 return runtime.reply(req, Request::ReadOk { value: data }).await;
37 }
38 Ok(Request::Add { element }) => {
39 self.s.lock().unwrap().insert(element);
40 return runtime.reply(req, Request::AddOk {}).await;
41 }
42 Ok(Request::ReplicateOne { element }) => {
43 self.s.lock().unwrap().insert(element);
44 return Ok(());
45 }
46 Ok(Request::ReplicateFull { value }) => {
47 let mut s = self.s.lock().unwrap();
48 for v in value {
49 s.insert(v);
50 }
51 return Ok(());
52 }
53 Ok(Request::Init {}) => {
54 // spawn into tokio (instead of runtime) to not to wait
55 // until it is completed, as it will never be.
56 let (r0, h0) = (runtime.clone(), self.clone());
57 tokio::spawn(async move {
58 loop {
59 tokio::time::sleep(Duration::from_secs(5)).await;
60 debug!("emit replication signal");
61 let s = h0.s.lock().unwrap();
62 for n in r0.neighbours() {
63 let msg = Request::ReplicateFull { value: to_seq(&s) };
64 drop(r0.send_async(n, msg));
65 }
66 }
67 });
68 return Ok(());
69 }
70 _ => done(runtime, req),
71 }
72 }