Skip to main content

second_brain_sync/
export.rs

1use std::io::Write;
2
3use anyhow::Result;
4use chrono::{DateTime, Utc};
5use serde::{Deserialize, Serialize};
6
7use second_brain_core::kuzu_store::KuzuStore;
8use second_brain_core::schema::{SyncNodeType, SyncOp};
9
10#[derive(Debug, Serialize, Deserialize)]
11pub struct SyncRecord {
12    pub seq: u64,
13    pub op: SyncOp,
14    pub node_type: SyncNodeType,
15    pub node_id: String,
16    pub machine_id: String,
17    pub timestamp: DateTime<Utc>,
18    pub data: Option<serde_json::Value>,
19}
20
21pub fn export_changes<W: Write>(store: &KuzuStore, after_seq: u64, writer: &mut W) -> Result<u64> {
22    let entries = store.sync_log_since(after_seq)?;
23    let mut max_seq = after_seq;
24
25    let mut records: Vec<SyncRecord> = entries
26        .into_iter()
27        .map(|e| {
28            let data = e.data.as_ref().and_then(|d| serde_json::from_str(d).ok());
29            SyncRecord {
30                seq: e.seq,
31                op: e.op,
32                node_type: e.node_type,
33                node_id: e.node_id,
34                machine_id: e.machine_id,
35                timestamp: e.timestamp,
36                data,
37            }
38        })
39        .collect();
40
41    sort_for_import(&mut records);
42
43    for record in &records {
44        serde_json::to_writer(&mut *writer, record)?;
45        writer.write_all(b"\n")?;
46        if record.seq > max_seq {
47            max_seq = record.seq;
48        }
49    }
50
51    Ok(max_seq)
52}
53
54pub fn export_to_stdout(store: &KuzuStore, after_seq: u64) -> Result<u64> {
55    let stdout = std::io::stdout();
56    let mut handle = stdout.lock();
57    export_changes(store, after_seq, &mut handle)
58}
59
60fn sort_for_import(records: &mut [SyncRecord]) {
61    records.sort_by(|a, b| {
62        let type_order = |r: &SyncRecord| -> u8 {
63            match (&r.op, &r.node_type) {
64                (SyncOp::Delete, SyncNodeType::Relation) => 0,
65                (SyncOp::Delete, SyncNodeType::Memory) => 1,
66                (SyncOp::Delete, SyncNodeType::Entity) => 2,
67                (SyncOp::Delete, SyncNodeType::Conversation) => 3,
68                (SyncOp::Create, SyncNodeType::Conversation) => 4,
69                (SyncOp::Create, SyncNodeType::Entity) => 5,
70                (SyncOp::Create, SyncNodeType::Memory) => 6,
71                (SyncOp::Create, SyncNodeType::Relation) => 7,
72                (SyncOp::Update, SyncNodeType::Conversation) => 8,
73                (SyncOp::Update, SyncNodeType::Entity) => 9,
74                (SyncOp::Update, SyncNodeType::Memory) => 10,
75                (SyncOp::Update, SyncNodeType::Relation) => 11,
76            }
77        };
78        type_order(a).cmp(&type_order(b)).then(a.seq.cmp(&b.seq))
79    });
80}