Skip to main content

contextdb_server/
protocol.rs

1use crate::error::SyncError;
2use contextdb_core::Value;
3use contextdb_engine::sync_types::{
4    ApplyResult, ChangeSet, Conflict, DdlChange, EdgeChange, NaturalKey, RowChange, VectorChange,
5};
6use serde::{Deserialize, Serialize};
7use std::collections::HashMap;
8
9const PROTOCOL_VERSION: u8 = 1;
10
11#[derive(Debug, Clone, Serialize, Deserialize)]
12pub struct Envelope {
13    pub version: u8,
14    pub message_type: MessageType,
15    pub payload: Vec<u8>,
16}
17
18#[derive(Debug, Clone, Serialize, Deserialize)]
19pub enum MessageType {
20    PushRequest,
21    PushResponse,
22    PullRequest,
23    PullResponse,
24    Chunk,
25    ChunkAck,
26}
27
28#[derive(Debug, Clone, Serialize, Deserialize)]
29pub struct PushRequest {
30    pub changeset: WireChangeSet,
31}
32
33#[derive(Debug, Clone, Serialize, Deserialize)]
34pub struct PushResponse {
35    pub result: Option<WireApplyResult>,
36    pub error: Option<String>,
37}
38
39#[derive(Debug, Clone, Serialize, Deserialize)]
40pub struct PullRequest {
41    pub since_lsn: u64,
42    pub max_entries: Option<u32>,
43}
44
45#[derive(Debug, Clone, Serialize, Deserialize)]
46pub struct PullResponse {
47    pub changeset: WireChangeSet,
48    pub has_more: bool,
49    pub cursor: Option<u64>,
50}
51
52#[derive(Debug, Clone, Serialize, Deserialize)]
53pub struct ChunkMessage {
54    pub chunk_id: uuid::Uuid,
55    pub sequence: u32,
56    pub total_chunks: u32,
57    #[serde(with = "serde_bytes")]
58    pub payload: Vec<u8>,
59}
60
61#[derive(Debug, Clone, Serialize, Deserialize)]
62pub struct ChunkAck {
63    pub chunk_id: uuid::Uuid,
64    pub total_chunks: u32,
65    pub reply_inbox: String,
66}
67
68#[derive(Debug, Clone, Default, Serialize, Deserialize)]
69pub struct WireChangeSet {
70    pub rows: Vec<WireRowChange>,
71    pub edges: Vec<WireEdgeChange>,
72    pub vectors: Vec<WireVectorChange>,
73    pub ddl: Vec<WireDdlChange>,
74}
75
76#[derive(Debug, Clone, Serialize, Deserialize)]
77pub struct WireRowChange {
78    pub table: String,
79    pub natural_key: WireNaturalKey,
80    pub values: HashMap<String, Value>,
81    #[serde(default)]
82    pub deleted: bool,
83    pub lsn: u64,
84}
85
86#[derive(Debug, Clone, Serialize, Deserialize)]
87pub struct WireEdgeChange {
88    pub source: uuid::Uuid,
89    pub target: uuid::Uuid,
90    pub edge_type: String,
91    pub properties: HashMap<String, Value>,
92    pub lsn: u64,
93}
94
95#[derive(Debug, Clone, Serialize, Deserialize)]
96pub struct WireVectorChange {
97    pub row_id: u64,
98    pub vector: Vec<f32>,
99    pub lsn: u64,
100}
101
102#[derive(Debug, Clone, Serialize, Deserialize)]
103pub enum WireDdlChange {
104    CreateTable {
105        name: String,
106        columns: Vec<(String, String)>,
107        constraints: Vec<String>,
108    },
109    DropTable {
110        name: String,
111    },
112    AlterTable {
113        name: String,
114        columns: Vec<(String, String)>,
115        constraints: Vec<String>,
116    },
117}
118
119#[derive(Debug, Clone, Serialize, Deserialize)]
120pub struct WireNaturalKey {
121    pub column: String,
122    pub value: Value,
123}
124
125#[derive(Debug, Clone, Serialize, Deserialize)]
126pub struct WireApplyResult {
127    pub applied_rows: usize,
128    pub skipped_rows: usize,
129    pub conflicts: Vec<WireConflict>,
130    pub new_lsn: u64,
131}
132
133#[derive(Debug, Clone, Serialize, Deserialize)]
134pub struct WireConflict {
135    pub natural_key: WireNaturalKey,
136    pub resolution: String,
137    pub reason: Option<String>,
138}
139
140pub fn encode<T: Serialize>(msg_type: MessageType, msg: &T) -> Result<Vec<u8>, SyncError> {
141    let payload = rmp_serde::to_vec(msg).map_err(|e| SyncError::Serde(e.to_string()))?;
142    let envelope = Envelope {
143        version: PROTOCOL_VERSION,
144        message_type: msg_type,
145        payload,
146    };
147    rmp_serde::to_vec(&envelope).map_err(|e| SyncError::Serde(e.to_string()))
148}
149
150pub fn decode(data: &[u8]) -> Result<Envelope, SyncError> {
151    let envelope: Envelope =
152        rmp_serde::from_slice(data).map_err(|e| SyncError::Serde(e.to_string()))?;
153    if envelope.version > PROTOCOL_VERSION {
154        return Err(SyncError::UnsupportedVersion(envelope.version));
155    }
156    Ok(envelope)
157}
158
159impl From<ChangeSet> for WireChangeSet {
160    fn from(value: ChangeSet) -> Self {
161        Self {
162            rows: value.rows.into_iter().map(Into::into).collect(),
163            edges: value.edges.into_iter().map(Into::into).collect(),
164            vectors: value.vectors.into_iter().map(Into::into).collect(),
165            ddl: value.ddl.into_iter().map(Into::into).collect(),
166        }
167    }
168}
169
170impl From<WireChangeSet> for ChangeSet {
171    fn from(value: WireChangeSet) -> Self {
172        Self {
173            rows: value.rows.into_iter().map(Into::into).collect(),
174            edges: value.edges.into_iter().map(Into::into).collect(),
175            vectors: value.vectors.into_iter().map(Into::into).collect(),
176            ddl: value.ddl.into_iter().map(Into::into).collect(),
177        }
178    }
179}
180
181impl From<RowChange> for WireRowChange {
182    fn from(value: RowChange) -> Self {
183        Self {
184            table: value.table,
185            natural_key: value.natural_key.into(),
186            values: value.values,
187            deleted: value.deleted,
188            lsn: value.lsn,
189        }
190    }
191}
192
193impl From<WireRowChange> for RowChange {
194    fn from(value: WireRowChange) -> Self {
195        Self {
196            table: value.table,
197            natural_key: value.natural_key.into(),
198            values: value.values,
199            deleted: value.deleted,
200            lsn: value.lsn,
201        }
202    }
203}
204
205impl From<EdgeChange> for WireEdgeChange {
206    fn from(value: EdgeChange) -> Self {
207        Self {
208            source: value.source,
209            target: value.target,
210            edge_type: value.edge_type,
211            properties: value.properties,
212            lsn: value.lsn,
213        }
214    }
215}
216
217impl From<WireEdgeChange> for EdgeChange {
218    fn from(value: WireEdgeChange) -> Self {
219        Self {
220            source: value.source,
221            target: value.target,
222            edge_type: value.edge_type,
223            properties: value.properties,
224            lsn: value.lsn,
225        }
226    }
227}
228
229impl From<VectorChange> for WireVectorChange {
230    fn from(value: VectorChange) -> Self {
231        Self {
232            row_id: value.row_id,
233            vector: value.vector,
234            lsn: value.lsn,
235        }
236    }
237}
238
239impl From<WireVectorChange> for VectorChange {
240    fn from(value: WireVectorChange) -> Self {
241        Self {
242            row_id: value.row_id,
243            vector: value.vector,
244            lsn: value.lsn,
245        }
246    }
247}
248
249impl From<DdlChange> for WireDdlChange {
250    fn from(value: DdlChange) -> Self {
251        match value {
252            DdlChange::CreateTable {
253                name,
254                columns,
255                constraints,
256            } => Self::CreateTable {
257                name,
258                columns,
259                constraints,
260            },
261            DdlChange::DropTable { name } => Self::DropTable { name },
262            DdlChange::AlterTable {
263                name,
264                columns,
265                constraints,
266            } => Self::AlterTable {
267                name,
268                columns,
269                constraints,
270            },
271        }
272    }
273}
274
275impl From<WireDdlChange> for DdlChange {
276    fn from(value: WireDdlChange) -> Self {
277        match value {
278            WireDdlChange::CreateTable {
279                name,
280                columns,
281                constraints,
282            } => Self::CreateTable {
283                name,
284                columns,
285                constraints,
286            },
287            WireDdlChange::DropTable { name } => Self::DropTable { name },
288            WireDdlChange::AlterTable {
289                name,
290                columns,
291                constraints,
292            } => Self::AlterTable {
293                name,
294                columns,
295                constraints,
296            },
297        }
298    }
299}
300
301impl From<NaturalKey> for WireNaturalKey {
302    fn from(value: NaturalKey) -> Self {
303        Self {
304            column: value.column,
305            value: value.value,
306        }
307    }
308}
309
310impl From<WireNaturalKey> for NaturalKey {
311    fn from(value: WireNaturalKey) -> Self {
312        Self {
313            column: value.column,
314            value: value.value,
315        }
316    }
317}
318
319impl From<ApplyResult> for WireApplyResult {
320    fn from(value: ApplyResult) -> Self {
321        Self {
322            applied_rows: value.applied_rows,
323            skipped_rows: value.skipped_rows,
324            conflicts: value.conflicts.into_iter().map(Into::into).collect(),
325            new_lsn: value.new_lsn,
326        }
327    }
328}
329
330impl From<WireApplyResult> for ApplyResult {
331    fn from(value: WireApplyResult) -> Self {
332        Self {
333            applied_rows: value.applied_rows,
334            skipped_rows: value.skipped_rows,
335            conflicts: value.conflicts.into_iter().map(Into::into).collect(),
336            new_lsn: value.new_lsn,
337        }
338    }
339}
340
341impl From<Conflict> for WireConflict {
342    fn from(value: Conflict) -> Self {
343        Self {
344            natural_key: value.natural_key.into(),
345            resolution: format!("{:?}", value.resolution),
346            reason: value.reason,
347        }
348    }
349}
350
351impl From<WireConflict> for Conflict {
352    fn from(value: WireConflict) -> Self {
353        let resolution = match value.resolution.as_str() {
354            "InsertIfNotExists" => contextdb_engine::sync_types::ConflictPolicy::InsertIfNotExists,
355            "ServerWins" => contextdb_engine::sync_types::ConflictPolicy::ServerWins,
356            "EdgeWins" => contextdb_engine::sync_types::ConflictPolicy::EdgeWins,
357            _ => contextdb_engine::sync_types::ConflictPolicy::LatestWins,
358        };
359        Self {
360            natural_key: value.natural_key.into(),
361            resolution,
362            reason: value.reason,
363        }
364    }
365}