jsonl/
connection.rs

1#[cfg(not(feature = "tokio"))]
2mod imports {
3    pub(super) use std::io::{self, BufRead, BufReader, Stdin, Stdout, Write};
4    pub(super) use std::net::{Shutdown, TcpStream};
5    pub(super) use std::process::{Child, ChildStdin, ChildStdout};
6}
7#[cfg(feature = "tokio")]
8mod imports {
9    pub(super) use tokio::io::{
10        self, AsyncBufRead as BufRead, AsyncWrite as Write, AsyncWriteExt, BufReader, Stdin, Stdout,
11    };
12    pub(super) use tokio::net::tcp::{ReadHalf, WriteHalf};
13    pub(super) use tokio::net::TcpStream;
14    pub(super) use tokio::process::{Child, ChildStdin, ChildStdout};
15}
16
17use imports::*;
18
19/// Use this type when you have both a reader and writer, and want them to be grouped together.
20///
21/// There are situations in which you have both a reader and a writer being passed around code,
22/// always kept together. This forms what is known as a ‘[data clump]’, and harms code readability.
23/// By grouping the two together it makes clear that they are both needed, and prevents mistakes
24/// when one is forgotten.
25///
26/// `Connection` is internally a pair of a reader and a writer, and delegates to [`crate::read`] and
27/// [`crate::write`] for [`Connection::read`] and [`Connection::write`] respectively.
28///
29/// [data clump]: https://youtu.be/DC-pQPq0acs?t=521
30#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Default)]
31pub struct Connection<R: BufRead, W: Write> {
32    reader: R,
33    writer: W,
34}
35
36impl<R: BufRead, W: Write> Connection<R, W> {
37    pub fn new(reader: R, writer: W) -> Self {
38        Self { reader, writer }
39    }
40}
41
42impl<'a> Connection<BufReader<&'a mut ChildStdout>, &'a mut ChildStdin> {
43    /// Creates a new `Connection` that uses the `stdin` of a child process as the writer and the
44    /// child process’ `stdout` as the reader. This facilitates communication with this child process
45    /// by passing data into its `stdin` and reading from its `stdout`.
46    pub fn new_from_child(child: &'a mut Child) -> Option<Self> {
47        let stdin = child.stdin.as_mut()?;
48        let stdout = child.stdout.as_mut()?;
49
50        Some(Self {
51            reader: BufReader::new(stdout),
52            writer: stdin,
53        })
54    }
55}
56
57impl Connection<BufReader<Stdin>, Stdout> {
58    /// Creates a new `Connection` from the stdio of the current process – `stdin` is used as the reader
59    /// and `stdout` is used as the writer.
60    pub fn new_from_stdio() -> Self {
61        Self {
62            reader: BufReader::new(io::stdin()),
63            writer: io::stdout(),
64        }
65    }
66}
67
68#[cfg(not(feature = "tokio"))]
69impl Connection<BufReader<TcpStream>, TcpStream> {
70    /// Creates a new `Connection` from a TCP stream.
71    pub fn new_from_tcp_stream(tcp_stream: TcpStream) -> io::Result<Self> {
72        Ok(Self {
73            reader: BufReader::new(tcp_stream.try_clone()?),
74            writer: tcp_stream,
75        })
76    }
77
78    /// Closes the TCP stream.
79    pub fn shutdown(self) -> io::Result<()> {
80        self.writer.shutdown(Shutdown::Both)
81    }
82}
83
84#[cfg(feature = "tokio")]
85impl<'a> Connection<BufReader<ReadHalf<'a>>, WriteHalf<'a>> {
86    /// Creates a new `Connection` from a mutable reference to a TCP stream.
87    pub fn new_from_tcp_stream(tcp_stream: &'a mut TcpStream) -> io::Result<Self> {
88        let (read_half, write_half) = tcp_stream.split();
89
90        Ok(Self {
91            reader: BufReader::new(read_half),
92            writer: write_half,
93        })
94    }
95
96    /// Closes the TCP stream.
97    pub async fn shutdown(mut self) -> io::Result<()> {
98        self.writer.shutdown().await
99    }
100}
101
102#[cfg(not(feature = "tokio"))]
103impl<R: BufRead, W: Write> Connection<R, W> {
104    /// Reads a line from the reader and deserializes it into a given type.
105    pub fn read<T: serde::de::DeserializeOwned>(&mut self) -> Result<T, crate::ReadError> {
106        crate::read(&mut self.reader)
107    }
108
109    /// Writes a given value to the writer, serializing it into JSON.
110    pub fn write<T: serde::Serialize>(&mut self, t: &T) -> Result<(), crate::WriteError> {
111        crate::write(&mut self.writer, t)
112    }
113
114    /// Flushes the contained writer’s buffer.
115    pub fn flush(&mut self) -> io::Result<()> {
116        self.writer.flush()
117    }
118}
119
120#[cfg(feature = "tokio")]
121impl<R: BufRead + Unpin, W: Write + Unpin> Connection<R, W> {
122    /// Reads a line from the reader and deserializes it into a given type.
123    pub async fn read<T: serde::de::DeserializeOwned>(&mut self) -> Result<T, crate::ReadError> {
124        crate::read(&mut self.reader).await
125    }
126
127    /// Writes a given value to the writer, serializing it into JSON.
128    pub async fn write<T: serde::Serialize>(&mut self, t: &T) -> Result<(), crate::WriteError> {
129        crate::write(&mut self.writer, t).await
130    }
131
132    /// Flushes the contained writer’s buffer.
133    pub async fn flush(&mut self) -> io::Result<()> {
134        self.writer.flush().await
135    }
136}