use crate::daemon_client::DaemonClient;
use crate::lifecycle_service::{LifecycleService, LifecycleWorkbenchSnapshot};
use crate::lifecycle_store::LedgerEntry;
use serde_json::{Value, json};
use std::io::{self, BufRead, Write};
use std::path::{Path, PathBuf};
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub enum LifecycleReadMode {
#[default]
Direct,
Daemon,
}
#[derive(Debug, Clone, Default, PartialEq, Eq)]
pub struct LifecycleReadOptions {
pub mode: LifecycleReadMode,
pub daemon_bin: Option<PathBuf>,
}
pub fn serve_stdio(config_path: &Path) -> anyhow::Result<()> {
if !config_path.exists() {
anyhow::bail!("config not found: {}", config_path.display());
}
let stdin = io::stdin();
let stdout = io::stdout();
let mut reader = stdin.lock();
let mut writer = stdout.lock();
let mut line = String::new();
loop {
line.clear();
let bytes = reader.read_line(&mut line)?;
if bytes == 0 {
break;
}
let trimmed = line.trim();
if trimmed.is_empty() {
continue;
}
let response = match serde_json::from_str::<Value>(trimmed) {
Ok(request) => handle_request(config_path, &request),
Err(error) => json!({ "ok": false, "error": format!("invalid json: {error}") }),
};
serde_json::to_writer(&mut writer, &response)?;
writer.write_all(b"\n")?;
writer.flush()?;
}
Ok(())
}
pub fn read_workbench(
config_path: &Path,
options: &LifecycleReadOptions,
) -> anyhow::Result<LifecycleWorkbenchSnapshot> {
match options.mode {
LifecycleReadMode::Direct => LifecycleService::new().load_workbench(config_path),
LifecycleReadMode::Daemon => {
let client = DaemonClient::new(options.daemon_bin(config_path)?, config_path);
client
.load_workbench()
.or_else(|_| LifecycleService::new().load_workbench(config_path))
}
}
}
pub fn read_record(
config_path: &Path,
record_id: &str,
options: &LifecycleReadOptions,
) -> anyhow::Result<Option<LedgerEntry>> {
match options.mode {
LifecycleReadMode::Direct => LifecycleService::new().get_record(config_path, record_id),
LifecycleReadMode::Daemon => {
let client = DaemonClient::new(options.daemon_bin(config_path)?, config_path);
client
.get_record(record_id)
.or_else(|_| LifecycleService::new().get_record(config_path, record_id))
}
}
}
pub fn read_history(
config_path: &Path,
record_id: &str,
options: &LifecycleReadOptions,
) -> anyhow::Result<Vec<LedgerEntry>> {
match options.mode {
LifecycleReadMode::Direct => LifecycleService::new().get_history(config_path, record_id),
LifecycleReadMode::Daemon => {
let client = DaemonClient::new(options.daemon_bin(config_path)?, config_path);
client
.get_history(record_id)
.or_else(|_| LifecycleService::new().get_history(config_path, record_id))
}
}
}
impl LifecycleReadOptions {
pub fn with_daemon(daemon_bin: &Path) -> Self {
Self {
mode: LifecycleReadMode::Daemon,
daemon_bin: Some(daemon_bin.to_path_buf()),
}
}
fn daemon_bin<'a>(&'a self, config_path: &Path) -> anyhow::Result<&'a Path> {
self.daemon_bin.as_deref().ok_or_else(|| {
anyhow::anyhow!("missing daemon binary for config {}", config_path.display())
})
}
}
fn handle_request(config_path: &Path, request: &Value) -> Value {
let command = request.get("command").and_then(Value::as_str).unwrap_or("");
match command {
"ping" => json!({ "ok": true, "command": "pong" }),
"workbench" => {
let service = LifecycleService::new();
match service.load_workbench(config_path) {
Ok(snapshot) => json!({
"ok": true,
"pending_review": snapshot.pending_review,
"wakeup_ready": snapshot.wakeup_ready
}),
Err(error) => json!({ "ok": false, "error": error.to_string() }),
}
}
"record" => {
let Some(record_id) = request.get("record_id").and_then(Value::as_str) else {
return json!({ "ok": false, "error": "missing record_id" });
};
let service = LifecycleService::new();
match service.get_record(config_path, record_id) {
Ok(Some(record)) => json!({ "ok": true, "record": record }),
Ok(None) => json!({ "ok": true, "record": Value::Null }),
Err(error) => json!({ "ok": false, "error": error.to_string() }),
}
}
"history" => {
let Some(record_id) = request.get("record_id").and_then(Value::as_str) else {
return json!({ "ok": false, "error": "missing record_id" });
};
let service = LifecycleService::new();
match service.get_history(config_path, record_id) {
Ok(history) => json!({ "ok": true, "record_id": record_id, "history": history }),
Err(error) => json!({ "ok": false, "error": error.to_string() }),
}
}
_ => json!({ "ok": false, "error": format!("unknown command: {command}") }),
}
}
#[cfg(test)]
mod tests {
use super::handle_request;
use crate::daemon_client::{
daemon_session_pid_for_test, daemon_test_lock_for_test, kill_daemon_session_for_test,
reset_daemon_sessions,
};
use crate::domain::MemoryScope;
use crate::lifecycle_service::LifecycleService;
use crate::lifecycle_store::{RecordMemoryRequest, TransitionMetadata};
use serde_json::json;
use std::fs;
use tempfile::tempdir;
fn setup_config() -> (tempfile::TempDir, std::path::PathBuf) {
let temp = tempdir().unwrap();
let config_path = temp.path().join("spool.toml");
fs::write(&config_path, "[vault]\nroot = \"/tmp\"\n").unwrap();
(temp, config_path)
}
#[test]
fn daemon_should_serve_workbench_and_record_history_reads() {
let (_temp, config_path) = setup_config();
let record = LifecycleService::new()
.record_manual(
config_path.as_path(),
RecordMemoryRequest {
title: "简洁输出".to_string(),
summary: "偏好简洁".to_string(),
memory_type: "preference".to_string(),
scope: MemoryScope::User,
source_ref: "manual:daemon".to_string(),
project_id: None,
user_id: Some("long".to_string()),
sensitivity: None,
metadata: TransitionMetadata::default(),
entities: Vec::new(),
tags: Vec::new(),
triggers: Vec::new(),
related_files: Vec::new(),
related_records: Vec::new(),
supersedes: None,
applies_to: Vec::new(),
valid_until: None,
},
)
.unwrap();
let workbench = handle_request(config_path.as_path(), &json!({ "command": "workbench" }));
assert_eq!(workbench["ok"], json!(true));
assert_eq!(workbench["wakeup_ready"].as_array().unwrap().len(), 1);
let record_response = handle_request(
config_path.as_path(),
&json!({ "command": "record", "record_id": record.entry.record_id }),
);
assert_eq!(record_response["ok"], json!(true));
let history_response = handle_request(
config_path.as_path(),
&json!({ "command": "history", "record_id": record.entry.record_id }),
);
assert_eq!(history_response["ok"], json!(true));
assert_eq!(history_response["history"].as_array().unwrap().len(), 1);
}
#[test]
fn daemon_record_command_should_return_ok_true_with_null_for_missing_id() {
let (_temp, config_path) = setup_config();
let response = handle_request(
config_path.as_path(),
&json!({ "command": "record", "record_id": "definitely-missing" }),
);
assert_eq!(response["ok"], json!(true));
assert_eq!(response["record"], serde_json::Value::Null);
assert!(
response.get("error").is_none(),
"missing record must NOT carry an error field"
);
}
#[test]
fn read_helpers_should_fallback_to_direct_when_daemon_is_unavailable() {
let (_temp, config_path) = setup_config();
let record = LifecycleService::new()
.propose_ai(
config_path.as_path(),
crate::lifecycle_store::ProposeMemoryRequest {
title: "测试偏好".to_string(),
summary: "先 smoke 再收口".to_string(),
memory_type: "workflow".to_string(),
scope: MemoryScope::User,
source_ref: "session:1".to_string(),
project_id: None,
user_id: Some("long".to_string()),
sensitivity: None,
metadata: TransitionMetadata::default(),
entities: Vec::new(),
tags: Vec::new(),
triggers: Vec::new(),
related_files: Vec::new(),
related_records: Vec::new(),
supersedes: None,
applies_to: Vec::new(),
valid_until: None,
},
)
.unwrap();
let options = super::LifecycleReadOptions::with_daemon(std::path::Path::new(
"/definitely/missing/spool-daemon",
));
let workbench = super::read_workbench(config_path.as_path(), &options).unwrap();
assert_eq!(workbench.pending_review.len(), 1);
let loaded_record =
super::read_record(config_path.as_path(), &record.entry.record_id, &options)
.unwrap()
.unwrap();
assert_eq!(
loaded_record.record.state,
crate::domain::MemoryLifecycleState::Candidate
);
let history =
super::read_history(config_path.as_path(), &record.entry.record_id, &options).unwrap();
assert_eq!(history.len(), 1);
}
#[test]
fn daemon_should_return_structured_error_for_invalid_json() {
let (_temp, config_path) = setup_config();
let response = serde_json::from_str::<serde_json::Value>(
"{\"ok\":false,\"error\":\"invalid json: EOF while parsing an object at line 1 column 1\"}",
);
assert!(response.is_ok());
let invalid = "{";
let parsed = serde_json::from_str::<serde_json::Value>(invalid);
assert!(parsed.is_err());
let response = match parsed {
Ok(request) => super::handle_request(config_path.as_path(), &request),
Err(error) => json!({ "ok": false, "error": format!("invalid json: {error}") }),
};
assert_eq!(response["ok"], json!(false));
assert!(
response["error"]
.as_str()
.unwrap()
.contains("invalid json:")
);
}
#[test]
fn read_helpers_should_reuse_shared_daemon_session() {
let _guard = daemon_test_lock_for_test()
.lock()
.unwrap_or_else(|error| error.into_inner());
reset_daemon_sessions();
let (_temp, config_path) = setup_config();
let record = LifecycleService::new()
.propose_ai(
config_path.as_path(),
crate::lifecycle_store::ProposeMemoryRequest {
title: "测试偏好".to_string(),
summary: "先 smoke 再收口".to_string(),
memory_type: "workflow".to_string(),
scope: MemoryScope::User,
source_ref: "session:1".to_string(),
project_id: None,
user_id: Some("long".to_string()),
sensitivity: None,
metadata: TransitionMetadata::default(),
entities: Vec::new(),
tags: Vec::new(),
triggers: Vec::new(),
related_files: Vec::new(),
related_records: Vec::new(),
supersedes: None,
applies_to: Vec::new(),
valid_until: None,
},
)
.unwrap();
let daemon_bin = assert_cmd::cargo::cargo_bin("spool-daemon");
let options = super::LifecycleReadOptions::with_daemon(daemon_bin.as_path());
let workbench = super::read_workbench(config_path.as_path(), &options).unwrap();
assert_eq!(workbench.pending_review.len(), 1);
let first_pid =
daemon_session_pid_for_test(daemon_bin.as_path(), config_path.as_path()).unwrap();
let loaded_record =
super::read_record(config_path.as_path(), &record.entry.record_id, &options)
.unwrap()
.unwrap();
assert_eq!(loaded_record.record.title, "测试偏好");
let history =
super::read_history(config_path.as_path(), &record.entry.record_id, &options).unwrap();
assert_eq!(history.len(), 1);
let second_pid =
daemon_session_pid_for_test(daemon_bin.as_path(), config_path.as_path()).unwrap();
assert_eq!(first_pid, second_pid);
reset_daemon_sessions();
}
#[test]
fn read_helpers_should_rebuild_shared_daemon_session_after_exit() {
let _guard = daemon_test_lock_for_test()
.lock()
.unwrap_or_else(|error| error.into_inner());
reset_daemon_sessions();
let (_temp, config_path) = setup_config();
let record = LifecycleService::new()
.propose_ai(
config_path.as_path(),
crate::lifecycle_store::ProposeMemoryRequest {
title: "测试偏好".to_string(),
summary: "先 smoke 再收口".to_string(),
memory_type: "workflow".to_string(),
scope: MemoryScope::User,
source_ref: "session:1".to_string(),
project_id: None,
user_id: Some("long".to_string()),
sensitivity: None,
metadata: TransitionMetadata::default(),
entities: Vec::new(),
tags: Vec::new(),
triggers: Vec::new(),
related_files: Vec::new(),
related_records: Vec::new(),
supersedes: None,
applies_to: Vec::new(),
valid_until: None,
},
)
.unwrap();
let daemon_bin = assert_cmd::cargo::cargo_bin("spool-daemon");
let options = super::LifecycleReadOptions::with_daemon(daemon_bin.as_path());
let workbench = super::read_workbench(config_path.as_path(), &options).unwrap();
assert_eq!(workbench.pending_review.len(), 1);
let first_pid =
daemon_session_pid_for_test(daemon_bin.as_path(), config_path.as_path()).unwrap();
kill_daemon_session_for_test(daemon_bin.as_path(), config_path.as_path()).unwrap();
let loaded_record =
super::read_record(config_path.as_path(), &record.entry.record_id, &options)
.unwrap()
.unwrap();
assert_eq!(loaded_record.record.title, "测试偏好");
let second_pid =
daemon_session_pid_for_test(daemon_bin.as_path(), config_path.as_path()).unwrap();
assert_ne!(first_pid, second_pid);
reset_daemon_sessions();
}
}