async_maelstrom/
runtime.rs

1//! Node runtime for [Process]es and [Maelstrom networking](https://github.com/jepsen-io/maelstrom/blob/main/doc/protocol.md#protocol)
2
3#[cfg(test)]
4use std::sync::Arc;
5
6use async_std::channel::{bounded, Receiver, Sender};
7use async_std::io::stdin;
8use async_std::io::stdout;
9use async_std::io::WriteExt;
10use async_trait::async_trait;
11use log::info;
12use serde::de::DeserializeOwned;
13use serde::Serialize;
14#[cfg(test)]
15use serde_json::Value;
16#[cfg(test)]
17use tokio::spawn;
18#[cfg(test)]
19use tokio::test;
20
21use crate::msg::Body;
22#[cfg(test)]
23use crate::msg::Body::Workload;
24use crate::msg::Echo;
25use crate::msg::Init;
26use crate::msg::{Msg, MsgId};
27use crate::process::{ProcNet, Process};
28#[cfg(test)]
29use crate::Error::TestIO;
30use crate::Error::{Deserialize, UnexpectedMsg};
31#[cfg(test)]
32use crate::Id;
33use crate::Result;
34use crate::Status;
35
36const QUEUE_DEPTH: usize = 16;
37
38/// Node runtime
39///
40/// A runtime will create, initialize and run an instance of `P`.
41///
42/// Parameters
43/// - `W` the workload body type, e.g. [Echo]
44/// - `A` the application body type
45pub struct Runtime<W, A, P: Process<W, A>>
46where
47    W: DeserializeOwned + Serialize,
48    A: DeserializeOwned + Serialize,
49{
50    line_io: Box<dyn LineIO + Send + Sync>,
51    process: P,
52    /// The process` receive queue
53    process_rxq: Sender<Msg<W, A>>,
54    /// The process` transmit queue
55    process_txq: Receiver<Msg<W, A>>,
56}
57
58impl<W, A, P: Process<W, A>> Runtime<W, A, P>
59where
60    W: DeserializeOwned + Serialize,
61    A: DeserializeOwned + Serialize,
62{
63    // Create a new runtime
64    pub async fn new(args: Vec<String>, process: P) -> Result<Self> {
65        Self::new_with_line_io(args, process, Box::new(StdLineIO {})).await
66    }
67
68    // Create a new runtime for testing
69    #[cfg(test)]
70    async fn new_for_test(
71        args: Vec<String>,
72        process: P,
73        rxq: Receiver<String>,
74        txq: Sender<String>,
75    ) -> Result<Self> {
76        Self::new_with_line_io(args, process, Box::new(QLineIO { rxq, txq })).await
77    }
78
79    async fn new_with_line_io(
80        args: Vec<String>,
81        mut process: P,
82        line_io: Box<dyn LineIO + Send + Sync>,
83    ) -> Result<Self> {
84        let msg_id = 0;
85        let (id, ids, start_msg_id) = Self::get_init(&*line_io, msg_id).await?;
86        let (process_rxq, rxq) = bounded(QUEUE_DEPTH);
87        let (txq, process_txq) = bounded(QUEUE_DEPTH);
88        let process_net = ProcNet { txq, rxq };
89        process.init(args, process_net, id, ids, start_msg_id);
90        Ok(Self {
91            line_io,
92            process,
93            process_rxq,
94            process_txq,
95        })
96    }
97
98    /// Run the process
99    ///
100    /// Run the runtime`s node process. The call will return
101    /// - on encountering a fatal error, or
102    /// - after [Self::shutdown] is called
103    pub async fn run_process(&self) -> Status {
104        self.process.run().await
105    }
106
107    /// Run IO egress until [Self::shutdown] is called
108    pub async fn run_io_egress(&self) {
109        while let Ok(_) = self.run_one_io_egress().await {}
110    }
111
112    async fn run_one_io_egress(&self) -> Status {
113        self.send_msg(&self.process_txq.recv().await?).await?;
114        Ok(())
115    }
116
117    /// Run IO ingress until [Self::shutdown] is called
118    pub async fn run_io_ingress(&self) {
119        while let Ok(_) = self.run_one_io_ingress().await {}
120    }
121
122    async fn run_one_io_ingress(&self) -> Status {
123        self.process_rxq.send(self.recv_msg().await?).await?;
124        Ok(())
125    }
126
127    /// Shutdown the runtime
128    pub fn shutdown(&self) {
129        self.process_rxq.close();
130        self.process_txq.close();
131        self.line_io.close();
132    }
133
134    /// Get initialization for the node
135    ///
136    /// Receives the next message and asserts it is [Init] message, responds if valid.
137    ///
138    /// Return the node's ID, all the participating node IDs, the next message ID to use.
139    async fn get_init(
140        line_io: &dyn LineIO,
141        start_msg_id: MsgId,
142    ) -> Result<(String, Vec<String>, MsgId)> {
143        let init_data = line_io.read_line().await?;
144        let Msg { src, body, .. }: Msg<Echo, ()> = serde_json::from_str(&init_data)?;
145        match body {
146            Body::Init(Init::Init {
147                msg_id,
148                node_id,
149                node_ids,
150            }) => {
151                let rsp: Msg<Echo, ()> = Msg {
152                    src: node_id.clone(),
153                    dest: src,
154                    body: Body::Init(Init::InitOk {
155                        in_reply_to: msg_id,
156                        msg_id: start_msg_id,
157                    }),
158                };
159                let line = serde_json::to_string(&rsp)?;
160                line_io.write_line(&line).await?;
161                Ok((node_id, node_ids, start_msg_id + 1))
162            }
163            _ => Err(UnexpectedMsg { expected: "Init" }),
164        }
165    }
166
167    /// Get the next message
168    async fn recv_msg(&self) -> Result<Msg<W, A>> {
169        let line = self.line_io.read_line().await?;
170        serde_json::from_str::<Msg<W, A>>(&line).map_err(|e| Deserialize(e))
171    }
172
173    /// Send a message
174    async fn send_msg(&self, msg: &Msg<W, A>) -> Status {
175        let line = serde_json::to_string(&msg)?;
176        self.line_io.write_line(&line).await?;
177        Ok(())
178    }
179}
180
181/// Line IO
182///
183/// Line IO to send and receive message to and from the Maelstrom OS process.
184/// The trait allows an implementation for testing within the local OS process.
185#[async_trait]
186trait LineIO {
187    async fn read_line(&self) -> Result<String>;
188    async fn write_line(&self, line: &str) -> Status;
189    fn close(&self);
190}
191
192/// Line IO from `stdin` and `stdout`
193struct StdLineIO {}
194
195#[async_trait]
196impl LineIO for StdLineIO {
197    async fn read_line(&self) -> Result<String> {
198        let mut line = String::new();
199        stdin().read_line(&mut line).await?;
200        info!("received: {}", line);
201        Ok(line)
202    }
203
204    async fn write_line(&self, line: &str) -> Status {
205        stdout().write_all(line.as_bytes()).await?;
206        stdout().write_all("\n".as_bytes()).await?;
207        info!("sent {}", line);
208        Ok(())
209    }
210
211    fn close(&self) {
212        // No op. stdin and stdout will close when Maelstrom closes its end.
213    }
214}
215
216/// LineIO implementation for local OS process testing
217#[cfg(test)]
218struct QLineIO {
219    rxq: Receiver<String>,
220    txq: Sender<String>,
221}
222
223#[async_trait]
224#[cfg(test)]
225impl LineIO for QLineIO {
226    async fn read_line(&self) -> Result<String> {
227        self.rxq.recv().await.map_err(|_| TestIO)
228    }
229
230    async fn write_line(&self, line: &str) -> Status {
231        self.txq.send(line.to_string()).await.map_err(|_| TestIO)
232    }
233
234    fn close(&self) {
235        self.rxq.close();
236        self.txq.close();
237    }
238}
239
240#[cfg(test)]
241struct EchoProcess {
242    args: Vec<String>,
243    net: ProcNet<Echo, ()>,
244    id: Id,
245    ids: Vec<Id>,
246}
247
248#[cfg(test)]
249impl Default for EchoProcess {
250    fn default() -> Self {
251        Self {
252            args: Default::default(),
253            net: Default::default(),
254            id: Default::default(),
255            ids: Default::default(),
256        }
257    }
258}
259
260#[cfg(test)]
261#[async_trait]
262impl Process<Echo, ()> for EchoProcess {
263    fn init(
264        &mut self,
265        args: Vec<String>,
266        net: ProcNet<Echo, ()>,
267        id: Id,
268        ids: Vec<Id>,
269        _start_msg_id: MsgId,
270    ) {
271        self.args = args;
272        self.net = net;
273        self.id = id;
274        self.ids = ids;
275    }
276
277    async fn run(&self) -> Status {
278        loop {
279            // Respond to all echo messages with an echo_ok message echoing the `echo` field
280            match self.net.rxq.recv().await {
281                Ok(Msg {
282                    src,
283                    body: Workload(Echo::Echo { msg_id, echo }),
284                    ..
285                }) => {
286                    self.net
287                        .txq
288                        .send(Msg {
289                            src: self.id.clone(),
290                            dest: src,
291                            body: Workload(Echo::EchoOk {
292                                in_reply_to: msg_id,
293                                msg_id: None,
294                                echo,
295                            }),
296                        })
297                        .await?;
298                }
299                Err(_) => return Ok(()), // Runtime is shutting down.
300                _ => panic!("unexpected message type"),
301            };
302        }
303    }
304}
305
306#[test]
307async fn test_runtime() {
308    // Create the process and the communication channels
309    let e = EchoProcess::default();
310    let (txq, erxq) = bounded(10);
311    let (etxq, rxq) = bounded(10);
312
313    // Send the init message so it is waiting for the initializer
314    let a = "a".to_string();
315    let test = "test".to_string();
316    let init = Msg::<Echo, ()> {
317        src: test.clone(),
318        dest: a.clone(),
319        body: Body::Init(Init::Init {
320            msg_id: 0,
321            node_id: a.clone(),
322            node_ids: vec![a.clone()],
323        }),
324    };
325    txq.send(serde_json::to_string(&init).expect("serialize init"))
326        .await
327        .expect("send message");
328
329    // Create and drive the runtime
330    let r = Arc::new(
331        Runtime::new_for_test(Default::default(), e, erxq, etxq)
332            .await
333            .expect("new runtime"),
334    );
335
336    // Verify process responds with init_ok
337    let init_ok_data: String = rxq.recv().await.expect("recv init_ok");
338    if let Msg::<Echo, ()> {
339        src,
340        dest,
341        body: Body::Init(Init::InitOk { in_reply_to, .. }),
342    } = serde_json::from_str(&init_ok_data).expect("deserialized init_ok")
343    {
344        assert_eq!(in_reply_to, 0);
345        assert_eq!(dest, test);
346        assert_eq!(src, a);
347    } else {
348        panic!("expected init_ok")
349    }
350
351    let r1 = r.clone();
352    let r2 = r.clone();
353    let r3 = r.clone();
354    let t1 = spawn(async move { r1.run_io_egress().await });
355    let t2 = spawn(async move { r2.run_io_ingress().await });
356    let t3 = spawn(async move { r3.run_process().await });
357
358    // Send echo requests and receive responses ...
359    for msg_id in 0..5 {
360        let echo_data = Value::String(format!("boo! {}", msg_id));
361        let echo = Msg::<Echo, ()> {
362            src: test.clone(),
363            dest: a.clone(),
364            body: Workload(Echo::Echo {
365                msg_id,
366                echo: echo_data.clone(),
367            }),
368        };
369        println!("echo request:  {:?}", echo);
370        txq.send(serde_json::to_string(&echo).expect("serialized"))
371            .await
372            .expect("sent echo request");
373        let echoed: Msg<Echo, ()> =
374            serde_json::from_str(&rxq.recv().await.expect("response")).expect("deserialized");
375        if let Msg {
376            body:
377                Workload(Echo::Echo {
378                    msg_id: in_reply_to,
379                    echo: echoed_data,
380                }),
381            ..
382        } = &echoed
383        {
384            assert_eq!(&msg_id, in_reply_to);
385            assert_eq!(&echo_data, echoed_data);
386        }
387        println!(
388            "echo response:      {}",
389            serde_json::to_string(&echoed).expect("serialized")
390        );
391    }
392
393    // Shutdown
394    r.shutdown();
395    let _ = tokio::join!(t1, t2, t3);
396}