#[cfg(feature = "search")]
mod search;
#[cfg(feature = "search")]
pub use search::*;
#[cfg(feature = "ingest")]
mod ingest;
#[cfg(feature = "ingest")]
pub use ingest::*;
#[cfg(feature = "control")]
mod control;
#[cfg(feature = "control")]
pub use control::*;
use std::cell::RefCell;
use std::io::{BufRead, BufReader, Write};
use std::net::{TcpStream, ToSocketAddrs};
use crate::commands::{StartCommand, StreamCommand};
use crate::protocol::{self, Protocol};
use crate::result::*;
const UNINITIALIZED_MODE_MAX_BUFFER_SIZE: usize = 200;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ChannelMode {
#[cfg(feature = "search")]
Search,
#[cfg(feature = "ingest")]
Ingest,
#[cfg(feature = "control")]
Control,
}
impl ChannelMode {
pub fn as_str(&self) -> &str {
match self {
#[cfg(feature = "search")]
ChannelMode::Search => "search",
#[cfg(feature = "ingest")]
ChannelMode::Ingest => "ingest",
#[cfg(feature = "control")]
ChannelMode::Control => "control",
}
}
}
impl std::fmt::Display for ChannelMode {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.as_str())
}
}
#[derive(Debug)]
pub struct SonicStream {
stream: RefCell<TcpStream>,
reader: RefCell<BufReader<TcpStream>>,
mode: Option<ChannelMode>, max_buffer_size: usize,
protocol: Protocol,
}
impl SonicStream {
fn send<SC: StreamCommand>(&self, command: &SC) -> Result<()> {
let buf = self
.protocol
.format_request(command.request())
.map_err(|_| Error::WriteToStream)?;
self.stream
.borrow_mut()
.write_all(&buf)
.map_err(|_| Error::WriteToStream)?;
Ok(())
}
fn read_line(&self) -> Result<protocol::Response> {
let line = {
let mut line = String::with_capacity(self.max_buffer_size);
self.reader
.borrow_mut()
.read_line(&mut line)
.map_err(|_| Error::ReadStream)?;
line
};
log::debug!("[channel] {}", &line);
self.protocol.parse_response(&line)
}
pub(crate) fn run_command<SC: StreamCommand>(&self, command: SC) -> Result<SC::Response> {
self.send(&command)?;
let res = loop {
let res = self.read_line()?;
if !matches!(&res, protocol::Response::Pending(_)) {
break res;
}
};
command.receive(res)
}
fn connect<A: ToSocketAddrs>(addr: A) -> Result<Self> {
let stream = TcpStream::connect(addr).map_err(|_| Error::ConnectToServer)?;
let read_stream = stream.try_clone().map_err(|_| Error::ConnectToServer)?;
let channel = SonicStream {
reader: RefCell::new(BufReader::new(read_stream)),
stream: RefCell::new(stream),
mode: None,
max_buffer_size: UNINITIALIZED_MODE_MAX_BUFFER_SIZE,
protocol: Default::default(),
};
let res = channel.read_line()?;
if matches!(res, protocol::Response::Connected) {
Ok(channel)
} else {
Err(Error::ConnectToServer)
}
}
fn start<S: ToString>(&mut self, mode: ChannelMode, password: S) -> Result<()> {
if self.mode.is_some() {
return Err(Error::RunCommand);
}
let res = self.run_command(StartCommand {
mode,
password: password.to_string(),
})?;
self.max_buffer_size = res.max_buffer_size;
self.protocol = Protocol::from(res.protocol_version);
self.mode = Some(res.mode);
Ok(())
}
pub(crate) fn connect_with_start<A, S>(mode: ChannelMode, addr: A, password: S) -> Result<Self>
where
A: ToSocketAddrs,
S: ToString,
{
let mut channel = Self::connect(addr)?;
channel.start(mode, password)?;
Ok(channel)
}
}
pub trait SonicChannel {
type Channel;
fn stream(&self) -> &SonicStream;
fn start<A, S>(addr: A, password: S) -> Result<Self::Channel>
where
A: ToSocketAddrs,
S: ToString;
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn format_channel_enums() {
#[cfg(feature = "search")]
assert_eq!(format!("{}", ChannelMode::Search), String::from("search"));
#[cfg(feature = "ingest")]
assert_eq!(format!("{}", ChannelMode::Ingest), String::from("ingest"));
#[cfg(feature = "control")]
assert_eq!(format!("{}", ChannelMode::Control), String::from("control"));
}
}