sonic_channel/
channels.rs

1#[cfg(feature = "search")]
2mod search;
3#[cfg(feature = "search")]
4pub use search::*;
5
6#[cfg(feature = "ingest")]
7mod ingest;
8#[cfg(feature = "ingest")]
9pub use ingest::*;
10
11#[cfg(feature = "control")]
12mod control;
13#[cfg(feature = "control")]
14pub use control::*;
15
16use std::cell::RefCell;
17use std::io::{BufRead, BufReader, Write};
18use std::net::{TcpStream, ToSocketAddrs};
19
20use crate::commands::{StartCommand, StreamCommand};
21use crate::protocol::{self, Protocol};
22use crate::result::*;
23
24const UNINITIALIZED_MODE_MAX_BUFFER_SIZE: usize = 200;
25
26/// Channel modes supported by sonic search backend.
27#[derive(Debug, Clone, Copy, PartialEq, Eq)]
28pub enum ChannelMode {
29    /// Sonic server search channel mode.
30    ///
31    /// In this mode you can use `query`, `pag_query`, `suggest`, `lim_suggest`, `ping`
32    /// and `quit` commands.
33    ///
34    /// Note: This mode requires enabling the `search` feature.
35    #[cfg(feature = "search")]
36    Search,
37
38    /// Sonic server ingest channel mode.
39    ///
40    /// In this mode you can use `push`, `pop`, `flush`, `count` `ping` and `quit` commands.
41    ///
42    /// Note: This mode requires enabling the `ingest` feature.
43    #[cfg(feature = "ingest")]
44    Ingest,
45
46    /// Sonic server control channel mode.
47    ///
48    /// In this mode you can use `trigger`, `consolidate`, `backup`, `restore`,
49    /// `ping` and `quit` commands.
50    ///
51    /// Note: This mode requires enabling the `control` feature.
52    #[cfg(feature = "control")]
53    Control,
54}
55
56impl ChannelMode {
57    /// Converts enum to &str
58    pub fn as_str(&self) -> &str {
59        match self {
60            #[cfg(feature = "search")]
61            ChannelMode::Search => "search",
62
63            #[cfg(feature = "ingest")]
64            ChannelMode::Ingest => "ingest",
65
66            #[cfg(feature = "control")]
67            ChannelMode::Control => "control",
68        }
69    }
70}
71
72impl std::fmt::Display for ChannelMode {
73    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
74        write!(f, "{}", self.as_str())
75    }
76}
77
78/// Root and Heart of this library.
79///
80/// You can connect to the sonic search backend and run all supported protocol methods.
81///
82#[derive(Debug)]
83pub struct SonicStream {
84    stream: RefCell<TcpStream>,
85    reader: RefCell<BufReader<TcpStream>>,
86    mode: Option<ChannelMode>, // None – Uninitialized mode
87    max_buffer_size: usize,
88    protocol: Protocol,
89}
90
91impl SonicStream {
92    fn send<SC: StreamCommand>(&self, command: &SC) -> Result<()> {
93        let buf = self
94            .protocol
95            .format_request(command.request())
96            .map_err(|_| Error::WriteToStream)?;
97        self.stream
98            .borrow_mut()
99            .write_all(&buf)
100            .map_err(|_| Error::WriteToStream)?;
101        Ok(())
102    }
103
104    fn read_line(&self) -> Result<protocol::Response> {
105        let line = {
106            let mut line = String::with_capacity(self.max_buffer_size);
107            self.reader
108                .borrow_mut()
109                .read_line(&mut line)
110                .map_err(|_| Error::ReadStream)?;
111            line
112        };
113
114        log::debug!("[channel] {}", &line);
115        self.protocol.parse_response(&line)
116    }
117
118    pub(crate) fn run_command<SC: StreamCommand>(&self, command: SC) -> Result<SC::Response> {
119        self.send(&command)?;
120        let res = loop {
121            let res = self.read_line()?;
122            if !matches!(&res, protocol::Response::Pending(_)) {
123                break res;
124            }
125        };
126        command.receive(res)
127    }
128
129    fn connect<A: ToSocketAddrs>(addr: A) -> Result<Self> {
130        let stream = TcpStream::connect(addr).map_err(|_| Error::ConnectToServer)?;
131        let read_stream = stream.try_clone().map_err(|_| Error::ConnectToServer)?;
132
133        let channel = SonicStream {
134            reader: RefCell::new(BufReader::new(read_stream)),
135            stream: RefCell::new(stream),
136            mode: None,
137            max_buffer_size: UNINITIALIZED_MODE_MAX_BUFFER_SIZE,
138            protocol: Default::default(),
139        };
140
141        let res = channel.read_line()?;
142        if matches!(res, protocol::Response::Connected) {
143            Ok(channel)
144        } else {
145            Err(Error::ConnectToServer)
146        }
147    }
148
149    fn start<S: ToString>(&mut self, mode: ChannelMode, password: S) -> Result<()> {
150        if self.mode.is_some() {
151            return Err(Error::RunCommand);
152        }
153
154        let res = self.run_command(StartCommand {
155            mode,
156            password: password.to_string(),
157        })?;
158
159        self.max_buffer_size = res.max_buffer_size;
160        self.protocol = Protocol::from(res.protocol_version);
161        self.mode = Some(res.mode);
162
163        Ok(())
164    }
165
166    /// Connect to the search backend in chosen mode.
167    ///
168    /// I think we shouldn't separate commands connect and start because we haven't
169    /// possibility to change channel in sonic server, if we already chosen one of them. 🤔
170    pub(crate) fn connect_with_start<A, S>(mode: ChannelMode, addr: A, password: S) -> Result<Self>
171    where
172        A: ToSocketAddrs,
173        S: ToString,
174    {
175        let mut channel = Self::connect(addr)?;
176        channel.start(mode, password)?;
177        Ok(channel)
178    }
179}
180
181/// This trait should be implemented for all supported sonic channels
182pub trait SonicChannel {
183    /// Sonic channel struct
184    type Channel;
185
186    /// Returns reference for sonic stream of connection
187    fn stream(&self) -> &SonicStream;
188
189    /// Connects to sonic backend and run start command.
190    ///
191    /// ```rust,no_run
192    /// # use sonic_channel::*;
193    /// # fn main() -> result::Result<()> {
194    /// let search_channel = SearchChannel::start(
195    ///     "localhost:1491",
196    ///     "SecretPassword",
197    /// )?;
198    /// # Ok(())
199    /// # }
200    /// ```
201    fn start<A, S>(addr: A, password: S) -> Result<Self::Channel>
202    where
203        A: ToSocketAddrs,
204        S: ToString;
205}
206
207#[cfg(test)]
208mod tests {
209    use super::*;
210
211    #[test]
212    fn format_channel_enums() {
213        #[cfg(feature = "search")]
214        assert_eq!(format!("{}", ChannelMode::Search), String::from("search"));
215        #[cfg(feature = "ingest")]
216        assert_eq!(format!("{}", ChannelMode::Ingest), String::from("ingest"));
217        #[cfg(feature = "control")]
218        assert_eq!(format!("{}", ChannelMode::Control), String::from("control"));
219    }
220}