Skip to main content

zer_cluster/
provenance.rs

1use rusqlite::Connection;
2use zer_core::{
3    entity::{EntityId, ResolutionMethod},
4    error::ZerError,
5    record::RecordId,
6};
7
8/// Events written to the `resolution_events` table to provide an audit trail
9/// for every structural change to the entity store.
10#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
11pub enum ResolutionEvent {
12    EntityCreated { entity_id: EntityId, record_ids: Vec<RecordId> },
13    RecordsAdded  { entity_id: EntityId, record_ids: Vec<RecordId>, method: ResolutionMethod },
14    EntityMerged  { source_a: EntityId, source_b: EntityId, into: EntityId },
15    EntitySplit   { source: EntityId, into: Vec<EntityId> },
16    JudgeApplied  { entity_id: EntityId, pair: (RecordId, RecordId), verdict: String },
17}
18
19/// Append a provenance event to `resolution_events`.
20///
21/// Called from `ZalEntityStore` with the locked connection, no additional
22/// locking is needed here.
23pub fn append_event(conn: &Connection, event: &ResolutionEvent) -> Result<(), ZerError> {
24    let (event_type, entity_id, record_ids, score, judge_verdict) = match event {
25        ResolutionEvent::EntityCreated { entity_id, record_ids } => (
26            "EntityCreated",
27            *entity_id,
28            record_ids.clone(),
29            None::<f32>,
30            None::<String>,
31        ),
32        ResolutionEvent::RecordsAdded { entity_id, record_ids, .. } => (
33            "RecordsAdded",
34            *entity_id,
35            record_ids.clone(),
36            None,
37            None,
38        ),
39        ResolutionEvent::EntityMerged { into, source_a, source_b } => (
40            "EntityMerged",
41            *into,
42            vec![*source_a, *source_b],
43            None,
44            None,
45        ),
46        ResolutionEvent::EntitySplit { source, into } => (
47            "EntitySplit",
48            *source,
49            into.clone(),
50            None,
51            None,
52        ),
53        ResolutionEvent::JudgeApplied { entity_id, pair, verdict } => (
54            "JudgeApplied",
55            *entity_id,
56            vec![pair.0, pair.1],
57            None,
58            Some(verdict.clone()),
59        ),
60    };
61
62    let ids_json = serde_json::to_string(&record_ids)
63        .map_err(|e| ZerError::Serialization(e.to_string()))?;
64    let now = unix_now();
65
66    conn.execute(
67        "INSERT INTO resolution_events
68             (event_type, entity_id, record_ids, score, judge_verdict, occurred_at)
69         VALUES (?1, ?2, ?3, ?4, ?5, ?6)",
70        rusqlite::params![event_type, entity_id as i64, ids_json, score, judge_verdict, now],
71    )
72    .map_err(|e| ZerError::Store(e.to_string()))?;
73
74    Ok(())
75}
76
77pub(crate) fn unix_now() -> i64 {
78    std::time::SystemTime::now()
79        .duration_since(std::time::UNIX_EPOCH)
80        .map(|d| d.as_secs() as i64)
81        .unwrap_or(0)
82}