signer-daemon 0.3.2

Signer daemon package.
Documentation
use sea_orm::{
    ActiveModelTrait as _, ActiveValue::Set, DatabaseTransaction, EntityTrait, QueryOrder,
    TransactionTrait,
};
use serde::{Deserialize, Serialize};

use crate::{
    SignerDaemonCore,
    entity::crdt_event,
    model::deltaobject::{
        chat_do::ChatDO, message_do::MessageDO, server_do::ServerDO, user_do::UserDO,
    },
};

#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum CrdtDelta<T: CrdtType> {
    Put(T),
    Del(T::PrimaryKey),
    Skip,
}

impl<T> CrdtDelta<T>
where
    T: CrdtType,
{
    async fn apply(
        &self,
        core: &SignerDaemonCore,
        tx: &DatabaseTransaction,
    ) -> crate::DaemonResult<CrdtDelta<T>> {
        match self {
            CrdtDelta::Put(value) => value.put(core, tx).await,
            CrdtDelta::Del(key) => T::del(core, &key, tx).await,
            CrdtDelta::Skip => Ok(CrdtDelta::Skip),
        }
    }
}

pub trait CrdtType
where
    Self: Sized,
{
    type PrimaryKey;

    fn put(
        &self,
        core: &SignerDaemonCore,
        tx: &DatabaseTransaction,
    ) -> impl std::future::Future<Output = crate::DaemonResult<CrdtDelta<Self>>> + Send;
    fn del(
        core: &SignerDaemonCore,
        key: &Self::PrimaryKey,
        tx: &DatabaseTransaction,
    ) -> impl std::future::Future<Output = crate::DaemonResult<CrdtDelta<Self>>> + Send;
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum CrdtDeltaBox {
    User(CrdtDelta<UserDO>),
    Server(CrdtDelta<ServerDO>),
    Chat(CrdtDelta<ChatDO>),
    Message(CrdtDelta<MessageDO>),
}

impl CrdtDeltaBox {
    async fn apply(
        &self,
        core: &SignerDaemonCore,
        tx: &DatabaseTransaction,
    ) -> crate::DaemonResult<Self> {
        let out = match self {
            CrdtDeltaBox::User(crdt_delta) => CrdtDeltaBox::User(crdt_delta.apply(core, tx).await?),
            CrdtDeltaBox::Server(crdt_delta) => {
                CrdtDeltaBox::Server(crdt_delta.apply(core, tx).await?)
            }
            CrdtDeltaBox::Chat(crdt_delta) => CrdtDeltaBox::Chat(crdt_delta.apply(core, tx).await?),
            CrdtDeltaBox::Message(crdt_delta) => {
                CrdtDeltaBox::Message(crdt_delta.apply(core, tx).await?)
            }
        };
        Ok(out)
    }

    pub async fn insert(&self, core: &SignerDaemonCore) -> crate::DaemonResult<crdt_event::Model> {
        let r = crdt_event::Entity::find()
            .order_by_desc(crdt_event::Column::Clock)
            .one(&core.db)
            .await?;
        let clock = match r {
            Some(r) => r.clock + 1,
            None => 1,
        };

        let payload = serde_json::to_string(&self)?;

        let m = crdt_event::ActiveModel {
            clock: Set(clock),
            peer: Set(core.peer.clone()),
            payload: Set(payload),
            revert: Set(None),
        };

        let r = m.insert(&core.db).await?;
        tracing::debug!("INSERT CrdtDeltaBox {:?}", self);

        Ok(r)
    }
}

impl crdt_event::Model {
    pub async fn apply(&self, core: &SignerDaemonCore) -> crate::DaemonResult<()> {
        let tx = core.db.begin().await.map_err(|e| {
            crate::DaemonError::Signer(crate::SignerError::Msg(format!(
                "start transcation failed: {}",
                e
            )))
        })?;

        let delta_box: CrdtDeltaBox = serde_json::from_str(&self.payload)?;
        let out_box = delta_box.apply(core, &tx).await.map_err(|e| {
            crate::DaemonError::Signer(crate::SignerError::Msg(format!(
                "apply delta failed: {}",
                e
            )))
        })?;

        let revert = serde_json::to_string(&out_box)?;

        let mut m = crdt_event::ActiveModel::from(self.clone());
        m.revert = Set(Some(revert));
        m.update(&tx).await?;

        tx.commit().await?;

        Ok(())
    }

    pub async fn revert(&self, core: &SignerDaemonCore) -> crate::DaemonResult<()> {
        let tx = core.db.begin().await?;

        let revert = match &self.revert {
            None => {
                return Err(crate::DaemonError::Signer(crate::SignerError::Msg(
                    "revert field is None".to_string(),
                )));
            }
            Some(val) => val,
        };

        let delta_box: CrdtDeltaBox = serde_json::from_str(revert)?;
        delta_box.apply(core, &tx).await?;

        let mut m = crdt_event::ActiveModel::from(self.clone());
        m.revert = Set(None);
        m.update(&tx).await?;

        tx.commit().await?;

        Ok(())
    }
}