use async_trait::async_trait;
use futures_core::Stream;
use serde::{Deserialize, Serialize};
use std::fmt::Debug;
use std::pin::Pin;
use crate::prelude::*;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CrdtUpdate {
pub data: Vec<u8>,
pub client_id: Option<Box<str>>,
#[serde(skip)]
pub seq: Option<u64>,
}
impl CrdtUpdate {
pub fn new(data: Vec<u8>) -> Self {
Self { data, client_id: None, seq: None }
}
pub fn with_client(data: Vec<u8>, client_id: impl Into<Box<str>>) -> Self {
Self { data, client_id: Some(client_id.into()), seq: None }
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CrdtChangeEvent {
pub doc_id: Box<str>,
pub update: CrdtUpdate,
}
#[derive(Debug, Clone)]
pub struct CrdtSubscriptionOptions {
pub doc_id: Box<str>,
pub send_snapshot: bool,
}
impl CrdtSubscriptionOptions {
pub fn with_snapshot(doc_id: impl Into<Box<str>>) -> Self {
Self { doc_id: doc_id.into(), send_snapshot: true }
}
pub fn updates_only(doc_id: impl Into<Box<str>>) -> Self {
Self { doc_id: doc_id.into(), send_snapshot: false }
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CrdtDocStats {
pub doc_id: Box<str>,
pub size_bytes: u64,
pub update_count: u32,
}
#[async_trait]
pub trait CrdtAdapter: Debug + Send + Sync {
async fn get_updates(&self, tn_id: TnId, doc_id: &str) -> ClResult<Vec<CrdtUpdate>>;
async fn store_update(&self, tn_id: TnId, doc_id: &str, update: CrdtUpdate) -> ClResult<()>;
async fn subscribe(
&self,
tn_id: TnId,
opts: CrdtSubscriptionOptions,
) -> ClResult<Pin<Box<dyn Stream<Item = CrdtChangeEvent> + Send>>>;
async fn stats(&self, tn_id: TnId, doc_id: &str) -> ClResult<CrdtDocStats> {
let updates = self.get_updates(tn_id, doc_id).await?;
let update_count = u32::try_from(updates.len()).unwrap_or_default();
let size_bytes: u64 = updates.iter().map(|u| u.data.len() as u64).sum();
Ok(CrdtDocStats { doc_id: doc_id.into(), size_bytes, update_count })
}
async fn compact_updates(
&self,
tn_id: TnId,
doc_id: &str,
remove_seqs: &[u64],
replacement: CrdtUpdate,
) -> ClResult<()>;
async fn delete_doc(&self, tn_id: TnId, doc_id: &str) -> ClResult<()>;
async fn close_doc(&self, _tn_id: TnId, _doc_id: &str) -> ClResult<()> {
Ok(())
}
async fn list_docs(&self, tn_id: TnId) -> ClResult<Vec<Box<str>>>;
async fn delete_tenant_documents(&self, tn_id: TnId) -> ClResult<()>;
}