signer-daemon 0.3.2

Signer daemon package.
Documentation
use std::collections::HashMap;

use sea_orm::{EntityTrait as _, QuerySelect as _, prelude::*};

use crate::{
    SignerDaemonCore,
    entity::crdt_event,
    model::{crdt::crdt::CrdtDeltaBox, viewobject::CrdtEventVO},
};

pub struct CrdtEventController {
    core: SignerDaemonCore,
}

impl CrdtEventController {
    pub fn new(core: SignerDaemonCore) -> Self {
        Self { core }
    }

    pub(crate) async fn insert_delta(&mut self, delta: CrdtDeltaBox) -> crate::DaemonResult<()> {
        delta.insert(&self.core).await.map_err(|e| {
            crate::DaemonError::Signer(crate::SignerError::Msg(format!(
                "insert delta failed: {}",
                e
            )))
        })?;
        Ok(())
    }

    pub async fn insert_events(&mut self, events: Vec<CrdtEventVO>) -> crate::DaemonResult<()> {
        let m = events
            .into_iter()
            .map(|i| crdt_event::ActiveModel::from(crdt_event::Model::from(i.into())))
            .collect::<Vec<crdt_event::ActiveModel>>();

        crdt_event::Entity::insert_many(m)
            .exec(&self.core.db)
            .await?;

        Ok(())
    }

    pub async fn get_event_frontiers(&self) -> crate::DaemonResult<HashMap<String, i32>> {
        let ce_vec: Vec<(i32, String)> = crdt_event::Entity::find()
            .select_only()
            .column_as(crdt_event::Column::Clock.max(), "clock")
            .column(crdt_event::Column::Peer)
            .group_by(crdt_event::Column::Peer)
            .into_tuple()
            .all(&self.core.db)
            .await?;

        let mut frontiers = HashMap::new();
        for ce in ce_vec {
            frontiers.insert(ce.1, ce.0);
        }

        Ok(frontiers)
    }

    pub async fn get_event_from_frontiers(
        &self,
        frontiers: HashMap<String, i32>,
    ) -> crate::DaemonResult<Vec<CrdtEventVO>> {
        let query = crdt_event::Entity::find();

        let mut filter = crdt_event::Column::Peer.is_null();
        for (key, value) in frontiers.iter() {
            filter = filter.or(crdt_event::Column::Clock
                .gt(*value)
                .and(crdt_event::Column::Peer.eq(key)));
        }
        filter = filter.or(crdt_event::Column::Peer.is_not_in(frontiers.keys()));

        let events = query.filter(filter).all(&self.core.db).await?;

        Ok(events.into_iter().map(|i| i.into()).collect())
    }
}