use anyhow::{Context, bail};
use sea_orm::{
ActiveModelTrait as _, ActiveValue::Set, DatabaseTransaction, EntityTrait,
QueryOrder, TransactionTrait,
};
use serde::{Deserialize, Serialize};
use crate::{
SignerDaemon,
entity::crdt_event,
model::deltaobject::{
chat_do::ChatDO, message_do::MessageDO, oper_log_do::OperLogDO,
server_do::ServerDO, user_do::UserDO,
},
};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub(crate) enum CrdtDelta<T: CrdtType> {
Put(T),
Del(T::PrimaryKey),
Skip,
}
impl<T> CrdtDelta<T>
where
T: CrdtType,
{
async fn apply(
&self,
daemon: &SignerDaemon,
tx: &DatabaseTransaction,
) -> anyhow::Result<CrdtDelta<T>> {
match self {
CrdtDelta::Put(value) => value.put(daemon, tx).await,
CrdtDelta::Del(key) => T::del(daemon, &key, tx).await,
CrdtDelta::Skip => Ok(CrdtDelta::Skip),
}
}
}
pub(crate) trait CrdtType
where
Self: Sized,
{
type PrimaryKey;
async fn put(
&self,
daemon: &SignerDaemon,
tx: &DatabaseTransaction,
) -> anyhow::Result<CrdtDelta<Self>>;
async fn del(
daemon: &SignerDaemon,
key: &Self::PrimaryKey,
tx: &DatabaseTransaction,
) -> anyhow::Result<CrdtDelta<Self>>;
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub(crate) enum CrdtDeltaBox {
User(CrdtDelta<UserDO>),
OperLog(CrdtDelta<OperLogDO>),
Server(CrdtDelta<ServerDO>),
Chat(CrdtDelta<ChatDO>),
Message(CrdtDelta<MessageDO>),
}
impl CrdtDeltaBox {
async fn apply(
&self,
daemon: &SignerDaemon,
tx: &DatabaseTransaction,
) -> anyhow::Result<Self> {
let out = match self {
CrdtDeltaBox::User(crdt_delta) => {
CrdtDeltaBox::User(crdt_delta.apply(daemon, tx).await?)
}
CrdtDeltaBox::OperLog(crdt_delta) => {
CrdtDeltaBox::OperLog(crdt_delta.apply(daemon, tx).await?)
}
CrdtDeltaBox::Server(crdt_delta) => {
CrdtDeltaBox::Server(crdt_delta.apply(daemon, tx).await?)
}
CrdtDeltaBox::Chat(crdt_delta) => {
CrdtDeltaBox::Chat(crdt_delta.apply(daemon, tx).await?)
}
CrdtDeltaBox::Message(crdt_delta) => {
CrdtDeltaBox::Message(crdt_delta.apply(daemon, tx).await?)
}
};
Ok(out)
}
pub async fn insert(
&self,
daemon: &SignerDaemon,
) -> anyhow::Result<crdt_event::Model> {
let r = crdt_event::Entity::find()
.order_by_desc(crdt_event::Column::Clock)
.one(&daemon.db)
.await?;
let clock = match r {
Some(r) => r.clock + 1,
None => 1,
};
let payload = serde_json::to_string(&self)?;
let m = crdt_event::ActiveModel {
clock: Set(clock),
peer: Set(daemon.peer.clone()),
payload: Set(payload),
revert: Set(None),
};
let r = m.insert(&daemon.db).await?;
tracing::debug!("INSERT CrdtDeltaBox {:?}", self);
Ok(r)
}
}
impl crdt_event::Model {
pub async fn apply(&self, daemon: &SignerDaemon) -> anyhow::Result<()> {
let tx = daemon
.db
.begin()
.await
.context("start transcation failed")?;
let delta_box: CrdtDeltaBox = serde_json::from_str(&self.payload)?;
let out_box = delta_box
.apply(daemon, &tx)
.await
.context("apply delta failed")?;
let revert = serde_json::to_string(&out_box)?;
let mut m = crdt_event::ActiveModel::from(self.clone());
m.revert = Set(Some(revert));
m.update(&tx).await?;
tx.commit().await?;
Ok(())
}
pub async fn revert(&self, daemon: &SignerDaemon) -> anyhow::Result<()> {
let tx = daemon.db.begin().await?;
let revert = match &self.revert {
None => bail!("revert field is None"),
Some(val) => val,
};
let delta_box: CrdtDeltaBox = serde_json::from_str(revert)?;
delta_box.apply(daemon, &tx).await?;
let mut m = crdt_event::ActiveModel::from(self.clone());
m.revert = Set(None);
m.update(&tx).await?;
tx.commit().await?;
Ok(())
}
}