use sea_orm::{
ActiveModelTrait as _,
ActiveValue::Set,
ColumnTrait, DatabaseTransaction,
EntityTrait,
QueryFilter,
QueryOrder,
TransactionTrait,
};
use serde::{Deserialize, Serialize};
use crate::{
ReconcileError, SignerMeta,
crdt::errors::CrdtError,
delta::{chat_do::ChatDO, message_do::MessageDO, user_do::UserDO},
entity::crdt_event,
};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum CrdtDelta<T: CrdtType> {
Put(T),
Del(T::PrimaryKey),
Skip,
}
impl<T> CrdtDelta<T>
where
T: CrdtType,
{
async fn apply(
&self,
core: &SignerMeta,
tx: &DatabaseTransaction,
) -> Result<CrdtDelta<T>, CrdtError> {
match self {
CrdtDelta::Put(value) => value.put(core, tx).await,
CrdtDelta::Del(key) => T::del(core, &key, tx).await,
CrdtDelta::Skip => Ok(CrdtDelta::Skip),
}
}
}
pub trait CrdtType
where
Self: Sized,
{
type PrimaryKey;
fn put(
&self,
core: &SignerMeta,
tx: &DatabaseTransaction,
) -> impl std::future::Future<Output = Result<CrdtDelta<Self>, CrdtError>> + Send;
fn del(
core: &SignerMeta,
key: &Self::PrimaryKey,
tx: &DatabaseTransaction,
) -> impl std::future::Future<Output = Result<CrdtDelta<Self>, CrdtError>> + Send;
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum CrdtDeltaBox {
User(CrdtDelta<UserDO>),
Chat(CrdtDelta<ChatDO>),
Message(CrdtDelta<MessageDO>),
}
impl CrdtDeltaBox {
async fn apply(&self, core: &SignerMeta, tx: &DatabaseTransaction) -> Result<Self, CrdtError> {
let out = match self {
CrdtDeltaBox::User(crdt_delta) => CrdtDeltaBox::User(crdt_delta.apply(core, tx).await?),
CrdtDeltaBox::Chat(crdt_delta) => CrdtDeltaBox::Chat(crdt_delta.apply(core, tx).await?),
CrdtDeltaBox::Message(crdt_delta) => {
CrdtDeltaBox::Message(crdt_delta.apply(core, tx).await?)
}
};
Ok(out)
}
pub async fn insert(&self, core: &SignerMeta) -> Result<crdt_event::Model, CrdtError> {
let r = crdt_event::Entity::find()
.order_by_desc(crdt_event::Column::Clock)
.one(&core.conn)
.await?;
let clock = match r {
Some(r) => r.clock + 1,
None => 1,
};
let last_applied_event = crdt_event::Entity::find()
.filter(crdt_event::Column::Peer.eq(&core.keys.pub_key))
.filter(crdt_event::Column::Revert.is_not_null())
.order_by_desc(crdt_event::Column::Clock)
.one(&core.conn)
.await?;
let deps: Option<serde_json::Value> = match last_applied_event {
Some(event) => {
Some(serde_json::json!([event.clock]))
}
None => {
Some(serde_json::json!([]))
}
};
let payload = serde_json::to_string(&self)?;
let m = crdt_event::ActiveModel {
clock: Set(clock),
peer: Set(core.keys.pub_key.clone()),
payload: Set(payload),
revert: Set(None),
deps: Set(deps), };
let r = m.insert(&core.conn).await?;
Ok(r)
}
}
impl crdt_event::Model {
pub async fn apply(&self, core: &SignerMeta) -> Result<(), crate::crdt::errors::CrdtError> {
let tx = core.conn.begin().await.map_err(|e| {
crate::crdt::errors::CrdtError::TransactionStartFailed(format!(
"start transaction failed: {}",
e
))
})?;
let delta_box: CrdtDeltaBox = serde_json::from_str(&self.payload)?;
let out_box = delta_box.apply(core, &tx).await?;
let revert = serde_json::to_string(&out_box)?;
let mut m = crdt_event::ActiveModel::from(self.clone());
m.revert = Set(Some(revert));
m.update(&tx).await?;
tx.commit().await?;
Ok(())
}
pub async fn revert(&self, core: &SignerMeta) -> Result<(), crate::crdt::errors::CrdtError> {
let tx = core.conn.begin().await?;
let revert = match &self.revert {
None => {
return Err(crate::crdt::errors::CrdtError::InvalidParams(
"revert field is None".to_string(),
));
}
Some(val) => val,
};
let delta_box: CrdtDeltaBox = serde_json::from_str(revert)?;
delta_box.apply(core, &tx).await?;
let mut m = crdt_event::ActiveModel::from(self.clone());
m.revert = Set(None);
m.update(&tx).await?;
tx.commit().await?;
Ok(())
}
}
pub async fn reconcile(meta: &SignerMeta) -> Result<(), ReconcileError> {
use crate::entity::crdt_event;
use sea_orm::prelude::*; use std::collections::HashSet;
let unapplied_events: Vec<crdt_event::Model> = crdt_event::Entity::find()
.filter(crdt_event::Column::Revert.is_null())
.all(&meta.conn)
.await?;
let applied_events: Vec<crdt_event::Model> = crdt_event::Entity::find()
.filter(crdt_event::Column::Revert.is_not_null())
.all(&meta.conn)
.await?;
let mut applied_clocks: HashSet<i32> = applied_events.iter().map(|e| e.clock).collect();
let mut unapplied_events_to_process = unapplied_events;
loop {
let mut applied_in_this_round = false;
let mut remaining_events = Vec::new();
for event in unapplied_events_to_process {
let deps_clocks: Vec<i32> = match &event.deps {
Some(deps_val) => {
serde_json::from_value(deps_val.clone()).unwrap_or_else(|_| vec![])
}
None => vec![],
};
let all_deps_satisfied = deps_clocks.iter().all(|&dep_clock| {
if dep_clock <= 0 {
true
} else {
applied_clocks.contains(&dep_clock)
}
});
if all_deps_satisfied {
event.apply(meta).await?;
applied_clocks.insert(event.clock);
applied_in_this_round = true;
} else {
remaining_events.push(event);
}
}
unapplied_events_to_process = remaining_events;
if !applied_in_this_round {
break;
}
}
Ok(())
}
#[cfg(test)]
mod tests {
#[test]
fn test_reconcile_compiles() {
}
}