bsp_server/
transporter.rs1use std::io;
2use std::net::TcpStream;
3use std::thread;
4
5use crate::Notification;
6
7use super::{IoThreads, Message};
8use crossbeam_channel::{bounded, Receiver, Sender};
9
10pub struct Transporter(pub Sender<Message>, pub Receiver<Message>, pub IoThreads);
11
12impl Transporter {
13 pub fn stdio() -> Self {
15 let (writer_sender, writer_receiver) = bounded::<Message>(0);
16 let writer = thread::spawn(move || {
17 let stdout = io::stdout();
18 let mut stdout = stdout.lock();
19 writer_receiver
20 .into_iter()
21 .try_for_each(|it| it.write(&mut stdout))?;
22 Ok(())
23 });
24 let (reader_sender, reader_receiver) = bounded::<Message>(0);
25 let reader = thread::spawn(move || {
26 let stdin = io::stdin();
27 let mut stdin = stdin.lock();
28 while let Some(msg) = Message::read(&mut stdin)? {
29 let is_exit = match &msg {
30 Message::Notification(Notification::Exit) => true,
31 _ => false,
32 };
33
34 reader_sender.send(msg).unwrap();
35
36 if is_exit {
37 break;
38 }
39 }
40 Ok(())
41 });
42 let threads = IoThreads { reader, writer };
43 Self(writer_sender, reader_receiver, threads)
44 }
45
46 pub fn socket(stream: TcpStream) -> Self {
48 let (reader_receiver, reader) = {
49 let stream = stream.try_clone().unwrap();
50 let (reader_sender, reader_receiver) = bounded::<Message>(0);
51 let reader = thread::spawn(move || {
52 let mut buf_read = io::BufReader::new(stream);
53 while let Some(msg) = Message::read(&mut buf_read).unwrap() {
54 let is_exit = matches!(&msg, Message::Notification(Notification::Exit));
55 reader_sender.send(msg).unwrap();
56 if is_exit {
57 break;
58 }
59 }
60 Ok(())
61 });
62 (reader_receiver, reader)
63 };
64
65 let (writer_sender, writer) = {
66 let mut stream = stream.try_clone().unwrap();
67 let (writer_sender, writer_receiver) = bounded::<Message>(0);
68 let writer = thread::spawn(move || {
69 writer_receiver
70 .into_iter()
71 .try_for_each(|it| it.write(&mut stream))
72 .unwrap();
73 Ok(())
74 });
75 (writer_sender, writer)
76 };
77
78 let io_threads = IoThreads::new(reader, writer);
79 Self(writer_sender, reader_receiver, io_threads)
80 }
81}