#[cfg(feature = "slatedb")]
pub mod slatedb;
use crate::types::{OperatorId, WorkerId};
use serde::{de::DeserializeOwned, Serialize};
#[cfg(feature = "slatedb")]
pub use slatedb::{object_store, SlateDbBackend, SlateDbClient};
use std::{fmt::Debug, rc::Rc, sync::Mutex};
pub type SnapshotVersion = u64;
pub(crate) fn serialize_state<S: Serialize>(state: &S) -> Vec<u8> {
rmp_serde::to_vec(state).expect("Error serializing state")
}
pub(crate) fn deserialize_state<S: DeserializeOwned>(state: Vec<u8>) -> S {
rmp_serde::from_slice(&state).expect("Error deserializing state")
}
pub trait PersistenceBackend: 'static {
type Client: PersistenceClient;
fn last_commited(&self) -> Option<SnapshotVersion>;
fn for_version(&self, worker_id: WorkerId, snapshot_version: &SnapshotVersion) -> Self::Client;
fn commit_version(&self, snapshot_version: &SnapshotVersion);
}
pub trait PersistenceClient: 'static {
fn load(&self, operator_id: &OperatorId) -> Option<Vec<u8>>;
fn persist(&mut self, state: &[u8], operator_id: &OperatorId);
}
pub struct Barrier {
backend: Rc<Mutex<Box<dyn PersistenceClient>>>,
}
impl Clone for Barrier {
fn clone(&self) -> Self {
Self {
backend: Rc::clone(&self.backend),
}
}
}
impl Debug for Barrier {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Barrier").finish()
}
}
impl Barrier {
pub(super) fn new(backend: Box<dyn PersistenceClient>) -> Self {
Self {
backend: Rc::new(Mutex::new(backend)),
}
}
pub fn persist<S: Serialize + DeserializeOwned>(
&mut self,
state: &S,
operator_id: &OperatorId,
) {
let encoded = serialize_state(state);
#[allow(clippy::unwrap_used)]
self.backend.lock().unwrap().persist(&encoded, operator_id)
}
pub(super) fn strong_count(&self) -> usize {
Rc::strong_count(&self.backend)
}
}
#[derive(Clone, Debug)]
pub struct NoPersistence;
impl PersistenceBackend for NoPersistence {
type Client = NoPersistence;
fn last_commited(&self) -> Option<SnapshotVersion> {
None
}
fn for_version(&self, _worker_id: WorkerId, _snapshot_version: &SnapshotVersion) -> Self {
NoPersistence {}
}
fn commit_version(&self, _snapshot_version: &SnapshotVersion) {}
}
impl PersistenceClient for NoPersistence {
fn load(&self, _operator_id: &OperatorId) -> Option<Vec<u8>> {
None
}
fn persist(&mut self, _state: &[u8], _operator_id: &OperatorId) {}
}
#[cfg(test)]
mod test {
use super::PersistenceClient;
#[test]
fn is_object_safe() {
struct _Foo {
_bar: Box<dyn PersistenceClient>,
}
}
}