use std::io::{BufRead, BufReader, Write};
use std::os::unix::net::UnixStream;
use std::path::PathBuf;
use std::time::Duration;
#[derive(Debug)]
pub enum CombResult {
Hit {
data: CombData,
age_ms: u128,
stale: bool,
},
Miss,
}
#[derive(Debug, Clone)]
pub struct CombData {
value: serde_json::Value,
}
impl CombData {
pub fn from_json(value: serde_json::Value) -> Self {
Self { value }
}
pub fn get_str(&self, field: &str) -> Option<&str> {
if let Some(obj) = self.value.as_object() {
obj.get(field).and_then(|v| v.as_str())
} else {
self.value.as_str()
}
}
pub fn get_bool(&self, field: &str) -> Option<bool> {
if let Some(obj) = self.value.as_object() {
obj.get(field).and_then(|v| v.as_bool())
} else {
self.value.as_bool()
}
}
pub fn get_i64(&self, field: &str) -> Option<i64> {
if let Some(obj) = self.value.as_object() {
obj.get(field).and_then(|v| v.as_i64())
} else {
self.value.as_i64()
}
}
pub fn get_f64(&self, field: &str) -> Option<f64> {
if let Some(obj) = self.value.as_object() {
obj.get(field).and_then(|v| v.as_f64())
} else {
self.value.as_f64()
}
}
pub fn as_value(&self) -> &serde_json::Value {
&self.value
}
pub fn as_text(&self) -> Option<String> {
match &self.value {
serde_json::Value::String(s) => Some(s.clone()),
serde_json::Value::Number(n) => Some(n.to_string()),
serde_json::Value::Bool(b) => Some(b.to_string()),
serde_json::Value::Null => None,
other => Some(other.to_string()),
}
}
}
#[derive(Debug)]
pub enum CombError {
DaemonNotRunning,
ConnectionFailed(std::io::Error),
IoError(std::io::Error),
ParseError(String),
ServerError(String),
Timeout,
}
impl std::fmt::Display for CombError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
CombError::DaemonNotRunning => write!(f, "comb daemon is not running"),
CombError::ConnectionFailed(e) => write!(f, "connection failed: {}", e),
CombError::IoError(e) => write!(f, "I/O error: {}", e),
CombError::ParseError(s) => write!(f, "parse error: {}", s),
CombError::ServerError(s) => write!(f, "server error: {}", s),
CombError::Timeout => write!(f, "operation timed out"),
}
}
}
impl std::error::Error for CombError {}
impl From<std::io::Error> for CombError {
fn from(e: std::io::Error) -> Self {
CombError::IoError(e)
}
}
#[derive(Debug, Clone)]
pub struct ClientConfig {
pub timeout: Duration,
pub auto_start: bool,
}
impl Default for ClientConfig {
fn default() -> Self {
Self {
timeout: Duration::from_millis(100),
auto_start: true,
}
}
}
pub struct Client {
config: ClientConfig,
}
impl Client {
pub fn new() -> Self {
Self {
config: ClientConfig::default(),
}
}
pub fn with_config(config: ClientConfig) -> Self {
Self { config }
}
pub fn get(&self, key: &str, path: Option<&str>) -> Result<CombResult, CombError> {
let socket_path = self.find_or_start_socket()?;
let mut stream = self.connect(&socket_path)?;
let mut request = serde_json::json!({ "op": "get", "key": key });
if let Some(p) = path {
request["path"] = serde_json::json!(p);
}
self.send_recv(&mut stream, &request)
}
pub fn poke(&self, key: &str, path: Option<&str>) -> Result<(), CombError> {
let socket_path = self.find_or_start_socket()?;
let mut stream = self.connect(&socket_path)?;
let mut request = serde_json::json!({ "op": "poke", "key": key });
if let Some(p) = path {
request["path"] = serde_json::json!(p);
}
let msg = format!("{}\n", serde_json::to_string(&request).unwrap());
stream.write_all(msg.as_bytes())?;
let mut reader = BufReader::new(stream);
let mut line = String::new();
reader.read_line(&mut line)?;
Ok(())
}
pub fn session(&self) -> Result<Session, CombError> {
let socket_path = self.find_or_start_socket()?;
let stream = self.connect(&socket_path)?;
Ok(Session::new(stream))
}
fn find_or_start_socket(&self) -> Result<PathBuf, CombError> {
let path = socket_path();
if UnixStream::connect(&path).is_ok() {
return Ok(path);
}
if !self.config.auto_start {
return Err(CombError::DaemonNotRunning);
}
start_daemon(&path)?;
let mut delay = Duration::from_millis(10);
for _ in 0..8 {
std::thread::sleep(delay);
if UnixStream::connect(&path).is_ok() {
return Ok(path);
}
delay = (delay * 2).min(Duration::from_millis(500));
}
Err(CombError::DaemonNotRunning)
}
fn connect(&self, path: &PathBuf) -> Result<UnixStream, CombError> {
let stream = UnixStream::connect(path).map_err(CombError::ConnectionFailed)?;
stream.set_read_timeout(Some(self.config.timeout))?;
stream.set_write_timeout(Some(self.config.timeout))?;
Ok(stream)
}
fn send_recv(
&self,
stream: &mut UnixStream,
request: &serde_json::Value,
) -> Result<CombResult, CombError> {
let msg = format!("{}\n", serde_json::to_string(request).unwrap());
stream.write_all(msg.as_bytes())?;
let mut reader = BufReader::new(stream);
let mut line = String::new();
reader.read_line(&mut line).map_err(|e| {
if e.kind() == std::io::ErrorKind::WouldBlock
|| e.kind() == std::io::ErrorKind::TimedOut
{
CombError::Timeout
} else {
CombError::IoError(e)
}
})?;
parse_response(&line)
}
}
impl Default for Client {
fn default() -> Self {
Self::new()
}
}
pub struct Session {
reader: BufReader<UnixStream>,
}
impl Session {
fn new(stream: UnixStream) -> Self {
Self {
reader: BufReader::new(stream),
}
}
pub fn get(&mut self, key: &str, path: Option<&str>) -> Result<CombResult, CombError> {
let mut request = serde_json::json!({ "op": "get", "key": key });
if let Some(p) = path {
request["path"] = serde_json::json!(p);
}
let msg = format!("{}\n", serde_json::to_string(&request).unwrap());
self.reader.get_mut().write_all(msg.as_bytes())?;
let mut line = String::new();
self.reader.read_line(&mut line)?;
parse_response(&line)
}
pub fn set_context(&mut self, path: &str) -> Result<(), CombError> {
let request = serde_json::json!({ "op": "context", "path": path });
let msg = format!("{}\n", serde_json::to_string(&request).unwrap());
self.reader.get_mut().write_all(msg.as_bytes())?;
let mut line = String::new();
self.reader.read_line(&mut line)?;
Ok(())
}
pub fn poke(&mut self, key: &str, path: Option<&str>) -> Result<(), CombError> {
let mut request = serde_json::json!({ "op": "poke", "key": key });
if let Some(p) = path {
request["path"] = serde_json::json!(p);
}
let msg = format!("{}\n", serde_json::to_string(&request).unwrap());
self.reader.get_mut().write_all(msg.as_bytes())?;
let mut line = String::new();
self.reader.read_line(&mut line)?;
Ok(())
}
}
fn parse_response(line: &str) -> Result<CombResult, CombError> {
let resp: serde_json::Value =
serde_json::from_str(line.trim()).map_err(|e| CombError::ParseError(e.to_string()))?;
let ok = resp.get("ok").and_then(|v| v.as_bool()).unwrap_or(false);
if !ok {
let error = resp
.get("error")
.and_then(|v| v.as_str())
.unwrap_or("unknown error")
.to_string();
return Err(CombError::ServerError(error));
}
match resp.get("data") {
Some(serde_json::Value::Null) | None => Ok(CombResult::Miss),
Some(data) => {
let age_ms = resp
.get("age_ms")
.and_then(|v| v.as_u64())
.map(|v| v as u128)
.unwrap_or(0);
let stale = resp.get("stale").and_then(|v| v.as_bool()).unwrap_or(false);
Ok(CombResult::Hit {
data: CombData {
value: data.clone(),
},
age_ms,
stale,
})
}
}
}
pub fn socket_path() -> PathBuf {
if let Ok(runtime_dir) = std::env::var("XDG_RUNTIME_DIR") {
let path = PathBuf::from(runtime_dir).join("beachcomber").join("sock");
if path.exists() {
return path;
}
}
let uid = unsafe { libc::getuid() };
let tmpdir = std::env::var("TMPDIR").unwrap_or_else(|_| "/tmp".to_string());
PathBuf::from(tmpdir)
.join(format!("beachcomber-{}", uid))
.join("sock")
}
fn start_daemon(socket_path: &PathBuf) -> Result<(), CombError> {
use std::process::Command;
let comb = which_comb().ok_or(CombError::DaemonNotRunning)?;
if let Some(parent) = socket_path.parent() {
let _ = std::fs::create_dir_all(parent);
}
Command::new(&comb)
.arg("daemon")
.arg("--socket")
.arg(socket_path.as_os_str())
.stdin(std::process::Stdio::null())
.stdout(std::process::Stdio::null())
.stderr(std::process::Stdio::null())
.spawn()
.map_err(CombError::ConnectionFailed)?;
Ok(())
}
fn which_comb() -> Option<PathBuf> {
if let Ok(path) = std::env::var("PATH") {
for dir in path.split(':') {
let candidate = PathBuf::from(dir).join("comb");
if candidate.exists() {
return Some(candidate);
}
}
}
None
}