use anyhow::Context as _;
use sea_orm::{prelude::*, QueryOrder as _};
use crate::{entity::crdt_event, SignerDaemonCore};
pub struct CrdtAdapterController {
core: SignerDaemonCore,
}
impl CrdtAdapterController {
pub fn new(core: SignerDaemonCore) -> Self {
Self { core }
}
pub async fn apply_all(&mut self) -> anyhow::Result<()> {
let anchor = crdt_event::Entity::find()
.filter(crdt_event::Column::Revert.is_null())
.order_by_asc(crdt_event::Column::Clock)
.order_by_asc(crdt_event::Column::Peer)
.one(&self.core.db)
.await
.context("find anchor failed")?;
if let Some(anchor) = &anchor {
let revert_vec = crdt_event::Entity::find()
.filter(
crdt_event::Column::Revert.is_not_null().and(
crdt_event::Column::Clock.gt(anchor.clock).or(
crdt_event::Column::Clock
.eq(anchor.clock)
.and(crdt_event::Column::Peer.gt(&anchor.peer)),
),
),
)
.order_by_desc(crdt_event::Column::Clock)
.order_by_desc(crdt_event::Column::Peer)
.all(&self.core.db)
.await
.context("find revert events failed")?;
for revert in revert_vec {
revert
.revert(&self.core)
.await
.context(format!("apply event {:?} failed", revert))?;
}
}
let apply_vec = match anchor {
Some(anchor) => crdt_event::Entity::find()
.filter(
crdt_event::Column::Clock.gt(anchor.clock).or(
crdt_event::Column::Clock
.eq(anchor.clock)
.and(crdt_event::Column::Peer.gte(anchor.peer)),
),
)
.order_by_asc(crdt_event::Column::Clock)
.order_by_asc(crdt_event::Column::Peer)
.all(&self.core.db)
.await
.context("find apply events failed")?,
None => crdt_event::Entity::find()
.order_by_asc(crdt_event::Column::Clock)
.order_by_asc(crdt_event::Column::Peer)
.all(&self.core.db)
.await
.context("find apply events failed")?,
};
for apply in apply_vec {
apply
.apply(&self.core)
.await
.context(format!("apply event {:?} failed", apply))?;
}
Ok(())
}
}