Skip to main content

spool/lifecycle_store/
mod.rs

1//! Lifecycle ledger store: append-only JSONL + 投影缓存的统一入口。
2//!
3//! 这个 module 的物理拆分:
4//! - `mod.rs`:对外类型与 `LifecycleStore` / `LifecycleProjection` 实现、常量、顶层 helper
5//! - `api.rs`:对外 mutation 与查询 API(原 `lifecycle_store.rs` 里的 pub fn)
6//! - `projection.rs`:投影缓存 + 快照读写 + fingerprint
7//! - `internal.rs`:私有的 ID/scope/timestamp/写入/状态转移辅助
8//! - `tests.rs`:单元测试
9//!
10//! 外部 `use crate::lifecycle_store::xxx` 的路径保持不变。
11
12mod api;
13mod internal;
14mod projection;
15
16use crate::domain::{
17    MemoryLedgerAction, MemoryLifecycleState, MemoryRecord, MemoryScope, MemorySourceKind,
18};
19use serde::{Deserialize, Serialize};
20use std::collections::BTreeMap;
21use std::fs::{self, OpenOptions};
22use std::io::{BufRead, BufReader, Write};
23use std::path::{Path, PathBuf};
24use ts_rs::TS;
25
26pub(crate) const LEDGER_FILE_NAME: &str = "memory-ledger.jsonl";
27pub(crate) const LEDGER_SCHEMA_VERSION: &str = "memory-ledger.v1";
28
29pub use api::{
30    accept_memory, accept_memory_with_metadata, archive_memory, archive_memory_with_metadata,
31    latest_state_by_scope, latest_state_by_state, latest_state_entries, lifecycle_query_plan,
32    pending_review_entries, project_latest_state, promote_memory_to_canonical,
33    promote_memory_to_canonical_with_metadata, propose_ai_memory, read_events_for_record,
34    record_manual_memory, review_queue_for_scope, review_queue_plan, wakeup_ready_entries,
35    wakeup_ready_for_scope,
36};
37pub use projection::read_projection;
38
39#[cfg(test)]
40pub(crate) use projection::{
41    clear_projection_cache, read_projection_with_cache_hit, read_projection_with_source,
42};
43
44#[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq, Eq, TS)]
45#[ts(export, export_to = "../frontend/src/lib/types/generated/")]
46pub struct TransitionMetadata {
47    #[serde(default, skip_serializing_if = "Option::is_none")]
48    pub actor: Option<String>,
49    #[serde(default, skip_serializing_if = "Option::is_none")]
50    pub reason: Option<String>,
51    #[serde(default, skip_serializing_if = "Vec::is_empty")]
52    pub evidence_refs: Vec<String>,
53}
54
55#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, TS)]
56#[ts(export, export_to = "../frontend/src/lib/types/generated/")]
57pub struct LedgerEntry {
58    pub schema_version: String,
59    pub recorded_at: String,
60    pub record_id: String,
61    pub scope_key: String,
62    pub action: MemoryLedgerAction,
63    pub source_kind: MemorySourceKind,
64    #[serde(default)]
65    pub metadata: TransitionMetadata,
66    pub record: MemoryRecord,
67}
68
69#[derive(Debug, Clone)]
70pub struct LifecycleStore {
71    ledger_path: PathBuf,
72}
73
74impl LifecycleStore {
75    pub fn new(root: &Path) -> Self {
76        Self {
77            ledger_path: root.join(LEDGER_FILE_NAME),
78        }
79    }
80
81    pub fn ledger_path(&self) -> &Path {
82        &self.ledger_path
83    }
84
85    pub fn projection_snapshot_path(&self) -> PathBuf {
86        self.ledger_path
87            .parent()
88            .unwrap_or_else(|| Path::new("."))
89            .join(projection::PROJECTION_SNAPSHOT_FILE_NAME)
90    }
91
92    pub fn append(&self, entry: &LedgerEntry) -> anyhow::Result<()> {
93        if let Some(parent) = self.ledger_path.parent() {
94            fs::create_dir_all(parent)?;
95        }
96        let mut file = OpenOptions::new()
97            .create(true)
98            .append(true)
99            .open(&self.ledger_path)?;
100        writeln!(file, "{}", serde_json::to_string(entry)?)?;
101        file.sync_data()?;
102        projection::invalidate_projection_cache(
103            &self.ledger_path,
104            self.projection_snapshot_path().as_path(),
105        );
106        Ok(())
107    }
108
109    pub fn read_all(&self) -> anyhow::Result<Vec<LedgerEntry>> {
110        if !self.ledger_path.exists() {
111            return Ok(Vec::new());
112        }
113
114        let file = fs::File::open(&self.ledger_path)?;
115        let reader = BufReader::new(file);
116        // We deliberately skip-and-warn malformed lines instead of
117        // returning Err: a single corrupted line (incomplete write
118        // after SIGKILL, accidental hand-edit, git merge marker, …)
119        // would otherwise take the entire query surface offline —
120        // wakeup, review queue, show, history all go dark together.
121        //
122        // The trade-off: a malformed entry is silently ignored from
123        // the projection. We log it to stderr so `spool mcp doctor`
124        // and operators can surface it. Loss-of-truth is preferable
125        // to total-blackout because the ledger remains the source
126        // of truth — the next valid append re-establishes state.
127        let mut entries = Vec::new();
128        for (idx, line) in reader.lines().enumerate() {
129            let line_no = idx + 1;
130            let raw = match line {
131                Ok(raw) => raw,
132                Err(err) => {
133                    eprintln!(
134                        "[spool ledger] read error at {}:{line_no}: {err}",
135                        self.ledger_path.display()
136                    );
137                    continue;
138                }
139            };
140            if raw.trim().is_empty() {
141                continue;
142            }
143            match serde_json::from_str::<LedgerEntry>(&raw) {
144                Ok(entry) => entries.push(entry),
145                Err(err) => {
146                    eprintln!(
147                        "[spool ledger] malformed entry at {}:{line_no}: {err}",
148                        self.ledger_path.display()
149                    );
150                }
151            }
152        }
153        Ok(entries)
154    }
155}
156
157#[derive(Debug, Clone)]
158pub struct LifecycleProjection {
159    latest_entries: Vec<LedgerEntry>,
160    latest_index_by_record_id: BTreeMap<String, usize>,
161}
162
163impl LifecycleProjection {
164    pub fn from_entries(entries: Vec<LedgerEntry>) -> Self {
165        let mut latest_entries = Vec::new();
166        let mut latest_index_by_record_id = BTreeMap::new();
167
168        for entry in entries {
169            if let Some(index) = latest_index_by_record_id.get(&entry.record_id).copied() {
170                latest_entries[index] = entry;
171            } else {
172                latest_index_by_record_id.insert(entry.record_id.clone(), latest_entries.len());
173                latest_entries.push(entry);
174            }
175        }
176
177        Self {
178            latest_entries,
179            latest_index_by_record_id,
180        }
181    }
182
183    pub fn latest_entries(&self) -> &[LedgerEntry] {
184        &self.latest_entries
185    }
186
187    pub fn latest_by_record_id(&self, record_id: &str) -> Option<&LedgerEntry> {
188        self.latest_index_by_record_id
189            .get(record_id)
190            .and_then(|index| self.latest_entries.get(*index))
191    }
192
193    pub fn by_scope(&self, scope: MemoryScope, scope_key: &str) -> Vec<LedgerEntry> {
194        self.latest_entries
195            .iter()
196            .filter(|entry| entry.record.scope == scope && entry.scope_key == scope_key)
197            .cloned()
198            .collect()
199    }
200
201    pub fn by_state(&self, state: MemoryLifecycleState) -> Vec<LedgerEntry> {
202        self.latest_entries
203            .iter()
204            .filter(|entry| entry.record.state == state)
205            .cloned()
206            .collect()
207    }
208
209    pub fn pending_review(&self) -> Vec<LedgerEntry> {
210        self.latest_entries
211            .iter()
212            .filter(|entry| entry.record.requires_review())
213            .cloned()
214            .collect()
215    }
216
217    pub fn wakeup_ready(&self) -> Vec<LedgerEntry> {
218        self.latest_entries
219            .iter()
220            .filter(|entry| entry.record.can_be_returned_in_wakeup())
221            .cloned()
222            .collect()
223    }
224}
225
226#[derive(Debug, Clone)]
227pub struct RecordMemoryRequest {
228    pub title: String,
229    pub summary: String,
230    pub memory_type: String,
231    pub scope: MemoryScope,
232    pub source_ref: String,
233    pub project_id: Option<String>,
234    pub user_id: Option<String>,
235    pub sensitivity: Option<String>,
236    pub metadata: TransitionMetadata,
237    // Structured retrieval signals
238    pub entities: Vec<String>,
239    pub tags: Vec<String>,
240    pub triggers: Vec<String>,
241    pub related_files: Vec<String>,
242    pub related_records: Vec<String>,
243    pub supersedes: Option<String>,
244    pub applies_to: Vec<String>,
245    pub valid_until: Option<String>,
246}
247
248#[derive(Debug, Clone)]
249pub struct ProposeMemoryRequest {
250    pub title: String,
251    pub summary: String,
252    pub memory_type: String,
253    pub scope: MemoryScope,
254    pub source_ref: String,
255    pub project_id: Option<String>,
256    pub user_id: Option<String>,
257    pub sensitivity: Option<String>,
258    pub metadata: TransitionMetadata,
259    // Structured retrieval signals
260    pub entities: Vec<String>,
261    pub tags: Vec<String>,
262    pub triggers: Vec<String>,
263    pub related_files: Vec<String>,
264    pub related_records: Vec<String>,
265    pub supersedes: Option<String>,
266    pub applies_to: Vec<String>,
267    pub valid_until: Option<String>,
268}
269
270pub fn ledger_file_name() -> &'static str {
271    LEDGER_FILE_NAME
272}
273
274pub fn lifecycle_root_from_config(config_dir: &Path) -> PathBuf {
275    if config_dir.file_name().and_then(|n| n.to_str()) == Some(".spool") {
276        config_dir.to_path_buf()
277    } else {
278        config_dir.join(".spool")
279    }
280}
281
282#[cfg(test)]
283mod tests;