1#[cfg(test)]
4use std::sync::Arc;
5
6use async_std::channel::{bounded, Receiver, Sender};
7use async_std::io::stdin;
8use async_std::io::stdout;
9use async_std::io::WriteExt;
10use async_trait::async_trait;
11use log::info;
12use serde::de::DeserializeOwned;
13use serde::Serialize;
14#[cfg(test)]
15use serde_json::Value;
16#[cfg(test)]
17use tokio::spawn;
18#[cfg(test)]
19use tokio::test;
20
21use crate::msg::Body;
22#[cfg(test)]
23use crate::msg::Body::Workload;
24use crate::msg::Echo;
25use crate::msg::Init;
26use crate::msg::{Msg, MsgId};
27use crate::process::{ProcNet, Process};
28#[cfg(test)]
29use crate::Error::TestIO;
30use crate::Error::{Deserialize, UnexpectedMsg};
31#[cfg(test)]
32use crate::Id;
33use crate::Result;
34use crate::Status;
35
36const QUEUE_DEPTH: usize = 16;
37
38pub struct Runtime<W, A, P: Process<W, A>>
46where
47 W: DeserializeOwned + Serialize,
48 A: DeserializeOwned + Serialize,
49{
50 line_io: Box<dyn LineIO + Send + Sync>,
51 process: P,
52 process_rxq: Sender<Msg<W, A>>,
54 process_txq: Receiver<Msg<W, A>>,
56}
57
58impl<W, A, P: Process<W, A>> Runtime<W, A, P>
59where
60 W: DeserializeOwned + Serialize,
61 A: DeserializeOwned + Serialize,
62{
63 pub async fn new(args: Vec<String>, process: P) -> Result<Self> {
65 Self::new_with_line_io(args, process, Box::new(StdLineIO {})).await
66 }
67
68 #[cfg(test)]
70 async fn new_for_test(
71 args: Vec<String>,
72 process: P,
73 rxq: Receiver<String>,
74 txq: Sender<String>,
75 ) -> Result<Self> {
76 Self::new_with_line_io(args, process, Box::new(QLineIO { rxq, txq })).await
77 }
78
79 async fn new_with_line_io(
80 args: Vec<String>,
81 mut process: P,
82 line_io: Box<dyn LineIO + Send + Sync>,
83 ) -> Result<Self> {
84 let msg_id = 0;
85 let (id, ids, start_msg_id) = Self::get_init(&*line_io, msg_id).await?;
86 let (process_rxq, rxq) = bounded(QUEUE_DEPTH);
87 let (txq, process_txq) = bounded(QUEUE_DEPTH);
88 let process_net = ProcNet { txq, rxq };
89 process.init(args, process_net, id, ids, start_msg_id);
90 Ok(Self {
91 line_io,
92 process,
93 process_rxq,
94 process_txq,
95 })
96 }
97
98 pub async fn run_process(&self) -> Status {
104 self.process.run().await
105 }
106
107 pub async fn run_io_egress(&self) {
109 while let Ok(_) = self.run_one_io_egress().await {}
110 }
111
112 async fn run_one_io_egress(&self) -> Status {
113 self.send_msg(&self.process_txq.recv().await?).await?;
114 Ok(())
115 }
116
117 pub async fn run_io_ingress(&self) {
119 while let Ok(_) = self.run_one_io_ingress().await {}
120 }
121
122 async fn run_one_io_ingress(&self) -> Status {
123 self.process_rxq.send(self.recv_msg().await?).await?;
124 Ok(())
125 }
126
127 pub fn shutdown(&self) {
129 self.process_rxq.close();
130 self.process_txq.close();
131 self.line_io.close();
132 }
133
134 async fn get_init(
140 line_io: &dyn LineIO,
141 start_msg_id: MsgId,
142 ) -> Result<(String, Vec<String>, MsgId)> {
143 let init_data = line_io.read_line().await?;
144 let Msg { src, body, .. }: Msg<Echo, ()> = serde_json::from_str(&init_data)?;
145 match body {
146 Body::Init(Init::Init {
147 msg_id,
148 node_id,
149 node_ids,
150 }) => {
151 let rsp: Msg<Echo, ()> = Msg {
152 src: node_id.clone(),
153 dest: src,
154 body: Body::Init(Init::InitOk {
155 in_reply_to: msg_id,
156 msg_id: start_msg_id,
157 }),
158 };
159 let line = serde_json::to_string(&rsp)?;
160 line_io.write_line(&line).await?;
161 Ok((node_id, node_ids, start_msg_id + 1))
162 }
163 _ => Err(UnexpectedMsg { expected: "Init" }),
164 }
165 }
166
167 async fn recv_msg(&self) -> Result<Msg<W, A>> {
169 let line = self.line_io.read_line().await?;
170 serde_json::from_str::<Msg<W, A>>(&line).map_err(|e| Deserialize(e))
171 }
172
173 async fn send_msg(&self, msg: &Msg<W, A>) -> Status {
175 let line = serde_json::to_string(&msg)?;
176 self.line_io.write_line(&line).await?;
177 Ok(())
178 }
179}
180
181#[async_trait]
186trait LineIO {
187 async fn read_line(&self) -> Result<String>;
188 async fn write_line(&self, line: &str) -> Status;
189 fn close(&self);
190}
191
192struct StdLineIO {}
194
195#[async_trait]
196impl LineIO for StdLineIO {
197 async fn read_line(&self) -> Result<String> {
198 let mut line = String::new();
199 stdin().read_line(&mut line).await?;
200 info!("received: {}", line);
201 Ok(line)
202 }
203
204 async fn write_line(&self, line: &str) -> Status {
205 stdout().write_all(line.as_bytes()).await?;
206 stdout().write_all("\n".as_bytes()).await?;
207 info!("sent {}", line);
208 Ok(())
209 }
210
211 fn close(&self) {
212 }
214}
215
216#[cfg(test)]
218struct QLineIO {
219 rxq: Receiver<String>,
220 txq: Sender<String>,
221}
222
223#[async_trait]
224#[cfg(test)]
225impl LineIO for QLineIO {
226 async fn read_line(&self) -> Result<String> {
227 self.rxq.recv().await.map_err(|_| TestIO)
228 }
229
230 async fn write_line(&self, line: &str) -> Status {
231 self.txq.send(line.to_string()).await.map_err(|_| TestIO)
232 }
233
234 fn close(&self) {
235 self.rxq.close();
236 self.txq.close();
237 }
238}
239
240#[cfg(test)]
241struct EchoProcess {
242 args: Vec<String>,
243 net: ProcNet<Echo, ()>,
244 id: Id,
245 ids: Vec<Id>,
246}
247
248#[cfg(test)]
249impl Default for EchoProcess {
250 fn default() -> Self {
251 Self {
252 args: Default::default(),
253 net: Default::default(),
254 id: Default::default(),
255 ids: Default::default(),
256 }
257 }
258}
259
260#[cfg(test)]
261#[async_trait]
262impl Process<Echo, ()> for EchoProcess {
263 fn init(
264 &mut self,
265 args: Vec<String>,
266 net: ProcNet<Echo, ()>,
267 id: Id,
268 ids: Vec<Id>,
269 _start_msg_id: MsgId,
270 ) {
271 self.args = args;
272 self.net = net;
273 self.id = id;
274 self.ids = ids;
275 }
276
277 async fn run(&self) -> Status {
278 loop {
279 match self.net.rxq.recv().await {
281 Ok(Msg {
282 src,
283 body: Workload(Echo::Echo { msg_id, echo }),
284 ..
285 }) => {
286 self.net
287 .txq
288 .send(Msg {
289 src: self.id.clone(),
290 dest: src,
291 body: Workload(Echo::EchoOk {
292 in_reply_to: msg_id,
293 msg_id: None,
294 echo,
295 }),
296 })
297 .await?;
298 }
299 Err(_) => return Ok(()), _ => panic!("unexpected message type"),
301 };
302 }
303 }
304}
305
306#[test]
307async fn test_runtime() {
308 let e = EchoProcess::default();
310 let (txq, erxq) = bounded(10);
311 let (etxq, rxq) = bounded(10);
312
313 let a = "a".to_string();
315 let test = "test".to_string();
316 let init = Msg::<Echo, ()> {
317 src: test.clone(),
318 dest: a.clone(),
319 body: Body::Init(Init::Init {
320 msg_id: 0,
321 node_id: a.clone(),
322 node_ids: vec![a.clone()],
323 }),
324 };
325 txq.send(serde_json::to_string(&init).expect("serialize init"))
326 .await
327 .expect("send message");
328
329 let r = Arc::new(
331 Runtime::new_for_test(Default::default(), e, erxq, etxq)
332 .await
333 .expect("new runtime"),
334 );
335
336 let init_ok_data: String = rxq.recv().await.expect("recv init_ok");
338 if let Msg::<Echo, ()> {
339 src,
340 dest,
341 body: Body::Init(Init::InitOk { in_reply_to, .. }),
342 } = serde_json::from_str(&init_ok_data).expect("deserialized init_ok")
343 {
344 assert_eq!(in_reply_to, 0);
345 assert_eq!(dest, test);
346 assert_eq!(src, a);
347 } else {
348 panic!("expected init_ok")
349 }
350
351 let r1 = r.clone();
352 let r2 = r.clone();
353 let r3 = r.clone();
354 let t1 = spawn(async move { r1.run_io_egress().await });
355 let t2 = spawn(async move { r2.run_io_ingress().await });
356 let t3 = spawn(async move { r3.run_process().await });
357
358 for msg_id in 0..5 {
360 let echo_data = Value::String(format!("boo! {}", msg_id));
361 let echo = Msg::<Echo, ()> {
362 src: test.clone(),
363 dest: a.clone(),
364 body: Workload(Echo::Echo {
365 msg_id,
366 echo: echo_data.clone(),
367 }),
368 };
369 println!("echo request: {:?}", echo);
370 txq.send(serde_json::to_string(&echo).expect("serialized"))
371 .await
372 .expect("sent echo request");
373 let echoed: Msg<Echo, ()> =
374 serde_json::from_str(&rxq.recv().await.expect("response")).expect("deserialized");
375 if let Msg {
376 body:
377 Workload(Echo::Echo {
378 msg_id: in_reply_to,
379 echo: echoed_data,
380 }),
381 ..
382 } = &echoed
383 {
384 assert_eq!(&msg_id, in_reply_to);
385 assert_eq!(&echo_data, echoed_data);
386 }
387 println!(
388 "echo response: {}",
389 serde_json::to_string(&echoed).expect("serialized")
390 );
391 }
392
393 r.shutdown();
395 let _ = tokio::join!(t1, t2, t3);
396}