use sea_orm::{
ActiveModelTrait as _, ActiveValue::Set, DatabaseTransaction, EntityTrait, QueryOrder,
TransactionTrait,
};
use serde::{Deserialize, Serialize};
use crate::{
SignerDaemonCore,
entity::crdt_event,
model::deltaobject::{
chat_do::ChatDO, message_do::MessageDO, server_do::ServerDO, user_do::UserDO,
},
};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum CrdtDelta<T: CrdtType> {
Put(T),
Del(T::PrimaryKey),
Skip,
}
impl<T> CrdtDelta<T>
where
T: CrdtType,
{
async fn apply(
&self,
core: &SignerDaemonCore,
tx: &DatabaseTransaction,
) -> crate::DaemonResult<CrdtDelta<T>> {
match self {
CrdtDelta::Put(value) => value.put(core, tx).await,
CrdtDelta::Del(key) => T::del(core, &key, tx).await,
CrdtDelta::Skip => Ok(CrdtDelta::Skip),
}
}
}
pub trait CrdtType
where
Self: Sized,
{
type PrimaryKey;
fn put(
&self,
core: &SignerDaemonCore,
tx: &DatabaseTransaction,
) -> impl std::future::Future<Output = crate::DaemonResult<CrdtDelta<Self>>> + Send;
fn del(
core: &SignerDaemonCore,
key: &Self::PrimaryKey,
tx: &DatabaseTransaction,
) -> impl std::future::Future<Output = crate::DaemonResult<CrdtDelta<Self>>> + Send;
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum CrdtDeltaBox {
User(CrdtDelta<UserDO>),
Server(CrdtDelta<ServerDO>),
Chat(CrdtDelta<ChatDO>),
Message(CrdtDelta<MessageDO>),
}
impl CrdtDeltaBox {
async fn apply(
&self,
core: &SignerDaemonCore,
tx: &DatabaseTransaction,
) -> crate::DaemonResult<Self> {
let out = match self {
CrdtDeltaBox::User(crdt_delta) => CrdtDeltaBox::User(crdt_delta.apply(core, tx).await?),
CrdtDeltaBox::Server(crdt_delta) => {
CrdtDeltaBox::Server(crdt_delta.apply(core, tx).await?)
}
CrdtDeltaBox::Chat(crdt_delta) => CrdtDeltaBox::Chat(crdt_delta.apply(core, tx).await?),
CrdtDeltaBox::Message(crdt_delta) => {
CrdtDeltaBox::Message(crdt_delta.apply(core, tx).await?)
}
};
Ok(out)
}
pub async fn insert(&self, core: &SignerDaemonCore) -> crate::DaemonResult<crdt_event::Model> {
let r = crdt_event::Entity::find()
.order_by_desc(crdt_event::Column::Clock)
.one(&core.db)
.await?;
let clock = match r {
Some(r) => r.clock + 1,
None => 1,
};
let payload = serde_json::to_string(&self)?;
let m = crdt_event::ActiveModel {
clock: Set(clock),
peer: Set(core.peer.clone()),
payload: Set(payload),
revert: Set(None),
};
let r = m.insert(&core.db).await?;
tracing::debug!("INSERT CrdtDeltaBox {:?}", self);
Ok(r)
}
}
impl crdt_event::Model {
pub async fn apply(&self, core: &SignerDaemonCore) -> crate::DaemonResult<()> {
let tx = core.db.begin().await.map_err(|e| {
crate::DaemonError::Signer(crate::SignerError::Msg(format!(
"start transcation failed: {}",
e
)))
})?;
let delta_box: CrdtDeltaBox = serde_json::from_str(&self.payload)?;
let out_box = delta_box.apply(core, &tx).await.map_err(|e| {
crate::DaemonError::Signer(crate::SignerError::Msg(format!(
"apply delta failed: {}",
e
)))
})?;
let revert = serde_json::to_string(&out_box)?;
let mut m = crdt_event::ActiveModel::from(self.clone());
m.revert = Set(Some(revert));
m.update(&tx).await?;
tx.commit().await?;
Ok(())
}
pub async fn revert(&self, core: &SignerDaemonCore) -> crate::DaemonResult<()> {
let tx = core.db.begin().await?;
let revert = match &self.revert {
None => {
return Err(crate::DaemonError::Signer(crate::SignerError::Msg(
"revert field is None".to_string(),
)));
}
Some(val) => val,
};
let delta_box: CrdtDeltaBox = serde_json::from_str(revert)?;
delta_box.apply(core, &tx).await?;
let mut m = crdt_event::ActiveModel::from(self.clone());
m.revert = Set(None);
m.update(&tx).await?;
tx.commit().await?;
Ok(())
}
}