Skip to main content

oxgraph_postgres/
sync.rs

1//! Sync log replay into overlay buffers.
2
3use alloc::{collections::BTreeMap, vec::Vec};
4
5use crate::{
6    build::EdgeRow,
7    catalog::NodeKey,
8    error::{PostgresGraphError, SyncError},
9    overlay::{OverlayEdge, OverlayState},
10};
11
12/// One durable sync row interpreted by the library.
13#[derive(Clone, Copy, Debug, PartialEq, Eq)]
14pub enum SyncAction {
15    /// Insert a new overlay edge between dense node ids.
16    InsertEdge {
17        /// Source node id.
18        source: u32,
19        /// Target node id.
20        target: u32,
21    },
22    /// Remove a previously inserted overlay edge between dense node ids.
23    RemoveOverlayEdge {
24        /// Source node id.
25        source: u32,
26        /// Target node id.
27        target: u32,
28    },
29    /// Tombstone a base edge id.
30    DeleteEdge {
31        /// Base CSR edge id.
32        edge_id: u32,
33    },
34    /// Tombstone a node id from query results.
35    DeleteNode {
36        /// Dense node id.
37        node_id: u32,
38    },
39    /// Truncate all overlays (maintenance prelude).
40    TruncateOverlays,
41}
42
43impl SyncAction {
44    /// Applies this action to `overlay`.
45    ///
46    /// # Performance
47    ///
48    /// This method is `O(log t)` for indexed overlay mutations.
49    pub(crate) fn apply_to(self, overlay: &mut OverlayState) {
50        match self {
51            Self::InsertEdge { source, target } => {
52                overlay.push_edge(OverlayEdge { source, target });
53            }
54            Self::RemoveOverlayEdge { source, target } => {
55                overlay.remove_edge(source, target);
56            }
57            Self::DeleteEdge { edge_id } => {
58                overlay.tombstone_edge(edge_id);
59            }
60            Self::DeleteNode { node_id } => {
61                overlay.tombstone_node(node_id);
62            }
63            Self::TruncateOverlays => overlay.clear(),
64        }
65    }
66}
67
68/// Persisted sync-log action before dense-node resolution.
69#[derive(Clone, Copy, Debug, PartialEq, Eq)]
70pub enum SyncActionWire {
71    /// Insert an overlay edge keyed by registered `(table_id, primary_key)` pairs.
72    InsertEdge {
73        /// Source node key from the sync log.
74        source: NodeKey,
75        /// Target node key from the sync log.
76        target: NodeKey,
77    },
78    /// Remove an overlay edge keyed by registered node keys.
79    RemoveOverlayEdge {
80        /// Source node key from the sync log.
81        source: NodeKey,
82        /// Target node key from the sync log.
83        target: NodeKey,
84    },
85    /// Tombstone a base CSR edge id (already dense).
86    DeleteEdge {
87        /// Base CSR edge id.
88        edge_id: u32,
89    },
90    /// Tombstone a dense node id (already dense).
91    DeleteNode {
92        /// Dense node id.
93        node_id: u32,
94    },
95    /// Truncate all overlay buffers.
96    TruncateOverlays,
97}
98
99/// Persisted sync-log action type ids shared with trigger SQL.
100#[derive(Clone, Copy, Debug, PartialEq, Eq)]
101#[repr(i16)]
102pub enum SyncActionCodec {
103    /// [`SyncActionWire::InsertEdge`]
104    InsertEdge = 1,
105    /// [`SyncActionWire::DeleteEdge`]
106    DeleteEdge = 2,
107    /// [`SyncActionWire::DeleteNode`]
108    DeleteNode = 3,
109    /// [`SyncActionWire::TruncateOverlays`]
110    TruncateOverlays = 4,
111    /// [`SyncActionWire::RemoveOverlayEdge`]
112    RemoveOverlayEdge = 5,
113}
114
115impl SyncActionCodec {
116    /// Returns whether this action carries registered node-key arguments.
117    ///
118    /// Insert and remove-overlay actions reference source/target node keys;
119    /// the others carry dense ids or nothing. Both [`Self::decode_wire`] and the
120    /// dense-map key harvest consult this so the keyed-action set lives in one
121    /// place instead of being a magic literal pair.
122    ///
123    /// # Performance
124    ///
125    /// This method is `O(1)`.
126    #[must_use]
127    pub const fn carries_node_keys(self) -> bool {
128        matches!(self, Self::InsertEdge | Self::RemoveOverlayEdge)
129    }
130
131    /// Decodes a persisted sync row into a wire action.
132    ///
133    /// # Errors
134    ///
135    /// Returns [`SyncError`] when the action type or arguments are invalid.
136    ///
137    /// # Performance
138    ///
139    /// This method is `O(1)`.
140    pub fn decode_wire(
141        action_type: i16,
142        arg0: Option<i64>,
143        arg1: Option<i64>,
144    ) -> Result<SyncActionWire, SyncError> {
145        match Self::try_from(action_type)? {
146            Self::InsertEdge => {
147                let source = decode_node_key(arg0, action_type)?;
148                let target = decode_node_key(arg1, action_type)?;
149                Ok(SyncActionWire::InsertEdge { source, target })
150            }
151            Self::DeleteEdge => {
152                let edge_id =
153                    u32::try_from(arg0.ok_or(SyncError::InvalidActionArgs { action_type })?)
154                        .map_err(|_| SyncError::InvalidActionArgs { action_type })?;
155                Ok(SyncActionWire::DeleteEdge { edge_id })
156            }
157            Self::DeleteNode => {
158                let node_id =
159                    u32::try_from(arg0.ok_or(SyncError::InvalidActionArgs { action_type })?)
160                        .map_err(|_| SyncError::InvalidActionArgs { action_type })?;
161                Ok(SyncActionWire::DeleteNode { node_id })
162            }
163            Self::TruncateOverlays => Ok(SyncActionWire::TruncateOverlays),
164            Self::RemoveOverlayEdge => {
165                let source = decode_node_key(arg0, action_type)?;
166                let target = decode_node_key(arg1, action_type)?;
167                Ok(SyncActionWire::RemoveOverlayEdge { source, target })
168            }
169        }
170    }
171
172    /// Decodes a persisted sync row into a dense [`SyncAction`].
173    ///
174    /// Keyed insert/remove actions require `node_map` built from the current
175    /// relational edge scan.
176    ///
177    /// # Errors
178    ///
179    /// Returns [`SyncError`] when decoding or dense resolution fails.
180    ///
181    /// # Performance
182    ///
183    /// This method is `O(1)` per row excluding map lookup.
184    pub fn decode(
185        action_type: i16,
186        arg0: Option<i64>,
187        arg1: Option<i64>,
188        node_map: &BTreeMap<NodeKey, u32>,
189    ) -> Result<SyncAction, SyncError> {
190        resolve_sync_action(Self::decode_wire(action_type, arg0, arg1)?, node_map)
191    }
192}
193
194impl TryFrom<i16> for SyncActionCodec {
195    type Error = SyncError;
196
197    /// Maps a persisted action-type id to its codec variant.
198    ///
199    /// # Errors
200    ///
201    /// Returns [`SyncError::InvalidActionType`] for an unknown id.
202    ///
203    /// # Performance
204    ///
205    /// This function is `O(1)`.
206    fn try_from(value: i16) -> Result<Self, Self::Error> {
207        match value {
208            1 => Ok(Self::InsertEdge),
209            2 => Ok(Self::DeleteEdge),
210            3 => Ok(Self::DeleteNode),
211            4 => Ok(Self::TruncateOverlays),
212            5 => Ok(Self::RemoveOverlayEdge),
213            action_type => Err(SyncError::InvalidActionType { action_type }),
214        }
215    }
216}
217
218/// Resolves one wire sync action into dense engine coordinates.
219///
220/// # Errors
221///
222/// Returns [`SyncError::UnknownNodeKey`] when a keyed action references an
223/// unassigned node key.
224///
225/// # Performance
226///
227/// This function is `O(log n)` for keyed actions.
228pub fn resolve_sync_action(
229    action: SyncActionWire,
230    node_map: &BTreeMap<NodeKey, u32>,
231) -> Result<SyncAction, SyncError> {
232    match action {
233        SyncActionWire::InsertEdge { source, target } => Ok(SyncAction::InsertEdge {
234            source: lookup_dense(source, node_map)?,
235            target: lookup_dense(target, node_map)?,
236        }),
237        SyncActionWire::RemoveOverlayEdge { source, target } => Ok(SyncAction::RemoveOverlayEdge {
238            source: lookup_dense(source, node_map)?,
239            target: lookup_dense(target, node_map)?,
240        }),
241        SyncActionWire::DeleteEdge { edge_id } => Ok(SyncAction::DeleteEdge { edge_id }),
242        SyncActionWire::DeleteNode { node_id } => Ok(SyncAction::DeleteNode { node_id }),
243        SyncActionWire::TruncateOverlays => Ok(SyncAction::TruncateOverlays),
244    }
245}
246
247/// Raw sync-log row scanned from SPI before dense resolution.
248pub type RawSyncRow = (u64, i16, Option<i64>, Option<i64>);
249
250/// Resolves persisted sync rows using the current relational edge scan.
251///
252/// # Errors
253///
254/// Returns [`PostgresGraphError::Build`] when dense assignment fails, or
255/// [`PostgresGraphError::Sync`] when a row cannot be decoded or resolved.
256///
257/// # Performance
258///
259/// This function is `O(n log n + m + r log r)` for edge scan size and row count `r`.
260pub fn resolve_sync_rows(
261    edges: &[EdgeRow],
262    raw_rows: &[RawSyncRow],
263) -> Result<Vec<SyncRow>, PostgresGraphError> {
264    let node_map = dense_node_map_for_sync_resolution(edges, raw_rows)?;
265    let mut rows = Vec::with_capacity(raw_rows.len());
266    for (sequence, action_type, arg0, arg1) in raw_rows {
267        let action = SyncActionCodec::decode(*action_type, *arg0, *arg1, &node_map)?;
268        rows.push(SyncRow {
269            sequence: *sequence,
270            action,
271        });
272    }
273    Ok(rows)
274}
275
276/// Builds the dense node assignment used when replaying sync rows.
277///
278/// Keys come from the current relational edge scan plus any keyed sync-log
279/// arguments so deletes remain resolvable after base edge rows disappear.
280///
281/// # Errors
282///
283/// Returns [`PostgresGraphError::Build`] when dense assignment fails.
284///
285/// # Performance
286///
287/// This function is `O(n log n + m + r)` for edge count `m` and sync row count `r`.
288pub fn dense_node_map_for_sync_resolution(
289    edges: &[EdgeRow],
290    raw_rows: &[RawSyncRow],
291) -> Result<BTreeMap<NodeKey, u32>, PostgresGraphError> {
292    let mut keys = crate::build::distinct_node_keys(edges);
293    for (_, action_type, arg0, arg1) in raw_rows {
294        if SyncActionCodec::try_from(*action_type).is_ok_and(SyncActionCodec::carries_node_keys) {
295            if let Some(key) = node_key_from_i64(*arg0) {
296                keys.insert(key);
297            }
298            if let Some(key) = node_key_from_i64(*arg1) {
299                keys.insert(key);
300            }
301        }
302    }
303    Ok(crate::build::dense_node_map_from_keys(keys)?)
304}
305
306/// Parses a non-negative sync-log node key when present.
307fn node_key_from_i64(value: Option<i64>) -> Option<NodeKey> {
308    let value = value?;
309    if value.is_negative() {
310        return None;
311    }
312    u64::try_from(value).ok().map(NodeKey)
313}
314
315/// Decodes a persisted node-key argument from the sync log.
316fn decode_node_key(value: Option<i64>, action_type: i16) -> Result<NodeKey, SyncError> {
317    let value = value.ok_or(SyncError::InvalidActionArgs { action_type })?;
318    if value.is_negative() {
319        return Err(SyncError::InvalidActionArgs { action_type });
320    }
321    let key = u64::try_from(value).map_err(|_| SyncError::InvalidActionArgs { action_type })?;
322    Ok(NodeKey(key))
323}
324
325/// Maps a registered node key to its dense id using the current edge scan.
326fn lookup_dense(key: NodeKey, node_map: &BTreeMap<NodeKey, u32>) -> Result<u32, SyncError> {
327    node_map
328        .get(&key)
329        .copied()
330        .ok_or(SyncError::UnknownNodeKey { key })
331}
332
333/// One sync log row with monotonic sequence metadata.
334#[derive(Clone, Copy, Debug, PartialEq, Eq)]
335pub struct SyncRow {
336    /// Monotonic sequence number assigned by the extension.
337    pub sequence: u64,
338    /// Action to replay.
339    pub action: SyncAction,
340}
341
342impl SyncRow {
343    /// Applies caller-ordered rows to `overlay`, rejecting any that are not in
344    /// strictly increasing sequence order.
345    ///
346    /// Rows must arrive already sorted by `sequence`; this method validates that
347    /// contract rather than silently normalizing it, so genuinely out-of-order
348    /// input (e.g. `[5, 3, 1]`) is rejected, not just duplicates. The extension
349    /// emits rows in sequence order, so this is a cheap defensive check.
350    ///
351    /// # Errors
352    ///
353    /// Returns [`PostgresGraphError::Sync`] when a row's sequence is not strictly
354    /// greater than its predecessor.
355    ///
356    /// # Performance
357    ///
358    /// This method is `O(r + a)` where `r` is row count and `a` is applied actions.
359    pub fn apply_in_order(
360        rows: &[Self],
361        overlay: &mut OverlayState,
362    ) -> Result<usize, PostgresGraphError> {
363        let mut applied = 0_usize;
364        let mut last_sequence = None;
365        for row in rows {
366            if let Some(previous) = last_sequence
367                && row.sequence <= previous
368            {
369                return Err(SyncError::NonMonotonicSequence {
370                    sequence: row.sequence,
371                    previous,
372                }
373                .into());
374            }
375            last_sequence = Some(row.sequence);
376            row.action.apply_to(overlay);
377            applied += 1;
378        }
379        Ok(applied)
380    }
381}
382
383/// Returns a coarse health summary for sync surfaces.
384#[derive(Clone, Copy, Debug, PartialEq, Eq)]
385pub struct SyncHealth {
386    /// Overlay edge insertions currently buffered.
387    pub overlay_edges: usize,
388    /// Tombstoned base edges.
389    pub tombstoned_edges: usize,
390    /// Tombstoned nodes.
391    pub tombstoned_nodes: usize,
392}