1use alloc::{collections::BTreeMap, vec::Vec};
4
5use crate::{
6 build::EdgeRow,
7 catalog::NodeKey,
8 error::{PostgresGraphError, SyncError},
9 overlay::{OverlayEdge, OverlayState},
10};
11
12#[derive(Clone, Copy, Debug, PartialEq, Eq)]
14pub enum SyncAction {
15 InsertEdge {
17 source: u32,
19 target: u32,
21 },
22 RemoveOverlayEdge {
24 source: u32,
26 target: u32,
28 },
29 DeleteEdge {
31 edge_id: u32,
33 },
34 DeleteNode {
36 node_id: u32,
38 },
39 TruncateOverlays,
41}
42
43impl SyncAction {
44 pub(crate) fn apply_to(self, overlay: &mut OverlayState) {
50 match self {
51 Self::InsertEdge { source, target } => {
52 overlay.push_edge(OverlayEdge { source, target });
53 }
54 Self::RemoveOverlayEdge { source, target } => {
55 overlay.remove_edge(source, target);
56 }
57 Self::DeleteEdge { edge_id } => {
58 overlay.tombstone_edge(edge_id);
59 }
60 Self::DeleteNode { node_id } => {
61 overlay.tombstone_node(node_id);
62 }
63 Self::TruncateOverlays => overlay.clear(),
64 }
65 }
66}
67
68#[derive(Clone, Copy, Debug, PartialEq, Eq)]
70pub enum SyncActionWire {
71 InsertEdge {
73 source: NodeKey,
75 target: NodeKey,
77 },
78 RemoveOverlayEdge {
80 source: NodeKey,
82 target: NodeKey,
84 },
85 DeleteEdge {
87 edge_id: u32,
89 },
90 DeleteNode {
92 node_id: u32,
94 },
95 TruncateOverlays,
97}
98
99#[derive(Clone, Copy, Debug, PartialEq, Eq)]
101#[repr(i16)]
102pub enum SyncActionCodec {
103 InsertEdge = 1,
105 DeleteEdge = 2,
107 DeleteNode = 3,
109 TruncateOverlays = 4,
111 RemoveOverlayEdge = 5,
113}
114
115impl SyncActionCodec {
116 #[must_use]
127 pub const fn carries_node_keys(self) -> bool {
128 matches!(self, Self::InsertEdge | Self::RemoveOverlayEdge)
129 }
130
131 pub fn decode_wire(
141 action_type: i16,
142 arg0: Option<i64>,
143 arg1: Option<i64>,
144 ) -> Result<SyncActionWire, SyncError> {
145 match Self::try_from(action_type)? {
146 Self::InsertEdge => {
147 let source = decode_node_key(arg0, action_type)?;
148 let target = decode_node_key(arg1, action_type)?;
149 Ok(SyncActionWire::InsertEdge { source, target })
150 }
151 Self::DeleteEdge => {
152 let edge_id =
153 u32::try_from(arg0.ok_or(SyncError::InvalidActionArgs { action_type })?)
154 .map_err(|_| SyncError::InvalidActionArgs { action_type })?;
155 Ok(SyncActionWire::DeleteEdge { edge_id })
156 }
157 Self::DeleteNode => {
158 let node_id =
159 u32::try_from(arg0.ok_or(SyncError::InvalidActionArgs { action_type })?)
160 .map_err(|_| SyncError::InvalidActionArgs { action_type })?;
161 Ok(SyncActionWire::DeleteNode { node_id })
162 }
163 Self::TruncateOverlays => Ok(SyncActionWire::TruncateOverlays),
164 Self::RemoveOverlayEdge => {
165 let source = decode_node_key(arg0, action_type)?;
166 let target = decode_node_key(arg1, action_type)?;
167 Ok(SyncActionWire::RemoveOverlayEdge { source, target })
168 }
169 }
170 }
171
172 pub fn decode(
185 action_type: i16,
186 arg0: Option<i64>,
187 arg1: Option<i64>,
188 node_map: &BTreeMap<NodeKey, u32>,
189 ) -> Result<SyncAction, SyncError> {
190 resolve_sync_action(Self::decode_wire(action_type, arg0, arg1)?, node_map)
191 }
192}
193
194impl TryFrom<i16> for SyncActionCodec {
195 type Error = SyncError;
196
197 fn try_from(value: i16) -> Result<Self, Self::Error> {
207 match value {
208 1 => Ok(Self::InsertEdge),
209 2 => Ok(Self::DeleteEdge),
210 3 => Ok(Self::DeleteNode),
211 4 => Ok(Self::TruncateOverlays),
212 5 => Ok(Self::RemoveOverlayEdge),
213 action_type => Err(SyncError::InvalidActionType { action_type }),
214 }
215 }
216}
217
218pub fn resolve_sync_action(
229 action: SyncActionWire,
230 node_map: &BTreeMap<NodeKey, u32>,
231) -> Result<SyncAction, SyncError> {
232 match action {
233 SyncActionWire::InsertEdge { source, target } => Ok(SyncAction::InsertEdge {
234 source: lookup_dense(source, node_map)?,
235 target: lookup_dense(target, node_map)?,
236 }),
237 SyncActionWire::RemoveOverlayEdge { source, target } => Ok(SyncAction::RemoveOverlayEdge {
238 source: lookup_dense(source, node_map)?,
239 target: lookup_dense(target, node_map)?,
240 }),
241 SyncActionWire::DeleteEdge { edge_id } => Ok(SyncAction::DeleteEdge { edge_id }),
242 SyncActionWire::DeleteNode { node_id } => Ok(SyncAction::DeleteNode { node_id }),
243 SyncActionWire::TruncateOverlays => Ok(SyncAction::TruncateOverlays),
244 }
245}
246
247pub type RawSyncRow = (u64, i16, Option<i64>, Option<i64>);
249
250pub fn resolve_sync_rows(
261 edges: &[EdgeRow],
262 raw_rows: &[RawSyncRow],
263) -> Result<Vec<SyncRow>, PostgresGraphError> {
264 let node_map = dense_node_map_for_sync_resolution(edges, raw_rows)?;
265 let mut rows = Vec::with_capacity(raw_rows.len());
266 for (sequence, action_type, arg0, arg1) in raw_rows {
267 let action = SyncActionCodec::decode(*action_type, *arg0, *arg1, &node_map)?;
268 rows.push(SyncRow {
269 sequence: *sequence,
270 action,
271 });
272 }
273 Ok(rows)
274}
275
276pub fn dense_node_map_for_sync_resolution(
289 edges: &[EdgeRow],
290 raw_rows: &[RawSyncRow],
291) -> Result<BTreeMap<NodeKey, u32>, PostgresGraphError> {
292 let mut keys = crate::build::distinct_node_keys(edges);
293 for (_, action_type, arg0, arg1) in raw_rows {
294 if SyncActionCodec::try_from(*action_type).is_ok_and(SyncActionCodec::carries_node_keys) {
295 if let Some(key) = node_key_from_i64(*arg0) {
296 keys.insert(key);
297 }
298 if let Some(key) = node_key_from_i64(*arg1) {
299 keys.insert(key);
300 }
301 }
302 }
303 Ok(crate::build::dense_node_map_from_keys(keys)?)
304}
305
306fn node_key_from_i64(value: Option<i64>) -> Option<NodeKey> {
308 let value = value?;
309 if value.is_negative() {
310 return None;
311 }
312 u64::try_from(value).ok().map(NodeKey)
313}
314
315fn decode_node_key(value: Option<i64>, action_type: i16) -> Result<NodeKey, SyncError> {
317 let value = value.ok_or(SyncError::InvalidActionArgs { action_type })?;
318 if value.is_negative() {
319 return Err(SyncError::InvalidActionArgs { action_type });
320 }
321 let key = u64::try_from(value).map_err(|_| SyncError::InvalidActionArgs { action_type })?;
322 Ok(NodeKey(key))
323}
324
325fn lookup_dense(key: NodeKey, node_map: &BTreeMap<NodeKey, u32>) -> Result<u32, SyncError> {
327 node_map
328 .get(&key)
329 .copied()
330 .ok_or(SyncError::UnknownNodeKey { key })
331}
332
333#[derive(Clone, Copy, Debug, PartialEq, Eq)]
335pub struct SyncRow {
336 pub sequence: u64,
338 pub action: SyncAction,
340}
341
342impl SyncRow {
343 pub fn apply_in_order(
360 rows: &[Self],
361 overlay: &mut OverlayState,
362 ) -> Result<usize, PostgresGraphError> {
363 let mut applied = 0_usize;
364 let mut last_sequence = None;
365 for row in rows {
366 if let Some(previous) = last_sequence
367 && row.sequence <= previous
368 {
369 return Err(SyncError::NonMonotonicSequence {
370 sequence: row.sequence,
371 previous,
372 }
373 .into());
374 }
375 last_sequence = Some(row.sequence);
376 row.action.apply_to(overlay);
377 applied += 1;
378 }
379 Ok(applied)
380 }
381}
382
383#[derive(Clone, Copy, Debug, PartialEq, Eq)]
385pub struct SyncHealth {
386 pub overlay_edges: usize,
388 pub tombstoned_edges: usize,
390 pub tombstoned_nodes: usize,
392}