use std::io::{BufRead, BufReader, Write};
use std::os::unix::net::UnixStream;
use std::path::Path;
use super::resolve::socket_path;
use super::types::{
ClientMessage, DaemonSockError, Result, SearchQuery, SearchResults, ServerMessage,
};
pub struct DaemonClient {
pub(super) stream: BufReader<UnixStream>,
}
impl DaemonClient {
pub fn connect(root: &Path) -> Result<Self> {
let sp = socket_path(root);
let stream = UnixStream::connect(&sp)?;
stream.set_read_timeout(Some(std::time::Duration::from_secs(5)))?;
stream.set_write_timeout(Some(std::time::Duration::from_secs(5)))?;
Ok(Self {
stream: BufReader::new(stream),
})
}
pub fn recv(&mut self) -> Result<ServerMessage> {
let mut line = String::new();
let bytes = self.stream.read_line(&mut line).map_err(|e| {
if e.kind() == std::io::ErrorKind::TimedOut {
DaemonSockError::Io(std::io::Error::new(
std::io::ErrorKind::TimedOut,
"recv timed out after 5s",
))
} else {
DaemonSockError::Io(e)
}
})?;
if bytes == 0 {
return Err(DaemonSockError::Io(std::io::Error::new(
std::io::ErrorKind::UnexpectedEof,
"daemon closed connection",
)));
}
let msg: ServerMessage = serde_json::from_str(line.trim_end()).map_err(|e| {
DaemonSockError::Io(std::io::Error::new(
std::io::ErrorKind::InvalidData,
format!("invalid JSON: {e}"),
))
})?;
Ok(msg)
}
pub fn send(&mut self, msg: &ClientMessage) -> Result<()> {
let stream = self.stream.get_mut();
let mut line = serde_json::to_string(msg)?;
line.push('\n');
stream.write_all(line.as_bytes())?;
stream.flush()?;
Ok(())
}
pub fn search(&mut self, query: SearchQuery) -> Result<SearchResults> {
use std::io::Write;
let stream = self.stream.get_mut();
stream.set_write_timeout(Some(std::time::Duration::from_secs(3)))?;
let mut line = serde_json::to_string(&ClientMessage::SearchQuery(query))?;
line.push('\n');
stream.write_all(line.as_bytes())?;
stream.flush()?;
stream.set_read_timeout(Some(std::time::Duration::from_secs(3)))?;
let mut response_line = String::new();
self.stream.read_line(&mut response_line)?;
match serde_json::from_str::<ServerMessage>(response_line.trim_end()) {
Ok(ServerMessage::SearchResults(results)) => {
if let Some(ref error) = results.error {
return Err(DaemonSockError::Io(std::io::Error::other(format!(
"daemon search error: {error}"
))));
}
Ok(results)
}
Ok(other) => Err(DaemonSockError::Io(std::io::Error::new(
std::io::ErrorKind::InvalidData,
format!("expected SearchResults, got {other:?}"),
))),
Err(e) => Err(DaemonSockError::Io(std::io::Error::new(
std::io::ErrorKind::InvalidData,
format!("invalid JSON: {e}"),
))),
}
}
pub fn search_progressive(&mut self, query: SearchQuery) -> Result<SearchResultsIter<'_>> {
use std::io::Write;
let mut prog_query = query;
prog_query.progressive = true;
let stream = self.stream.get_mut();
stream.set_write_timeout(Some(std::time::Duration::from_secs(3)))?;
let mut line = serde_json::to_string(&ClientMessage::SearchQuery(prog_query))?;
line.push('\n');
stream.write_all(line.as_bytes())?;
stream.flush()?;
Ok(SearchResultsIter {
client: self,
done_received: false,
})
}
}
pub struct SearchResultsIter<'a> {
client: &'a mut DaemonClient,
done_received: bool,
}
impl Iterator for SearchResultsIter<'_> {
type Item = Result<SearchResults>;
fn next(&mut self) -> Option<Self::Item> {
if self.done_received {
return None;
}
match self.client.recv() {
Ok(ServerMessage::SearchResults(results)) => {
if let Some(ref error) = results.error {
return Some(Err(DaemonSockError::Io(std::io::Error::other(format!(
"daemon search error: {error}"
)))));
}
let is_done = results.done;
if is_done {
self.done_received = true;
}
Some(Ok(results))
}
Ok(_) => Some(Err(DaemonSockError::Io(std::io::Error::new(
std::io::ErrorKind::InvalidData,
"expected SearchResults message from daemon",
)))),
Err(e) => {
if matches!(
&e,
DaemonSockError::Io(io_err) if io_err.kind() == std::io::ErrorKind::UnexpectedEof
) {
None
} else {
Some(Err(e))
}
}
}
}
}