bsp_server/
transporter.rs

1use std::io;
2use std::net::TcpStream;
3use std::thread;
4
5use crate::Notification;
6
7use super::{IoThreads, Message};
8use crossbeam_channel::{bounded, Receiver, Sender};
9
10pub struct Transporter(pub Sender<Message>, pub Receiver<Message>, pub IoThreads);
11
12impl Transporter {
13    /// Creates an BSP connection via stdio.
14    pub fn stdio() -> Self {
15        let (writer_sender, writer_receiver) = bounded::<Message>(0);
16        let writer = thread::spawn(move || {
17            let stdout = io::stdout();
18            let mut stdout = stdout.lock();
19            writer_receiver
20                .into_iter()
21                .try_for_each(|it| it.write(&mut stdout))?;
22            Ok(())
23        });
24        let (reader_sender, reader_receiver) = bounded::<Message>(0);
25        let reader = thread::spawn(move || {
26            let stdin = io::stdin();
27            let mut stdin = stdin.lock();
28            while let Some(msg) = Message::read(&mut stdin)? {
29                let is_exit = match &msg {
30                    Message::Notification(Notification::Exit) => true,
31                    _ => false,
32                };
33
34                reader_sender.send(msg).unwrap();
35
36                if is_exit {
37                    break;
38                }
39            }
40            Ok(())
41        });
42        let threads = IoThreads { reader, writer };
43        Self(writer_sender, reader_receiver, threads)
44    }
45
46    /// Creates an BSP connection via socket.
47    pub fn socket(stream: TcpStream) -> Self {
48        let (reader_receiver, reader) = {
49            let stream = stream.try_clone().unwrap();
50            let (reader_sender, reader_receiver) = bounded::<Message>(0);
51            let reader = thread::spawn(move || {
52                let mut buf_read = io::BufReader::new(stream);
53                while let Some(msg) = Message::read(&mut buf_read).unwrap() {
54                    let is_exit = matches!(&msg, Message::Notification(Notification::Exit));
55                    reader_sender.send(msg).unwrap();
56                    if is_exit {
57                        break;
58                    }
59                }
60                Ok(())
61            });
62            (reader_receiver, reader)
63        };
64
65        let (writer_sender, writer) = {
66            let mut stream = stream.try_clone().unwrap();
67            let (writer_sender, writer_receiver) = bounded::<Message>(0);
68            let writer = thread::spawn(move || {
69                writer_receiver
70                    .into_iter()
71                    .try_for_each(|it| it.write(&mut stream))
72                    .unwrap();
73                Ok(())
74            });
75            (writer_sender, writer)
76        };
77
78        let io_threads = IoThreads::new(reader, writer);
79        Self(writer_sender, reader_receiver, io_threads)
80    }
81}