armour 0.30.27

DDL and serialization for key-value storage
Documentation
use std::sync::OnceLock;

use async_broadcast::{InactiveReceiver, Sender};
use rapira::Rapira;
use strum::IntoStaticStr;

use crate::{KV, utils::CheckSumVec};

type ByteValue = fjall::Slice;

#[derive(Clone, Debug, Rapira)]
#[cfg_attr(
    feature = "rkyv",
    derive(rkyv::Archive, rkyv::Serialize, rkyv::Deserialize)
)]
#[cfg_attr(feature = "rkyv", archive(derive(Debug)))]
pub struct ReplicationInitReq {
    pub entity_name: String,
    pub instance_id: u16,
    pub version: u16,
    pub checksum: u32,
    pub hash: u64,
}

#[derive(Debug, Rapira, PartialEq, Eq)]
pub enum ReplicationInitRes {
    Ok,
    Sync(CheckSumVec),
    Err,
}

#[cfg(feature = "fjall")]
pub mod slice_rapira {
    use core::marker::PhantomData;

    use fjall::Slice;
    use rapira::{LEN_SIZE, Result, bytes_rapira};

    pub const fn static_size<T>(_: PhantomData<T>) -> Option<usize> {
        None
    }

    pub const fn min_size<T>(_: PhantomData<T>) -> usize {
        LEN_SIZE
    }

    #[inline]
    pub fn size(s: &Slice) -> usize {
        4 + s.len()
    }

    #[inline]
    pub fn check_bytes<T>(_: PhantomData<T>, slice: &mut &[u8]) -> Result<()> {
        bytes_rapira::check_bytes::<()>(core::marker::PhantomData, slice)
    }

    #[inline]
    pub fn from_slice(slice: &mut &[u8]) -> Result<Slice> {
        let bytes = bytes_rapira::from_slice(slice)?;
        Ok(Slice::new(bytes))
    }

    /// # Safety
    ///
    /// see funcs::deser_unchecked
    #[inline]
    pub unsafe fn from_slice_unchecked(slice: &mut &[u8]) -> Result<Slice> {
        from_slice(slice)
    }

    /// ...
    ///
    /// # Errors
    ///
    /// This function will return an error if ...
    ///
    /// # Safety
    ///
    /// this is unsafe
    #[inline]
    pub unsafe fn from_slice_unsafe(slice: &mut &[u8]) -> Result<Slice> {
        let bytes = unsafe { bytes_rapira::from_slice_unsafe(slice)? };
        Ok(Slice::new(bytes))
    }

    #[inline]
    pub fn convert_to_bytes(item: &Slice, slice: &mut [u8], cursor: &mut usize) {
        bytes_rapira::convert_to_bytes(item, slice, cursor);
    }

    #[inline]
    pub fn try_convert_to_bytes(item: &Slice, slice: &mut [u8], cursor: &mut usize) -> Result<()> {
        bytes_rapira::try_convert_to_bytes(item, slice, cursor)
    }
}

#[cfg(feature = "fjall")]
pub mod key_value {
    use core::marker::PhantomData;

    use fjall::Slice;
    use rapira::{LEN_SIZE, Result, bytes_rapira};

    use super::slice_rapira;

    type Value = (Slice, Slice);

    pub const fn static_size<T>(_: PhantomData<T>) -> Option<usize> {
        None
    }

    pub const fn min_size<T>(_: PhantomData<T>) -> usize {
        LEN_SIZE * 2
    }

    #[inline]
    pub fn size(s: &Value) -> usize {
        8 + s.0.len() + s.1.len()
    }

    #[inline]
    pub fn check_bytes<T>(_: PhantomData<T>, slice: &mut &[u8]) -> Result<()> {
        bytes_rapira::check_bytes::<()>(core::marker::PhantomData, slice)?;
        bytes_rapira::check_bytes::<()>(core::marker::PhantomData, slice)?;
        Ok(())
    }

    #[inline]
    pub fn from_slice(slice: &mut &[u8]) -> Result<Value> {
        let key = slice_rapira::from_slice(slice)?;
        let value = slice_rapira::from_slice(slice)?;
        Ok((key, value))
    }

    /// # Safety
    ///
    /// see funcs::deser_unchecked
    #[inline]
    pub unsafe fn from_slice_unchecked(slice: &mut &[u8]) -> Result<Value> {
        from_slice(slice)
    }

    /// ...
    ///
    /// # Errors
    ///
    /// This function will return an error if ...
    ///
    /// # Safety
    ///
    /// this is unsafe
    #[inline]
    pub unsafe fn from_slice_unsafe(slice: &mut &[u8]) -> Result<Value> {
        let value = unsafe {
            let key = slice_rapira::from_slice_unsafe(slice)?;
            let value = slice_rapira::from_slice_unsafe(slice)?;
            (key, value)
        };
        Ok(value)
    }

    #[inline]
    pub fn convert_to_bytes(item: &Value, slice: &mut [u8], cursor: &mut usize) {
        bytes_rapira::convert_to_bytes(&item.0, slice, cursor);
        bytes_rapira::convert_to_bytes(&item.1, slice, cursor);
    }

    #[inline]
    pub fn try_convert_to_bytes(item: &Value, slice: &mut [u8], cursor: &mut usize) -> Result<()> {
        bytes_rapira::try_convert_to_bytes(&item.0, slice, cursor)?;
        bytes_rapira::try_convert_to_bytes(&item.1, slice, cursor)?;
        Ok(())
    }
}

#[derive(Debug, Rapira, IntoStaticStr)]
pub enum SyncChunk {
    Done,
    /// delete range with prefix
    DeleteRanges(Vec<u32>),
    /// update range with prefix
    UpdateRange(u32, Vec<KV<ByteValue, ByteValue>>),
}

#[derive(Debug, Clone, Rapira, IntoStaticStr)]
pub enum ChangeEvent<K, V> {
    Upsert(KV<K, V>),
    /// old id, new id
    ChangeId(K, K),
    /// delete by id
    Delete(K),
}

impl<K, V> ChangeEvent<K, V>
where
    K: AsRef<[u8]>,
{
    pub fn variant(&self) -> &'static str {
        self.into()
    }

    /// old key
    pub fn key(&self) -> &[u8] {
        match self {
            ChangeEvent::Upsert(kv) => kv.0.as_ref(),
            ChangeEvent::ChangeId(old, _) => old.as_ref(),
            ChangeEvent::Delete(key) => key.as_ref(),
        }
    }

    pub fn from_kv(key: K, val: Option<V>) -> Self {
        match val {
            Some(val) => ChangeEvent::Upsert((key, val)),
            None => ChangeEvent::Delete(key),
        }
    }
}

pub const REPLICATION_PORT: u16 = 9651;

pub type Subscribers<K, V> = OnceLock<(
    Sender<ChangeEvent<K, V>>,
    InactiveReceiver<ChangeEvent<K, V>>,
)>;

pub enum ReplicationEvent<'a, K, V> {
    Upsert {
        key: &'a K,
        val: &'a V,
        old_val: Option<&'a V>,
    },
    Delete {
        key: &'a K,
        val: &'a V,
    },
    IdChange {
        old_key: &'a K,
        new_key: &'a K,
        val: &'a V,
    },
}

impl<K, V> ReplicationEvent<'_, K, V>
where
    K: AsRef<[u8]>,
    V: AsRef<[u8]>,
{
    pub fn key(&self) -> &[u8] {
        match self {
            ReplicationEvent::Upsert { key, .. } => key.as_ref(),
            ReplicationEvent::Delete { key, .. } => key.as_ref(),
            ReplicationEvent::IdChange { new_key, .. } => new_key.as_ref(),
        }
    }

    pub fn val(&self) -> &V {
        match self {
            ReplicationEvent::Upsert { val, .. } => val,
            ReplicationEvent::Delete { val, .. } => val,
            ReplicationEvent::IdChange { val, .. } => val,
        }
    }
}

pub type ReplicationEventHandler<K, V> =
    Option<Box<dyn Fn(ReplicationEvent<'_, K, V>) + Send + Sync>>;