use std::{
collections::HashMap,
fs::{File, OpenOptions},
io::{BufRead, BufReader, Read, Seek, SeekFrom, Write},
os::unix::fs::MetadataExt,
path::{Path, PathBuf},
sync::{Arc, Mutex},
time::Duration,
};
use myko::{
server::{CellServerCtx, PersistError, PersistHealth, Persister},
wire::{MEvent, MEventType},
};
use notify_debouncer_mini::{
DebounceEventResult, Debouncer, new_debouncer,
notify::{RecommendedWatcher, RecursiveMode},
};
pub fn default_state_dir() -> PathBuf {
if let Ok(s) = std::env::var("MARSHAL_STATE_DIR") {
return PathBuf::from(s);
}
if let Ok(home) = std::env::var("HOME") {
return PathBuf::from(home).join(".local/state/marshal");
}
PathBuf::from(".marshal")
}
pub fn migrate_from_claude_coord(new_log_path: &Path) -> std::io::Result<()> {
let Some(old_log) = legacy_log_path() else {
return Ok(());
};
if !old_log.exists() {
return Ok(());
}
if new_log_path.exists() {
log::warn!(
"[migrate] both legacy ({}) and current ({}) event logs exist; \
leaving both in place. Reconcile manually if needed — current \
is preferred.",
old_log.display(),
new_log_path.display(),
);
return Ok(());
}
if let Some(parent) = new_log_path.parent() {
std::fs::create_dir_all(parent)?;
}
log::info!(
"[migrate] moving event log: {} → {}",
old_log.display(),
new_log_path.display(),
);
std::fs::rename(&old_log, new_log_path)?;
Ok(())
}
fn legacy_log_path() -> Option<PathBuf> {
if let Ok(s) = std::env::var("CLAUDE_COORD_STATE_DIR") {
return Some(PathBuf::from(s).join("events.jsonl"));
}
let home = std::env::var("HOME").ok()?;
Some(PathBuf::from(home).join(".local/state/claude-coord/events.jsonl"))
}
struct PersistState {
file: File,
applied_offset: u64,
inode: u64,
}
pub struct DiskPersister {
state: Arc<Mutex<PersistState>>,
path: PathBuf,
health: Arc<PersistHealth>,
}
impl DiskPersister {
pub fn new(path: impl AsRef<Path>) -> std::io::Result<Self> {
let path = path.as_ref().to_path_buf();
if let Some(parent) = path.parent() {
std::fs::create_dir_all(parent)?;
}
let file = OpenOptions::new()
.read(true)
.append(true)
.create(true)
.open(&path)?;
let meta = file.metadata()?;
let state = PersistState {
file,
applied_offset: 0, inode: meta.ino(),
};
Ok(Self {
state: Arc::new(Mutex::new(state)),
path,
health: Arc::new(PersistHealth::default()),
})
}
pub fn path(&self) -> &Path {
&self.path
}
pub fn replay(&self, ctx: &CellServerCtx) -> std::io::Result<usize> {
let mut state = self.state.lock().expect("disk persister mutex poisoned");
let surviving = read_and_dedupe_from_top(&mut state.file, &self.path)?;
let end = state.file.seek(SeekFrom::End(0))?;
state.applied_offset = end;
drop(state);
let surviving_count = surviving.len();
let applied = ctx.apply_event_batch(surviving).map_err(|e| {
std::io::Error::new(std::io::ErrorKind::Other, format!("apply_event_batch: {e}"))
})?;
log::info!(
"[disk-persister] replayed event log {} ({} survived dedup, {} applied)",
self.path.display(),
surviving_count,
applied,
);
Ok(applied)
}
pub fn start_watcher(
self: &Arc<Self>,
ctx: CellServerCtx,
) -> notify_debouncer_mini::notify::Result<DiskWatcher> {
let me = Arc::clone(self);
let mut debouncer: Debouncer<RecommendedWatcher> = new_debouncer(
Duration::from_millis(150),
move |result: DebounceEventResult| match result {
Ok(_events) => {
if let Err(e) = me.tail_apply(&ctx) {
log::warn!("[watcher] tail apply failed: {e}");
}
}
Err(errors) => {
log::warn!("[watcher] notify error: {errors:?}");
}
},
)?;
debouncer
.watcher()
.watch(&self.path, RecursiveMode::NonRecursive)?;
log::info!("[watcher] tailing event log {}", self.path.display());
Ok(DiskWatcher {
_debouncer: debouncer,
})
}
fn tail_apply(&self, ctx: &CellServerCtx) -> std::io::Result<()> {
let mut state = self.state.lock().expect("disk persister mutex poisoned");
let meta = std::fs::metadata(&self.path)?;
let current_len = meta.len();
let current_inode = meta.ino();
let inode_changed = current_inode != state.inode;
let truncated = current_len < state.applied_offset;
if inode_changed || truncated {
log::warn!(
"[watcher] events.jsonl {} (inode {} → {}, len {} → {}); reloading from top",
if truncated { "truncated" } else { "replaced" },
state.inode,
current_inode,
state.applied_offset,
current_len,
);
let new_file = OpenOptions::new()
.read(true)
.append(true)
.create(true)
.open(&self.path)?;
state.file = new_file;
state.inode = current_inode;
let surviving = read_and_dedupe_from_top(&mut state.file, &self.path)?;
let end = state.file.seek(SeekFrom::End(0))?;
state.applied_offset = end;
drop(state);
apply_or_log(ctx, surviving, "full-reload");
return Ok(());
}
if current_len == state.applied_offset {
return Ok(()); }
let start = state.applied_offset;
let span = current_len - start;
state.file.seek(SeekFrom::Start(start))?;
let mut buf = String::new();
BufReader::new(&mut state.file)
.take(span)
.read_to_string(&mut buf)?;
state.file.seek(SeekFrom::End(0))?;
state.applied_offset = current_len;
drop(state);
let mut events: Vec<MEvent> = Vec::new();
for line in buf.lines() {
if line.trim().is_empty() {
continue;
}
match serde_json::from_str::<MEvent>(line) {
Ok(mut e) => {
let mut opts = e.options.clone().unwrap_or_default();
opts.prevent_persist = true;
e.options = Some(opts);
events.push(e);
}
Err(e) => log::warn!("[watcher] skipping malformed line: {e}"),
}
}
apply_or_log(ctx, events, "tail");
Ok(())
}
}
impl Persister for DiskPersister {
fn persist(&self, event: MEvent) -> Result<(), PersistError> {
self.health.record_enqueue();
let serialized = match serde_json::to_string(&event) {
Ok(s) => s,
Err(e) => {
self.health.record_dropped(e.to_string());
return Err(PersistError {
entity_type: event.item_type,
message: format!("serialize: {e}"),
});
}
};
let mut state = self.state.lock().expect("disk persister mutex poisoned");
let write_result = state
.file
.write_all(serialized.as_bytes())
.and_then(|_| state.file.write_all(b"\n"))
.and_then(|_| state.file.sync_data());
if let Err(e) = write_result {
self.health.record_error(e.to_string());
return Err(PersistError {
entity_type: event.item_type,
message: format!("write/fsync: {e}"),
});
}
state.applied_offset += serialized.len() as u64 + 1;
self.health.record_success();
Ok(())
}
fn health(&self) -> Arc<PersistHealth> {
self.health.clone()
}
}
pub struct DiskWatcher {
_debouncer: Debouncer<RecommendedWatcher>,
}
fn read_and_dedupe_from_top(file: &mut File, path: &Path) -> std::io::Result<Vec<MEvent>> {
file.seek(SeekFrom::Start(0))?;
let mut latest: HashMap<(String, String), MEvent> = HashMap::new();
let mut total = 0usize;
let mut malformed = 0usize;
let reader = BufReader::new(&mut *file);
for line in reader.lines() {
let line = line?;
if line.trim().is_empty() {
continue;
}
match serde_json::from_str::<MEvent>(&line) {
Ok(event) => {
total += 1;
let id = event
.item
.get("id")
.and_then(|v| v.as_str())
.unwrap_or("")
.to_string();
if id.is_empty() {
malformed += 1;
continue;
}
latest.insert((event.item_type.clone(), id), event);
}
Err(e) => {
malformed += 1;
log::warn!(
"[disk-persister] skipping malformed line in {}: {e}",
path.display()
);
}
}
}
let surviving: Vec<MEvent> = latest
.into_values()
.filter(|e| matches!(e.change_type, MEventType::SET))
.map(|mut e| {
let mut opts = e.options.clone().unwrap_or_default();
opts.prevent_persist = true;
opts.prevent_relationship_updates = true;
e.options = Some(opts);
e
})
.collect();
log::debug!(
"[disk-persister] read {} ({} total, {} malformed, {} survived dedup)",
path.display(),
total,
malformed,
surviving.len(),
);
Ok(surviving)
}
fn apply_or_log(ctx: &CellServerCtx, events: Vec<MEvent>, label: &str) {
if events.is_empty() {
return;
}
let count = events.len();
match ctx.apply_event_batch(events) {
Ok(applied) => {
log::info!("[watcher] {label}: applied {applied}/{count} hot-reloaded event(s)")
}
Err(e) => log::warn!("[watcher] {label}: apply_event_batch failed: {e}"),
}
}