use std::collections::HashMap;
use sea_orm::{EntityTrait as _, QuerySelect as _, prelude::*};
use crate::{
SignerDaemonCore,
entity::crdt_event,
model::{crdt::crdt::CrdtDeltaBox, viewobject::CrdtEventVO},
};
pub struct CrdtEventController {
core: SignerDaemonCore,
}
impl CrdtEventController {
pub fn new(core: SignerDaemonCore) -> Self {
Self { core }
}
pub(crate) async fn insert_delta(&mut self, delta: CrdtDeltaBox) -> crate::DaemonResult<()> {
delta.insert(&self.core).await.map_err(|e| {
crate::DaemonError::Signer(crate::SignerError::Msg(format!(
"insert delta failed: {}",
e
)))
})?;
Ok(())
}
pub async fn insert_events(&mut self, events: Vec<CrdtEventVO>) -> crate::DaemonResult<()> {
let m = events
.into_iter()
.map(|i| crdt_event::ActiveModel::from(crdt_event::Model::from(i.into())))
.collect::<Vec<crdt_event::ActiveModel>>();
crdt_event::Entity::insert_many(m)
.exec(&self.core.db)
.await?;
Ok(())
}
pub async fn get_event_frontiers(&self) -> crate::DaemonResult<HashMap<String, i32>> {
let ce_vec: Vec<(i32, String)> = crdt_event::Entity::find()
.select_only()
.column_as(crdt_event::Column::Clock.max(), "clock")
.column(crdt_event::Column::Peer)
.group_by(crdt_event::Column::Peer)
.into_tuple()
.all(&self.core.db)
.await?;
let mut frontiers = HashMap::new();
for ce in ce_vec {
frontiers.insert(ce.1, ce.0);
}
Ok(frontiers)
}
pub async fn get_event_from_frontiers(
&self,
frontiers: HashMap<String, i32>,
) -> crate::DaemonResult<Vec<CrdtEventVO>> {
let query = crdt_event::Entity::find();
let mut filter = crdt_event::Column::Peer.is_null();
for (key, value) in frontiers.iter() {
filter = filter.or(crdt_event::Column::Clock
.gt(*value)
.and(crdt_event::Column::Peer.eq(key)));
}
filter = filter.or(crdt_event::Column::Peer.is_not_in(frontiers.keys()));
let events = query.filter(filter).all(&self.core.db).await?;
Ok(events.into_iter().map(|i| i.into()).collect())
}
}