1use std::{
20 collections::HashMap,
21 fs::{File, OpenOptions},
22 io::{BufRead, BufReader, Read, Seek, SeekFrom, Write},
23 os::unix::fs::MetadataExt,
24 path::{Path, PathBuf},
25 sync::{Arc, Mutex},
26 time::Duration,
27};
28
29use myko::{
30 server::{CellServerCtx, PersistError, PersistHealth, Persister},
31 wire::{MEvent, MEventType},
32};
33use notify_debouncer_mini::{
34 DebounceEventResult, Debouncer, new_debouncer,
35 notify::{RecommendedWatcher, RecursiveMode},
36};
37
38pub fn default_state_dir() -> PathBuf {
41 if let Ok(s) = std::env::var("MARSHAL_STATE_DIR") {
42 return PathBuf::from(s);
43 }
44 if let Ok(home) = std::env::var("HOME") {
45 return PathBuf::from(home).join(".local/state/marshal");
46 }
47 PathBuf::from(".marshal")
48}
49
50pub fn migrate_from_claude_coord(new_log_path: &Path) -> std::io::Result<()> {
67 let Some(old_log) = legacy_log_path() else {
68 return Ok(());
69 };
70 if !old_log.exists() {
71 return Ok(());
72 }
73 if new_log_path.exists() {
74 log::warn!(
75 "[migrate] both legacy ({}) and current ({}) event logs exist; \
76 leaving both in place. Reconcile manually if needed — current \
77 is preferred.",
78 old_log.display(),
79 new_log_path.display(),
80 );
81 return Ok(());
82 }
83 if let Some(parent) = new_log_path.parent() {
84 std::fs::create_dir_all(parent)?;
85 }
86 log::info!(
87 "[migrate] moving event log: {} → {}",
88 old_log.display(),
89 new_log_path.display(),
90 );
91 std::fs::rename(&old_log, new_log_path)?;
92 Ok(())
93}
94
95fn legacy_log_path() -> Option<PathBuf> {
99 if let Ok(s) = std::env::var("CLAUDE_COORD_STATE_DIR") {
100 return Some(PathBuf::from(s).join("events.jsonl"));
101 }
102 let home = std::env::var("HOME").ok()?;
103 Some(PathBuf::from(home).join(".local/state/claude-coord/events.jsonl"))
104}
105
106struct PersistState {
110 file: File,
111 applied_offset: u64,
115 inode: u64,
118}
119
120pub struct DiskPersister {
121 state: Arc<Mutex<PersistState>>,
122 path: PathBuf,
123 health: Arc<PersistHealth>,
124}
125
126impl DiskPersister {
127 pub fn new(path: impl AsRef<Path>) -> std::io::Result<Self> {
130 let path = path.as_ref().to_path_buf();
131 if let Some(parent) = path.parent() {
132 std::fs::create_dir_all(parent)?;
133 }
134 let file = OpenOptions::new()
135 .read(true)
136 .append(true)
137 .create(true)
138 .open(&path)?;
139 let meta = file.metadata()?;
140 let state = PersistState {
141 file,
142 applied_offset: 0, inode: meta.ino(),
144 };
145 Ok(Self {
146 state: Arc::new(Mutex::new(state)),
147 path,
148 health: Arc::new(PersistHealth::default()),
149 })
150 }
151
152 pub fn path(&self) -> &Path {
153 &self.path
154 }
155
156 pub fn replay(&self, ctx: &CellServerCtx) -> std::io::Result<usize> {
161 let mut state = self.state.lock().expect("disk persister mutex poisoned");
162 let surviving = read_and_dedupe_from_top(&mut state.file, &self.path)?;
163 let end = state.file.seek(SeekFrom::End(0))?;
167 state.applied_offset = end;
168 drop(state);
169
170 let surviving_count = surviving.len();
171 let applied = ctx
172 .apply_event_batch(surviving)
173 .map_err(|e| std::io::Error::other(format!("apply_event_batch: {e}")))?;
174
175 log::info!(
176 "[disk-persister] replayed event log {} ({} survived dedup, {} applied)",
177 self.path.display(),
178 surviving_count,
179 applied,
180 );
181
182 Ok(applied)
183 }
184
185 pub fn start_watcher(
203 self: &Arc<Self>,
204 ctx: CellServerCtx,
205 ) -> notify_debouncer_mini::notify::Result<DiskWatcher> {
206 let me = Arc::clone(self);
207 let mut debouncer: Debouncer<RecommendedWatcher> = new_debouncer(
208 Duration::from_millis(150),
209 move |result: DebounceEventResult| match result {
210 Ok(_events) => {
211 if let Err(e) = me.tail_apply(&ctx) {
212 log::warn!("[watcher] tail apply failed: {e}");
213 }
214 }
215 Err(errors) => {
216 log::warn!("[watcher] notify error: {errors:?}");
217 }
218 },
219 )?;
220 debouncer
221 .watcher()
222 .watch(&self.path, RecursiveMode::NonRecursive)?;
223 log::info!("[watcher] tailing event log {}", self.path.display());
224 Ok(DiskWatcher {
225 _debouncer: debouncer,
226 })
227 }
228
229 fn tail_apply(&self, ctx: &CellServerCtx) -> std::io::Result<()> {
236 let mut state = self.state.lock().expect("disk persister mutex poisoned");
239
240 let meta = std::fs::metadata(&self.path)?;
241 let current_len = meta.len();
242 let current_inode = meta.ino();
243
244 let inode_changed = current_inode != state.inode;
245 let truncated = current_len < state.applied_offset;
246 if inode_changed || truncated {
247 log::warn!(
248 "[watcher] events.jsonl {} (inode {} → {}, len {} → {}); reloading from top",
249 if truncated { "truncated" } else { "replaced" },
250 state.inode,
251 current_inode,
252 state.applied_offset,
253 current_len,
254 );
255 let new_file = OpenOptions::new()
257 .read(true)
258 .append(true)
259 .create(true)
260 .open(&self.path)?;
261 state.file = new_file;
262 state.inode = current_inode;
263 let surviving = read_and_dedupe_from_top(&mut state.file, &self.path)?;
264 let end = state.file.seek(SeekFrom::End(0))?;
265 state.applied_offset = end;
266 drop(state);
267 apply_or_log(ctx, surviving, "full-reload");
268 return Ok(());
269 }
270
271 if current_len == state.applied_offset {
272 return Ok(()); }
274
275 let start = state.applied_offset;
276 let span = current_len - start;
277 state.file.seek(SeekFrom::Start(start))?;
278 let mut buf = String::new();
279 BufReader::new(&mut state.file)
283 .take(span)
284 .read_to_string(&mut buf)?;
285 state.file.seek(SeekFrom::End(0))?;
287 state.applied_offset = current_len;
288 drop(state);
289
290 let mut events: Vec<MEvent> = Vec::new();
291 for line in buf.lines() {
292 if line.trim().is_empty() {
293 continue;
294 }
295 match serde_json::from_str::<MEvent>(line) {
296 Ok(mut e) => {
297 let mut opts = e.options.clone().unwrap_or_default();
298 opts.prevent_persist = true;
299 e.options = Some(opts);
300 events.push(e);
301 }
302 Err(e) => log::warn!("[watcher] skipping malformed line: {e}"),
303 }
304 }
305
306 apply_or_log(ctx, events, "tail");
307 Ok(())
308 }
309}
310
311impl Persister for DiskPersister {
312 fn persist(&self, event: MEvent) -> Result<(), PersistError> {
313 self.health.record_enqueue();
314
315 let serialized = match serde_json::to_string(&event) {
316 Ok(s) => s,
317 Err(e) => {
318 self.health.record_dropped(e.to_string());
319 return Err(PersistError {
320 entity_type: event.item_type,
321 message: format!("serialize: {e}"),
322 });
323 }
324 };
325
326 let mut state = self.state.lock().expect("disk persister mutex poisoned");
327 let write_result = state
328 .file
329 .write_all(serialized.as_bytes())
330 .and_then(|_| state.file.write_all(b"\n"))
331 .and_then(|_| state.file.sync_data());
332
333 if let Err(e) = write_result {
334 self.health.record_error(e.to_string());
335 return Err(PersistError {
336 entity_type: event.item_type,
337 message: format!("write/fsync: {e}"),
338 });
339 }
340
341 state.applied_offset += serialized.len() as u64 + 1;
344
345 self.health.record_success();
346 Ok(())
347 }
348
349 fn health(&self) -> Arc<PersistHealth> {
350 self.health.clone()
351 }
352}
353
354pub struct DiskWatcher {
357 _debouncer: Debouncer<RecommendedWatcher>,
358}
359
360fn read_and_dedupe_from_top(file: &mut File, path: &Path) -> std::io::Result<Vec<MEvent>> {
365 file.seek(SeekFrom::Start(0))?;
366 let mut latest: HashMap<(String, String), MEvent> = HashMap::new();
367 let mut total = 0usize;
368 let mut malformed = 0usize;
369
370 let reader = BufReader::new(&mut *file);
371 for line in reader.lines() {
372 let line = line?;
373 if line.trim().is_empty() {
374 continue;
375 }
376 match serde_json::from_str::<MEvent>(&line) {
377 Ok(event) => {
378 total += 1;
379 let id = event
380 .item
381 .get("id")
382 .and_then(|v| v.as_str())
383 .unwrap_or("")
384 .to_string();
385 if id.is_empty() {
386 malformed += 1;
387 continue;
388 }
389 latest.insert((event.item_type.clone(), id), event);
390 }
391 Err(e) => {
392 malformed += 1;
393 log::warn!(
394 "[disk-persister] skipping malformed line in {}: {e}",
395 path.display()
396 );
397 }
398 }
399 }
400
401 let surviving: Vec<MEvent> = latest
402 .into_values()
403 .filter(|e| matches!(e.change_type, MEventType::SET))
404 .map(|mut e| {
405 let mut opts = e.options.clone().unwrap_or_default();
406 opts.prevent_persist = true;
407 opts.prevent_relationship_updates = true;
408 e.options = Some(opts);
409 e
410 })
411 .collect();
412
413 log::debug!(
414 "[disk-persister] read {} ({} total, {} malformed, {} survived dedup)",
415 path.display(),
416 total,
417 malformed,
418 surviving.len(),
419 );
420
421 Ok(surviving)
422}
423
424fn apply_or_log(ctx: &CellServerCtx, events: Vec<MEvent>, label: &str) {
425 if events.is_empty() {
426 return;
427 }
428 let count = events.len();
429 match ctx.apply_event_batch(events) {
430 Ok(applied) => {
431 log::info!("[watcher] {label}: applied {applied}/{count} hot-reloaded event(s)")
432 }
433 Err(e) => log::warn!("[watcher] {label}: apply_event_batch failed: {e}"),
434 }
435}