1use contextdb_core::{Lsn, RowId, Value, VectorIndexRef};
7use serde::{Deserialize, Serialize};
8use std::collections::HashMap;
9use uuid::Uuid;
10
11#[derive(Debug, Clone, Default, Serialize, Deserialize)]
13pub struct ChangeSet {
14 pub rows: Vec<RowChange>,
15 pub edges: Vec<EdgeChange>,
16 pub vectors: Vec<VectorChange>,
17 pub ddl: Vec<DdlChange>,
18}
19
20impl ChangeSet {
21 pub fn filter_by_direction(
23 &self,
24 directions: &HashMap<String, SyncDirection>,
25 include: &[SyncDirection],
26 ) -> ChangeSet {
27 let include_dir = |table: &str| {
28 let dir = directions
29 .get(table)
30 .copied()
31 .unwrap_or(SyncDirection::Both);
32 include.contains(&dir)
33 };
34
35 ChangeSet {
36 rows: self
37 .rows
38 .iter()
39 .filter(|r| include_dir(&r.table))
40 .cloned()
41 .collect(),
42 edges: self.edges.clone(),
43 vectors: self
44 .vectors
45 .iter()
46 .filter(|v| include_dir(&v.index.table))
47 .cloned()
48 .collect(),
49 ddl: self
50 .ddl
51 .iter()
52 .filter(|d| match d {
53 DdlChange::CreateTable { name, .. } => include_dir(name),
54 DdlChange::DropTable { name } => include_dir(name),
55 DdlChange::AlterTable { name, .. } => include_dir(name),
56 DdlChange::CreateIndex { table, .. } => include_dir(table),
57 DdlChange::DropIndex { table, .. } => include_dir(table),
58 })
59 .cloned()
60 .collect(),
61 }
62 }
63}
64
65#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
66pub struct RowChange {
67 pub table: String,
68 pub natural_key: NaturalKey,
69 pub values: HashMap<String, Value>,
70 pub deleted: bool,
71 pub lsn: Lsn,
72}
73
74#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
75pub struct EdgeChange {
76 pub source: Uuid,
77 pub target: Uuid,
78 pub edge_type: String,
79 pub properties: HashMap<String, Value>,
80 pub lsn: Lsn,
81}
82
83#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
84pub struct VectorChange {
85 pub index: VectorIndexRef,
86 pub row_id: RowId,
87 pub vector: Vec<f32>,
88 pub lsn: Lsn,
89}
90
91#[derive(Debug, Clone, Serialize, Deserialize)]
92pub enum DdlChange {
93 CreateTable {
94 name: String,
95 columns: Vec<(String, String)>,
96 constraints: Vec<String>,
97 },
98 DropTable {
99 name: String,
100 },
101 AlterTable {
102 name: String,
103 columns: Vec<(String, String)>,
104 constraints: Vec<String>,
105 },
106 CreateIndex {
107 table: String,
108 name: String,
109 columns: Vec<(String, contextdb_core::SortDirection)>,
110 },
111 DropIndex {
112 table: String,
113 name: String,
114 },
115}
116
117#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
118pub struct NaturalKey {
119 pub column: String,
120 pub value: Value,
121}
122
123#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
124pub enum ConflictPolicy {
125 InsertIfNotExists,
126 ServerWins,
127 EdgeWins,
128 LatestWins,
129}
130
131#[derive(Debug, Clone, Serialize, Deserialize)]
132pub struct ConflictPolicies {
133 pub per_table: HashMap<String, ConflictPolicy>,
134 pub default: ConflictPolicy,
135}
136
137impl ConflictPolicies {
138 pub fn uniform(policy: ConflictPolicy) -> Self {
139 Self {
140 per_table: HashMap::new(),
141 default: policy,
142 }
143 }
144}
145
146#[derive(Debug, Clone, Serialize, Deserialize)]
147pub struct ApplyResult {
148 pub applied_rows: usize,
149 pub skipped_rows: usize,
150 pub conflicts: Vec<Conflict>,
151 pub new_lsn: Lsn,
152}
153
154#[derive(Debug, Clone, Serialize, Deserialize)]
155pub struct Conflict {
156 pub natural_key: NaturalKey,
157 pub resolution: ConflictPolicy,
158 pub reason: Option<String>,
159}
160
161#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
162pub enum SyncDirection {
163 Push,
164 Pull,
165 Both,
166 None,
167}