vampirc_io/
io.rs

1use std::thread;
2
3use async_std::future::{Future, ready};
4use async_std::io;
5use async_std::prelude::*;
6use async_std::task::block_on;
7use futures::{AsyncRead, AsyncWriteExt, FutureExt, join, Sink, SinkExt, Stream, StreamExt, TryStreamExt};
8use futures::channel::mpsc::{unbounded, UnboundedReceiver, UnboundedSender};
9use vampirc_uci::{ByteVecUciMessage, parse_with_unknown, UciMessage};
10
11pub type UciStream = dyn Stream<Item=Result<UciMessage, io::Error>> + Unpin + Send + Sync;
12pub type UciSink = dyn Sink<UciMessage, Error=io::Error> + Unpin + Send;
13pub type UciSender = UnboundedSender<UciMessage>;
14pub type UciReceiver = UnboundedReceiver<UciMessage>;
15pub type UciTrySender = UnboundedSender<io::Result<UciMessage>>;
16pub type UciTryReceiver = UnboundedReceiver<io::Result<UciMessage>>;
17
18
19
20pub fn from_reader<'a, R>(reader: io::BufReader<R>) -> Box<UciStream> where R: AsyncRead + Unpin + Sync + Send + 'static {
21    let stream = reader.lines()
22        .map_ok(|line| parse_with_unknown(&(line + "\n")))
23        .map_ok(|msg_list| msg_list[0].clone())
24        ;
25
26    Box::new(stream)
27}
28
29pub fn stdin_msg_stream() -> Box<UciStream> {
30    from_reader(io::BufReader::new(io::stdin()))
31
32}
33
34pub fn stdout_msg_sink() -> Box<UciSink> {
35    let sink = io::stdout().into_sink().buffer(256).with(|msg: UciMessage| {
36        ready(Ok(ByteVecUciMessage::from(msg))).boxed()
37    });
38    Box::new(sink)
39}
40
41pub async fn run_loops(
42    mut inbound_source: Box<UciStream>,
43    mut inbound_consumer: UciTrySender,
44    mut outbound_source: UciReceiver,
45    mut outbound_consumer: Box<UciSink>) {
46    let inb = async {
47        loop {
48            let msg_result = inbound_source.try_next().await;
49            if let Ok(msg_opt) = msg_result {
50                if msg_opt.is_none() {
51                    break;
52                } else {
53                    let msg = msg_opt.unwrap();
54                    inbound_consumer.send(Ok(msg)).await.unwrap();
55                }
56            } else {
57                inbound_consumer.send(Err(msg_result.err().unwrap())).await.unwrap();
58            }
59        }
60    };
61
62    let outb = async {
63        while let Some(msg) = StreamExt::next(&mut outbound_source).await {
64            let sr = outbound_consumer.send(msg).await;
65            if let Err(err) = sr {
66                eprintln!("[{:?}] Error while sending message through the outbound channel: {}", thread::current().id(), err);
67            }
68        }
69    };
70
71    join!(inb, outb);
72}
73
74pub async fn run_std_loops(inbound_consumer: UciTrySender, outbound_source: UciReceiver) {
75    run_loops(stdin_msg_stream(), inbound_consumer, outbound_source, stdout_msg_sink()).await;
76}
77
78pub fn new_channel() -> (UciSender, UciReceiver) {
79    unbounded::<UciMessage>()
80}
81
82pub fn new_try_channel() -> (UciTrySender, UciTryReceiver) {
83    unbounded::<io::Result<UciMessage>>()
84}
85
86pub fn run_future<F, T>(future: F) -> T
87    where
88        F: Future<Output=T> + Send,
89        T: Send {
90    block_on(future)
91}
92
93
94
95#[cfg(test)]
96mod tests {
97    use super::*;
98
99    #[test]
100    #[ignore]
101    fn test_run_in_loop() {
102        block_on(async {
103            let mut msg_stream = stdin_msg_stream();
104            while let Ok(msg_opt) = msg_stream.try_next().await {
105                let msg = msg_opt.unwrap();
106                println!("MSG RECEIVED VIA STREAM: {}", msg);
107            }
108        });
109    }
110
111    #[test]
112    #[ignore]
113    fn test_run_loops() {
114        let (itx, mut irx) = new_try_channel();
115        let (otx, orx) = new_channel();
116
117        otx.unbounded_send(UciMessage::Uci).unwrap();
118
119        let rec = async {
120            while let Ok(incoming) = TryStreamExt::try_next(&mut irx).await {
121                println!("Handling received message: {}", incoming.unwrap())
122            }
123        };
124
125        block_on(async {
126            otx.unbounded_send(UciMessage::UciOk).unwrap();
127            join!(run_std_loops(itx, orx), rec);
128        });
129    }
130
131}