callysto 0.1.9

Stream processing framework.
Documentation
use crate::errors::*;
use crate::kafka::ctopic::CTP;
use crate::prelude::{CTable, Context, Service, ServiceState};
use crate::stores::store::Store;
use async_trait::*;
use futures::future::{BoxFuture, FutureExt, TryFutureExt};
use lever::prelude::LOTable;
use lever::sync::atomics::AtomicBox;
use rdkafka::message::OwnedMessage;
use rdkafka::Message;
use std::collections::HashMap;
use std::sync::atomic::AtomicBool;
use std::sync::Arc;
use tracing::info;
use url::Url;

const CALLYSTO_OFFSET_KEY: &[u8; 19] = b"__callysto\0offset__";

pub type InMemoryDb = LOTable<Vec<u8>, Vec<u8>>;

#[derive(Clone)]
pub struct InMemoryStore {
    app_name: String,
    storage_url: Url,
    table_name: String,
    service_state: Arc<AtomicBox<ServiceState>>,
    dbs: LOTable<usize, InMemoryDb>,
}

impl InMemoryStore {
    pub fn new(app_name: String, storage_url: Url, table_name: String) -> Self {
        Self {
            app_name,
            storage_url,
            table_name,
            service_state: Arc::new(AtomicBox::new(ServiceState::PreStart)),
            dbs: LOTable::default(),
        }
    }

    fn db_for_partition(&self, partition: usize) -> Result<Arc<InMemoryDb>> {
        match self.dbs.get(&partition) {
            Some(x) => Ok(Arc::from(x)),
            _ => {
                let db = LOTable::default();
                let _ = self.dbs.insert(partition, db.clone());
                Ok(Arc::from(db))
            }
        }
    }
}

#[async_trait]
impl<State> Service<State> for InMemoryStore
where
    State: Clone + Send + Sync + 'static,
{
    async fn call(&self, st: Context<State>) -> Result<State> {
        unimplemented!()
    }

    async fn start(&self) -> Result<BoxFuture<'_, ()>> {
        let closure = async move {
            info!("InMemory backend is started.");
            self.service_state.replace_with(|_| ServiceState::Running);
        };

        Ok(closure.boxed())
    }

    async fn restart(&self) -> Result<()> {
        <Self as Service<State>>::stop(self)
            .and_then(|_| <Self as Service<State>>::start(self))
            .await;

        Ok(())
    }

    async fn crash(&self) {
        <Self as Service<State>>::service_state(self)
            .await
            .replace_with(|e| ServiceState::Crashed);
    }

    async fn wait_until_stopped(&self) {
        todo!()
    }

    async fn started(&self) -> bool {
        *<Self as Service<State>>::service_state(self).await.get() == ServiceState::Running
    }

    async fn stopped(&self) -> bool {
        *<Self as Service<State>>::service_state(self).await.get() == ServiceState::Stopped
    }

    async fn crashed(&self) -> bool {
        *<Self as Service<State>>::service_state(self).await.get() == ServiceState::Crashed
    }

    async fn state(&self) -> String {
        unimplemented!()
    }

    async fn label(&self) -> String {
        format!(
            "{}@{}",
            self.app_name,
            <Self as Service<State>>::shortlabel(self).await
        )
    }

    async fn shortlabel(&self) -> String {
        format!("inmemory:{}", self.table_name)
    }

    async fn service_state(&self) -> Arc<AtomicBox<ServiceState>> {
        self.service_state.clone()
    }
}

#[async_trait]
impl<State> Store<State> for InMemoryStore
where
    State: Clone + Send + Sync + 'static,
{
    fn get(&self, serialized_key: Vec<u8>, msg: OwnedMessage) -> Result<Option<Vec<u8>>> {
        let partition: usize = msg.partition() as _;
        let db = self.db_for_partition(partition)?;
        match db.get(&serialized_key) {
            Some(x) => Ok(Some(bincode::deserialize(x.as_slice())?)),
            _ => Ok(None),
        }
    }

    fn set(
        &self,
        serialized_key: Vec<u8>,
        serialized_val: Vec<u8>,
        msg: OwnedMessage,
    ) -> Result<()> {
        let partition: usize = msg.partition() as _;
        let db = self.db_for_partition(partition)?;
        let _ = db
            .insert(serialized_key, serialized_val)
            .map_err(|e| CallystoError::GeneralError(format!("{}", e)))?;
        Ok(())
    }

    fn del(&self, serialized_key: Vec<u8>, msg: OwnedMessage) -> Result<()> {
        let partition: usize = msg.partition() as _;
        let db = self.db_for_partition(partition)?;
        let _ = db
            .remove(&serialized_key)
            .map_err(|e| CallystoError::GeneralError(format!("{}", e)))?;
        Ok(())
    }

    fn table(&self) -> CTable<State> {
        unimplemented!("Table needs to be implemented on top of Storage.")
    }

    fn persisted_offset(&self, tp: CTP) -> Result<Option<usize>> {
        let offset = self
            .db_for_partition(tp.partition)?
            .get(&CALLYSTO_OFFSET_KEY.to_vec())
            .ok_or_else(|| CallystoError::GeneralError("Offset fetch failed.".into()))
            .map_or(None, |e| {
                Option::from(usize::from_ne_bytes(e.as_slice().try_into().unwrap()))
            });
        Ok(offset)
    }

    fn set_persisted_offset(&self, tp: CTP, offset: usize) -> Result<()> {
        let _ = self
            .db_for_partition(tp.partition)?
            .insert(
                CALLYSTO_OFFSET_KEY.to_vec(),
                Vec::from(offset.to_ne_bytes()),
            )
            .map_err(|e| CallystoError::GeneralError(format!("{}", e)))?;
        Ok(())
    }

    fn apply_changelog_batch(&self, events: Vec<OwnedMessage>) -> Result<()> {
        let mut tp_offsets: HashMap<CTP, usize> = HashMap::with_capacity(events.len());
        events.iter().for_each(|e| {
            let tp = CTP::new(e.topic().into(), e.partition() as _);
            let offset: usize = e.offset() as _;
            tp_offsets
                .entry(tp)
                .and_modify(|o| *o = offset.max(*o))
                .or_insert(offset);

            match e.payload() {
                Some(p) => {
                    let db = self
                        .db_for_partition(e.partition() as _)
                        .map_err(|e| {
                            CallystoError::GeneralError("Partition number fetch error".into())
                        })
                        .unwrap();
                    db.insert(e.key().unwrap().to_vec(), p.to_vec())
                        .map_err(|e| CallystoError::GeneralError(format!("{}", e)))
                        .unwrap();
                }
                _ => {
                    let db = self
                        .db_for_partition(e.partition() as _)
                        .map_err(|e| {
                            CallystoError::GeneralError("Partition number fetch error".into())
                        })
                        .unwrap();
                    db.remove(&e.key().unwrap().to_vec())
                        .map_err(|e| CallystoError::GeneralError(format!("{}", e)))
                        .unwrap();
                }
            }
        });

        tp_offsets.into_iter().try_for_each(|(tp, offset)| {
            <Self as Store<State>>::set_persisted_offset(self, tp, offset)
        })?;

        Ok(())
    }

    fn reset_state(&self) -> Result<()> {
        let _ = self.dbs.clear();
        Ok(())
    }

    async fn on_rebalance(
        &self,
        assigned: Vec<CTP>,
        revoked: Vec<CTP>,
        newly_assigned: Vec<CTP>,
        generation_id: usize,
    ) -> Result<()> {
        // TODO: On Rebalance
        todo!()
    }

    async fn on_recovery_completed(
        &self,
        active_tps: Vec<CTP>,
        standby_tps: Vec<CTP>,
    ) -> Result<()> {
        todo!()
    }

    fn into_service(&self) -> &dyn Service<State> {
        self
    }
}