use std::collections::HashMap;
use anyhow::Context as _;
use sea_orm::{EntityTrait as _, QuerySelect as _, prelude::*};
use crate::{
SignerDaemonCore,
entity::crdt_event,
model::{crdt::crdt::CrdtDeltaBox, viewobject::CrdtEventVO},
};
pub struct CrdtEventController {
core: SignerDaemonCore,
}
impl CrdtEventController {
pub fn new(core: SignerDaemonCore) -> Self {
Self { core }
}
pub(crate) async fn insert_delta(
&mut self,
delta: CrdtDeltaBox,
) -> anyhow::Result<()> {
delta
.insert(&self.core)
.await
.context("insert delta failed")?;
Ok(())
}
pub async fn insert_events(
&mut self,
events: Vec<CrdtEventVO>,
) -> anyhow::Result<()> {
let m = events
.into_iter()
.map(|i| crdt_event::ActiveModel::from(crdt_event::Model::from(i.into())))
.collect::<Vec<crdt_event::ActiveModel>>();
crdt_event::Entity::insert_many(m)
.exec(&self.core.db)
.await?;
Ok(())
}
pub async fn get_event_frontiers(
&self,
) -> anyhow::Result<HashMap<String, i32>> {
let ce_vec: Vec<(i32, String)> = crdt_event::Entity::find()
.select_only()
.column_as(crdt_event::Column::Clock.max(), "clock")
.column(crdt_event::Column::Peer)
.group_by(crdt_event::Column::Peer)
.into_tuple()
.all(&self.core.db)
.await?;
let mut frontiers = HashMap::new();
for ce in ce_vec {
frontiers.insert(ce.1, ce.0);
}
Ok(frontiers)
}
pub async fn get_event_from_frontiers(
&self,
frontiers: HashMap<String, i32>,
) -> anyhow::Result<Vec<CrdtEventVO>> {
let query = crdt_event::Entity::find();
let mut filter = crdt_event::Column::Peer.is_null();
for (key, value) in frontiers.iter() {
filter = filter.or(
crdt_event::Column::Clock
.gt(*value)
.and(crdt_event::Column::Peer.eq(key)),
);
}
filter = filter.or(crdt_event::Column::Peer.is_not_in(frontiers.keys()));
let events = query.filter(filter).all(&self.core.db).await?;
Ok(events.into_iter().map(|i| i.into()).collect())
}
}