use std::path::Path;
use anyhow::{Context, Result};
use mimir_core::replicate::{self, ApplyStats};
use mimir_core::Mimir;
pub struct FileSummary {
pub wrote: usize,
pub peers_merged: usize,
pub applied: ApplyStats,
}
pub fn sync(mimir: &mut Mimir, dir: &Path) -> Result<FileSummary> {
let wrote = write_snapshot(mimir, dir)?;
let (peers_merged, applied) = merge_peers(mimir, dir)?;
let _ = mimir.embed_pending();
Ok(FileSummary {
wrote,
peers_merged,
applied,
})
}
pub fn write_snapshot(mimir: &Mimir, dir: &Path) -> Result<usize> {
std::fs::create_dir_all(dir).with_context(|| format!("create {}", dir.display()))?;
let id = replicate::machine_id(&mimir.conn)?;
let batch = replicate::snapshot(&mimir.conn)?;
let count = batch.nodes.len();
let final_path = dir.join(format!("{id}.json"));
let tmp_path = dir.join(format!(".{id}.json.tmp"));
std::fs::write(&tmp_path, serde_json::to_vec(&batch)?)
.with_context(|| format!("write {}", tmp_path.display()))?;
std::fs::rename(&tmp_path, &final_path)
.with_context(|| format!("rename into {}", final_path.display()))?;
Ok(count)
}
pub fn merge_peers(mimir: &mut Mimir, dir: &Path) -> Result<(usize, ApplyStats)> {
let id = replicate::machine_id(&mimir.conn)?;
let mine = format!("{id}.json");
let mut total = ApplyStats::default();
let mut peers = 0;
if !dir.is_dir() {
return Ok((0, total));
}
for entry in std::fs::read_dir(dir).with_context(|| format!("read {}", dir.display()))? {
let entry = entry?;
let name = entry.file_name();
let name = name.to_string_lossy();
if !name.ends_with(".json") || name == mine || name.starts_with('.') {
continue;
}
let data = std::fs::read(entry.path())?;
let batch: replicate::SyncBatch = serde_json::from_slice(&data)
.with_context(|| format!("parse {}", entry.path().display()))?;
let s = replicate::apply_changes(&mimir.conn, &batch)?;
total.nodes_upserted += s.nodes_upserted;
total.nodes_skipped += s.nodes_skipped;
total.edges_upserted += s.edges_upserted;
total.edges_skipped += s.edges_skipped;
total.tombstones += s.tombstones;
peers += 1;
}
Ok((peers, total))
}
#[cfg(test)]
mod tests {
use super::*;
use mimir_core::config::Paths;
use mimir_core::memory::{remember, Remember};
use mimir_core::model::MemoryType;
fn store(root: &std::path::Path) -> Mimir {
std::fs::create_dir_all(root).unwrap();
Mimir::open_at(Paths::under_root(root)).unwrap()
}
fn glob(m: &Mimir, text: &str) {
remember(
&m.conn,
Remember {
text: text.into(),
mtype: MemoryType::Note,
tags: vec![],
project_id: None,
force: true,
},
)
.unwrap();
}
fn live_count(m: &Mimir) -> i64 {
m.conn
.query_row(
"SELECT count(*) FROM node WHERE kind='memory' AND project_id IS NULL AND deleted_at IS NULL",
[],
|r| r.get(0),
)
.unwrap()
}
#[test]
fn two_stores_converge_via_shared_dir() {
let tmp = tempfile::tempdir().unwrap();
let shared = tmp.path().join("shared");
let mut a = store(&tmp.path().join("a"));
let mut b = store(&tmp.path().join("b"));
glob(&a, "fact from machine a");
sync(&mut a, &shared).unwrap();
let s = sync(&mut b, &shared).unwrap();
assert_eq!(s.applied.nodes_upserted, 1);
assert_eq!(live_count(&b), 1, "b received a's memory");
let id: i64 = a
.conn
.query_row("SELECT id FROM node WHERE kind='memory'", [], |r| r.get(0))
.unwrap();
mimir_core::store::soft_delete(&a.conn, id).unwrap();
sync(&mut a, &shared).unwrap();
sync(&mut b, &shared).unwrap();
assert_eq!(live_count(&b), 0, "delete propagated");
}
}