Skip to main content

spool/
daemon.rs

1use crate::daemon_client::DaemonClient;
2use crate::lifecycle_service::{LifecycleService, LifecycleWorkbenchSnapshot};
3use crate::lifecycle_store::LedgerEntry;
4use serde_json::{Value, json};
5use std::io::{self, BufRead, Write};
6use std::path::{Path, PathBuf};
7
8#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
9pub enum LifecycleReadMode {
10    #[default]
11    Direct,
12    Daemon,
13}
14
15#[derive(Debug, Clone, Default, PartialEq, Eq)]
16pub struct LifecycleReadOptions {
17    pub mode: LifecycleReadMode,
18    pub daemon_bin: Option<PathBuf>,
19}
20
21pub fn serve_stdio(config_path: &Path) -> anyhow::Result<()> {
22    if !config_path.exists() {
23        anyhow::bail!("config not found: {}", config_path.display());
24    }
25
26    let stdin = io::stdin();
27    let stdout = io::stdout();
28    let mut reader = stdin.lock();
29    let mut writer = stdout.lock();
30
31    let mut line = String::new();
32    loop {
33        line.clear();
34        let bytes = reader.read_line(&mut line)?;
35        if bytes == 0 {
36            break;
37        }
38
39        let trimmed = line.trim();
40        if trimmed.is_empty() {
41            continue;
42        }
43
44        let response = match serde_json::from_str::<Value>(trimmed) {
45            Ok(request) => handle_request(config_path, &request),
46            Err(error) => json!({ "ok": false, "error": format!("invalid json: {error}") }),
47        };
48        serde_json::to_writer(&mut writer, &response)?;
49        writer.write_all(b"\n")?;
50        writer.flush()?;
51    }
52
53    Ok(())
54}
55
56pub fn read_workbench(
57    config_path: &Path,
58    options: &LifecycleReadOptions,
59) -> anyhow::Result<LifecycleWorkbenchSnapshot> {
60    match options.mode {
61        LifecycleReadMode::Direct => LifecycleService::new().load_workbench(config_path),
62        LifecycleReadMode::Daemon => {
63            let client = DaemonClient::new(options.daemon_bin(config_path)?, config_path);
64            client
65                .load_workbench()
66                .or_else(|_| LifecycleService::new().load_workbench(config_path))
67        }
68    }
69}
70
71pub fn read_record(
72    config_path: &Path,
73    record_id: &str,
74    options: &LifecycleReadOptions,
75) -> anyhow::Result<Option<LedgerEntry>> {
76    match options.mode {
77        LifecycleReadMode::Direct => LifecycleService::new().get_record(config_path, record_id),
78        LifecycleReadMode::Daemon => {
79            let client = DaemonClient::new(options.daemon_bin(config_path)?, config_path);
80            client
81                .get_record(record_id)
82                .or_else(|_| LifecycleService::new().get_record(config_path, record_id))
83        }
84    }
85}
86
87pub fn read_history(
88    config_path: &Path,
89    record_id: &str,
90    options: &LifecycleReadOptions,
91) -> anyhow::Result<Vec<LedgerEntry>> {
92    match options.mode {
93        LifecycleReadMode::Direct => LifecycleService::new().get_history(config_path, record_id),
94        LifecycleReadMode::Daemon => {
95            let client = DaemonClient::new(options.daemon_bin(config_path)?, config_path);
96            client
97                .get_history(record_id)
98                .or_else(|_| LifecycleService::new().get_history(config_path, record_id))
99        }
100    }
101}
102
103impl LifecycleReadOptions {
104    pub fn with_daemon(daemon_bin: &Path) -> Self {
105        Self {
106            mode: LifecycleReadMode::Daemon,
107            daemon_bin: Some(daemon_bin.to_path_buf()),
108        }
109    }
110
111    fn daemon_bin<'a>(&'a self, config_path: &Path) -> anyhow::Result<&'a Path> {
112        self.daemon_bin.as_deref().ok_or_else(|| {
113            anyhow::anyhow!("missing daemon binary for config {}", config_path.display())
114        })
115    }
116}
117
118fn handle_request(config_path: &Path, request: &Value) -> Value {
119    let command = request.get("command").and_then(Value::as_str).unwrap_or("");
120    match command {
121        "ping" => json!({ "ok": true, "command": "pong" }),
122        "workbench" => {
123            let service = LifecycleService::new();
124            match service.load_workbench(config_path) {
125                Ok(snapshot) => json!({
126                    "ok": true,
127                    "pending_review": snapshot.pending_review,
128                    "wakeup_ready": snapshot.wakeup_ready
129                }),
130                Err(error) => json!({ "ok": false, "error": error.to_string() }),
131            }
132        }
133        "record" => {
134            let Some(record_id) = request.get("record_id").and_then(Value::as_str) else {
135                return json!({ "ok": false, "error": "missing record_id" });
136            };
137            let service = LifecycleService::new();
138            match service.get_record(config_path, record_id) {
139                // Missing records use the same envelope shape as `history`
140                // (ok:true + null payload). We deliberately do NOT signal
141                // "not found" via ok:false anymore — that's a real
142                // protocol error category (transport / config / IO),
143                // and conflating "not found" into it forces the client
144                // to do brittle string matching on the error message.
145                Ok(Some(record)) => json!({ "ok": true, "record": record }),
146                Ok(None) => json!({ "ok": true, "record": Value::Null }),
147                Err(error) => json!({ "ok": false, "error": error.to_string() }),
148            }
149        }
150        "history" => {
151            let Some(record_id) = request.get("record_id").and_then(Value::as_str) else {
152                return json!({ "ok": false, "error": "missing record_id" });
153            };
154            let service = LifecycleService::new();
155            match service.get_history(config_path, record_id) {
156                Ok(history) => json!({ "ok": true, "record_id": record_id, "history": history }),
157                Err(error) => json!({ "ok": false, "error": error.to_string() }),
158            }
159        }
160        _ => json!({ "ok": false, "error": format!("unknown command: {command}") }),
161    }
162}
163
164#[cfg(test)]
165mod tests {
166    use super::handle_request;
167    use crate::daemon_client::{
168        daemon_session_pid_for_test, daemon_test_lock_for_test, kill_daemon_session_for_test,
169        reset_daemon_sessions,
170    };
171    use crate::domain::MemoryScope;
172    use crate::lifecycle_service::LifecycleService;
173    use crate::lifecycle_store::{RecordMemoryRequest, TransitionMetadata};
174    use serde_json::json;
175    use std::fs;
176    use tempfile::tempdir;
177
178    fn setup_config() -> (tempfile::TempDir, std::path::PathBuf) {
179        let temp = tempdir().unwrap();
180        let config_path = temp.path().join("spool.toml");
181        fs::write(&config_path, "[vault]\nroot = \"/tmp\"\n").unwrap();
182        (temp, config_path)
183    }
184
185    #[test]
186    fn daemon_should_serve_workbench_and_record_history_reads() {
187        let (_temp, config_path) = setup_config();
188        let record = LifecycleService::new()
189            .record_manual(
190                config_path.as_path(),
191                RecordMemoryRequest {
192                    title: "简洁输出".to_string(),
193                    summary: "偏好简洁".to_string(),
194                    memory_type: "preference".to_string(),
195                    scope: MemoryScope::User,
196                    source_ref: "manual:daemon".to_string(),
197                    project_id: None,
198                    user_id: Some("long".to_string()),
199                    sensitivity: None,
200                    metadata: TransitionMetadata::default(),
201                    entities: Vec::new(),
202                    tags: Vec::new(),
203                    triggers: Vec::new(),
204                    related_files: Vec::new(),
205                    related_records: Vec::new(),
206                    supersedes: None,
207                    applies_to: Vec::new(),
208                    valid_until: None,
209                },
210            )
211            .unwrap();
212
213        let workbench = handle_request(config_path.as_path(), &json!({ "command": "workbench" }));
214        assert_eq!(workbench["ok"], json!(true));
215        assert_eq!(workbench["wakeup_ready"].as_array().unwrap().len(), 1);
216
217        let record_response = handle_request(
218            config_path.as_path(),
219            &json!({ "command": "record", "record_id": record.entry.record_id }),
220        );
221        assert_eq!(record_response["ok"], json!(true));
222
223        let history_response = handle_request(
224            config_path.as_path(),
225            &json!({ "command": "history", "record_id": record.entry.record_id }),
226        );
227        assert_eq!(history_response["ok"], json!(true));
228        assert_eq!(history_response["history"].as_array().unwrap().len(), 1);
229    }
230
231    #[test]
232    fn daemon_record_command_should_return_ok_true_with_null_for_missing_id() {
233        // Bug #1+#2 contract: missing records use ok:true + record:null.
234        // Conflating "not found" into ok:false forces clients to do
235        // brittle string matching on the error message.
236        let (_temp, config_path) = setup_config();
237        let response = handle_request(
238            config_path.as_path(),
239            &json!({ "command": "record", "record_id": "definitely-missing" }),
240        );
241        assert_eq!(response["ok"], json!(true));
242        assert_eq!(response["record"], serde_json::Value::Null);
243        assert!(
244            response.get("error").is_none(),
245            "missing record must NOT carry an error field"
246        );
247    }
248
249    #[test]
250    fn read_helpers_should_fallback_to_direct_when_daemon_is_unavailable() {
251        let (_temp, config_path) = setup_config();
252        let record = LifecycleService::new()
253            .propose_ai(
254                config_path.as_path(),
255                crate::lifecycle_store::ProposeMemoryRequest {
256                    title: "测试偏好".to_string(),
257                    summary: "先 smoke 再收口".to_string(),
258                    memory_type: "workflow".to_string(),
259                    scope: MemoryScope::User,
260                    source_ref: "session:1".to_string(),
261                    project_id: None,
262                    user_id: Some("long".to_string()),
263                    sensitivity: None,
264                    metadata: TransitionMetadata::default(),
265                    entities: Vec::new(),
266                    tags: Vec::new(),
267                    triggers: Vec::new(),
268                    related_files: Vec::new(),
269                    related_records: Vec::new(),
270                    supersedes: None,
271                    applies_to: Vec::new(),
272                    valid_until: None,
273                },
274            )
275            .unwrap();
276        let options = super::LifecycleReadOptions::with_daemon(std::path::Path::new(
277            "/definitely/missing/spool-daemon",
278        ));
279
280        let workbench = super::read_workbench(config_path.as_path(), &options).unwrap();
281        assert_eq!(workbench.pending_review.len(), 1);
282
283        let loaded_record =
284            super::read_record(config_path.as_path(), &record.entry.record_id, &options)
285                .unwrap()
286                .unwrap();
287        assert_eq!(
288            loaded_record.record.state,
289            crate::domain::MemoryLifecycleState::Candidate
290        );
291
292        let history =
293            super::read_history(config_path.as_path(), &record.entry.record_id, &options).unwrap();
294        assert_eq!(history.len(), 1);
295    }
296
297    #[test]
298    fn daemon_should_return_structured_error_for_invalid_json() {
299        let (_temp, config_path) = setup_config();
300        let response = serde_json::from_str::<serde_json::Value>(
301            "{\"ok\":false,\"error\":\"invalid json: EOF while parsing an object at line 1 column 1\"}",
302        );
303        assert!(response.is_ok());
304
305        let invalid = "{";
306        let parsed = serde_json::from_str::<serde_json::Value>(invalid);
307        assert!(parsed.is_err());
308
309        let response = match parsed {
310            Ok(request) => super::handle_request(config_path.as_path(), &request),
311            Err(error) => json!({ "ok": false, "error": format!("invalid json: {error}") }),
312        };
313        assert_eq!(response["ok"], json!(false));
314        assert!(
315            response["error"]
316                .as_str()
317                .unwrap()
318                .contains("invalid json:")
319        );
320    }
321
322    #[test]
323    fn read_helpers_should_reuse_shared_daemon_session() {
324        let _guard = daemon_test_lock_for_test()
325            .lock()
326            .unwrap_or_else(|error| error.into_inner());
327        reset_daemon_sessions();
328        let (_temp, config_path) = setup_config();
329        let record = LifecycleService::new()
330            .propose_ai(
331                config_path.as_path(),
332                crate::lifecycle_store::ProposeMemoryRequest {
333                    title: "测试偏好".to_string(),
334                    summary: "先 smoke 再收口".to_string(),
335                    memory_type: "workflow".to_string(),
336                    scope: MemoryScope::User,
337                    source_ref: "session:1".to_string(),
338                    project_id: None,
339                    user_id: Some("long".to_string()),
340                    sensitivity: None,
341                    metadata: TransitionMetadata::default(),
342                    entities: Vec::new(),
343                    tags: Vec::new(),
344                    triggers: Vec::new(),
345                    related_files: Vec::new(),
346                    related_records: Vec::new(),
347                    supersedes: None,
348                    applies_to: Vec::new(),
349                    valid_until: None,
350                },
351            )
352            .unwrap();
353        let daemon_bin = assert_cmd::cargo::cargo_bin("spool-daemon");
354        let options = super::LifecycleReadOptions::with_daemon(daemon_bin.as_path());
355
356        let workbench = super::read_workbench(config_path.as_path(), &options).unwrap();
357        assert_eq!(workbench.pending_review.len(), 1);
358        let first_pid =
359            daemon_session_pid_for_test(daemon_bin.as_path(), config_path.as_path()).unwrap();
360
361        let loaded_record =
362            super::read_record(config_path.as_path(), &record.entry.record_id, &options)
363                .unwrap()
364                .unwrap();
365        assert_eq!(loaded_record.record.title, "测试偏好");
366
367        let history =
368            super::read_history(config_path.as_path(), &record.entry.record_id, &options).unwrap();
369        assert_eq!(history.len(), 1);
370        let second_pid =
371            daemon_session_pid_for_test(daemon_bin.as_path(), config_path.as_path()).unwrap();
372
373        assert_eq!(first_pid, second_pid);
374        reset_daemon_sessions();
375    }
376
377    #[test]
378    fn read_helpers_should_rebuild_shared_daemon_session_after_exit() {
379        let _guard = daemon_test_lock_for_test()
380            .lock()
381            .unwrap_or_else(|error| error.into_inner());
382        reset_daemon_sessions();
383        let (_temp, config_path) = setup_config();
384        let record = LifecycleService::new()
385            .propose_ai(
386                config_path.as_path(),
387                crate::lifecycle_store::ProposeMemoryRequest {
388                    title: "测试偏好".to_string(),
389                    summary: "先 smoke 再收口".to_string(),
390                    memory_type: "workflow".to_string(),
391                    scope: MemoryScope::User,
392                    source_ref: "session:1".to_string(),
393                    project_id: None,
394                    user_id: Some("long".to_string()),
395                    sensitivity: None,
396                    metadata: TransitionMetadata::default(),
397                    entities: Vec::new(),
398                    tags: Vec::new(),
399                    triggers: Vec::new(),
400                    related_files: Vec::new(),
401                    related_records: Vec::new(),
402                    supersedes: None,
403                    applies_to: Vec::new(),
404                    valid_until: None,
405                },
406            )
407            .unwrap();
408        let daemon_bin = assert_cmd::cargo::cargo_bin("spool-daemon");
409        let options = super::LifecycleReadOptions::with_daemon(daemon_bin.as_path());
410
411        let workbench = super::read_workbench(config_path.as_path(), &options).unwrap();
412        assert_eq!(workbench.pending_review.len(), 1);
413        let first_pid =
414            daemon_session_pid_for_test(daemon_bin.as_path(), config_path.as_path()).unwrap();
415
416        kill_daemon_session_for_test(daemon_bin.as_path(), config_path.as_path()).unwrap();
417
418        let loaded_record =
419            super::read_record(config_path.as_path(), &record.entry.record_id, &options)
420                .unwrap()
421                .unwrap();
422        assert_eq!(loaded_record.record.title, "测试偏好");
423        let second_pid =
424            daemon_session_pid_for_test(daemon_bin.as_path(), config_path.as_path()).unwrap();
425
426        assert_ne!(first_pid, second_pid);
427        reset_daemon_sessions();
428    }
429}