signer-daemon 0.2.1

Signer daemon package.
Documentation
use std::sync::Arc;

use crate::{model::crdt::crdt::CrdtDeltaBox, swallow_error::SwallowError};

use super::SignerDaemon;

#[derive(Debug, Clone)]
#[allow(dead_code)]
pub(crate) enum SignerDaemonEvent {
  InsertDelta(CrdtDeltaBox),
  SyncEvent(String),
  SyncEventAll,
  SyncUser,
  ApplyEvent,
  UpdateConnections,
  ForceReapplyAllEvent,
}

impl SignerDaemon {
  pub(crate) fn emit(&self, event: SignerDaemonEvent) -> anyhow::Result<()> {
    self.event_sender.send(event)?;
    Ok(())
  }

  pub async fn wait_apply(&self) -> anyhow::Result<()> {
    let mut rx = self.event_receiver.subscribe();
    loop {
      match rx.recv().await? {
        SignerDaemonEvent::ApplyEvent => break,
        _ => continue,
      }
    }

    Ok(())
  }

  pub(crate) fn start_watch_event(&self) {
    let daemon = Arc::new(self.clone());
    let mut rx = self.event_sender.subscribe();

    tokio::spawn(async move {
      loop {
        let ev = match rx.recv().await {
          Ok(ev) => ev,
          Err(_) => break,
        };

        match &ev {
          SignerDaemonEvent::InsertDelta(delta) => {
            delta
              .insert(&daemon)
              .await
              .swallow_error("insert delta failed");
            daemon
              .emit(SignerDaemonEvent::ApplyEvent)
              .expect("emit apply event failed");
            daemon
              .emit(SignerDaemonEvent::SyncEventAll)
              .expect("emit sync event all failed");
          }
          SignerDaemonEvent::SyncEvent(addr) => {
            daemon
              .crdt_pull(&addr)
              .await
              .swallow_error("crdt pull failed");
            daemon
              .crdt_push(&addr)
              .await
              .swallow_error("crdt push failed");
            daemon
              .emit(SignerDaemonEvent::ApplyEvent)
              .expect("emit apply event failed");
          }
          SignerDaemonEvent::SyncEventAll => {
            for (addr, conn) in daemon.connections.lock().await.iter() {
              conn.sync_crdt_event().await.swallow_error(&format!(
                "sync event all failed: sync crdt event from {} failed",
                addr
              ));
            }
          }
          SignerDaemonEvent::SyncUser => {
            daemon
              .crdt_sync_user_public()
              .await
              .swallow_error("crdt sync user public failed");
          }
          SignerDaemonEvent::ApplyEvent => {
            daemon
              .crdt_apply_all()
              .await
              .swallow_error("crdt apply all failed");
            daemon
              .emit(SignerDaemonEvent::SyncUser)
              .expect("emit sync user failed");
          }
          SignerDaemonEvent::UpdateConnections => {
            daemon
              .update_connections()
              .await
              .swallow_error("update connections failed");
          }
          SignerDaemonEvent::ForceReapplyAllEvent => {
            daemon
              .crdt_apply_all()
              .await
              .swallow_error("crdt apply all failed");
          }
        };

        let _ = daemon.event_receiver.send(ev);
      }
    });
  }
}