async_maelstrom/
msg.rs

1//! Maelstrom [network message protocol](https://github.com/jepsen-io/maelstrom/blob/main/doc/protocol.md#messages)
2//!
3//! A message Maelstrom workload client message can be created as follows
4//! ```no_compile_
5//! use async_maelstrom::msg::Msg;
6//! use async_maelstrom::msg::Body::Echo;
7//!
8//! // Receive an echo request
9//! let request = recv();
10//! if let Msg {
11//!     src: client_id,
12//!     body: Client(Echo {msg_id, echo}),
13//!     ..
14//! } = request {
15//!     // Create an echo response
16//!     let node_id = "n1".to_string();
17//!     let response: Msg<()> = Msg {
18//!         src: node_id,
19//!         dest: client_id,
20//!         body: Echo(EchoOk {
21//!             in_reply_to: msg_id,
22//!             msg_id: Some(5),
23//!             echo,
24//!     })};
25//!     send(response);
26//! }
27//! ```
28use std::fmt::Debug;
29
30#[cfg(test)]
31use serde::de::DeserializeOwned;
32use serde::{Deserialize, Serialize};
33#[cfg(test)]
34use serde_json::json;
35use serde_json::Value;
36
37#[cfg(test)]
38use crate::msg::Body::Application;
39#[cfg(test)]
40use crate::msg::Body::Workload;
41use crate::{ErrorCode, Id};
42
43/// Maelstrom network [message](https://github.com/jepsen-io/maelstrom/blob/main/doc/protocol.md#messages)
44///
45/// A message envelope containing
46/// - source node identifier,
47/// - destination node identifier,
48/// - and body
49///
50/// Maelstrom defined bodies have a `type` field. Inter node message may have a `type` field,
51/// populated with their specified message type value.
52///
53/// Messages are parameterized on a workload body type, `W`, and an application body type `A`.
54/// The workload parameter is required to disambiguate Maelstrom messages when deserializing into the
55/// target Rust type.
56///
57/// Parameters
58/// - `W` the workload body type, e.g. [Echo]
59/// - `A` the application body type
60#[derive(Deserialize, Serialize, Debug, Eq, PartialEq)]
61pub struct Msg<W, A> {
62    pub src: Id,
63    pub dest: Id,
64    pub body: Body<W, A>,
65}
66
67#[derive(Deserialize, Serialize, Debug, Eq, PartialEq)]
68#[serde(untagged)]
69pub enum Body<W, A> {
70    /// An application defined node-to-node message
71    ///
72    /// From the Maelstrom [message documentation](https://github.com/jepsen-io/maelstrom/blob/main/doc/protocol.md#messages)
73    /// > Messages exchanged between your server nodes may have any body
74    ///   structure you like; you are not limited to request-response, and may
75    ///   invent any message semantics you choose. If some of your messages do
76    ///   use the body format described above, Maelstrom can help generate useful
77    ///   visualizations and statistics for those messages.
78    Application(A),
79    Error(Error),
80    Init(Init),
81    Workload(W),
82}
83
84/// Maelstrom [client message body](https://github.com/jepsen-io/maelstrom/blob/main/doc/protocol.md#message-bodies)
85#[derive(Deserialize, Serialize, Debug, Eq, PartialEq)]
86#[serde(tag = "type")]
87pub enum Echo {
88    #[serde(rename = "echo")]
89    Echo { msg_id: MsgId, echo: Value },
90    #[serde(rename = "echo_ok")]
91    EchoOk {
92        in_reply_to: MsgId,
93        #[serde(skip_serializing_if = "Option::is_none")]
94        msg_id: Option<MsgId>,
95        echo: Value,
96    },
97}
98
99/// Maelstrom [errors](https://github.com/jepsen-io/maelstrom/blob/main/doc/protocol.md#errors)
100#[derive(Deserialize, Serialize, Debug, Eq, PartialEq)]
101#[serde(tag = "type")]
102#[serde(rename = "error")]
103pub struct Error {
104    pub in_reply_to: MsgId,
105    pub code: ErrorCode,
106    pub text: String,
107}
108
109/// Maelstrom node [initialization](https://github.com/jepsen-io/maelstrom/blob/main/doc/protocol.md#initialization)
110#[derive(Deserialize, Serialize, Debug, Eq, PartialEq)]
111#[serde(tag = "type")]
112pub enum Init {
113    #[serde(rename = "init")]
114    Init {
115        msg_id: MsgId,
116        node_id: Id,
117        node_ids: Vec<Id>,
118    },
119    #[serde(rename = "init_ok")]
120    InitOk { in_reply_to: MsgId, msg_id: MsgId },
121}
122
123/// Maelstrom [Lin-kv workload messages](https://github.com/jepsen-io/maelstrom/blob/main/doc/workloads.md#workload-lin-kv)
124#[derive(Deserialize, Serialize, Debug, Eq, PartialEq)]
125#[serde(tag = "type")]
126pub enum LinKv {
127    #[serde(rename = "cas")]
128    Cas {
129        msg_id: MsgId,
130        key: Key,
131        from: Val,
132        to: Val,
133    },
134    #[serde(rename = "cas_ok")]
135    CasOk {
136        in_reply_to: MsgId,
137        #[serde(skip_serializing_if = "Option::is_none")]
138        msg_id: Option<MsgId>,
139    },
140    #[serde(rename = "read")]
141    Read { msg_id: MsgId, key: Key },
142    #[serde(rename = "read_ok")]
143    ReadOk {
144        in_reply_to: MsgId,
145        #[serde(skip_serializing_if = "Option::is_none")]
146        msg_id: Option<MsgId>,
147        value: Val,
148    },
149    #[serde(rename = "write")]
150    Write { msg_id: MsgId, key: Key, value: Val },
151    #[serde(rename = "write_ok")]
152    WriteOk { in_reply_to: MsgId },
153}
154
155/// Maelstrom [Lin-kv workload messages](https://github.com/jepsen-io/maelstrom/blob/main/doc/workloads.md#workload-pn-counter)
156#[derive(Deserialize, Serialize, Debug, Eq, PartialEq)]
157#[serde(tag = "type")]
158pub enum PnCounter {
159    #[serde(rename = "add")]
160    Add { msg_id: MsgId, delta: i64 },
161    #[serde(rename = "read")]
162    Read { msg_id: MsgId },
163    #[serde(rename = "read_ok")]
164    ReadOk {
165        in_reply_to: MsgId,
166        #[serde(skip_serializing_if = "Option::is_none")]
167        msg_id: Option<MsgId>,
168        value: i64,
169    },
170}
171
172/// Maelstrom [message ID](https://github.com/jepsen-io/maelstrom/blob/main/doc/protocol.md#message-bodies)
173pub type MsgId = u64;
174
175/// Maelstrom [Lin-kv workload key](https://github.com/jepsen-io/maelstrom/blob/main/doc/workloads.md#workload-lin-kv)
176pub type Key = Value;
177
178/// Maelstrom [Lin-kv workload value](https://github.com/jepsen-io/maelstrom/blob/main/doc/workloads.md#workload-lin-kv)
179pub type Val = Value;
180
181#[test]
182fn serde_cas_msg() {
183    let buf = r#"{"dest":"n1","body":{"key":0,"from":4,"to":2,"type":"cas","msg_id":1},"src":"c11","id":11}"#;
184    let msg: Msg<LinKv, ()> = serde_json::from_str(&buf).expect("message");
185    if let Msg {
186        src,
187        dest,
188        body:
189            Workload(LinKv::Cas {
190                msg_id,
191                key,
192                from,
193                to,
194            }),
195    } = &msg
196    {
197        assert_eq!(dest, "n1");
198        assert_eq!(src, "c11");
199        assert_eq!(key, &json!(0));
200        assert_eq!(from, &json!(4));
201        assert_eq!(to, &json!(2));
202        assert_eq!(*msg_id, 1);
203    } else {
204        panic!("expected cas message")
205    }
206    assert_serde_preserves_identity(&msg);
207}
208
209#[test]
210fn serde_cas_ok_msg() {
211    let buf = r#"{"dest":"n1","body":{ "type": "cas_ok", "in_reply_to": 1 },"src":"c11","id":11}"#;
212    let msg: Msg<LinKv, ()> = serde_json::from_str(&buf).expect("message");
213    if let Msg {
214        src,
215        dest,
216        body: Workload(LinKv::CasOk {
217            in_reply_to,
218            msg_id,
219        }),
220    } = &msg
221    {
222        assert_eq!(dest, "n1");
223        assert_eq!(src, "c11");
224        assert_eq!(*in_reply_to, 1);
225        assert_eq!(msg_id, &None);
226    } else {
227        panic!("expected cas_ok message")
228    }
229    assert_serde_preserves_identity(&msg);
230}
231
232#[test]
233fn serde_echo_msg() {
234    let buf = r#"{"dest":"n1","body":{"echo":"Please echo 36","type":"echo","msg_id":1},"src":"c10","id":10}"#;
235    let msg: Msg<Echo, ()> = serde_json::from_str(&buf).expect("echo message");
236    if let Msg {
237        src,
238        dest,
239        body: Workload(Echo::Echo { msg_id, echo }),
240    } = &msg
241    {
242        assert_eq!(dest, "n1");
243        assert_eq!(src, "c10");
244        assert_eq!(echo, "Please echo 36");
245        assert_eq!(*msg_id, 1);
246    } else {
247        panic!("expected echo message")
248    }
249    assert_serde_preserves_identity(&msg);
250}
251
252#[test]
253fn serde_init_msg() {
254    let buf = r#"{"dest":"n1","body":{"type":"init","node_id":"n1","node_ids":["n1","n2","n3","n4","n5"],"msg_id":1},"src":"c4","id":4}"#;
255    let msg: Msg<Echo, ()> = serde_json::from_str(&buf).expect("message");
256    if let Msg {
257        src,
258        dest,
259        body:
260            Body::Init(Init::Init {
261                msg_id,
262                node_id,
263                node_ids,
264            }),
265    } = &msg
266    {
267        assert_eq!(dest, "n1");
268        assert_eq!(src, "c4");
269        assert_eq!(node_id, "n1");
270        assert_eq!(
271            node_ids,
272            &vec!["n1", "n2", "n3", "n4", "n5"]
273                .iter()
274                .map(|s| s.to_string())
275                .collect::<Vec<_>>()
276        );
277        assert_eq!(*msg_id, 1);
278    } else {
279        panic!("expected init message")
280    }
281    assert_serde_preserves_identity(&msg);
282}
283
284#[test]
285fn serde_init_ok_msg() {
286    let buf = r#"{"src":"n1","dest":"c4","body":{"type":"init_ok","in_reply_to":1,"msg_id":0}}"#;
287    let msg: Msg<Init, ()> = serde_json::from_str(&buf).expect("message");
288    if let Msg {
289        src,
290        dest,
291        body: Body::Init(Init::InitOk {
292            in_reply_to,
293            msg_id,
294        }),
295    } = &msg
296    {
297        assert_eq!(dest, "c4");
298        assert_eq!(src, "n1");
299        assert_eq!(*in_reply_to, 1);
300        assert_eq!(*msg_id, 0);
301    } else {
302        panic!("expected init_ok message, got {:?}", msg);
303    }
304    assert_serde_preserves_identity(&msg);
305}
306
307#[test]
308fn serde_linkv_read_msg() {
309    let buf = r#"{"dest":"n4","body":{"key":0,"type":"read","msg_id":1},"src":"c10","id":10}"#;
310    let msg: Msg<LinKv, ()> = serde_json::from_str(&buf).expect("message");
311    if let Msg {
312        src,
313        dest,
314        body: Workload(LinKv::Read { msg_id, key }),
315    } = &msg
316    {
317        assert_eq!(dest, "n4");
318        assert_eq!(src, "c10");
319        assert_eq!(key, &json!(0));
320        assert_eq!(*msg_id, 1);
321    } else {
322        panic!("expected read message");
323    }
324
325    assert_serde_preserves_identity(&msg);
326}
327
328#[test]
329fn serde_linkv_read_ok_msg() {
330    let buf = r#"{"dest":"n4","body":{"type": "read_ok", "value": 1, "msg_id": 0 , "in_reply_to": 2},"src":"c10","id":10}"#;
331    let msg: Msg<LinKv, ()> = serde_json::from_str(&buf).expect("message");
332    if let Msg {
333        src,
334        dest,
335        body:
336            Workload(LinKv::ReadOk {
337                in_reply_to,
338                msg_id,
339                value,
340            }),
341    } = &msg
342    {
343        assert_eq!(dest, "n4");
344        assert_eq!(src, "c10");
345        assert_eq!(value, &json!(1));
346        assert_eq!(msg_id, &Some(0));
347        assert_eq!(*in_reply_to, 2);
348    } else {
349        panic!("expected read message");
350    }
351
352    assert_serde_preserves_identity(&msg);
353}
354
355#[test]
356fn serde_pn_read_msg() {
357    let buf = r#"{"dest":"n1","body":{"type":"read","msg_id":1},"src":"c10","id":10}"#;
358    let msg: Msg<PnCounter, ()> = serde_json::from_str(&buf).expect("message");
359    if let Msg {
360        src,
361        dest,
362        body: Workload(PnCounter::Read { msg_id }),
363    } = &msg
364    {
365        assert_eq!(dest, "n1");
366        assert_eq!(src, "c10");
367        assert_eq!(*msg_id, 1);
368    } else {
369        panic!("expected read message");
370    }
371
372    assert_serde_preserves_identity(&msg);
373}
374
375#[test]
376fn serde_pncounter_read_ok_msg() {
377    let buf = r#"{"dest":"n1","body":{"type":"read_ok","value":1,"msg_id": 2,"in_reply_to":0},"src":"c10","id":10}"#;
378    let msg: Msg<PnCounter, ()> = serde_json::from_str(&buf).expect("message");
379    if let Msg {
380        src,
381        dest,
382        body:
383            Workload(PnCounter::ReadOk {
384                in_reply_to,
385                msg_id,
386                value,
387            }),
388    } = &msg
389    {
390        assert_eq!(dest, "n1");
391        assert_eq!(src, "c10");
392        assert_eq!(value, &json!(1));
393        assert_eq!(msg_id, &Some(2));
394        assert_eq!(*in_reply_to, 0);
395    } else {
396        panic!("expected read message");
397    }
398
399    assert_serde_preserves_identity(&msg);
400}
401
402#[test]
403fn serde_typed_bar() {
404    let bar = Typed::Bar {
405        id: 0x2a,
406        value: "boo".to_string(),
407    };
408    let m = &Msg {
409        src: "A".to_string(),
410        dest: "B".to_string(),
411        body: Application(bar.clone()),
412    };
413    let data = serde_json::to_string(m).expect("JSON data");
414    println!("{}", data);
415    let de_m: Msg<Echo, Typed> = serde_json::from_str(&data).expect(&format!("{:?}", m));
416    assert_eq!(
417        m, &de_m,
418        "expected deserialized NetMsg={:?} from data={}, but got NetMsg={:?}",
419        m, data, &de_m
420    );
421    let de_bar = match de_m.body {
422        Application(body) => body,
423        _ => panic!("expected node body"),
424    };
425    assert_eq!(
426        &bar, &de_bar,
427        "expected deserialized Foo={:?} from data={}, but got Foo={:?}",
428        bar, data, de_bar
429    );
430}
431
432#[test]
433fn serde_typed_baz() {
434    let baz = Typed::Baz {
435        id: 0x2a,
436        value: "boo".to_string(),
437    };
438    let m = &Msg {
439        src: "A".to_string(),
440        dest: "B".to_string(),
441        body: Application(baz.clone()),
442    };
443    let data = serde_json::to_string(m).expect("JSON data");
444    println!("{}", data);
445    let de_m: Msg<Echo, Typed> = serde_json::from_str(&data).expect(&format!("{:?}", m));
446    assert_eq!(
447        m, &de_m,
448        "expected deserialized NetMsg={:?} from data={}, but got NetMsg={:?}",
449        m, data, &de_m
450    );
451    let de_baz = match de_m.body {
452        Application(body) => body,
453        _ => panic!("expected node body"),
454    };
455    assert_eq!(
456        &baz, &de_baz,
457        "expected deserialized Foo={:?} from data={}, but got Foo={:?}",
458        baz, data, de_baz
459    );
460}
461
462#[test]
463fn serde_untyped_bar() {
464    let bar = Untyped::Bar {
465        id: 0x2a,
466        value: "boo".to_string(),
467    };
468    let m = &Msg {
469        src: "A".to_string(),
470        dest: "B".to_string(),
471        body: Application(bar.clone()),
472    };
473    let data = serde_json::to_string(m).expect("JSON data");
474    println!("{}", data);
475    let de_m: Msg<Echo, Untyped> = serde_json::from_str(&data).expect(&format!("{:?}", m));
476    assert_eq!(
477        m, &de_m,
478        "expected deserialized NetMsg={:?} from data={}, but got NetMsg={:?}",
479        m, data, &de_m
480    );
481    let de_bar = match de_m.body {
482        Application(body) => body,
483        _ => panic!("expected node body"),
484    };
485    assert_eq!(
486        &bar, &de_bar,
487        "expected deserialized Foo={:?} from data={}, but got Foo={:?}",
488        bar, data, de_bar
489    );
490}
491
492#[test]
493fn serde_untyped_baz() {
494    let baz = Untyped::Baz {
495        key: 0x2a,
496        value: "boo".to_string(),
497    };
498    let m = &Msg {
499        src: "A".to_string(),
500        dest: "B".to_string(),
501        body: Application(baz.clone()),
502    };
503    let data = serde_json::to_string(m).expect("JSON data");
504    println!("{}", data);
505    let de_m: Msg<Echo, Untyped> = serde_json::from_str(&data).expect(&format!("{:?}", m));
506    assert_eq!(
507        m, &de_m,
508        "expected deserialized NetMsg={:?} from data={}, but got NetMsg={:?}",
509        m, data, &de_m
510    );
511    let de_baz = match de_m.body {
512        Application(body) => body,
513        _ => panic!("expected node body"),
514    };
515    assert_eq!(
516        &baz, &de_baz,
517        "expected deserialized Foo={:?} from data={}, but got Foo={:?}",
518        baz, data, de_baz
519    );
520}
521
522/// Verify that a [Typed] can't be serde`d into an [Untyped]
523#[test]
524fn serde_untyped_into_typed() {
525    let bar = Untyped::Bar {
526        id: 0x2a,
527        value: "boo".to_string(),
528    };
529    let m = &Msg::<Echo, Untyped> {
530        src: "A".to_string(),
531        dest: "B".to_string(),
532        body: Application(bar.clone()),
533    };
534    let data = serde_json::to_string(m).expect("JSON data");
535    println!("{}", data);
536    if let Ok(_) = serde_json::from_str::<Untyped>(&data) {
537        assert!(false, "invalid deserialization")
538    }
539}
540
541/// Typed body has a `type` tag to indicate deserialization target type
542#[cfg(test)]
543#[derive(Clone, Deserialize, Serialize, Debug, Eq, PartialEq)]
544#[serde(tag = "type")]
545enum Typed {
546    #[serde(rename = "bar")]
547    Bar { id: u64, value: String },
548    #[serde(rename = "baz")]
549    Baz { id: u64, value: String },
550}
551
552/// Untyped body has no `type` tag to indicate deserialization target type
553///
554/// Untyped bodies are deserialized into a specifiedstruct or the first enumerated type that fits.
555#[cfg(test)]
556#[derive(Clone, Deserialize, Serialize, Debug, Eq, PartialEq)]
557#[serde(untagged)]
558enum Untyped {
559    #[serde(rename = "bar")]
560    Bar { id: u64, value: String },
561    #[serde(rename = "baz")]
562    Baz { key: u64, value: String },
563}
564
565/// Assert `deserialize(serialize(m)) == m`
566#[cfg(test)]
567fn assert_serde_preserves_identity<M>(m: &M)
568where
569    M: Debug + Eq + PartialEq + Serialize + DeserializeOwned,
570{
571    let data = serde_json::to_string(m).expect("JSON data");
572    println!("{}", data);
573    let de_m: M = serde_json::from_str(&data).expect(&format!("{:?}", m));
574    assert_eq!(
575        m, &de_m,
576        "expected deserialized M={:?} from data={}, but got M={:?}",
577        m, data, &de_m
578    );
579}