use alloc::{collections::BTreeMap, vec::Vec};
use crate::{
build::EdgeRow,
catalog::NodeKey,
error::{PostgresGraphError, SyncError},
overlay::{OverlayEdge, OverlayState},
};
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum SyncAction {
InsertEdge {
source: u32,
target: u32,
},
RemoveOverlayEdge {
source: u32,
target: u32,
},
DeleteEdge {
edge_id: u32,
},
DeleteNode {
node_id: u32,
},
TruncateOverlays,
}
impl SyncAction {
pub(crate) fn apply_to(self, overlay: &mut OverlayState) {
match self {
Self::InsertEdge { source, target } => {
overlay.push_edge(OverlayEdge { source, target });
}
Self::RemoveOverlayEdge { source, target } => {
overlay.remove_edge(source, target);
}
Self::DeleteEdge { edge_id } => {
overlay.tombstone_edge(edge_id);
}
Self::DeleteNode { node_id } => {
overlay.tombstone_node(node_id);
}
Self::TruncateOverlays => overlay.clear(),
}
}
}
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum SyncActionWire {
InsertEdge {
source: NodeKey,
target: NodeKey,
},
RemoveOverlayEdge {
source: NodeKey,
target: NodeKey,
},
DeleteEdge {
edge_id: u32,
},
DeleteNode {
node_id: u32,
},
TruncateOverlays,
}
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
#[repr(i16)]
pub enum SyncActionCodec {
InsertEdge = 1,
DeleteEdge = 2,
DeleteNode = 3,
TruncateOverlays = 4,
RemoveOverlayEdge = 5,
}
impl SyncActionCodec {
#[must_use]
pub const fn carries_node_keys(self) -> bool {
matches!(self, Self::InsertEdge | Self::RemoveOverlayEdge)
}
pub fn decode_wire(
action_type: i16,
arg0: Option<i64>,
arg1: Option<i64>,
) -> Result<SyncActionWire, SyncError> {
match Self::try_from(action_type)? {
Self::InsertEdge => {
let source = decode_node_key(arg0, action_type)?;
let target = decode_node_key(arg1, action_type)?;
Ok(SyncActionWire::InsertEdge { source, target })
}
Self::DeleteEdge => {
let edge_id =
u32::try_from(arg0.ok_or(SyncError::InvalidActionArgs { action_type })?)
.map_err(|_| SyncError::InvalidActionArgs { action_type })?;
Ok(SyncActionWire::DeleteEdge { edge_id })
}
Self::DeleteNode => {
let node_id =
u32::try_from(arg0.ok_or(SyncError::InvalidActionArgs { action_type })?)
.map_err(|_| SyncError::InvalidActionArgs { action_type })?;
Ok(SyncActionWire::DeleteNode { node_id })
}
Self::TruncateOverlays => Ok(SyncActionWire::TruncateOverlays),
Self::RemoveOverlayEdge => {
let source = decode_node_key(arg0, action_type)?;
let target = decode_node_key(arg1, action_type)?;
Ok(SyncActionWire::RemoveOverlayEdge { source, target })
}
}
}
pub fn decode(
action_type: i16,
arg0: Option<i64>,
arg1: Option<i64>,
node_map: &BTreeMap<NodeKey, u32>,
) -> Result<SyncAction, SyncError> {
resolve_sync_action(Self::decode_wire(action_type, arg0, arg1)?, node_map)
}
}
impl TryFrom<i16> for SyncActionCodec {
type Error = SyncError;
fn try_from(value: i16) -> Result<Self, Self::Error> {
match value {
1 => Ok(Self::InsertEdge),
2 => Ok(Self::DeleteEdge),
3 => Ok(Self::DeleteNode),
4 => Ok(Self::TruncateOverlays),
5 => Ok(Self::RemoveOverlayEdge),
action_type => Err(SyncError::InvalidActionType { action_type }),
}
}
}
pub fn resolve_sync_action(
action: SyncActionWire,
node_map: &BTreeMap<NodeKey, u32>,
) -> Result<SyncAction, SyncError> {
match action {
SyncActionWire::InsertEdge { source, target } => Ok(SyncAction::InsertEdge {
source: lookup_dense(source, node_map)?,
target: lookup_dense(target, node_map)?,
}),
SyncActionWire::RemoveOverlayEdge { source, target } => Ok(SyncAction::RemoveOverlayEdge {
source: lookup_dense(source, node_map)?,
target: lookup_dense(target, node_map)?,
}),
SyncActionWire::DeleteEdge { edge_id } => Ok(SyncAction::DeleteEdge { edge_id }),
SyncActionWire::DeleteNode { node_id } => Ok(SyncAction::DeleteNode { node_id }),
SyncActionWire::TruncateOverlays => Ok(SyncAction::TruncateOverlays),
}
}
pub type RawSyncRow = (u64, i16, Option<i64>, Option<i64>);
pub fn resolve_sync_rows(
edges: &[EdgeRow],
raw_rows: &[RawSyncRow],
) -> Result<Vec<SyncRow>, PostgresGraphError> {
let node_map = dense_node_map_for_sync_resolution(edges, raw_rows)?;
let mut rows = Vec::with_capacity(raw_rows.len());
for (sequence, action_type, arg0, arg1) in raw_rows {
let action = SyncActionCodec::decode(*action_type, *arg0, *arg1, &node_map)?;
rows.push(SyncRow {
sequence: *sequence,
action,
});
}
Ok(rows)
}
pub fn dense_node_map_for_sync_resolution(
edges: &[EdgeRow],
raw_rows: &[RawSyncRow],
) -> Result<BTreeMap<NodeKey, u32>, PostgresGraphError> {
let mut keys = crate::build::distinct_node_keys(edges);
for (_, action_type, arg0, arg1) in raw_rows {
if SyncActionCodec::try_from(*action_type).is_ok_and(SyncActionCodec::carries_node_keys) {
if let Some(key) = node_key_from_i64(*arg0) {
keys.insert(key);
}
if let Some(key) = node_key_from_i64(*arg1) {
keys.insert(key);
}
}
}
Ok(crate::build::dense_node_map_from_keys(keys)?)
}
fn node_key_from_i64(value: Option<i64>) -> Option<NodeKey> {
let value = value?;
if value.is_negative() {
return None;
}
u64::try_from(value).ok().map(NodeKey)
}
fn decode_node_key(value: Option<i64>, action_type: i16) -> Result<NodeKey, SyncError> {
let value = value.ok_or(SyncError::InvalidActionArgs { action_type })?;
if value.is_negative() {
return Err(SyncError::InvalidActionArgs { action_type });
}
let key = u64::try_from(value).map_err(|_| SyncError::InvalidActionArgs { action_type })?;
Ok(NodeKey(key))
}
fn lookup_dense(key: NodeKey, node_map: &BTreeMap<NodeKey, u32>) -> Result<u32, SyncError> {
node_map
.get(&key)
.copied()
.ok_or(SyncError::UnknownNodeKey { key })
}
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub struct SyncRow {
pub sequence: u64,
pub action: SyncAction,
}
impl SyncRow {
pub fn apply_in_order(
rows: &[Self],
overlay: &mut OverlayState,
) -> Result<usize, PostgresGraphError> {
let mut applied = 0_usize;
let mut last_sequence = None;
for row in rows {
if let Some(previous) = last_sequence
&& row.sequence <= previous
{
return Err(SyncError::NonMonotonicSequence {
sequence: row.sequence,
previous,
}
.into());
}
last_sequence = Some(row.sequence);
row.action.apply_to(overlay);
applied += 1;
}
Ok(applied)
}
}
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub struct SyncHealth {
pub overlay_edges: usize,
pub tombstoned_edges: usize,
pub tombstoned_nodes: usize,
}