signer-daemon 0.2.3

Signer daemon package.
Documentation
use anyhow::{Context, bail};
use sea_orm::{
  ActiveModelTrait as _, ActiveValue::Set, DatabaseTransaction, EntityTrait,
  QueryOrder, TransactionTrait,
};
use serde::{Deserialize, Serialize};

use crate::{
  SignerDaemon,
  entity::crdt_event,
  model::deltaobject::{
    chat_do::ChatDO, message_do::MessageDO, oper_log_do::OperLogDO,
    server_do::ServerDO, user_do::UserDO,
  },
};

#[derive(Debug, Clone, Serialize, Deserialize)]
pub(crate) enum CrdtDelta<T: CrdtType> {
  Put(T),
  Del(T::PrimaryKey),
  Skip,
}

impl<T> CrdtDelta<T>
where
  T: CrdtType,
{
  async fn apply(
    &self,
    daemon: &SignerDaemon,
    tx: &DatabaseTransaction,
  ) -> anyhow::Result<CrdtDelta<T>> {
    match self {
      CrdtDelta::Put(value) => value.put(daemon, tx).await,
      CrdtDelta::Del(key) => T::del(daemon, &key, tx).await,
      CrdtDelta::Skip => Ok(CrdtDelta::Skip),
    }
  }
}

pub(crate) trait CrdtType
where
  Self: Sized,
{
  type PrimaryKey;

  async fn put(
    &self,
    daemon: &SignerDaemon,
    tx: &DatabaseTransaction,
  ) -> anyhow::Result<CrdtDelta<Self>>;
  async fn del(
    daemon: &SignerDaemon,
    key: &Self::PrimaryKey,
    tx: &DatabaseTransaction,
  ) -> anyhow::Result<CrdtDelta<Self>>;
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub(crate) enum CrdtDeltaBox {
  User(CrdtDelta<UserDO>),
  OperLog(CrdtDelta<OperLogDO>),
  Server(CrdtDelta<ServerDO>),
  Chat(CrdtDelta<ChatDO>),
  Message(CrdtDelta<MessageDO>),
}

impl CrdtDeltaBox {
  async fn apply(
    &self,
    daemon: &SignerDaemon,
    tx: &DatabaseTransaction,
  ) -> anyhow::Result<Self> {
    let out = match self {
      CrdtDeltaBox::User(crdt_delta) => {
        CrdtDeltaBox::User(crdt_delta.apply(daemon, tx).await?)
      }
      CrdtDeltaBox::OperLog(crdt_delta) => {
        CrdtDeltaBox::OperLog(crdt_delta.apply(daemon, tx).await?)
      }
      CrdtDeltaBox::Server(crdt_delta) => {
        CrdtDeltaBox::Server(crdt_delta.apply(daemon, tx).await?)
      }
      CrdtDeltaBox::Chat(crdt_delta) => {
        CrdtDeltaBox::Chat(crdt_delta.apply(daemon, tx).await?)
      }
      CrdtDeltaBox::Message(crdt_delta) => {
        CrdtDeltaBox::Message(crdt_delta.apply(daemon, tx).await?)
      }
    };
    Ok(out)
  }

  pub async fn insert(
    &self,
    daemon: &SignerDaemon,
  ) -> anyhow::Result<crdt_event::Model> {
    let r = crdt_event::Entity::find()
      .order_by_desc(crdt_event::Column::Clock)
      .one(&daemon.db)
      .await?;
    let clock = match r {
      Some(r) => r.clock + 1,
      None => 1,
    };

    let payload = serde_json::to_string(&self)?;

    let m = crdt_event::ActiveModel {
      clock: Set(clock),
      peer: Set(daemon.peer.clone()),
      payload: Set(payload),
      revert: Set(None),
    };

    let r = m.insert(&daemon.db).await?;
    tracing::debug!("INSERT CrdtDeltaBox {:?}", self);

    Ok(r)
  }
}

impl crdt_event::Model {
  pub async fn apply(&self, daemon: &SignerDaemon) -> anyhow::Result<()> {
    let tx = daemon
      .db
      .begin()
      .await
      .context("start transcation failed")?;

    let delta_box: CrdtDeltaBox = serde_json::from_str(&self.payload)?;
    let out_box = delta_box
      .apply(daemon, &tx)
      .await
      .context("apply delta failed")?;

    let revert = serde_json::to_string(&out_box)?;

    let mut m = crdt_event::ActiveModel::from(self.clone());
    m.revert = Set(Some(revert));
    m.update(&tx).await?;

    tx.commit().await?;

    Ok(())
  }

  pub async fn revert(&self, daemon: &SignerDaemon) -> anyhow::Result<()> {
    let tx = daemon.db.begin().await?;

    let revert = match &self.revert {
      None => bail!("revert field is None"),
      Some(val) => val,
    };

    let delta_box: CrdtDeltaBox = serde_json::from_str(revert)?;
    delta_box.apply(daemon, &tx).await?;

    let mut m = crdt_event::ActiveModel::from(self.clone());
    m.revert = Set(None);
    m.update(&tx).await?;

    tx.commit().await?;

    Ok(())
  }
}