1use crate::lifecycle_service::LifecycleWorkbenchSnapshot;
2use crate::lifecycle_store::LedgerEntry;
3use serde::de::DeserializeOwned;
4use serde_json::{Value, json};
5use std::collections::HashMap;
6use std::io::{BufRead, BufReader, Write};
7use std::path::{Path, PathBuf};
8use std::process::{Child, ChildStdin, ChildStdout, Command, Stdio};
9use std::sync::{Arc, Mutex, OnceLock};
10
11#[derive(Debug, Clone)]
12pub struct DaemonClient {
13 daemon_bin: PathBuf,
14 config_path: PathBuf,
15}
16
17#[derive(Debug, Clone, PartialEq, Eq, Hash)]
18struct DaemonSessionKey {
19 daemon_bin: PathBuf,
20 config_path: PathBuf,
21}
22
23#[derive(Debug)]
24struct DaemonProcess {
25 child: Child,
26 stdin: ChildStdin,
27 stdout: BufReader<ChildStdout>,
28}
29
30type DaemonSessionHandle = Arc<Mutex<DaemonProcess>>;
31type DaemonSessionPool = Mutex<HashMap<DaemonSessionKey, DaemonSessionHandle>>;
32
33impl DaemonClient {
34 pub fn new(daemon_bin: &Path, config_path: &Path) -> Self {
35 Self {
36 daemon_bin: daemon_bin.to_path_buf(),
37 config_path: config_path.to_path_buf(),
38 }
39 }
40
41 pub fn load_workbench(&self) -> anyhow::Result<LifecycleWorkbenchSnapshot> {
42 let response = ensure_ok(self.request(json!({ "command": "workbench" }))?)?;
43 Ok(LifecycleWorkbenchSnapshot {
44 pending_review: parse_required(response.get("pending_review"), "pending_review")?,
45 wakeup_ready: parse_required(response.get("wakeup_ready"), "wakeup_ready")?,
46 })
47 }
48
49 pub fn get_record(&self, record_id: &str) -> anyhow::Result<Option<LedgerEntry>> {
50 let response = self.request(json!({ "command": "record", "record_id": record_id }))?;
51 if response.get("ok").and_then(Value::as_bool).unwrap_or(false) {
61 match response.get("record") {
62 None | Some(Value::Null) => Ok(None),
63 Some(_) => parse_required(response.get("record"), "record"),
64 }
65 } else {
66 let error = response
67 .get("error")
68 .and_then(Value::as_str)
69 .unwrap_or("daemon request failed");
70 if error == format!("memory record not found: {record_id}") {
72 return Ok(None);
73 }
74 anyhow::bail!(error.to_string());
75 }
76 }
77
78 pub fn get_history(&self, record_id: &str) -> anyhow::Result<Vec<LedgerEntry>> {
79 let response =
80 ensure_ok(self.request(json!({ "command": "history", "record_id": record_id }))?)?;
81 parse_required(response.get("history"), "history")
82 }
83
84 #[cfg(test)]
85 pub(crate) fn session_pid(&self) -> Option<u32> {
86 let key = self.session_key();
87 let session = session_pool()
88 .lock()
89 .ok()
90 .and_then(|sessions| sessions.get(&key).cloned())?;
91 let process = session.lock().ok()?;
92 Some(process.child.id())
93 }
94
95 fn request(&self, request: Value) -> anyhow::Result<Value> {
96 let key = self.session_key();
97 match self.request_with_session(&key, &request) {
98 Ok(response) => Ok(response),
99 Err(first_error) => {
100 remove_session(&key);
101 self.request_with_session(&key, &request)
102 .map_err(|_| first_error)
103 }
104 }
105 }
106
107 fn request_with_session(
108 &self,
109 key: &DaemonSessionKey,
110 request: &Value,
111 ) -> anyhow::Result<Value> {
112 self.session_handle(key)?
113 .lock()
114 .map_err(|_| anyhow::anyhow!("daemon session lock poisoned"))?
115 .request(request)
116 }
117
118 fn session_key(&self) -> DaemonSessionKey {
119 DaemonSessionKey {
120 daemon_bin: self.daemon_bin.clone(),
121 config_path: self.config_path.clone(),
122 }
123 }
124
125 fn session_handle(&self, key: &DaemonSessionKey) -> anyhow::Result<DaemonSessionHandle> {
126 let mut sessions = session_pool()
127 .lock()
128 .map_err(|_| anyhow::anyhow!("daemon session pool lock poisoned"))?;
129 if let Some(session) = sessions.get(key) {
130 return Ok(session.clone());
131 }
132
133 let session = Arc::new(Mutex::new(DaemonProcess::spawn(key)?));
134 sessions.insert(key.clone(), session.clone());
135 Ok(session)
136 }
137}
138
139impl DaemonProcess {
140 fn spawn(key: &DaemonSessionKey) -> anyhow::Result<Self> {
141 let config_path = key
142 .config_path
143 .to_str()
144 .ok_or_else(|| anyhow::anyhow!("config path is not valid UTF-8"))?;
145 let mut child = Command::new(&key.daemon_bin)
146 .args(["--config", config_path])
147 .stdin(Stdio::piped())
148 .stdout(Stdio::piped())
149 .spawn()?;
150 let stdin = child
151 .stdin
152 .take()
153 .ok_or_else(|| anyhow::anyhow!("missing daemon stdin"))?;
154 let stdout = child
155 .stdout
156 .take()
157 .ok_or_else(|| anyhow::anyhow!("missing daemon stdout"))?;
158 Ok(Self {
159 child,
160 stdin,
161 stdout: BufReader::new(stdout),
162 })
163 }
164
165 fn request(&mut self, request: &Value) -> anyhow::Result<Value> {
166 serde_json::to_writer(&mut self.stdin, request)?;
167 self.stdin.write_all(b"\n")?;
168 self.stdin.flush()?;
169
170 let mut line = String::new();
171 let bytes = self.stdout.read_line(&mut line)?;
172 if bytes == 0 || line.trim().is_empty() {
173 if let Some(status) = self.child.try_wait()? {
174 anyhow::bail!("daemon exited unsuccessfully: {status}");
175 }
176 anyhow::bail!("daemon returned empty response");
177 }
178
179 Ok(serde_json::from_str(line.trim())?)
180 }
181}
182
183impl Drop for DaemonProcess {
184 fn drop(&mut self) {
185 let _ = self.child.kill();
186 let _ = self.child.wait();
187 }
188}
189
190fn session_pool() -> &'static DaemonSessionPool {
191 static POOL: OnceLock<DaemonSessionPool> = OnceLock::new();
192 POOL.get_or_init(|| Mutex::new(HashMap::new()))
193}
194
195fn remove_session(key: &DaemonSessionKey) {
196 if let Ok(mut sessions) = session_pool().lock() {
197 sessions.remove(key);
198 }
199}
200
201fn ensure_ok(response: Value) -> anyhow::Result<Value> {
202 if response.get("ok").and_then(Value::as_bool).unwrap_or(false) {
203 return Ok(response);
204 }
205 let error = response
206 .get("error")
207 .and_then(Value::as_str)
208 .unwrap_or("daemon request failed");
209 anyhow::bail!(error.to_string())
210}
211
212fn parse_required<T>(value: Option<&Value>, field: &str) -> anyhow::Result<T>
213where
214 T: DeserializeOwned,
215{
216 let value = value.ok_or_else(|| anyhow::anyhow!("missing daemon field: {field}"))?;
217 Ok(serde_json::from_value(value.clone())?)
218}
219
220#[cfg(test)]
221pub(crate) fn reset_daemon_sessions() {
222 if let Ok(mut sessions) = session_pool().lock() {
223 sessions.clear();
224 }
225}
226
227#[cfg(test)]
228pub(crate) fn kill_daemon_session_for_test(
229 daemon_bin: &Path,
230 config_path: &Path,
231) -> anyhow::Result<()> {
232 let key = DaemonSessionKey {
233 daemon_bin: daemon_bin.to_path_buf(),
234 config_path: config_path.to_path_buf(),
235 };
236 let session = session_pool()
237 .lock()
238 .map_err(|_| anyhow::anyhow!("daemon session pool lock poisoned"))?
239 .get(&key)
240 .cloned()
241 .ok_or_else(|| anyhow::anyhow!("missing daemon session"))?;
242 let mut process = session
243 .lock()
244 .map_err(|_| anyhow::anyhow!("daemon session lock poisoned"))?;
245 process.child.kill()?;
246 process.child.wait()?;
247 Ok(())
248}
249
250#[cfg(test)]
251pub(crate) fn daemon_session_pid_for_test(daemon_bin: &Path, config_path: &Path) -> Option<u32> {
252 let key = DaemonSessionKey {
253 daemon_bin: daemon_bin.to_path_buf(),
254 config_path: config_path.to_path_buf(),
255 };
256 let session = session_pool().lock().ok()?.get(&key).cloned()?;
257 let process = session.lock().ok()?;
258 Some(process.child.id())
259}
260
261#[cfg(test)]
262pub(crate) fn daemon_test_lock_for_test() -> &'static Mutex<()> {
263 static LOCK: OnceLock<Mutex<()>> = OnceLock::new();
264 LOCK.get_or_init(|| Mutex::new(()))
265}
266
267#[cfg(test)]
268mod tests {
269 use super::{DaemonClient, daemon_test_lock_for_test, reset_daemon_sessions};
270 use crate::domain::MemoryScope;
271 use crate::lifecycle_service::LifecycleService;
272 use crate::lifecycle_store::{RecordMemoryRequest, TransitionMetadata};
273 use assert_cmd::cargo::cargo_bin;
274 use std::fs;
275 use tempfile::tempdir;
276
277 fn daemon_client_test_lock() -> &'static std::sync::Mutex<()> {
278 daemon_test_lock_for_test()
279 }
280
281 #[test]
282 fn daemon_client_should_reuse_same_child_process_across_requests() {
283 let _guard = daemon_client_test_lock()
284 .lock()
285 .unwrap_or_else(|error| error.into_inner());
286 reset_daemon_sessions();
287
288 let temp = tempdir().unwrap();
289 let config_path = temp.path().join("spool.toml");
290 fs::write(&config_path, "[vault]\nroot = \"/tmp\"\n").unwrap();
291 let record = LifecycleService::new()
292 .record_manual(
293 config_path.as_path(),
294 RecordMemoryRequest {
295 title: "简洁输出".to_string(),
296 summary: "偏好简洁".to_string(),
297 memory_type: "preference".to_string(),
298 scope: MemoryScope::User,
299 source_ref: "manual:daemon-client".to_string(),
300 project_id: None,
301 user_id: Some("long".to_string()),
302 sensitivity: None,
303 metadata: TransitionMetadata::default(),
304 entities: Vec::new(),
305 tags: Vec::new(),
306 triggers: Vec::new(),
307 related_files: Vec::new(),
308 related_records: Vec::new(),
309 supersedes: None,
310 applies_to: Vec::new(),
311 valid_until: None,
312 },
313 )
314 .unwrap();
315
316 let daemon_bin = cargo_bin("spool-daemon");
317 let client = DaemonClient::new(daemon_bin.as_path(), config_path.as_path());
318
319 let workbench = client.load_workbench().unwrap();
320 assert_eq!(workbench.wakeup_ready.len(), 1);
321 let first_pid = client.session_pid().unwrap();
322
323 let loaded = client.get_record(&record.entry.record_id).unwrap().unwrap();
324 assert_eq!(loaded.record.title, "简洁输出");
325 let second_pid = client.session_pid().unwrap();
326
327 assert_eq!(first_pid, second_pid);
328 reset_daemon_sessions();
329 }
330
331 #[test]
332 fn daemon_client_should_reuse_session_across_client_instances() {
333 let _guard = daemon_client_test_lock()
334 .lock()
335 .unwrap_or_else(|error| error.into_inner());
336 reset_daemon_sessions();
337
338 let temp = tempdir().unwrap();
339 let config_path = temp.path().join("spool.toml");
340 fs::write(&config_path, "[vault]\nroot = \"/tmp\"\n").unwrap();
341 let record = LifecycleService::new()
342 .record_manual(
343 config_path.as_path(),
344 RecordMemoryRequest {
345 title: "简洁输出".to_string(),
346 summary: "偏好简洁".to_string(),
347 memory_type: "preference".to_string(),
348 scope: MemoryScope::User,
349 source_ref: "manual:daemon-client".to_string(),
350 project_id: None,
351 user_id: Some("long".to_string()),
352 sensitivity: None,
353 metadata: TransitionMetadata::default(),
354 entities: Vec::new(),
355 tags: Vec::new(),
356 triggers: Vec::new(),
357 related_files: Vec::new(),
358 related_records: Vec::new(),
359 supersedes: None,
360 applies_to: Vec::new(),
361 valid_until: None,
362 },
363 )
364 .unwrap();
365
366 let daemon_bin = cargo_bin("spool-daemon");
367 let first_client = DaemonClient::new(daemon_bin.as_path(), config_path.as_path());
368 let second_client = DaemonClient::new(daemon_bin.as_path(), config_path.as_path());
369
370 first_client.load_workbench().unwrap();
371 let first_pid = first_client.session_pid().unwrap();
372
373 let loaded = second_client
374 .get_record(&record.entry.record_id)
375 .unwrap()
376 .unwrap();
377 assert_eq!(loaded.record.title, "简洁输出");
378 let second_pid = second_client.session_pid().unwrap();
379
380 assert_eq!(first_pid, second_pid);
381 reset_daemon_sessions();
382 }
383
384 #[test]
385 fn daemon_client_get_record_should_return_none_for_missing_record() {
386 let _guard = daemon_client_test_lock()
387 .lock()
388 .unwrap_or_else(|error| error.into_inner());
389 reset_daemon_sessions();
390
391 let temp = tempdir().unwrap();
392 let config_path = temp.path().join("spool.toml");
393 fs::write(&config_path, "[vault]\nroot = \"/tmp\"\n").unwrap();
394
395 let daemon_bin = cargo_bin("spool-daemon");
396 let client = DaemonClient::new(daemon_bin.as_path(), config_path.as_path());
397
398 let record = client.get_record("missing-record-id").unwrap();
399 assert!(record.is_none());
400 reset_daemon_sessions();
401 }
402
403 #[test]
404 fn daemon_client_should_rebuild_session_after_child_exit() {
405 let _guard = daemon_client_test_lock()
406 .lock()
407 .unwrap_or_else(|error| error.into_inner());
408 reset_daemon_sessions();
409
410 let temp = tempdir().unwrap();
411 let config_path = temp.path().join("spool.toml");
412 fs::write(&config_path, "[vault]\nroot = \"/tmp\"\n").unwrap();
413 LifecycleService::new()
414 .record_manual(
415 config_path.as_path(),
416 RecordMemoryRequest {
417 title: "简洁输出".to_string(),
418 summary: "偏好简洁".to_string(),
419 memory_type: "preference".to_string(),
420 scope: MemoryScope::User,
421 source_ref: "manual:daemon-client-rebuild".to_string(),
422 project_id: None,
423 user_id: Some("long".to_string()),
424 sensitivity: None,
425 metadata: TransitionMetadata::default(),
426 entities: Vec::new(),
427 tags: Vec::new(),
428 triggers: Vec::new(),
429 related_files: Vec::new(),
430 related_records: Vec::new(),
431 supersedes: None,
432 applies_to: Vec::new(),
433 valid_until: None,
434 },
435 )
436 .unwrap();
437
438 let daemon_bin = cargo_bin("spool-daemon");
439 let client = DaemonClient::new(daemon_bin.as_path(), config_path.as_path());
440
441 let first = client.load_workbench().unwrap();
442 assert_eq!(first.wakeup_ready.len(), 1);
443 let first_pid = client.session_pid().unwrap();
444
445 let session = super::session_pool()
446 .lock()
447 .unwrap()
448 .get(&client.session_key())
449 .cloned()
450 .unwrap();
451 {
452 let mut process = session.lock().unwrap();
453 process.child.kill().unwrap();
454 process.child.wait().unwrap();
455 }
456
457 let second = client.load_workbench().unwrap();
458 assert_eq!(second.wakeup_ready.len(), 1);
459 let second_pid = client.session_pid().unwrap();
460
461 assert_ne!(first_pid, second_pid);
462 reset_daemon_sessions();
463 }
464}