1use std::thread;
2
3use async_std::future::{Future, ready};
4use async_std::io;
5use async_std::prelude::*;
6use async_std::task::block_on;
7use futures::{AsyncRead, AsyncWriteExt, FutureExt, join, Sink, SinkExt, Stream, StreamExt, TryStreamExt};
8use futures::channel::mpsc::{unbounded, UnboundedReceiver, UnboundedSender};
9use vampirc_uci::{ByteVecUciMessage, parse_with_unknown, UciMessage};
10
11pub type UciStream = dyn Stream<Item=Result<UciMessage, io::Error>> + Unpin + Send + Sync;
12pub type UciSink = dyn Sink<UciMessage, Error=io::Error> + Unpin + Send;
13pub type UciSender = UnboundedSender<UciMessage>;
14pub type UciReceiver = UnboundedReceiver<UciMessage>;
15pub type UciTrySender = UnboundedSender<io::Result<UciMessage>>;
16pub type UciTryReceiver = UnboundedReceiver<io::Result<UciMessage>>;
17
18
19
20pub fn from_reader<'a, R>(reader: io::BufReader<R>) -> Box<UciStream> where R: AsyncRead + Unpin + Sync + Send + 'static {
21 let stream = reader.lines()
22 .map_ok(|line| parse_with_unknown(&(line + "\n")))
23 .map_ok(|msg_list| msg_list[0].clone())
24 ;
25
26 Box::new(stream)
27}
28
29pub fn stdin_msg_stream() -> Box<UciStream> {
30 from_reader(io::BufReader::new(io::stdin()))
31
32}
33
34pub fn stdout_msg_sink() -> Box<UciSink> {
35 let sink = io::stdout().into_sink().buffer(256).with(|msg: UciMessage| {
36 ready(Ok(ByteVecUciMessage::from(msg))).boxed()
37 });
38 Box::new(sink)
39}
40
41pub async fn run_loops(
42 mut inbound_source: Box<UciStream>,
43 mut inbound_consumer: UciTrySender,
44 mut outbound_source: UciReceiver,
45 mut outbound_consumer: Box<UciSink>) {
46 let inb = async {
47 loop {
48 let msg_result = inbound_source.try_next().await;
49 if let Ok(msg_opt) = msg_result {
50 if msg_opt.is_none() {
51 break;
52 } else {
53 let msg = msg_opt.unwrap();
54 inbound_consumer.send(Ok(msg)).await.unwrap();
55 }
56 } else {
57 inbound_consumer.send(Err(msg_result.err().unwrap())).await.unwrap();
58 }
59 }
60 };
61
62 let outb = async {
63 while let Some(msg) = StreamExt::next(&mut outbound_source).await {
64 let sr = outbound_consumer.send(msg).await;
65 if let Err(err) = sr {
66 eprintln!("[{:?}] Error while sending message through the outbound channel: {}", thread::current().id(), err);
67 }
68 }
69 };
70
71 join!(inb, outb);
72}
73
74pub async fn run_std_loops(inbound_consumer: UciTrySender, outbound_source: UciReceiver) {
75 run_loops(stdin_msg_stream(), inbound_consumer, outbound_source, stdout_msg_sink()).await;
76}
77
78pub fn new_channel() -> (UciSender, UciReceiver) {
79 unbounded::<UciMessage>()
80}
81
82pub fn new_try_channel() -> (UciTrySender, UciTryReceiver) {
83 unbounded::<io::Result<UciMessage>>()
84}
85
86pub fn run_future<F, T>(future: F) -> T
87 where
88 F: Future<Output=T> + Send,
89 T: Send {
90 block_on(future)
91}
92
93
94
95#[cfg(test)]
96mod tests {
97 use super::*;
98
99 #[test]
100 #[ignore]
101 fn test_run_in_loop() {
102 block_on(async {
103 let mut msg_stream = stdin_msg_stream();
104 while let Ok(msg_opt) = msg_stream.try_next().await {
105 let msg = msg_opt.unwrap();
106 println!("MSG RECEIVED VIA STREAM: {}", msg);
107 }
108 });
109 }
110
111 #[test]
112 #[ignore]
113 fn test_run_loops() {
114 let (itx, mut irx) = new_try_channel();
115 let (otx, orx) = new_channel();
116
117 otx.unbounded_send(UciMessage::Uci).unwrap();
118
119 let rec = async {
120 while let Ok(incoming) = TryStreamExt::try_next(&mut irx).await {
121 println!("Handling received message: {}", incoming.unwrap())
122 }
123 };
124
125 block_on(async {
126 otx.unbounded_send(UciMessage::UciOk).unwrap();
127 join!(run_std_loops(itx, orx), rec);
128 });
129 }
130
131}