Skip to main content

oxgraph_postgres/
sync.rs

1//! Sync log replay into overlay buffers.
2
3use alloc::{
4    collections::{BTreeMap, BTreeSet},
5    vec::Vec,
6};
7
8use crate::{
9    build::EdgeRow,
10    catalog::NodeKey,
11    error::{BuildError, PostgresGraphError, SyncError},
12    overlay::{OverlayEdge, OverlayState},
13};
14
15/// One durable sync row interpreted by the library.
16#[derive(Clone, Copy, Debug, PartialEq, Eq)]
17pub enum SyncAction {
18    /// Insert a new overlay edge between dense node ids.
19    InsertEdge {
20        /// Source node id.
21        source: u32,
22        /// Target node id.
23        target: u32,
24    },
25    /// Remove a previously inserted overlay edge between dense node ids.
26    RemoveOverlayEdge {
27        /// Source node id.
28        source: u32,
29        /// Target node id.
30        target: u32,
31    },
32    /// Tombstone a base edge id.
33    DeleteEdge {
34        /// Base CSR edge id.
35        edge_id: u32,
36    },
37    /// Tombstone a node id from query results.
38    DeleteNode {
39        /// Dense node id.
40        node_id: u32,
41    },
42    /// Truncate all overlays (maintenance prelude).
43    TruncateOverlays,
44}
45
46impl SyncAction {
47    /// Applies this action to `overlay`.
48    ///
49    /// # Performance
50    ///
51    /// This method is `O(log t)` for indexed overlay mutations.
52    pub(crate) fn apply_to(self, overlay: &mut OverlayState) {
53        match self {
54            Self::InsertEdge { source, target } => {
55                overlay.push_edge(OverlayEdge { source, target });
56            }
57            Self::RemoveOverlayEdge { source, target } => {
58                overlay.remove_edge(source, target);
59            }
60            Self::DeleteEdge { edge_id } => {
61                overlay.tombstoned_edges.insert(edge_id);
62            }
63            Self::DeleteNode { node_id } => {
64                overlay.tombstoned_nodes.insert(node_id);
65            }
66            Self::TruncateOverlays => overlay.clear(),
67        }
68    }
69}
70
71/// Persisted sync-log action before dense-node resolution.
72#[derive(Clone, Copy, Debug, PartialEq, Eq)]
73pub enum SyncActionWire {
74    /// Insert an overlay edge keyed by registered `(table_id, primary_key)` pairs.
75    InsertEdge {
76        /// Source node key from the sync log.
77        source: NodeKey,
78        /// Target node key from the sync log.
79        target: NodeKey,
80    },
81    /// Remove an overlay edge keyed by registered node keys.
82    RemoveOverlayEdge {
83        /// Source node key from the sync log.
84        source: NodeKey,
85        /// Target node key from the sync log.
86        target: NodeKey,
87    },
88    /// Tombstone a base CSR edge id (already dense).
89    DeleteEdge {
90        /// Base CSR edge id.
91        edge_id: u32,
92    },
93    /// Tombstone a dense node id (already dense).
94    DeleteNode {
95        /// Dense node id.
96        node_id: u32,
97    },
98    /// Truncate all overlay buffers.
99    TruncateOverlays,
100}
101
102/// Persisted sync-log action type ids shared with trigger SQL.
103#[derive(Clone, Copy, Debug, PartialEq, Eq)]
104#[repr(i16)]
105pub enum SyncActionCodec {
106    /// [`SyncActionWire::InsertEdge`]
107    InsertEdge = 1,
108    /// [`SyncActionWire::DeleteEdge`]
109    DeleteEdge = 2,
110    /// [`SyncActionWire::DeleteNode`]
111    DeleteNode = 3,
112    /// [`SyncActionWire::TruncateOverlays`]
113    TruncateOverlays = 4,
114    /// [`SyncActionWire::RemoveOverlayEdge`]
115    RemoveOverlayEdge = 5,
116}
117
118impl SyncActionCodec {
119    /// Decodes a persisted sync row into a wire action.
120    ///
121    /// # Errors
122    ///
123    /// Returns [`SyncError`] when the action type or arguments are invalid.
124    ///
125    /// # Performance
126    ///
127    /// This method is `O(1)`.
128    pub fn decode_wire(
129        action_type: i16,
130        arg0: Option<i64>,
131        arg1: Option<i64>,
132    ) -> Result<SyncActionWire, SyncError> {
133        match action_type {
134            1 => {
135                let source = decode_node_key(arg0, action_type)?;
136                let target = decode_node_key(arg1, action_type)?;
137                Ok(SyncActionWire::InsertEdge { source, target })
138            }
139            2 => {
140                let edge_id =
141                    u32::try_from(arg0.ok_or(SyncError::InvalidActionArgs { action_type })?)
142                        .map_err(|_| SyncError::InvalidActionArgs { action_type })?;
143                Ok(SyncActionWire::DeleteEdge { edge_id })
144            }
145            3 => {
146                let node_id =
147                    u32::try_from(arg0.ok_or(SyncError::InvalidActionArgs { action_type })?)
148                        .map_err(|_| SyncError::InvalidActionArgs { action_type })?;
149                Ok(SyncActionWire::DeleteNode { node_id })
150            }
151            4 => Ok(SyncActionWire::TruncateOverlays),
152            5 => {
153                let source = decode_node_key(arg0, action_type)?;
154                let target = decode_node_key(arg1, action_type)?;
155                Ok(SyncActionWire::RemoveOverlayEdge { source, target })
156            }
157            _ => Err(SyncError::InvalidActionType { action_type }),
158        }
159    }
160
161    /// Decodes a persisted sync row into a dense [`SyncAction`].
162    ///
163    /// Keyed insert/remove actions require `node_map` built from the current
164    /// relational edge scan.
165    ///
166    /// # Errors
167    ///
168    /// Returns [`SyncError`] when decoding or dense resolution fails.
169    ///
170    /// # Performance
171    ///
172    /// This method is `O(1)` per row excluding map lookup.
173    pub fn decode(
174        action_type: i16,
175        arg0: Option<i64>,
176        arg1: Option<i64>,
177        node_map: &BTreeMap<NodeKey, u32>,
178    ) -> Result<SyncAction, SyncError> {
179        resolve_sync_action(Self::decode_wire(action_type, arg0, arg1)?, node_map)
180    }
181}
182
183/// Resolves one wire sync action into dense engine coordinates.
184///
185/// # Errors
186///
187/// Returns [`SyncError::UnknownNodeKey`] when a keyed action references an
188/// unassigned node key.
189///
190/// # Performance
191///
192/// This function is `O(log n)` for keyed actions.
193pub fn resolve_sync_action(
194    action: SyncActionWire,
195    node_map: &BTreeMap<NodeKey, u32>,
196) -> Result<SyncAction, SyncError> {
197    match action {
198        SyncActionWire::InsertEdge { source, target } => Ok(SyncAction::InsertEdge {
199            source: lookup_dense(source, node_map)?,
200            target: lookup_dense(target, node_map)?,
201        }),
202        SyncActionWire::RemoveOverlayEdge { source, target } => Ok(SyncAction::RemoveOverlayEdge {
203            source: lookup_dense(source, node_map)?,
204            target: lookup_dense(target, node_map)?,
205        }),
206        SyncActionWire::DeleteEdge { edge_id } => Ok(SyncAction::DeleteEdge { edge_id }),
207        SyncActionWire::DeleteNode { node_id } => Ok(SyncAction::DeleteNode { node_id }),
208        SyncActionWire::TruncateOverlays => Ok(SyncAction::TruncateOverlays),
209    }
210}
211
212/// Raw sync-log row scanned from SPI before dense resolution.
213pub type RawSyncRow = (u64, i16, Option<i64>, Option<i64>);
214
215/// Resolves persisted sync rows using the current relational edge scan.
216///
217/// # Errors
218///
219/// Returns [`PostgresGraphError::Build`] when dense assignment fails, or
220/// [`PostgresGraphError::Sync`] when a row cannot be decoded or resolved.
221///
222/// # Performance
223///
224/// This function is `O(n log n + m + r log r)` for edge scan size and row count `r`.
225pub fn resolve_sync_rows(
226    edges: &[EdgeRow],
227    raw_rows: &[RawSyncRow],
228) -> Result<Vec<SyncRow>, PostgresGraphError> {
229    let node_map = dense_node_map_for_sync_resolution(edges, raw_rows)?;
230    let mut rows = Vec::with_capacity(raw_rows.len());
231    for (sequence, action_type, arg0, arg1) in raw_rows {
232        let action = SyncActionCodec::decode(*action_type, *arg0, *arg1, &node_map)?;
233        rows.push(SyncRow {
234            sequence: *sequence,
235            action,
236        });
237    }
238    Ok(rows)
239}
240
241/// Builds the dense node assignment used when replaying sync rows.
242///
243/// Keys come from the current relational edge scan plus any keyed sync-log
244/// arguments so deletes remain resolvable after base edge rows disappear.
245///
246/// # Errors
247///
248/// Returns [`PostgresGraphError::Build`] when dense assignment fails.
249///
250/// # Performance
251///
252/// This function is `O(n log n + m + r)` for edge count `m` and sync row count `r`.
253pub fn dense_node_map_for_sync_resolution(
254    edges: &[EdgeRow],
255    raw_rows: &[RawSyncRow],
256) -> Result<BTreeMap<NodeKey, u32>, PostgresGraphError> {
257    let mut keys = BTreeSet::new();
258    for edge in edges {
259        keys.insert(edge.source);
260        keys.insert(edge.target);
261    }
262    for (_, action_type, arg0, arg1) in raw_rows {
263        if matches!(*action_type, 1 | 5) {
264            if let Some(key) = node_key_from_i64(*arg0) {
265                keys.insert(key);
266            }
267            if let Some(key) = node_key_from_i64(*arg1) {
268                keys.insert(key);
269            }
270        }
271    }
272    let mut map = BTreeMap::new();
273    for (index, key) in keys.into_iter().enumerate() {
274        let dense = u32::try_from(index).map_err(|_| BuildError::NodeCountOverflow)?;
275        map.insert(key, dense);
276    }
277    Ok(map)
278}
279
280/// Parses a non-negative sync-log node key when present.
281fn node_key_from_i64(value: Option<i64>) -> Option<NodeKey> {
282    let value = value?;
283    if value.is_negative() {
284        return None;
285    }
286    u64::try_from(value).ok().map(NodeKey)
287}
288
289/// Decodes a persisted node-key argument from the sync log.
290fn decode_node_key(value: Option<i64>, action_type: i16) -> Result<NodeKey, SyncError> {
291    let value = value.ok_or(SyncError::InvalidActionArgs { action_type })?;
292    if value.is_negative() {
293        return Err(SyncError::InvalidActionArgs { action_type });
294    }
295    let key = u64::try_from(value).map_err(|_| SyncError::InvalidActionArgs { action_type })?;
296    Ok(NodeKey(key))
297}
298
299/// Maps a registered node key to its dense id using the current edge scan.
300fn lookup_dense(key: NodeKey, node_map: &BTreeMap<NodeKey, u32>) -> Result<u32, SyncError> {
301    node_map
302        .get(&key)
303        .copied()
304        .ok_or(SyncError::UnknownNodeKey { key })
305}
306
307/// One sync log row with monotonic sequence metadata.
308#[derive(Clone, Copy, Debug, PartialEq, Eq)]
309pub struct SyncRow {
310    /// Monotonic sequence number assigned by the extension.
311    pub sequence: u64,
312    /// Action to replay.
313    pub action: SyncAction,
314}
315
316impl SyncRow {
317    /// Applies rows to `overlay` in strictly increasing sequence order.
318    ///
319    /// # Errors
320    ///
321    /// Returns [`PostgresGraphError::Sync`] when rows are out of order.
322    ///
323    /// # Performance
324    ///
325    /// This method is `O(r log r + a)` where `r` is row count and `a` is applied actions.
326    pub fn apply_in_order(
327        rows: &[Self],
328        overlay: &mut OverlayState,
329    ) -> Result<usize, PostgresGraphError> {
330        if rows.is_empty() {
331            return Ok(0);
332        }
333        let mut ordered = rows.to_vec();
334        ordered.sort_by_key(|row| row.sequence);
335        let mut applied = 0_usize;
336        let mut last_sequence = None;
337        for row in ordered {
338            if let Some(previous) = last_sequence
339                && row.sequence <= previous
340            {
341                return Err(SyncError::NonMonotonicSequence {
342                    sequence: row.sequence,
343                    previous,
344                }
345                .into());
346            }
347            last_sequence = Some(row.sequence);
348            row.action.apply_to(overlay);
349            applied += 1;
350        }
351        Ok(applied)
352    }
353}
354
355/// Returns a coarse health summary for sync surfaces.
356#[derive(Clone, Copy, Debug, PartialEq, Eq)]
357pub struct SyncHealth {
358    /// Overlay edge insertions currently buffered.
359    pub overlay_edges: usize,
360    /// Tombstoned base edges.
361    pub tombstoned_edges: usize,
362    /// Tombstoned nodes.
363    pub tombstoned_nodes: usize,
364}