1use crate::error::SyncError;
2use contextdb_core::Value;
3use contextdb_engine::sync_types::{
4 ApplyResult, ChangeSet, Conflict, DdlChange, EdgeChange, NaturalKey, RowChange, VectorChange,
5};
6use serde::{Deserialize, Serialize};
7use std::collections::HashMap;
8
9const PROTOCOL_VERSION: u8 = 1;
10
11#[derive(Debug, Clone, Serialize, Deserialize)]
12pub struct Envelope {
13 pub version: u8,
14 pub message_type: MessageType,
15 pub payload: Vec<u8>,
16}
17
18#[derive(Debug, Clone, Serialize, Deserialize)]
19pub enum MessageType {
20 PushRequest,
21 PushResponse,
22 PullRequest,
23 PullResponse,
24 Chunk,
25 ChunkAck,
26}
27
28#[derive(Debug, Clone, Serialize, Deserialize)]
29pub struct PushRequest {
30 pub changeset: WireChangeSet,
31}
32
33#[derive(Debug, Clone, Serialize, Deserialize)]
34pub struct PushResponse {
35 pub result: Option<WireApplyResult>,
36 pub error: Option<String>,
37}
38
39#[derive(Debug, Clone, Serialize, Deserialize)]
40pub struct PullRequest {
41 pub since_lsn: u64,
42 pub max_entries: Option<u32>,
43}
44
45#[derive(Debug, Clone, Serialize, Deserialize)]
46pub struct PullResponse {
47 pub changeset: WireChangeSet,
48 pub has_more: bool,
49 pub cursor: Option<u64>,
50}
51
52#[derive(Debug, Clone, Serialize, Deserialize)]
53pub struct ChunkMessage {
54 pub chunk_id: uuid::Uuid,
55 pub sequence: u32,
56 pub total_chunks: u32,
57 #[serde(with = "serde_bytes")]
58 pub payload: Vec<u8>,
59}
60
61#[derive(Debug, Clone, Serialize, Deserialize)]
62pub struct ChunkAck {
63 pub chunk_id: uuid::Uuid,
64 pub total_chunks: u32,
65 pub reply_inbox: String,
66}
67
68#[derive(Debug, Clone, Default, Serialize, Deserialize)]
69pub struct WireChangeSet {
70 pub rows: Vec<WireRowChange>,
71 pub edges: Vec<WireEdgeChange>,
72 pub vectors: Vec<WireVectorChange>,
73 pub ddl: Vec<WireDdlChange>,
74}
75
76#[derive(Debug, Clone, Serialize, Deserialize)]
77pub struct WireRowChange {
78 pub table: String,
79 pub natural_key: WireNaturalKey,
80 pub values: HashMap<String, Value>,
81 #[serde(default)]
82 pub deleted: bool,
83 pub lsn: u64,
84}
85
86#[derive(Debug, Clone, Serialize, Deserialize)]
87pub struct WireEdgeChange {
88 pub source: uuid::Uuid,
89 pub target: uuid::Uuid,
90 pub edge_type: String,
91 pub properties: HashMap<String, Value>,
92 pub lsn: u64,
93}
94
95#[derive(Debug, Clone, Serialize, Deserialize)]
96pub struct WireVectorChange {
97 pub row_id: u64,
98 pub vector: Vec<f32>,
99 pub lsn: u64,
100}
101
102#[derive(Debug, Clone, Serialize, Deserialize)]
103pub enum WireDdlChange {
104 CreateTable {
105 name: String,
106 columns: Vec<(String, String)>,
107 constraints: Vec<String>,
108 },
109 DropTable {
110 name: String,
111 },
112 AlterTable {
113 name: String,
114 columns: Vec<(String, String)>,
115 constraints: Vec<String>,
116 },
117}
118
119#[derive(Debug, Clone, Serialize, Deserialize)]
120pub struct WireNaturalKey {
121 pub column: String,
122 pub value: Value,
123}
124
125#[derive(Debug, Clone, Serialize, Deserialize)]
126pub struct WireApplyResult {
127 pub applied_rows: usize,
128 pub skipped_rows: usize,
129 pub conflicts: Vec<WireConflict>,
130 pub new_lsn: u64,
131}
132
133#[derive(Debug, Clone, Serialize, Deserialize)]
134pub struct WireConflict {
135 pub natural_key: WireNaturalKey,
136 pub resolution: String,
137 pub reason: Option<String>,
138}
139
140pub fn encode<T: Serialize>(msg_type: MessageType, msg: &T) -> Result<Vec<u8>, SyncError> {
141 let payload = rmp_serde::to_vec(msg).map_err(|e| SyncError::Serde(e.to_string()))?;
142 let envelope = Envelope {
143 version: PROTOCOL_VERSION,
144 message_type: msg_type,
145 payload,
146 };
147 rmp_serde::to_vec(&envelope).map_err(|e| SyncError::Serde(e.to_string()))
148}
149
150pub fn decode(data: &[u8]) -> Result<Envelope, SyncError> {
151 let envelope: Envelope =
152 rmp_serde::from_slice(data).map_err(|e| SyncError::Serde(e.to_string()))?;
153 if envelope.version > PROTOCOL_VERSION {
154 return Err(SyncError::UnsupportedVersion(envelope.version));
155 }
156 Ok(envelope)
157}
158
159impl From<ChangeSet> for WireChangeSet {
160 fn from(value: ChangeSet) -> Self {
161 Self {
162 rows: value.rows.into_iter().map(Into::into).collect(),
163 edges: value.edges.into_iter().map(Into::into).collect(),
164 vectors: value.vectors.into_iter().map(Into::into).collect(),
165 ddl: value.ddl.into_iter().map(Into::into).collect(),
166 }
167 }
168}
169
170impl From<WireChangeSet> for ChangeSet {
171 fn from(value: WireChangeSet) -> Self {
172 Self {
173 rows: value.rows.into_iter().map(Into::into).collect(),
174 edges: value.edges.into_iter().map(Into::into).collect(),
175 vectors: value.vectors.into_iter().map(Into::into).collect(),
176 ddl: value.ddl.into_iter().map(Into::into).collect(),
177 }
178 }
179}
180
181impl From<RowChange> for WireRowChange {
182 fn from(value: RowChange) -> Self {
183 Self {
184 table: value.table,
185 natural_key: value.natural_key.into(),
186 values: value.values,
187 deleted: value.deleted,
188 lsn: value.lsn,
189 }
190 }
191}
192
193impl From<WireRowChange> for RowChange {
194 fn from(value: WireRowChange) -> Self {
195 Self {
196 table: value.table,
197 natural_key: value.natural_key.into(),
198 values: value.values,
199 deleted: value.deleted,
200 lsn: value.lsn,
201 }
202 }
203}
204
205impl From<EdgeChange> for WireEdgeChange {
206 fn from(value: EdgeChange) -> Self {
207 Self {
208 source: value.source,
209 target: value.target,
210 edge_type: value.edge_type,
211 properties: value.properties,
212 lsn: value.lsn,
213 }
214 }
215}
216
217impl From<WireEdgeChange> for EdgeChange {
218 fn from(value: WireEdgeChange) -> Self {
219 Self {
220 source: value.source,
221 target: value.target,
222 edge_type: value.edge_type,
223 properties: value.properties,
224 lsn: value.lsn,
225 }
226 }
227}
228
229impl From<VectorChange> for WireVectorChange {
230 fn from(value: VectorChange) -> Self {
231 Self {
232 row_id: value.row_id,
233 vector: value.vector,
234 lsn: value.lsn,
235 }
236 }
237}
238
239impl From<WireVectorChange> for VectorChange {
240 fn from(value: WireVectorChange) -> Self {
241 Self {
242 row_id: value.row_id,
243 vector: value.vector,
244 lsn: value.lsn,
245 }
246 }
247}
248
249impl From<DdlChange> for WireDdlChange {
250 fn from(value: DdlChange) -> Self {
251 match value {
252 DdlChange::CreateTable {
253 name,
254 columns,
255 constraints,
256 } => Self::CreateTable {
257 name,
258 columns,
259 constraints,
260 },
261 DdlChange::DropTable { name } => Self::DropTable { name },
262 DdlChange::AlterTable {
263 name,
264 columns,
265 constraints,
266 } => Self::AlterTable {
267 name,
268 columns,
269 constraints,
270 },
271 }
272 }
273}
274
275impl From<WireDdlChange> for DdlChange {
276 fn from(value: WireDdlChange) -> Self {
277 match value {
278 WireDdlChange::CreateTable {
279 name,
280 columns,
281 constraints,
282 } => Self::CreateTable {
283 name,
284 columns,
285 constraints,
286 },
287 WireDdlChange::DropTable { name } => Self::DropTable { name },
288 WireDdlChange::AlterTable {
289 name,
290 columns,
291 constraints,
292 } => Self::AlterTable {
293 name,
294 columns,
295 constraints,
296 },
297 }
298 }
299}
300
301impl From<NaturalKey> for WireNaturalKey {
302 fn from(value: NaturalKey) -> Self {
303 Self {
304 column: value.column,
305 value: value.value,
306 }
307 }
308}
309
310impl From<WireNaturalKey> for NaturalKey {
311 fn from(value: WireNaturalKey) -> Self {
312 Self {
313 column: value.column,
314 value: value.value,
315 }
316 }
317}
318
319impl From<ApplyResult> for WireApplyResult {
320 fn from(value: ApplyResult) -> Self {
321 Self {
322 applied_rows: value.applied_rows,
323 skipped_rows: value.skipped_rows,
324 conflicts: value.conflicts.into_iter().map(Into::into).collect(),
325 new_lsn: value.new_lsn,
326 }
327 }
328}
329
330impl From<WireApplyResult> for ApplyResult {
331 fn from(value: WireApplyResult) -> Self {
332 Self {
333 applied_rows: value.applied_rows,
334 skipped_rows: value.skipped_rows,
335 conflicts: value.conflicts.into_iter().map(Into::into).collect(),
336 new_lsn: value.new_lsn,
337 }
338 }
339}
340
341impl From<Conflict> for WireConflict {
342 fn from(value: Conflict) -> Self {
343 Self {
344 natural_key: value.natural_key.into(),
345 resolution: format!("{:?}", value.resolution),
346 reason: value.reason,
347 }
348 }
349}
350
351impl From<WireConflict> for Conflict {
352 fn from(value: WireConflict) -> Self {
353 let resolution = match value.resolution.as_str() {
354 "InsertIfNotExists" => contextdb_engine::sync_types::ConflictPolicy::InsertIfNotExists,
355 "ServerWins" => contextdb_engine::sync_types::ConflictPolicy::ServerWins,
356 "EdgeWins" => contextdb_engine::sync_types::ConflictPolicy::EdgeWins,
357 _ => contextdb_engine::sync_types::ConflictPolicy::LatestWins,
358 };
359 Self {
360 natural_key: value.natural_key.into(),
361 resolution,
362 reason: value.reason,
363 }
364 }
365}