use crate::lifecycle_service::LifecycleWorkbenchSnapshot;
use crate::lifecycle_store::LedgerEntry;
use serde::de::DeserializeOwned;
use serde_json::{Value, json};
use std::collections::HashMap;
use std::io::{BufRead, BufReader, Write};
use std::path::{Path, PathBuf};
use std::process::{Child, ChildStdin, ChildStdout, Command, Stdio};
use std::sync::{Arc, Mutex, OnceLock};
#[derive(Debug, Clone)]
pub struct DaemonClient {
daemon_bin: PathBuf,
config_path: PathBuf,
}
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
struct DaemonSessionKey {
daemon_bin: PathBuf,
config_path: PathBuf,
}
#[derive(Debug)]
struct DaemonProcess {
child: Child,
stdin: ChildStdin,
stdout: BufReader<ChildStdout>,
}
type DaemonSessionHandle = Arc<Mutex<DaemonProcess>>;
type DaemonSessionPool = Mutex<HashMap<DaemonSessionKey, DaemonSessionHandle>>;
impl DaemonClient {
pub fn new(daemon_bin: &Path, config_path: &Path) -> Self {
Self {
daemon_bin: daemon_bin.to_path_buf(),
config_path: config_path.to_path_buf(),
}
}
pub fn load_workbench(&self) -> anyhow::Result<LifecycleWorkbenchSnapshot> {
let response = ensure_ok(self.request(json!({ "command": "workbench" }))?)?;
Ok(LifecycleWorkbenchSnapshot {
pending_review: parse_required(response.get("pending_review"), "pending_review")?,
wakeup_ready: parse_required(response.get("wakeup_ready"), "wakeup_ready")?,
})
}
pub fn get_record(&self, record_id: &str) -> anyhow::Result<Option<LedgerEntry>> {
let response = self.request(json!({ "command": "record", "record_id": record_id }))?;
if response.get("ok").and_then(Value::as_bool).unwrap_or(false) {
match response.get("record") {
None | Some(Value::Null) => Ok(None),
Some(_) => parse_required(response.get("record"), "record"),
}
} else {
let error = response
.get("error")
.and_then(Value::as_str)
.unwrap_or("daemon request failed");
if error == format!("memory record not found: {record_id}") {
return Ok(None);
}
anyhow::bail!(error.to_string());
}
}
pub fn get_history(&self, record_id: &str) -> anyhow::Result<Vec<LedgerEntry>> {
let response =
ensure_ok(self.request(json!({ "command": "history", "record_id": record_id }))?)?;
parse_required(response.get("history"), "history")
}
#[cfg(test)]
pub(crate) fn session_pid(&self) -> Option<u32> {
let key = self.session_key();
let session = session_pool()
.lock()
.ok()
.and_then(|sessions| sessions.get(&key).cloned())?;
let process = session.lock().ok()?;
Some(process.child.id())
}
fn request(&self, request: Value) -> anyhow::Result<Value> {
let key = self.session_key();
match self.request_with_session(&key, &request) {
Ok(response) => Ok(response),
Err(first_error) => {
remove_session(&key);
self.request_with_session(&key, &request)
.map_err(|_| first_error)
}
}
}
fn request_with_session(
&self,
key: &DaemonSessionKey,
request: &Value,
) -> anyhow::Result<Value> {
self.session_handle(key)?
.lock()
.map_err(|_| anyhow::anyhow!("daemon session lock poisoned"))?
.request(request)
}
fn session_key(&self) -> DaemonSessionKey {
DaemonSessionKey {
daemon_bin: self.daemon_bin.clone(),
config_path: self.config_path.clone(),
}
}
fn session_handle(&self, key: &DaemonSessionKey) -> anyhow::Result<DaemonSessionHandle> {
let mut sessions = session_pool()
.lock()
.map_err(|_| anyhow::anyhow!("daemon session pool lock poisoned"))?;
if let Some(session) = sessions.get(key) {
return Ok(session.clone());
}
let session = Arc::new(Mutex::new(DaemonProcess::spawn(key)?));
sessions.insert(key.clone(), session.clone());
Ok(session)
}
}
impl DaemonProcess {
fn spawn(key: &DaemonSessionKey) -> anyhow::Result<Self> {
let config_path = key
.config_path
.to_str()
.ok_or_else(|| anyhow::anyhow!("config path is not valid UTF-8"))?;
let mut child = Command::new(&key.daemon_bin)
.args(["--config", config_path])
.stdin(Stdio::piped())
.stdout(Stdio::piped())
.spawn()?;
let stdin = child
.stdin
.take()
.ok_or_else(|| anyhow::anyhow!("missing daemon stdin"))?;
let stdout = child
.stdout
.take()
.ok_or_else(|| anyhow::anyhow!("missing daemon stdout"))?;
Ok(Self {
child,
stdin,
stdout: BufReader::new(stdout),
})
}
fn request(&mut self, request: &Value) -> anyhow::Result<Value> {
serde_json::to_writer(&mut self.stdin, request)?;
self.stdin.write_all(b"\n")?;
self.stdin.flush()?;
let mut line = String::new();
let bytes = self.stdout.read_line(&mut line)?;
if bytes == 0 || line.trim().is_empty() {
if let Some(status) = self.child.try_wait()? {
anyhow::bail!("daemon exited unsuccessfully: {status}");
}
anyhow::bail!("daemon returned empty response");
}
Ok(serde_json::from_str(line.trim())?)
}
}
impl Drop for DaemonProcess {
fn drop(&mut self) {
let _ = self.child.kill();
let _ = self.child.wait();
}
}
fn session_pool() -> &'static DaemonSessionPool {
static POOL: OnceLock<DaemonSessionPool> = OnceLock::new();
POOL.get_or_init(|| Mutex::new(HashMap::new()))
}
fn remove_session(key: &DaemonSessionKey) {
if let Ok(mut sessions) = session_pool().lock() {
sessions.remove(key);
}
}
fn ensure_ok(response: Value) -> anyhow::Result<Value> {
if response.get("ok").and_then(Value::as_bool).unwrap_or(false) {
return Ok(response);
}
let error = response
.get("error")
.and_then(Value::as_str)
.unwrap_or("daemon request failed");
anyhow::bail!(error.to_string())
}
fn parse_required<T>(value: Option<&Value>, field: &str) -> anyhow::Result<T>
where
T: DeserializeOwned,
{
let value = value.ok_or_else(|| anyhow::anyhow!("missing daemon field: {field}"))?;
Ok(serde_json::from_value(value.clone())?)
}
#[cfg(test)]
pub(crate) fn reset_daemon_sessions() {
if let Ok(mut sessions) = session_pool().lock() {
sessions.clear();
}
}
#[cfg(test)]
pub(crate) fn kill_daemon_session_for_test(
daemon_bin: &Path,
config_path: &Path,
) -> anyhow::Result<()> {
let key = DaemonSessionKey {
daemon_bin: daemon_bin.to_path_buf(),
config_path: config_path.to_path_buf(),
};
let session = session_pool()
.lock()
.map_err(|_| anyhow::anyhow!("daemon session pool lock poisoned"))?
.get(&key)
.cloned()
.ok_or_else(|| anyhow::anyhow!("missing daemon session"))?;
let mut process = session
.lock()
.map_err(|_| anyhow::anyhow!("daemon session lock poisoned"))?;
process.child.kill()?;
process.child.wait()?;
Ok(())
}
#[cfg(test)]
pub(crate) fn daemon_session_pid_for_test(daemon_bin: &Path, config_path: &Path) -> Option<u32> {
let key = DaemonSessionKey {
daemon_bin: daemon_bin.to_path_buf(),
config_path: config_path.to_path_buf(),
};
let session = session_pool().lock().ok()?.get(&key).cloned()?;
let process = session.lock().ok()?;
Some(process.child.id())
}
#[cfg(test)]
pub(crate) fn daemon_test_lock_for_test() -> &'static Mutex<()> {
static LOCK: OnceLock<Mutex<()>> = OnceLock::new();
LOCK.get_or_init(|| Mutex::new(()))
}
#[cfg(test)]
mod tests {
use super::{DaemonClient, daemon_test_lock_for_test, reset_daemon_sessions};
use crate::domain::MemoryScope;
use crate::lifecycle_service::LifecycleService;
use crate::lifecycle_store::{RecordMemoryRequest, TransitionMetadata};
use assert_cmd::cargo::cargo_bin;
use std::fs;
use tempfile::tempdir;
fn daemon_client_test_lock() -> &'static std::sync::Mutex<()> {
daemon_test_lock_for_test()
}
#[test]
fn daemon_client_should_reuse_same_child_process_across_requests() {
let _guard = daemon_client_test_lock()
.lock()
.unwrap_or_else(|error| error.into_inner());
reset_daemon_sessions();
let temp = tempdir().unwrap();
let config_path = temp.path().join("spool.toml");
fs::write(&config_path, "[vault]\nroot = \"/tmp\"\n").unwrap();
let record = LifecycleService::new()
.record_manual(
config_path.as_path(),
RecordMemoryRequest {
title: "简洁输出".to_string(),
summary: "偏好简洁".to_string(),
memory_type: "preference".to_string(),
scope: MemoryScope::User,
source_ref: "manual:daemon-client".to_string(),
project_id: None,
user_id: Some("long".to_string()),
sensitivity: None,
metadata: TransitionMetadata::default(),
entities: Vec::new(),
tags: Vec::new(),
triggers: Vec::new(),
related_files: Vec::new(),
related_records: Vec::new(),
supersedes: None,
applies_to: Vec::new(),
valid_until: None,
},
)
.unwrap();
let daemon_bin = cargo_bin("spool-daemon");
let client = DaemonClient::new(daemon_bin.as_path(), config_path.as_path());
let workbench = client.load_workbench().unwrap();
assert_eq!(workbench.wakeup_ready.len(), 1);
let first_pid = client.session_pid().unwrap();
let loaded = client.get_record(&record.entry.record_id).unwrap().unwrap();
assert_eq!(loaded.record.title, "简洁输出");
let second_pid = client.session_pid().unwrap();
assert_eq!(first_pid, second_pid);
reset_daemon_sessions();
}
#[test]
fn daemon_client_should_reuse_session_across_client_instances() {
let _guard = daemon_client_test_lock()
.lock()
.unwrap_or_else(|error| error.into_inner());
reset_daemon_sessions();
let temp = tempdir().unwrap();
let config_path = temp.path().join("spool.toml");
fs::write(&config_path, "[vault]\nroot = \"/tmp\"\n").unwrap();
let record = LifecycleService::new()
.record_manual(
config_path.as_path(),
RecordMemoryRequest {
title: "简洁输出".to_string(),
summary: "偏好简洁".to_string(),
memory_type: "preference".to_string(),
scope: MemoryScope::User,
source_ref: "manual:daemon-client".to_string(),
project_id: None,
user_id: Some("long".to_string()),
sensitivity: None,
metadata: TransitionMetadata::default(),
entities: Vec::new(),
tags: Vec::new(),
triggers: Vec::new(),
related_files: Vec::new(),
related_records: Vec::new(),
supersedes: None,
applies_to: Vec::new(),
valid_until: None,
},
)
.unwrap();
let daemon_bin = cargo_bin("spool-daemon");
let first_client = DaemonClient::new(daemon_bin.as_path(), config_path.as_path());
let second_client = DaemonClient::new(daemon_bin.as_path(), config_path.as_path());
first_client.load_workbench().unwrap();
let first_pid = first_client.session_pid().unwrap();
let loaded = second_client
.get_record(&record.entry.record_id)
.unwrap()
.unwrap();
assert_eq!(loaded.record.title, "简洁输出");
let second_pid = second_client.session_pid().unwrap();
assert_eq!(first_pid, second_pid);
reset_daemon_sessions();
}
#[test]
fn daemon_client_get_record_should_return_none_for_missing_record() {
let _guard = daemon_client_test_lock()
.lock()
.unwrap_or_else(|error| error.into_inner());
reset_daemon_sessions();
let temp = tempdir().unwrap();
let config_path = temp.path().join("spool.toml");
fs::write(&config_path, "[vault]\nroot = \"/tmp\"\n").unwrap();
let daemon_bin = cargo_bin("spool-daemon");
let client = DaemonClient::new(daemon_bin.as_path(), config_path.as_path());
let record = client.get_record("missing-record-id").unwrap();
assert!(record.is_none());
reset_daemon_sessions();
}
#[test]
fn daemon_client_should_rebuild_session_after_child_exit() {
let _guard = daemon_client_test_lock()
.lock()
.unwrap_or_else(|error| error.into_inner());
reset_daemon_sessions();
let temp = tempdir().unwrap();
let config_path = temp.path().join("spool.toml");
fs::write(&config_path, "[vault]\nroot = \"/tmp\"\n").unwrap();
LifecycleService::new()
.record_manual(
config_path.as_path(),
RecordMemoryRequest {
title: "简洁输出".to_string(),
summary: "偏好简洁".to_string(),
memory_type: "preference".to_string(),
scope: MemoryScope::User,
source_ref: "manual:daemon-client-rebuild".to_string(),
project_id: None,
user_id: Some("long".to_string()),
sensitivity: None,
metadata: TransitionMetadata::default(),
entities: Vec::new(),
tags: Vec::new(),
triggers: Vec::new(),
related_files: Vec::new(),
related_records: Vec::new(),
supersedes: None,
applies_to: Vec::new(),
valid_until: None,
},
)
.unwrap();
let daemon_bin = cargo_bin("spool-daemon");
let client = DaemonClient::new(daemon_bin.as_path(), config_path.as_path());
let first = client.load_workbench().unwrap();
assert_eq!(first.wakeup_ready.len(), 1);
let first_pid = client.session_pid().unwrap();
let session = super::session_pool()
.lock()
.unwrap()
.get(&client.session_key())
.cloned()
.unwrap();
{
let mut process = session.lock().unwrap();
process.child.kill().unwrap();
process.child.wait().unwrap();
}
let second = client.load_workbench().unwrap();
assert_eq!(second.wakeup_ready.len(), 1);
let second_pid = client.session_pid().unwrap();
assert_ne!(first_pid, second_pid);
reset_daemon_sessions();
}
}