nodedb-types 0.3.0

Portable type definitions shared between NodeDB Origin and NodeDB-Lite
Documentation
// SPDX-License-Identifier: Apache-2.0

//! Primitive Calvin scheduling types shared between `nodedb-physical`
//! (the physical-plan IR layer) and `nodedb-cluster` (the distributed
//! Calvin sequencer / scheduler).
//!
//! Provides [`SortedVec`], [`EngineKeySet`], and [`PassiveReadKey`] —
//! the building blocks of Calvin read/write sets. `DependentReadSpec`
//! and other scheduler-internal aggregates stay in `nodedb-cluster`.

use serde::{Deserialize, Serialize};

/// A newtype over `Vec<T>` that guarantees sorted, deduplicated contents.
///
/// Constructed via [`SortedVec::new`], which sorts and deduplicates at
/// construction time. This property is load-bearing for byte-determinism:
/// two `SortedVec`s built from the same logical set (in any insertion order)
/// produce identical serialized bytes.
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct SortedVec<T>(Vec<T>);

impl<T: zerompk::ToMessagePack> zerompk::ToMessagePack for SortedVec<T> {
    fn write<W: zerompk::Write>(&self, writer: &mut W) -> zerompk::Result<()> {
        self.0.write(writer)
    }
}

impl<'de, T> zerompk::FromMessagePack<'de> for SortedVec<T>
where
    T: zerompk::FromMessagePack<'de> + Ord + Clone,
{
    fn read<R: zerompk::Read<'de>>(reader: &mut R) -> zerompk::Result<Self> {
        let v = Vec::<T>::read(reader)?;
        Ok(Self::new(v))
    }
}

impl<T: Ord + Clone> SortedVec<T> {
    /// Build from any slice. Sorts and deduplicates in place.
    pub fn new(mut items: Vec<T>) -> Self {
        items.sort();
        items.dedup();
        Self(items)
    }

    pub fn as_slice(&self) -> &[T] {
        &self.0
    }

    pub fn is_empty(&self) -> bool {
        self.0.is_empty()
    }

    pub fn len(&self) -> usize {
        self.0.len()
    }

    pub fn iter(&self) -> std::slice::Iter<'_, T> {
        self.0.iter()
    }
}

impl<T: Ord + Clone> From<Vec<T>> for SortedVec<T> {
    fn from(v: Vec<T>) -> Self {
        Self::new(v)
    }
}

/// A typed key set for one engine within a read or write set.
///
/// Keys are normalized to surrogates (or byte keys for KV) at admission, so
/// all engine-specific naming is resolved upstream of the sequencer.
#[derive(
    Debug,
    Clone,
    PartialEq,
    Eq,
    Serialize,
    Deserialize,
    zerompk::ToMessagePack,
    zerompk::FromMessagePack,
)]
pub enum EngineKeySet {
    /// Document engine (schemaless or strict): identified by surrogate.
    Document {
        collection: String,
        surrogates: SortedVec<u32>,
    },
    /// Vector engine: identified by surrogate.
    Vector {
        collection: String,
        surrogates: SortedVec<u32>,
    },
    /// Key-Value engine: identified by raw byte keys.
    Kv {
        collection: String,
        keys: SortedVec<Vec<u8>>,
    },
    /// Graph edge engine: identified by (src_surrogate, dst_surrogate) pairs.
    Edge {
        collection: String,
        edges: SortedVec<(u32, u32)>,
    },
}

impl EngineKeySet {
    /// O(1) estimate of the serialized byte size of this key set.
    ///
    /// Used by the dependent-read cap check at sequencer admission to bound
    /// the total bytes that would be Raft-replicated in a `CalvinReadResult`
    /// entry.  This is an estimate, not an exact count; do NOT use it as a
    /// correctness check — only as a pre-flight guard.
    pub fn serialized_size_hint(&self) -> usize {
        match self {
            // u32 surrogates: 4 bytes each.
            Self::Document { surrogates, .. } | Self::Vector { surrogates, .. } => {
                surrogates.len() * 4
            }
            // KV keys: sum of key byte lengths.
            Self::Kv { keys, .. } => keys.iter().map(|k| k.len()).sum(),
            // Edge: two u32 per edge = 8 bytes each.
            Self::Edge { edges, .. } => edges.len() * 8,
        }
    }

    /// The collection this key set belongs to.
    pub fn collection(&self) -> &str {
        match self {
            Self::Document { collection, .. }
            | Self::Vector { collection, .. }
            | Self::Kv { collection, .. }
            | Self::Edge { collection, .. } => collection,
        }
    }

    /// Returns `true` if this key set contains no keys.
    pub fn is_empty(&self) -> bool {
        match self {
            Self::Document { surrogates, .. } => surrogates.is_empty(),
            Self::Vector { surrogates, .. } => surrogates.is_empty(),
            Self::Kv { keys, .. } => keys.is_empty(),
            Self::Edge { edges, .. } => edges.is_empty(),
        }
    }
}

/// A single key that a passive participant must read and broadcast.
///
/// Wraps an [`EngineKeySet`]; per the dependent-read protocol each
/// `PassiveReadKey` contains a single-element (or small) key set.  The
/// sequencer does not enforce single-element sets; the scheduler enforces the
/// total byte budget via `DependentReadSpec::total_bytes()` (which lives in
/// `nodedb-cluster`).
#[derive(
    Debug,
    Clone,
    PartialEq,
    Eq,
    Serialize,
    Deserialize,
    zerompk::ToMessagePack,
    zerompk::FromMessagePack,
)]
pub struct PassiveReadKey {
    /// The engine key set to read on the passive vshard.
    pub engine_key: EngineKeySet,
}