use std::io::{BufRead, BufReader, Write};
use std::os::unix::net::UnixStream;
use std::process::Stdio;
use std::time::Duration;
use super::paths;
use super::protocol::{Request, Response};
use crate::contexts::daemon::domain::cache::{CachePort, CacheState};
const READ_TIMEOUT_MS: u64 = 500;
pub struct DaemonClient;
impl CachePort for DaemonClient {
fn query(&self, repo_id: &str) -> Result<CacheState, ()> {
Self::restart_if_version_mismatched();
let cwd = std::env::current_dir()
.map(|p| p.to_string_lossy().into_owned())
.unwrap_or_default();
let request = Request::Query {
repo_id: repo_id.to_owned(),
cwd,
};
if let Ok(response) = Self::send(&request) {
response_to_cache_state(response)
} else {
Self::lazy_start();
match Self::send(&request) {
Ok(response) => response_to_cache_state(response),
Err(()) => Err(()),
}
}
}
fn update(&self, repo_id: &str, output: &str) {
let _ = Self::send(&Request::Update {
repo_id: repo_id.to_owned(),
output: output.to_owned(),
});
}
}
fn response_to_cache_state(response: Response) -> Result<CacheState, ()> {
match response {
Response::Fresh { output } => Ok(CacheState::Fresh(output)),
Response::Stale { output } => Ok(CacheState::Stale(output)),
Response::Miss => Ok(CacheState::Miss),
_ => Err(()),
}
}
impl DaemonClient {
const CLIENT_VERSION: &'static str = env!("CARGO_PKG_VERSION");
pub(super) fn stop() -> bool {
Self::send(&Request::Stop).is_ok()
}
pub(super) fn status_raw() -> Option<(u32, usize, u64, String)> {
match Self::send(&Request::Status) {
Ok(Response::Status {
pid,
entries,
uptime_secs,
version,
}) => Some((pid, entries, uptime_secs, version)),
_ => None,
}
}
fn restart_if_version_mismatched() {
let Some((_pid, _entries, _uptime_secs, daemon_version)) = Self::status_raw() else {
return;
};
if daemon_version == Self::CLIENT_VERSION {
return;
}
let _ = Self::stop();
Self::lazy_start();
}
fn send(request: &Request) -> Result<Response, ()> {
let stream = UnixStream::connect(paths::socket_path()).map_err(|_| ())?;
stream
.set_read_timeout(Some(Duration::from_millis(READ_TIMEOUT_MS)))
.map_err(|_| ())?;
let mut stream = stream;
let json = serde_json::to_string(request).map_err(|_| ())?;
stream
.write_all(format!("{json}\n").as_bytes())
.map_err(|_| ())?;
let mut buf = String::new();
BufReader::new(&stream)
.read_line(&mut buf)
.map_err(|_| ())?;
serde_json::from_str(buf.trim()).map_err(|_| ())
}
fn lazy_start() {
let Ok(exe) = std::env::current_exe() else {
return;
};
let _ = std::process::Command::new(&exe)
.args(["daemon", "start"])
.stdin(Stdio::null())
.stdout(Stdio::null())
.stderr(Stdio::null())
.status();
}
}
pub fn query_via_daemon(repo_id: &str) -> Option<String> {
cache_state_to_output(DaemonClient.query(repo_id))
}
fn cache_state_to_output(state: Result<CacheState, ()>) -> Option<String> {
match state {
Ok(CacheState::Fresh(s)) => Some(s),
Ok(CacheState::Stale(s)) if !s.is_empty() => Some(s),
_ => None,
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn fresh_with_output_returns_some() {
let result = cache_state_to_output(Ok(CacheState::Fresh("✓ merge-ready".into())));
assert_eq!(result, Some("✓ merge-ready".into()));
}
#[test]
fn stale_with_output_returns_some() {
let result = cache_state_to_output(Ok(CacheState::Stale("✓ merge-ready".into())));
assert_eq!(result, Some("✓ merge-ready".into()));
}
#[test]
fn miss_returns_none() {
let result = cache_state_to_output(Ok(CacheState::Miss));
assert_eq!(result, None);
}
#[test]
fn error_returns_none() {
let result = cache_state_to_output(Err(()));
assert_eq!(result, None);
}
#[test]
fn fresh_empty_returns_some_empty() {
let result = cache_state_to_output(Ok(CacheState::Fresh(String::new())));
assert_eq!(result, Some(String::new()));
}
#[test]
fn stale_empty_returns_none() {
let result = cache_state_to_output(Ok(CacheState::Stale(String::new())));
assert_eq!(result, None);
}
}