sonic_channel/
channels.rs1#[cfg(feature = "search")]
2mod search;
3#[cfg(feature = "search")]
4pub use search::*;
5
6#[cfg(feature = "ingest")]
7mod ingest;
8#[cfg(feature = "ingest")]
9pub use ingest::*;
10
11#[cfg(feature = "control")]
12mod control;
13#[cfg(feature = "control")]
14pub use control::*;
15
16use std::cell::RefCell;
17use std::io::{BufRead, BufReader, Write};
18use std::net::{TcpStream, ToSocketAddrs};
19
20use crate::commands::{StartCommand, StreamCommand};
21use crate::protocol::{self, Protocol};
22use crate::result::*;
23
24const UNINITIALIZED_MODE_MAX_BUFFER_SIZE: usize = 200;
25
26#[derive(Debug, Clone, Copy, PartialEq, Eq)]
28pub enum ChannelMode {
29 #[cfg(feature = "search")]
36 Search,
37
38 #[cfg(feature = "ingest")]
44 Ingest,
45
46 #[cfg(feature = "control")]
53 Control,
54}
55
56impl ChannelMode {
57 pub fn as_str(&self) -> &str {
59 match self {
60 #[cfg(feature = "search")]
61 ChannelMode::Search => "search",
62
63 #[cfg(feature = "ingest")]
64 ChannelMode::Ingest => "ingest",
65
66 #[cfg(feature = "control")]
67 ChannelMode::Control => "control",
68 }
69 }
70}
71
72impl std::fmt::Display for ChannelMode {
73 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
74 write!(f, "{}", self.as_str())
75 }
76}
77
78#[derive(Debug)]
83pub struct SonicStream {
84 stream: RefCell<TcpStream>,
85 reader: RefCell<BufReader<TcpStream>>,
86 mode: Option<ChannelMode>, max_buffer_size: usize,
88 protocol: Protocol,
89}
90
91impl SonicStream {
92 fn send<SC: StreamCommand>(&self, command: &SC) -> Result<()> {
93 let buf = self
94 .protocol
95 .format_request(command.request())
96 .map_err(|_| Error::WriteToStream)?;
97 self.stream
98 .borrow_mut()
99 .write_all(&buf)
100 .map_err(|_| Error::WriteToStream)?;
101 Ok(())
102 }
103
104 fn read_line(&self) -> Result<protocol::Response> {
105 let line = {
106 let mut line = String::with_capacity(self.max_buffer_size);
107 self.reader
108 .borrow_mut()
109 .read_line(&mut line)
110 .map_err(|_| Error::ReadStream)?;
111 line
112 };
113
114 log::debug!("[channel] {}", &line);
115 self.protocol.parse_response(&line)
116 }
117
118 pub(crate) fn run_command<SC: StreamCommand>(&self, command: SC) -> Result<SC::Response> {
119 self.send(&command)?;
120 let res = loop {
121 let res = self.read_line()?;
122 if !matches!(&res, protocol::Response::Pending(_)) {
123 break res;
124 }
125 };
126 command.receive(res)
127 }
128
129 fn connect<A: ToSocketAddrs>(addr: A) -> Result<Self> {
130 let stream = TcpStream::connect(addr).map_err(|_| Error::ConnectToServer)?;
131 let read_stream = stream.try_clone().map_err(|_| Error::ConnectToServer)?;
132
133 let channel = SonicStream {
134 reader: RefCell::new(BufReader::new(read_stream)),
135 stream: RefCell::new(stream),
136 mode: None,
137 max_buffer_size: UNINITIALIZED_MODE_MAX_BUFFER_SIZE,
138 protocol: Default::default(),
139 };
140
141 let res = channel.read_line()?;
142 if matches!(res, protocol::Response::Connected) {
143 Ok(channel)
144 } else {
145 Err(Error::ConnectToServer)
146 }
147 }
148
149 fn start<S: ToString>(&mut self, mode: ChannelMode, password: S) -> Result<()> {
150 if self.mode.is_some() {
151 return Err(Error::RunCommand);
152 }
153
154 let res = self.run_command(StartCommand {
155 mode,
156 password: password.to_string(),
157 })?;
158
159 self.max_buffer_size = res.max_buffer_size;
160 self.protocol = Protocol::from(res.protocol_version);
161 self.mode = Some(res.mode);
162
163 Ok(())
164 }
165
166 pub(crate) fn connect_with_start<A, S>(mode: ChannelMode, addr: A, password: S) -> Result<Self>
171 where
172 A: ToSocketAddrs,
173 S: ToString,
174 {
175 let mut channel = Self::connect(addr)?;
176 channel.start(mode, password)?;
177 Ok(channel)
178 }
179}
180
181pub trait SonicChannel {
183 type Channel;
185
186 fn stream(&self) -> &SonicStream;
188
189 fn start<A, S>(addr: A, password: S) -> Result<Self::Channel>
202 where
203 A: ToSocketAddrs,
204 S: ToString;
205}
206
207#[cfg(test)]
208mod tests {
209 use super::*;
210
211 #[test]
212 fn format_channel_enums() {
213 #[cfg(feature = "search")]
214 assert_eq!(format!("{}", ChannelMode::Search), String::from("search"));
215 #[cfg(feature = "ingest")]
216 assert_eq!(format!("{}", ChannelMode::Ingest), String::from("ingest"));
217 #[cfg(feature = "control")]
218 assert_eq!(format!("{}", ChannelMode::Control), String::from("control"));
219 }
220}