signer-daemon 0.2.5

Signer daemon package.
Documentation
use std::sync::Arc;

use crate::{model::crdt::crdt::CrdtDeltaBox, swallow_error::SwallowError};

use super::SignerDaemon;

#[derive(Debug, Clone)]
#[allow(dead_code)]
pub(crate) enum SignerDaemonEvent {
  InsertDelta(CrdtDeltaBox),
  SyncEvent(String),
  SyncEventAll,
  SyncUser,
  ApplyEvent,
  UpdateConnections,
  ForceReapplyAllEvent,
}

impl SignerDaemon {
  pub(crate) fn emit(&self, event: SignerDaemonEvent) -> anyhow::Result<()> {
    self.event_sender.send(event)?;
    Ok(())
  }

  pub async fn wait_apply(&self) -> anyhow::Result<()> {
    let mut rx = self.event_receiver.subscribe();
    loop {
      let ev = tokio::select! {
        ev = rx.recv() => ev,
        _ = tokio::time::sleep(std::time::Duration::from_secs(3)) => {
          tracing::warn!("wait apply over 3 seconds, break");
          break;
        }
      };

      match ev {
        Ok(ev) => match ev {
          SignerDaemonEvent::ApplyEvent => break,
          _ => continue,
        },
        Err(_) => break,
      }
    }

    Ok(())
  }

  pub(crate) fn start_watch_event(&self) {
    let daemon = Arc::new(self.clone());
    let mut rx = self.event_sender.subscribe();

    tokio::spawn(async move {
      loop {
        let ev = match rx.recv().await {
          Ok(ev) => ev,
          Err(_) => break,
        };

        match &ev {
          SignerDaemonEvent::InsertDelta(delta) => {
            delta
              .insert(&daemon)
              .await
              .swallow_error("insert delta failed");
            daemon
              .emit(SignerDaemonEvent::ApplyEvent)
              .expect("emit apply event failed");
            daemon
              .emit(SignerDaemonEvent::SyncEventAll)
              .expect("emit sync event all failed");
          }
          SignerDaemonEvent::SyncEvent(addr) => {
            daemon
              .crdt_pull(&addr)
              .await
              .swallow_error("crdt pull failed");
            daemon
              .crdt_push(&addr)
              .await
              .swallow_error("crdt push failed");
            daemon
              .emit(SignerDaemonEvent::ApplyEvent)
              .expect("emit apply event failed");
          }
          SignerDaemonEvent::SyncEventAll => {
            let conns = {
              daemon
                .connections
                .lock()
                .await
                .values()
                .map(|i| i.clone())
                .collect::<Vec<_>>()
            };
            for conn in conns {
              conn
                .sync_crdt_event()
                .await
                .swallow_error("sync event all failed");
            }
          }
          SignerDaemonEvent::SyncUser => {
            daemon
              .crdt_sync_user_public()
              .await
              .swallow_error("crdt sync user public failed");
          }
          SignerDaemonEvent::ApplyEvent => {
            daemon
              .crdt_apply_all()
              .await
              .swallow_error("crdt apply all failed");
            daemon
              .emit(SignerDaemonEvent::SyncUser)
              .expect("emit sync user failed");
          }
          SignerDaemonEvent::UpdateConnections => {
            daemon
              .update_connections()
              .await
              .swallow_error("update connections failed");
          }
          SignerDaemonEvent::ForceReapplyAllEvent => {
            daemon
              .crdt_apply_all()
              .await
              .swallow_error("crdt apply all failed");
          }
        };

        let _ = daemon.event_receiver.send(ev);
      }
    });
  }
}