signer-daemon 0.2.5

Signer daemon package.
Documentation
use anyhow::Context;
use sea_orm::{
  ActiveModelTrait,
  ActiveValue::{NotSet, Set},
  ColumnTrait, DatabaseTransaction, EntityTrait, ModelTrait, QueryFilter,
  sqlx::types::chrono::{self, TimeZone as _},
};
use serde::{Deserialize, Serialize};

use crate::{
  SignerDaemon,
  entity::message,
  model::{
    deltaobject::{deltaobject::DeltaField, message_do::MessageDO},
    viewobject::MessageVO,
  },
};

use super::{CrdtType, crdt::CrdtDelta};

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct MessageKey {
  pub id: String,
  pub user_key: String,
  pub chat_key: String,
  pub chat_variant: String,
}

impl CrdtType for MessageDO {
  // (chat_key, chat_variant, user_key, id)
  type PrimaryKey = MessageKey;

  async fn put(
    &self,
    daemon: &SignerDaemon,
    tx: &DatabaseTransaction,
  ) -> anyhow::Result<CrdtDelta<Self>> {
    let chat_key = self.chat.chat_key(daemon).await?;
    let chat_variant = self.chat.chat_variant();

    let r = message::Entity::find()
      .filter(
        message::Column::Id
          .eq(&self.id)
          .and(message::Column::ChatKey.eq(&chat_key))
          .and(message::Column::ChatVariant.eq(&chat_variant))
          .and(message::Column::UserKey.eq(&self.user_key)),
      )
      .one(tx)
      .await
      .context("find message failed")?;
    if let Some(val) = &r {
      let vo = MessageVO::from_model(tx, val)
        .await
        .context("create vo from model failed")?;
      if !self.has_changed(&vo) {
        return Ok(CrdtDelta::Skip);
      }
    }

    let m = message::ActiveModel {
      id: Set(self.id.clone()),
      chat_variant: Set(self.chat.chat_variant()),
      chat_key: Set(
        self
          .chat
          .chat_key(daemon)
          .await
          .context("get chat key failed")?,
      ),
      user_key: Set(self.user_key.clone()),
      parent_id: self.parent_id.clone().into(),
      parent_user_key: self.parent_user_key.clone().into(),
      content: match &self.content {
        DeltaField::Set(val) => Set(serde_json::to_string(val)?),
        DeltaField::NotSet => NotSet,
      },
      content_type: match &self.content_type {
        DeltaField::Set(val) => Set(val.clone()),
        DeltaField::NotSet => NotSet,
      },
      receiver_keys: match &self.receiver_keys {
        DeltaField::Set(val) => Set(val.join(",")),
        DeltaField::NotSet => NotSet,
      },
      create_time: match &self.create_time {
        DeltaField::Set(val) => {
          Set(chrono::Utc.timestamp_millis_opt(*val).unwrap().naive_utc())
        }
        DeltaField::NotSet => NotSet,
      },
    };

    match r {
      Some(val) => {
        let e = m.update(tx).await?;
        let message_do = MessageDO::new(
          &MessageVO::from_model(tx, &e).await?,
          &MessageVO::from_model(tx, &val).await?,
        )?;
        Ok(CrdtDelta::Put(message_do))
      }
      None => {
        let e = m.insert(tx).await?;
        Ok(CrdtDelta::Del(MessageKey {
          chat_key: e.chat_key,
          chat_variant: e.chat_variant,
          user_key: e.user_key,
          id: e.id,
        }))
      }
    }
  }

  async fn del(
    _daemon: &SignerDaemon,
    key: &Self::PrimaryKey,
    tx: &DatabaseTransaction,
  ) -> anyhow::Result<CrdtDelta<Self>> {
    let r = message::Entity::find()
      .filter(
        message::Column::Id.eq(&key.id).and(
          message::Column::UserKey
            .eq(&key.user_key)
            .and(message::Column::ChatKey.eq(&key.chat_key))
            .and(message::Column::ChatVariant.eq(&key.chat_variant)),
        ),
      )
      .one(tx)
      .await?;
    if let Some(val) = r {
      let message_do = MessageDO::from(MessageVO::from_model(tx, &val).await?);
      val.delete(tx).await?;
      Ok(CrdtDelta::Put(message_do))
    } else {
      Ok(CrdtDelta::Skip)
    }
  }
}