use std::sync::OnceLock;
use async_broadcast::{InactiveReceiver, Sender};
use rapira::Rapira;
use strum::IntoStaticStr;
use crate::{KV, utils::CheckSumVec};
type ByteValue = fjall::Slice;
#[derive(Clone, Debug, Rapira)]
#[cfg_attr(
feature = "rkyv",
derive(rkyv::Archive, rkyv::Serialize, rkyv::Deserialize)
)]
#[cfg_attr(feature = "rkyv", archive(derive(Debug)))]
pub struct ReplicationInitReq {
pub entity_name: String,
pub instance_id: u16,
pub version: u16,
pub checksum: u32,
pub hash: u64,
}
#[derive(Debug, Rapira, PartialEq, Eq)]
pub enum ReplicationInitRes {
Ok,
Sync(CheckSumVec),
Err,
}
#[cfg(feature = "fjall")]
pub mod slice_rapira {
use core::marker::PhantomData;
use fjall::Slice;
use rapira::{LEN_SIZE, Result, bytes_rapira};
pub const fn static_size<T>(_: PhantomData<T>) -> Option<usize> {
None
}
pub const fn min_size<T>(_: PhantomData<T>) -> usize {
LEN_SIZE
}
#[inline]
pub fn size(s: &Slice) -> usize {
4 + s.len()
}
#[inline]
pub fn check_bytes<T>(_: PhantomData<T>, slice: &mut &[u8]) -> Result<()> {
bytes_rapira::check_bytes::<()>(core::marker::PhantomData, slice)
}
#[inline]
pub fn from_slice(slice: &mut &[u8]) -> Result<Slice> {
let bytes = bytes_rapira::from_slice(slice)?;
Ok(Slice::new(bytes))
}
#[inline]
pub unsafe fn from_slice_unchecked(slice: &mut &[u8]) -> Result<Slice> {
from_slice(slice)
}
#[inline]
pub unsafe fn from_slice_unsafe(slice: &mut &[u8]) -> Result<Slice> {
let bytes = unsafe { bytes_rapira::from_slice_unsafe(slice)? };
Ok(Slice::new(bytes))
}
#[inline]
pub fn convert_to_bytes(item: &Slice, slice: &mut [u8], cursor: &mut usize) {
bytes_rapira::convert_to_bytes(item, slice, cursor);
}
#[inline]
pub fn try_convert_to_bytes(item: &Slice, slice: &mut [u8], cursor: &mut usize) -> Result<()> {
bytes_rapira::try_convert_to_bytes(item, slice, cursor)
}
}
#[cfg(feature = "fjall")]
pub mod key_value {
use core::marker::PhantomData;
use fjall::Slice;
use rapira::{LEN_SIZE, Result, bytes_rapira};
use super::slice_rapira;
type Value = (Slice, Slice);
pub const fn static_size<T>(_: PhantomData<T>) -> Option<usize> {
None
}
pub const fn min_size<T>(_: PhantomData<T>) -> usize {
LEN_SIZE * 2
}
#[inline]
pub fn size(s: &Value) -> usize {
8 + s.0.len() + s.1.len()
}
#[inline]
pub fn check_bytes<T>(_: PhantomData<T>, slice: &mut &[u8]) -> Result<()> {
bytes_rapira::check_bytes::<()>(core::marker::PhantomData, slice)?;
bytes_rapira::check_bytes::<()>(core::marker::PhantomData, slice)?;
Ok(())
}
#[inline]
pub fn from_slice(slice: &mut &[u8]) -> Result<Value> {
let key = slice_rapira::from_slice(slice)?;
let value = slice_rapira::from_slice(slice)?;
Ok((key, value))
}
#[inline]
pub unsafe fn from_slice_unchecked(slice: &mut &[u8]) -> Result<Value> {
from_slice(slice)
}
#[inline]
pub unsafe fn from_slice_unsafe(slice: &mut &[u8]) -> Result<Value> {
let value = unsafe {
let key = slice_rapira::from_slice_unsafe(slice)?;
let value = slice_rapira::from_slice_unsafe(slice)?;
(key, value)
};
Ok(value)
}
#[inline]
pub fn convert_to_bytes(item: &Value, slice: &mut [u8], cursor: &mut usize) {
bytes_rapira::convert_to_bytes(&item.0, slice, cursor);
bytes_rapira::convert_to_bytes(&item.1, slice, cursor);
}
#[inline]
pub fn try_convert_to_bytes(item: &Value, slice: &mut [u8], cursor: &mut usize) -> Result<()> {
bytes_rapira::try_convert_to_bytes(&item.0, slice, cursor)?;
bytes_rapira::try_convert_to_bytes(&item.1, slice, cursor)?;
Ok(())
}
}
#[derive(Debug, Rapira, IntoStaticStr)]
pub enum SyncChunk {
Done,
DeleteRanges(Vec<u32>),
UpdateRange(u32, Vec<KV<ByteValue, ByteValue>>),
}
#[derive(Debug, Clone, Rapira, IntoStaticStr)]
pub enum ChangeEvent<K, V> {
Upsert(KV<K, V>),
ChangeId(K, K),
Delete(K),
}
impl<K, V> ChangeEvent<K, V>
where
K: AsRef<[u8]>,
{
pub fn variant(&self) -> &'static str {
self.into()
}
pub fn key(&self) -> &[u8] {
match self {
ChangeEvent::Upsert(kv) => kv.0.as_ref(),
ChangeEvent::ChangeId(old, _) => old.as_ref(),
ChangeEvent::Delete(key) => key.as_ref(),
}
}
pub fn from_kv(key: K, val: Option<V>) -> Self {
match val {
Some(val) => ChangeEvent::Upsert((key, val)),
None => ChangeEvent::Delete(key),
}
}
}
pub const REPLICATION_PORT: u16 = 9651;
pub type Subscribers<K, V> = OnceLock<(
Sender<ChangeEvent<K, V>>,
InactiveReceiver<ChangeEvent<K, V>>,
)>;
pub enum ReplicationEvent<'a, K, V> {
Upsert {
key: &'a K,
val: &'a V,
old_val: Option<&'a V>,
},
Delete {
key: &'a K,
val: &'a V,
},
IdChange {
old_key: &'a K,
new_key: &'a K,
val: &'a V,
},
}
impl<K, V> ReplicationEvent<'_, K, V>
where
K: AsRef<[u8]>,
V: AsRef<[u8]>,
{
pub fn key(&self) -> &[u8] {
match self {
ReplicationEvent::Upsert { key, .. } => key.as_ref(),
ReplicationEvent::Delete { key, .. } => key.as_ref(),
ReplicationEvent::IdChange { new_key, .. } => new_key.as_ref(),
}
}
pub fn val(&self) -> &V {
match self {
ReplicationEvent::Upsert { val, .. } => val,
ReplicationEvent::Delete { val, .. } => val,
ReplicationEvent::IdChange { val, .. } => val,
}
}
}
pub type ReplicationEventHandler<K, V> =
Option<Box<dyn Fn(ReplicationEvent<'_, K, V>) + Send + Sync>>;