Skip to main content

spool/
daemon_client.rs

1use crate::lifecycle_service::LifecycleWorkbenchSnapshot;
2use crate::lifecycle_store::LedgerEntry;
3use serde::de::DeserializeOwned;
4use serde_json::{Value, json};
5use std::collections::HashMap;
6use std::io::{BufRead, BufReader, Write};
7use std::path::{Path, PathBuf};
8use std::process::{Child, ChildStdin, ChildStdout, Command, Stdio};
9use std::sync::{Arc, Mutex, OnceLock};
10
11#[derive(Debug, Clone)]
12pub struct DaemonClient {
13    daemon_bin: PathBuf,
14    config_path: PathBuf,
15}
16
17#[derive(Debug, Clone, PartialEq, Eq, Hash)]
18struct DaemonSessionKey {
19    daemon_bin: PathBuf,
20    config_path: PathBuf,
21}
22
23#[derive(Debug)]
24struct DaemonProcess {
25    child: Child,
26    stdin: ChildStdin,
27    stdout: BufReader<ChildStdout>,
28}
29
30type DaemonSessionHandle = Arc<Mutex<DaemonProcess>>;
31type DaemonSessionPool = Mutex<HashMap<DaemonSessionKey, DaemonSessionHandle>>;
32
33impl DaemonClient {
34    pub fn new(daemon_bin: &Path, config_path: &Path) -> Self {
35        Self {
36            daemon_bin: daemon_bin.to_path_buf(),
37            config_path: config_path.to_path_buf(),
38        }
39    }
40
41    pub fn load_workbench(&self) -> anyhow::Result<LifecycleWorkbenchSnapshot> {
42        let response = ensure_ok(self.request(json!({ "command": "workbench" }))?)?;
43        Ok(LifecycleWorkbenchSnapshot {
44            pending_review: parse_required(response.get("pending_review"), "pending_review")?,
45            wakeup_ready: parse_required(response.get("wakeup_ready"), "wakeup_ready")?,
46        })
47    }
48
49    pub fn get_record(&self, record_id: &str) -> anyhow::Result<Option<LedgerEntry>> {
50        let response = self.request(json!({ "command": "record", "record_id": record_id }))?;
51        // New protocol (post bug #1 fix): missing records arrive as
52        //   { "ok": true, "record": null }
53        // and present records as
54        //   { "ok": true, "record": <object> }.
55        //
56        // For backward compatibility with older daemon binaries that
57        // still emit { "ok": false, "error": "memory record not found: …" }
58        // we keep the legacy string match below — but only for the
59        // exact legacy phrase, never as a generic ok:false branch.
60        if response.get("ok").and_then(Value::as_bool).unwrap_or(false) {
61            match response.get("record") {
62                None | Some(Value::Null) => Ok(None),
63                Some(_) => parse_required(response.get("record"), "record"),
64            }
65        } else {
66            let error = response
67                .get("error")
68                .and_then(Value::as_str)
69                .unwrap_or("daemon request failed");
70            // Legacy daemon: "memory record not found: <id>"
71            if error == format!("memory record not found: {record_id}") {
72                return Ok(None);
73            }
74            anyhow::bail!(error.to_string());
75        }
76    }
77
78    pub fn get_history(&self, record_id: &str) -> anyhow::Result<Vec<LedgerEntry>> {
79        let response =
80            ensure_ok(self.request(json!({ "command": "history", "record_id": record_id }))?)?;
81        parse_required(response.get("history"), "history")
82    }
83
84    #[cfg(test)]
85    pub(crate) fn session_pid(&self) -> Option<u32> {
86        let key = self.session_key();
87        let session = session_pool()
88            .lock()
89            .ok()
90            .and_then(|sessions| sessions.get(&key).cloned())?;
91        let process = session.lock().ok()?;
92        Some(process.child.id())
93    }
94
95    fn request(&self, request: Value) -> anyhow::Result<Value> {
96        let key = self.session_key();
97        match self.request_with_session(&key, &request) {
98            Ok(response) => Ok(response),
99            Err(first_error) => {
100                remove_session(&key);
101                self.request_with_session(&key, &request)
102                    .map_err(|_| first_error)
103            }
104        }
105    }
106
107    fn request_with_session(
108        &self,
109        key: &DaemonSessionKey,
110        request: &Value,
111    ) -> anyhow::Result<Value> {
112        self.session_handle(key)?
113            .lock()
114            .map_err(|_| anyhow::anyhow!("daemon session lock poisoned"))?
115            .request(request)
116    }
117
118    fn session_key(&self) -> DaemonSessionKey {
119        DaemonSessionKey {
120            daemon_bin: self.daemon_bin.clone(),
121            config_path: self.config_path.clone(),
122        }
123    }
124
125    fn session_handle(&self, key: &DaemonSessionKey) -> anyhow::Result<DaemonSessionHandle> {
126        let mut sessions = session_pool()
127            .lock()
128            .map_err(|_| anyhow::anyhow!("daemon session pool lock poisoned"))?;
129        if let Some(session) = sessions.get(key) {
130            return Ok(session.clone());
131        }
132
133        let session = Arc::new(Mutex::new(DaemonProcess::spawn(key)?));
134        sessions.insert(key.clone(), session.clone());
135        Ok(session)
136    }
137}
138
139impl DaemonProcess {
140    fn spawn(key: &DaemonSessionKey) -> anyhow::Result<Self> {
141        let config_path = key
142            .config_path
143            .to_str()
144            .ok_or_else(|| anyhow::anyhow!("config path is not valid UTF-8"))?;
145        let mut child = Command::new(&key.daemon_bin)
146            .args(["--config", config_path])
147            .stdin(Stdio::piped())
148            .stdout(Stdio::piped())
149            .spawn()?;
150        let stdin = child
151            .stdin
152            .take()
153            .ok_or_else(|| anyhow::anyhow!("missing daemon stdin"))?;
154        let stdout = child
155            .stdout
156            .take()
157            .ok_or_else(|| anyhow::anyhow!("missing daemon stdout"))?;
158        Ok(Self {
159            child,
160            stdin,
161            stdout: BufReader::new(stdout),
162        })
163    }
164
165    fn request(&mut self, request: &Value) -> anyhow::Result<Value> {
166        serde_json::to_writer(&mut self.stdin, request)?;
167        self.stdin.write_all(b"\n")?;
168        self.stdin.flush()?;
169
170        let mut line = String::new();
171        let bytes = self.stdout.read_line(&mut line)?;
172        if bytes == 0 || line.trim().is_empty() {
173            if let Some(status) = self.child.try_wait()? {
174                anyhow::bail!("daemon exited unsuccessfully: {status}");
175            }
176            anyhow::bail!("daemon returned empty response");
177        }
178
179        Ok(serde_json::from_str(line.trim())?)
180    }
181}
182
183impl Drop for DaemonProcess {
184    fn drop(&mut self) {
185        let _ = self.child.kill();
186        let _ = self.child.wait();
187    }
188}
189
190fn session_pool() -> &'static DaemonSessionPool {
191    static POOL: OnceLock<DaemonSessionPool> = OnceLock::new();
192    POOL.get_or_init(|| Mutex::new(HashMap::new()))
193}
194
195fn remove_session(key: &DaemonSessionKey) {
196    if let Ok(mut sessions) = session_pool().lock() {
197        sessions.remove(key);
198    }
199}
200
201fn ensure_ok(response: Value) -> anyhow::Result<Value> {
202    if response.get("ok").and_then(Value::as_bool).unwrap_or(false) {
203        return Ok(response);
204    }
205    let error = response
206        .get("error")
207        .and_then(Value::as_str)
208        .unwrap_or("daemon request failed");
209    anyhow::bail!(error.to_string())
210}
211
212fn parse_required<T>(value: Option<&Value>, field: &str) -> anyhow::Result<T>
213where
214    T: DeserializeOwned,
215{
216    let value = value.ok_or_else(|| anyhow::anyhow!("missing daemon field: {field}"))?;
217    Ok(serde_json::from_value(value.clone())?)
218}
219
220#[cfg(test)]
221pub(crate) fn reset_daemon_sessions() {
222    if let Ok(mut sessions) = session_pool().lock() {
223        sessions.clear();
224    }
225}
226
227#[cfg(test)]
228pub(crate) fn kill_daemon_session_for_test(
229    daemon_bin: &Path,
230    config_path: &Path,
231) -> anyhow::Result<()> {
232    let key = DaemonSessionKey {
233        daemon_bin: daemon_bin.to_path_buf(),
234        config_path: config_path.to_path_buf(),
235    };
236    let session = session_pool()
237        .lock()
238        .map_err(|_| anyhow::anyhow!("daemon session pool lock poisoned"))?
239        .get(&key)
240        .cloned()
241        .ok_or_else(|| anyhow::anyhow!("missing daemon session"))?;
242    let mut process = session
243        .lock()
244        .map_err(|_| anyhow::anyhow!("daemon session lock poisoned"))?;
245    process.child.kill()?;
246    process.child.wait()?;
247    Ok(())
248}
249
250#[cfg(test)]
251pub(crate) fn daemon_session_pid_for_test(daemon_bin: &Path, config_path: &Path) -> Option<u32> {
252    let key = DaemonSessionKey {
253        daemon_bin: daemon_bin.to_path_buf(),
254        config_path: config_path.to_path_buf(),
255    };
256    let session = session_pool().lock().ok()?.get(&key).cloned()?;
257    let process = session.lock().ok()?;
258    Some(process.child.id())
259}
260
261#[cfg(test)]
262pub(crate) fn daemon_test_lock_for_test() -> &'static Mutex<()> {
263    static LOCK: OnceLock<Mutex<()>> = OnceLock::new();
264    LOCK.get_or_init(|| Mutex::new(()))
265}
266
267#[cfg(test)]
268mod tests {
269    use super::{DaemonClient, daemon_test_lock_for_test, reset_daemon_sessions};
270    use crate::domain::MemoryScope;
271    use crate::lifecycle_service::LifecycleService;
272    use crate::lifecycle_store::{RecordMemoryRequest, TransitionMetadata};
273    use assert_cmd::cargo::cargo_bin;
274    use std::fs;
275    use tempfile::tempdir;
276
277    fn daemon_client_test_lock() -> &'static std::sync::Mutex<()> {
278        daemon_test_lock_for_test()
279    }
280
281    #[test]
282    fn daemon_client_should_reuse_same_child_process_across_requests() {
283        let _guard = daemon_client_test_lock()
284            .lock()
285            .unwrap_or_else(|error| error.into_inner());
286        reset_daemon_sessions();
287
288        let temp = tempdir().unwrap();
289        let config_path = temp.path().join("spool.toml");
290        fs::write(&config_path, "[vault]\nroot = \"/tmp\"\n").unwrap();
291        let record = LifecycleService::new()
292            .record_manual(
293                config_path.as_path(),
294                RecordMemoryRequest {
295                    title: "简洁输出".to_string(),
296                    summary: "偏好简洁".to_string(),
297                    memory_type: "preference".to_string(),
298                    scope: MemoryScope::User,
299                    source_ref: "manual:daemon-client".to_string(),
300                    project_id: None,
301                    user_id: Some("long".to_string()),
302                    sensitivity: None,
303                    metadata: TransitionMetadata::default(),
304                    entities: Vec::new(),
305                    tags: Vec::new(),
306                    triggers: Vec::new(),
307                    related_files: Vec::new(),
308                    related_records: Vec::new(),
309                    supersedes: None,
310                    applies_to: Vec::new(),
311                    valid_until: None,
312                },
313            )
314            .unwrap();
315
316        let daemon_bin = cargo_bin("spool-daemon");
317        let client = DaemonClient::new(daemon_bin.as_path(), config_path.as_path());
318
319        let workbench = client.load_workbench().unwrap();
320        assert_eq!(workbench.wakeup_ready.len(), 1);
321        let first_pid = client.session_pid().unwrap();
322
323        let loaded = client.get_record(&record.entry.record_id).unwrap().unwrap();
324        assert_eq!(loaded.record.title, "简洁输出");
325        let second_pid = client.session_pid().unwrap();
326
327        assert_eq!(first_pid, second_pid);
328        reset_daemon_sessions();
329    }
330
331    #[test]
332    fn daemon_client_should_reuse_session_across_client_instances() {
333        let _guard = daemon_client_test_lock()
334            .lock()
335            .unwrap_or_else(|error| error.into_inner());
336        reset_daemon_sessions();
337
338        let temp = tempdir().unwrap();
339        let config_path = temp.path().join("spool.toml");
340        fs::write(&config_path, "[vault]\nroot = \"/tmp\"\n").unwrap();
341        let record = LifecycleService::new()
342            .record_manual(
343                config_path.as_path(),
344                RecordMemoryRequest {
345                    title: "简洁输出".to_string(),
346                    summary: "偏好简洁".to_string(),
347                    memory_type: "preference".to_string(),
348                    scope: MemoryScope::User,
349                    source_ref: "manual:daemon-client".to_string(),
350                    project_id: None,
351                    user_id: Some("long".to_string()),
352                    sensitivity: None,
353                    metadata: TransitionMetadata::default(),
354                    entities: Vec::new(),
355                    tags: Vec::new(),
356                    triggers: Vec::new(),
357                    related_files: Vec::new(),
358                    related_records: Vec::new(),
359                    supersedes: None,
360                    applies_to: Vec::new(),
361                    valid_until: None,
362                },
363            )
364            .unwrap();
365
366        let daemon_bin = cargo_bin("spool-daemon");
367        let first_client = DaemonClient::new(daemon_bin.as_path(), config_path.as_path());
368        let second_client = DaemonClient::new(daemon_bin.as_path(), config_path.as_path());
369
370        first_client.load_workbench().unwrap();
371        let first_pid = first_client.session_pid().unwrap();
372
373        let loaded = second_client
374            .get_record(&record.entry.record_id)
375            .unwrap()
376            .unwrap();
377        assert_eq!(loaded.record.title, "简洁输出");
378        let second_pid = second_client.session_pid().unwrap();
379
380        assert_eq!(first_pid, second_pid);
381        reset_daemon_sessions();
382    }
383
384    #[test]
385    fn daemon_client_get_record_should_return_none_for_missing_record() {
386        let _guard = daemon_client_test_lock()
387            .lock()
388            .unwrap_or_else(|error| error.into_inner());
389        reset_daemon_sessions();
390
391        let temp = tempdir().unwrap();
392        let config_path = temp.path().join("spool.toml");
393        fs::write(&config_path, "[vault]\nroot = \"/tmp\"\n").unwrap();
394
395        let daemon_bin = cargo_bin("spool-daemon");
396        let client = DaemonClient::new(daemon_bin.as_path(), config_path.as_path());
397
398        let record = client.get_record("missing-record-id").unwrap();
399        assert!(record.is_none());
400        reset_daemon_sessions();
401    }
402
403    #[test]
404    fn daemon_client_should_rebuild_session_after_child_exit() {
405        let _guard = daemon_client_test_lock()
406            .lock()
407            .unwrap_or_else(|error| error.into_inner());
408        reset_daemon_sessions();
409
410        let temp = tempdir().unwrap();
411        let config_path = temp.path().join("spool.toml");
412        fs::write(&config_path, "[vault]\nroot = \"/tmp\"\n").unwrap();
413        LifecycleService::new()
414            .record_manual(
415                config_path.as_path(),
416                RecordMemoryRequest {
417                    title: "简洁输出".to_string(),
418                    summary: "偏好简洁".to_string(),
419                    memory_type: "preference".to_string(),
420                    scope: MemoryScope::User,
421                    source_ref: "manual:daemon-client-rebuild".to_string(),
422                    project_id: None,
423                    user_id: Some("long".to_string()),
424                    sensitivity: None,
425                    metadata: TransitionMetadata::default(),
426                    entities: Vec::new(),
427                    tags: Vec::new(),
428                    triggers: Vec::new(),
429                    related_files: Vec::new(),
430                    related_records: Vec::new(),
431                    supersedes: None,
432                    applies_to: Vec::new(),
433                    valid_until: None,
434                },
435            )
436            .unwrap();
437
438        let daemon_bin = cargo_bin("spool-daemon");
439        let client = DaemonClient::new(daemon_bin.as_path(), config_path.as_path());
440
441        let first = client.load_workbench().unwrap();
442        assert_eq!(first.wakeup_ready.len(), 1);
443        let first_pid = client.session_pid().unwrap();
444
445        let session = super::session_pool()
446            .lock()
447            .unwrap()
448            .get(&client.session_key())
449            .cloned()
450            .unwrap();
451        {
452            let mut process = session.lock().unwrap();
453            process.child.kill().unwrap();
454            process.child.wait().unwrap();
455        }
456
457        let second = client.load_workbench().unwrap();
458        assert_eq!(second.wakeup_ready.len(), 1);
459        let second_pid = client.session_pid().unwrap();
460
461        assert_ne!(first_pid, second_pid);
462        reset_daemon_sessions();
463    }
464}