Skip to main content

daemon/
persister.rs

1//! Append-only JSONL disk persister.
2//!
3//! Events are written one per line to `<state_dir>/events.jsonl` with an
4//! fsync after every write so a crash or kill -9 still leaves the log in a
5//! recoverable state. On startup the file is replayed: events are
6//! deduplicated to the latest SET/DEL per `(item_type, item_id)` and
7//! surviving SETs are pushed into the server's `StoreRegistry` via
8//! `apply_event_batch` with persistence and relationship cascades both
9//! suppressed (we are restoring a snapshot — the original cascades already
10//! fired at write time).
11//!
12//! After replay, an optional filesystem watcher tails the log: any
13//! externally-appended lines (e.g. a hand-edit migration against a running
14//! daemon) are picked up, deserialized, and dispatched into the live store
15//! "as if just emitted internally" — but with `prevent_persist = true` so
16//! the writer doesn't echo them back to disk and trigger a feedback loop.
17//! See `start_watcher` for details.
18
19use std::{
20    collections::HashMap,
21    fs::{File, OpenOptions},
22    io::{BufRead, BufReader, Read, Seek, SeekFrom, Write},
23    os::unix::fs::MetadataExt,
24    path::{Path, PathBuf},
25    sync::{Arc, Mutex},
26    time::Duration,
27};
28
29use myko::{
30    server::{CellServerCtx, PersistError, PersistHealth, Persister},
31    wire::{MEvent, MEventType},
32};
33use notify_debouncer_mini::{
34    DebounceEventResult, Debouncer, new_debouncer,
35    notify::{RecommendedWatcher, RecursiveMode},
36};
37
38/// Default state-directory location (`~/.local/state/marshal`). The
39/// daemon honors `MARSHAL_STATE_DIR` to override.
40pub fn default_state_dir() -> PathBuf {
41    if let Ok(s) = std::env::var("MARSHAL_STATE_DIR") {
42        return PathBuf::from(s);
43    }
44    if let Ok(home) = std::env::var("HOME") {
45        return PathBuf::from(home).join(".local/state/marshal");
46    }
47    PathBuf::from(".marshal")
48}
49
50/// One-time JSONL persistence migration from the previous claude-coord
51/// path. Called once at startup before the persister opens its file:
52///
53/// - If the old default (`~/.local/state/claude-coord/events.jsonl`) or
54///   `$CLAUDE_COORD_STATE_DIR/events.jsonl` exists AND the new path does
55///   not, move the old file to the new path. Idempotent: a second run
56///   sees the new file in place and no-ops.
57/// - If both exist, leave both alone and warn — preferring the new path,
58///   so we never clobber state the user has already started writing under
59///   the marshal name.
60/// - If only the new (or neither) exists, no-op.
61///
62/// Cross-filesystem moves are not handled: the old and new defaults are
63/// both under `~/.local/state` so a `rename(2)` succeeds in practice. A
64/// failure here logs and returns the error; the daemon will then open
65/// the (still-empty) new path and start fresh.
66pub fn migrate_from_claude_coord(new_log_path: &Path) -> std::io::Result<()> {
67    let Some(old_log) = legacy_log_path() else {
68        return Ok(());
69    };
70    if !old_log.exists() {
71        return Ok(());
72    }
73    if new_log_path.exists() {
74        log::warn!(
75            "[migrate] both legacy ({}) and current ({}) event logs exist; \
76             leaving both in place. Reconcile manually if needed — current \
77             is preferred.",
78            old_log.display(),
79            new_log_path.display(),
80        );
81        return Ok(());
82    }
83    if let Some(parent) = new_log_path.parent() {
84        std::fs::create_dir_all(parent)?;
85    }
86    log::info!(
87        "[migrate] moving event log: {} → {}",
88        old_log.display(),
89        new_log_path.display(),
90    );
91    std::fs::rename(&old_log, new_log_path)?;
92    Ok(())
93}
94
95/// Resolve the previous claude-coord-era event log path, if discoverable.
96/// Honors the historical `CLAUDE_COORD_STATE_DIR` override before falling
97/// back to `~/.local/state/claude-coord`.
98fn legacy_log_path() -> Option<PathBuf> {
99    if let Ok(s) = std::env::var("CLAUDE_COORD_STATE_DIR") {
100        return Some(PathBuf::from(s).join("events.jsonl"));
101    }
102    let home = std::env::var("HOME").ok()?;
103    Some(PathBuf::from(home).join(".local/state/claude-coord/events.jsonl"))
104}
105
106/// File handle plus the bookkeeping needed by the watcher to distinguish
107/// the daemon's own appends from external edits. Held under a single
108/// mutex so persist() and tail_apply() can't interleave torn reads.
109struct PersistState {
110    file: File,
111    /// Bytes consumed: we have read or written every byte before this
112    /// position. The watcher skips events at offsets `< applied_offset`
113    /// to avoid re-applying its own (and the daemon's own) writes.
114    applied_offset: u64,
115    /// Inode of the currently-open `file`. A change here means someone
116    /// replaced the log via `mv` (or similar) — we reopen and reload.
117    inode: u64,
118}
119
120pub struct DiskPersister {
121    state: Arc<Mutex<PersistState>>,
122    path: PathBuf,
123    health: Arc<PersistHealth>,
124}
125
126impl DiskPersister {
127    /// Open (or create) the JSONL log at `path`. The parent directory is
128    /// created if missing.
129    pub fn new(path: impl AsRef<Path>) -> std::io::Result<Self> {
130        let path = path.as_ref().to_path_buf();
131        if let Some(parent) = path.parent() {
132            std::fs::create_dir_all(parent)?;
133        }
134        let file = OpenOptions::new()
135            .read(true)
136            .append(true)
137            .create(true)
138            .open(&path)?;
139        let meta = file.metadata()?;
140        let state = PersistState {
141            file,
142            applied_offset: 0, // replay() will set this to file end
143            inode: meta.ino(),
144        };
145        Ok(Self {
146            state: Arc::new(Mutex::new(state)),
147            path,
148            health: Arc::new(PersistHealth::default()),
149        })
150    }
151
152    pub fn path(&self) -> &Path {
153        &self.path
154    }
155
156    /// Replay the JSONL log into `ctx`'s store. Call once after
157    /// `CellServer::builder().build()` and before `server.run()` so the
158    /// registry is hot before any client connects. Returns the number of
159    /// entities restored.
160    pub fn replay(&self, ctx: &CellServerCtx) -> std::io::Result<usize> {
161        let mut state = self.state.lock().expect("disk persister mutex poisoned");
162        let surviving = read_and_dedupe_from_top(&mut state.file, &self.path)?;
163        // Position the cursor at end-of-file for subsequent appends and
164        // record that offset as our high-water mark. (Append mode ignores
165        // the seek for writes, but stream_position now reflects EOF.)
166        let end = state.file.seek(SeekFrom::End(0))?;
167        state.applied_offset = end;
168        drop(state);
169
170        let surviving_count = surviving.len();
171        let applied = ctx.apply_event_batch(surviving).map_err(|e| {
172            std::io::Error::new(std::io::ErrorKind::Other, format!("apply_event_batch: {e}"))
173        })?;
174
175        log::info!(
176            "[disk-persister] replayed event log {} ({} survived dedup, {} applied)",
177            self.path.display(),
178            surviving_count,
179            applied,
180        );
181
182        Ok(applied)
183    }
184
185    /// Spawn a filesystem watcher that tails the event log for external
186    /// appends, truncations, and replacements. Caller must keep the
187    /// returned `DiskWatcher` alive for the lifetime of the daemon — it
188    /// owns the underlying notify debouncer thread.
189    ///
190    /// Self-write suppression: every persist/replay updates
191    /// `applied_offset`. The watcher only reads bytes past that mark, so
192    /// the daemon's own appends are skipped (file size doesn't grow
193    /// beyond the new offset before the watcher tick fires).
194    ///
195    /// Truncation / replacement: detected by `current_len <
196    /// applied_offset` or by an inode change. In either case the
197    /// persister reopens its file handle (so subsequent appends land in
198    /// the new file, not an unlinked one) and re-runs the
199    /// dedupe-from-top loader. Pre-existing entities not in the new file
200    /// are not DEL'd — hot-reload is a recovery affordance, not a full
201    /// state-replacement primitive.
202    pub fn start_watcher(
203        self: &Arc<Self>,
204        ctx: CellServerCtx,
205    ) -> notify_debouncer_mini::notify::Result<DiskWatcher> {
206        let me = Arc::clone(self);
207        let mut debouncer: Debouncer<RecommendedWatcher> = new_debouncer(
208            Duration::from_millis(150),
209            move |result: DebounceEventResult| match result {
210                Ok(_events) => {
211                    if let Err(e) = me.tail_apply(&ctx) {
212                        log::warn!("[watcher] tail apply failed: {e}");
213                    }
214                }
215                Err(errors) => {
216                    log::warn!("[watcher] notify error: {errors:?}");
217                }
218            },
219        )?;
220        debouncer
221            .watcher()
222            .watch(&self.path, RecursiveMode::NonRecursive)?;
223        log::info!("[watcher] tailing event log {}", self.path.display());
224        Ok(DiskWatcher {
225            _debouncer: debouncer,
226        })
227    }
228
229    /// Read any new bytes at the tail of the log and apply them as live
230    /// events. Idempotent against the daemon's own writes via the
231    /// `applied_offset` watermark. Mutates `state.applied_offset` to the
232    /// new file length before releasing the lock so any cascading
233    /// `persist()` calls inside `apply_event_batch` extend it further
234    /// rather than re-applying what we just read.
235    fn tail_apply(&self, ctx: &CellServerCtx) -> std::io::Result<()> {
236        // Hold the state lock across the read so persist() can't
237        // interleave a partial line into our buffer.
238        let mut state = self.state.lock().expect("disk persister mutex poisoned");
239
240        let meta = std::fs::metadata(&self.path)?;
241        let current_len = meta.len();
242        let current_inode = meta.ino();
243
244        let inode_changed = current_inode != state.inode;
245        let truncated = current_len < state.applied_offset;
246        if inode_changed || truncated {
247            log::warn!(
248                "[watcher] events.jsonl {} (inode {} → {}, len {} → {}); reloading from top",
249                if truncated { "truncated" } else { "replaced" },
250                state.inode,
251                current_inode,
252                state.applied_offset,
253                current_len,
254            );
255            // Reopen the path so future appends land in the new inode.
256            let new_file = OpenOptions::new()
257                .read(true)
258                .append(true)
259                .create(true)
260                .open(&self.path)?;
261            state.file = new_file;
262            state.inode = current_inode;
263            let surviving = read_and_dedupe_from_top(&mut state.file, &self.path)?;
264            let end = state.file.seek(SeekFrom::End(0))?;
265            state.applied_offset = end;
266            drop(state);
267            apply_or_log(ctx, surviving, "full-reload");
268            return Ok(());
269        }
270
271        if current_len == state.applied_offset {
272            return Ok(()); // No new bytes; nothing to do.
273        }
274
275        let start = state.applied_offset;
276        let span = current_len - start;
277        state.file.seek(SeekFrom::Start(start))?;
278        let mut buf = String::new();
279        // Read exactly the bytes that were present when we observed the
280        // length, not whatever may have been appended since. This keeps
281        // the offset bookkeeping aligned with what we actually consumed.
282        BufReader::new(&mut state.file)
283            .take(span)
284            .read_to_string(&mut buf)?;
285        // Restore stream position to EOF (informational under append mode).
286        state.file.seek(SeekFrom::End(0))?;
287        state.applied_offset = current_len;
288        drop(state);
289
290        let mut events: Vec<MEvent> = Vec::new();
291        for line in buf.lines() {
292            if line.trim().is_empty() {
293                continue;
294            }
295            match serde_json::from_str::<MEvent>(line) {
296                Ok(mut e) => {
297                    let mut opts = e.options.clone().unwrap_or_default();
298                    opts.prevent_persist = true;
299                    e.options = Some(opts);
300                    events.push(e);
301                }
302                Err(e) => log::warn!("[watcher] skipping malformed line: {e}"),
303            }
304        }
305
306        apply_or_log(ctx, events, "tail");
307        Ok(())
308    }
309}
310
311impl Persister for DiskPersister {
312    fn persist(&self, event: MEvent) -> Result<(), PersistError> {
313        self.health.record_enqueue();
314
315        let serialized = match serde_json::to_string(&event) {
316            Ok(s) => s,
317            Err(e) => {
318                self.health.record_dropped(e.to_string());
319                return Err(PersistError {
320                    entity_type: event.item_type,
321                    message: format!("serialize: {e}"),
322                });
323            }
324        };
325
326        let mut state = self.state.lock().expect("disk persister mutex poisoned");
327        let write_result = state
328            .file
329            .write_all(serialized.as_bytes())
330            .and_then(|_| state.file.write_all(b"\n"))
331            .and_then(|_| state.file.sync_data());
332
333        if let Err(e) = write_result {
334            self.health.record_error(e.to_string());
335            return Err(PersistError {
336                entity_type: event.item_type,
337                message: format!("write/fsync: {e}"),
338            });
339        }
340
341        // Bump the watermark so the watcher's next tick recognizes this
342        // append as our own and skips it. +1 for the trailing newline.
343        state.applied_offset += serialized.len() as u64 + 1;
344
345        self.health.record_success();
346        Ok(())
347    }
348
349    fn health(&self) -> Arc<PersistHealth> {
350        self.health.clone()
351    }
352}
353
354/// Opaque handle returned by `start_watcher`. Owns the notify debouncer
355/// thread; dropping it tears the watcher down.
356pub struct DiskWatcher {
357    _debouncer: Debouncer<RecommendedWatcher>,
358}
359
360/// Stream the file once, collecting per `(item_type, id)` to the latest
361/// MEvent, then return the surviving SETs marked replay-only
362/// (`prevent_persist` + `prevent_relationship_updates`). Used by both
363/// startup `replay()` and the watcher's full-reload path.
364fn read_and_dedupe_from_top(file: &mut File, path: &Path) -> std::io::Result<Vec<MEvent>> {
365    file.seek(SeekFrom::Start(0))?;
366    let mut latest: HashMap<(String, String), MEvent> = HashMap::new();
367    let mut total = 0usize;
368    let mut malformed = 0usize;
369
370    let reader = BufReader::new(&mut *file);
371    for line in reader.lines() {
372        let line = line?;
373        if line.trim().is_empty() {
374            continue;
375        }
376        match serde_json::from_str::<MEvent>(&line) {
377            Ok(event) => {
378                total += 1;
379                let id = event
380                    .item
381                    .get("id")
382                    .and_then(|v| v.as_str())
383                    .unwrap_or("")
384                    .to_string();
385                if id.is_empty() {
386                    malformed += 1;
387                    continue;
388                }
389                latest.insert((event.item_type.clone(), id), event);
390            }
391            Err(e) => {
392                malformed += 1;
393                log::warn!(
394                    "[disk-persister] skipping malformed line in {}: {e}",
395                    path.display()
396                );
397            }
398        }
399    }
400
401    let surviving: Vec<MEvent> = latest
402        .into_values()
403        .filter(|e| matches!(e.change_type, MEventType::SET))
404        .map(|mut e| {
405            let mut opts = e.options.clone().unwrap_or_default();
406            opts.prevent_persist = true;
407            opts.prevent_relationship_updates = true;
408            e.options = Some(opts);
409            e
410        })
411        .collect();
412
413    log::debug!(
414        "[disk-persister] read {} ({} total, {} malformed, {} survived dedup)",
415        path.display(),
416        total,
417        malformed,
418        surviving.len(),
419    );
420
421    Ok(surviving)
422}
423
424fn apply_or_log(ctx: &CellServerCtx, events: Vec<MEvent>, label: &str) {
425    if events.is_empty() {
426        return;
427    }
428    let count = events.len();
429    match ctx.apply_event_batch(events) {
430        Ok(applied) => {
431            log::info!("[watcher] {label}: applied {applied}/{count} hot-reloaded event(s)")
432        }
433        Err(e) => log::warn!("[watcher] {label}: apply_event_batch failed: {e}"),
434    }
435}