1use alloc::{
4 collections::{BTreeMap, BTreeSet},
5 vec::Vec,
6};
7
8use crate::{
9 build::EdgeRow,
10 catalog::NodeKey,
11 error::{BuildError, PostgresGraphError, SyncError},
12 overlay::{OverlayEdge, OverlayState},
13};
14
15#[derive(Clone, Copy, Debug, PartialEq, Eq)]
17pub enum SyncAction {
18 InsertEdge {
20 source: u32,
22 target: u32,
24 },
25 RemoveOverlayEdge {
27 source: u32,
29 target: u32,
31 },
32 DeleteEdge {
34 edge_id: u32,
36 },
37 DeleteNode {
39 node_id: u32,
41 },
42 TruncateOverlays,
44}
45
46impl SyncAction {
47 pub(crate) fn apply_to(self, overlay: &mut OverlayState) {
53 match self {
54 Self::InsertEdge { source, target } => {
55 overlay.push_edge(OverlayEdge { source, target });
56 }
57 Self::RemoveOverlayEdge { source, target } => {
58 overlay.remove_edge(source, target);
59 }
60 Self::DeleteEdge { edge_id } => {
61 overlay.tombstoned_edges.insert(edge_id);
62 }
63 Self::DeleteNode { node_id } => {
64 overlay.tombstoned_nodes.insert(node_id);
65 }
66 Self::TruncateOverlays => overlay.clear(),
67 }
68 }
69}
70
71#[derive(Clone, Copy, Debug, PartialEq, Eq)]
73pub enum SyncActionWire {
74 InsertEdge {
76 source: NodeKey,
78 target: NodeKey,
80 },
81 RemoveOverlayEdge {
83 source: NodeKey,
85 target: NodeKey,
87 },
88 DeleteEdge {
90 edge_id: u32,
92 },
93 DeleteNode {
95 node_id: u32,
97 },
98 TruncateOverlays,
100}
101
102#[derive(Clone, Copy, Debug, PartialEq, Eq)]
104#[repr(i16)]
105pub enum SyncActionCodec {
106 InsertEdge = 1,
108 DeleteEdge = 2,
110 DeleteNode = 3,
112 TruncateOverlays = 4,
114 RemoveOverlayEdge = 5,
116}
117
118impl SyncActionCodec {
119 pub fn decode_wire(
129 action_type: i16,
130 arg0: Option<i64>,
131 arg1: Option<i64>,
132 ) -> Result<SyncActionWire, SyncError> {
133 match action_type {
134 1 => {
135 let source = decode_node_key(arg0, action_type)?;
136 let target = decode_node_key(arg1, action_type)?;
137 Ok(SyncActionWire::InsertEdge { source, target })
138 }
139 2 => {
140 let edge_id =
141 u32::try_from(arg0.ok_or(SyncError::InvalidActionArgs { action_type })?)
142 .map_err(|_| SyncError::InvalidActionArgs { action_type })?;
143 Ok(SyncActionWire::DeleteEdge { edge_id })
144 }
145 3 => {
146 let node_id =
147 u32::try_from(arg0.ok_or(SyncError::InvalidActionArgs { action_type })?)
148 .map_err(|_| SyncError::InvalidActionArgs { action_type })?;
149 Ok(SyncActionWire::DeleteNode { node_id })
150 }
151 4 => Ok(SyncActionWire::TruncateOverlays),
152 5 => {
153 let source = decode_node_key(arg0, action_type)?;
154 let target = decode_node_key(arg1, action_type)?;
155 Ok(SyncActionWire::RemoveOverlayEdge { source, target })
156 }
157 _ => Err(SyncError::InvalidActionType { action_type }),
158 }
159 }
160
161 pub fn decode(
174 action_type: i16,
175 arg0: Option<i64>,
176 arg1: Option<i64>,
177 node_map: &BTreeMap<NodeKey, u32>,
178 ) -> Result<SyncAction, SyncError> {
179 resolve_sync_action(Self::decode_wire(action_type, arg0, arg1)?, node_map)
180 }
181}
182
183pub fn resolve_sync_action(
194 action: SyncActionWire,
195 node_map: &BTreeMap<NodeKey, u32>,
196) -> Result<SyncAction, SyncError> {
197 match action {
198 SyncActionWire::InsertEdge { source, target } => Ok(SyncAction::InsertEdge {
199 source: lookup_dense(source, node_map)?,
200 target: lookup_dense(target, node_map)?,
201 }),
202 SyncActionWire::RemoveOverlayEdge { source, target } => Ok(SyncAction::RemoveOverlayEdge {
203 source: lookup_dense(source, node_map)?,
204 target: lookup_dense(target, node_map)?,
205 }),
206 SyncActionWire::DeleteEdge { edge_id } => Ok(SyncAction::DeleteEdge { edge_id }),
207 SyncActionWire::DeleteNode { node_id } => Ok(SyncAction::DeleteNode { node_id }),
208 SyncActionWire::TruncateOverlays => Ok(SyncAction::TruncateOverlays),
209 }
210}
211
212pub type RawSyncRow = (u64, i16, Option<i64>, Option<i64>);
214
215pub fn resolve_sync_rows(
226 edges: &[EdgeRow],
227 raw_rows: &[RawSyncRow],
228) -> Result<Vec<SyncRow>, PostgresGraphError> {
229 let node_map = dense_node_map_for_sync_resolution(edges, raw_rows)?;
230 let mut rows = Vec::with_capacity(raw_rows.len());
231 for (sequence, action_type, arg0, arg1) in raw_rows {
232 let action = SyncActionCodec::decode(*action_type, *arg0, *arg1, &node_map)?;
233 rows.push(SyncRow {
234 sequence: *sequence,
235 action,
236 });
237 }
238 Ok(rows)
239}
240
241pub fn dense_node_map_for_sync_resolution(
254 edges: &[EdgeRow],
255 raw_rows: &[RawSyncRow],
256) -> Result<BTreeMap<NodeKey, u32>, PostgresGraphError> {
257 let mut keys = BTreeSet::new();
258 for edge in edges {
259 keys.insert(edge.source);
260 keys.insert(edge.target);
261 }
262 for (_, action_type, arg0, arg1) in raw_rows {
263 if matches!(*action_type, 1 | 5) {
264 if let Some(key) = node_key_from_i64(*arg0) {
265 keys.insert(key);
266 }
267 if let Some(key) = node_key_from_i64(*arg1) {
268 keys.insert(key);
269 }
270 }
271 }
272 let mut map = BTreeMap::new();
273 for (index, key) in keys.into_iter().enumerate() {
274 let dense = u32::try_from(index).map_err(|_| BuildError::NodeCountOverflow)?;
275 map.insert(key, dense);
276 }
277 Ok(map)
278}
279
280fn node_key_from_i64(value: Option<i64>) -> Option<NodeKey> {
282 let value = value?;
283 if value.is_negative() {
284 return None;
285 }
286 u64::try_from(value).ok().map(NodeKey)
287}
288
289fn decode_node_key(value: Option<i64>, action_type: i16) -> Result<NodeKey, SyncError> {
291 let value = value.ok_or(SyncError::InvalidActionArgs { action_type })?;
292 if value.is_negative() {
293 return Err(SyncError::InvalidActionArgs { action_type });
294 }
295 let key = u64::try_from(value).map_err(|_| SyncError::InvalidActionArgs { action_type })?;
296 Ok(NodeKey(key))
297}
298
299fn lookup_dense(key: NodeKey, node_map: &BTreeMap<NodeKey, u32>) -> Result<u32, SyncError> {
301 node_map
302 .get(&key)
303 .copied()
304 .ok_or(SyncError::UnknownNodeKey { key })
305}
306
307#[derive(Clone, Copy, Debug, PartialEq, Eq)]
309pub struct SyncRow {
310 pub sequence: u64,
312 pub action: SyncAction,
314}
315
316impl SyncRow {
317 pub fn apply_in_order(
327 rows: &[Self],
328 overlay: &mut OverlayState,
329 ) -> Result<usize, PostgresGraphError> {
330 if rows.is_empty() {
331 return Ok(0);
332 }
333 let mut ordered = rows.to_vec();
334 ordered.sort_by_key(|row| row.sequence);
335 let mut applied = 0_usize;
336 let mut last_sequence = None;
337 for row in ordered {
338 if let Some(previous) = last_sequence
339 && row.sequence <= previous
340 {
341 return Err(SyncError::NonMonotonicSequence {
342 sequence: row.sequence,
343 previous,
344 }
345 .into());
346 }
347 last_sequence = Some(row.sequence);
348 row.action.apply_to(overlay);
349 applied += 1;
350 }
351 Ok(applied)
352 }
353}
354
355#[derive(Clone, Copy, Debug, PartialEq, Eq)]
357pub struct SyncHealth {
358 pub overlay_edges: usize,
360 pub tombstoned_edges: usize,
362 pub tombstoned_nodes: usize,
364}