Skip to main content

zer_cluster/
provenance.rs

1use rusqlite::Connection;
2use zer_core::{
3    entity::{EntityId, ResolutionMethod},
4    error::ZerError,
5    record::RecordId,
6};
7
8/// Events written to the `resolution_events` table to provide an audit trail
9/// for every structural change to the entity store.
10#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
11pub enum ResolutionEvent {
12    EntityCreated {
13        entity_id: EntityId,
14        record_ids: Vec<RecordId>,
15    },
16    RecordsAdded {
17        entity_id: EntityId,
18        record_ids: Vec<RecordId>,
19        method: ResolutionMethod,
20    },
21    EntityMerged {
22        source_a: EntityId,
23        source_b: EntityId,
24        into: EntityId,
25    },
26    EntitySplit {
27        source: EntityId,
28        into: Vec<EntityId>,
29    },
30    JudgeApplied {
31        entity_id: EntityId,
32        pair: (RecordId, RecordId),
33        verdict: String,
34    },
35}
36
37/// Append a provenance event to `resolution_events`.
38///
39/// Called from `ZalEntityStore` with the locked connection, no additional
40/// locking is needed here.
41pub fn append_event(conn: &Connection, event: &ResolutionEvent) -> Result<(), ZerError> {
42    let (event_type, entity_id, record_ids, score, judge_verdict) = match event {
43        ResolutionEvent::EntityCreated {
44            entity_id,
45            record_ids,
46        } => (
47            "EntityCreated",
48            *entity_id,
49            record_ids.clone(),
50            None::<f32>,
51            None::<String>,
52        ),
53        ResolutionEvent::RecordsAdded {
54            entity_id,
55            record_ids,
56            ..
57        } => ("RecordsAdded", *entity_id, record_ids.clone(), None, None),
58        ResolutionEvent::EntityMerged {
59            into,
60            source_a,
61            source_b,
62        } => (
63            "EntityMerged",
64            *into,
65            vec![*source_a, *source_b],
66            None,
67            None,
68        ),
69        ResolutionEvent::EntitySplit { source, into } => {
70            ("EntitySplit", *source, into.clone(), None, None)
71        }
72        ResolutionEvent::JudgeApplied {
73            entity_id,
74            pair,
75            verdict,
76        } => (
77            "JudgeApplied",
78            *entity_id,
79            vec![pair.0, pair.1],
80            None,
81            Some(verdict.clone()),
82        ),
83    };
84
85    let ids_json =
86        serde_json::to_string(&record_ids).map_err(|e| ZerError::Serialization(e.to_string()))?;
87    let now = unix_now();
88
89    conn.execute(
90        "INSERT INTO resolution_events
91             (event_type, entity_id, record_ids, score, judge_verdict, occurred_at)
92         VALUES (?1, ?2, ?3, ?4, ?5, ?6)",
93        rusqlite::params![
94            event_type,
95            entity_id as i64,
96            ids_json,
97            score,
98            judge_verdict,
99            now
100        ],
101    )
102    .map_err(|e| ZerError::Store(e.to_string()))?;
103
104    Ok(())
105}
106
107pub(crate) fn unix_now() -> i64 {
108    std::time::SystemTime::now()
109        .duration_since(std::time::UNIX_EPOCH)
110        .map(|d| d.as_secs() as i64)
111        .unwrap_or(0)
112}