use std::sync::Arc;
use crate::{model::crdt::crdt::CrdtDeltaBox, swallow_error::SwallowError};
use super::SignerDaemon;
#[derive(Debug, Clone)]
#[allow(dead_code)]
pub(crate) enum SignerDaemonEvent {
InsertDelta(CrdtDeltaBox),
SyncEvent(String),
SyncEventAll,
SyncUser,
ApplyEvent,
UpdateConnections,
ForceReapplyAllEvent,
}
impl SignerDaemon {
pub(crate) fn emit(&self, event: SignerDaemonEvent) -> anyhow::Result<()> {
self.event_sender.send(event)?;
Ok(())
}
pub async fn wait_apply(&self) -> anyhow::Result<()> {
let mut rx = self.event_receiver.subscribe();
loop {
let ev = tokio::select! {
ev = rx.recv() => ev,
_ = tokio::time::sleep(std::time::Duration::from_secs(3)) => {
tracing::warn!("wait apply over 3 seconds, break");
break;
}
};
match ev {
Ok(ev) => match ev {
SignerDaemonEvent::ApplyEvent => break,
_ => continue,
},
Err(_) => break,
}
}
Ok(())
}
pub(crate) fn start_watch_event(&self) {
let daemon = Arc::new(self.clone());
let mut rx = self.event_sender.subscribe();
tokio::spawn(async move {
loop {
let ev = match rx.recv().await {
Ok(ev) => ev,
Err(_) => break,
};
match &ev {
SignerDaemonEvent::InsertDelta(delta) => {
delta
.insert(&daemon)
.await
.swallow_error("insert delta failed");
daemon
.emit(SignerDaemonEvent::ApplyEvent)
.expect("emit apply event failed");
daemon
.emit(SignerDaemonEvent::SyncEventAll)
.expect("emit sync event all failed");
}
SignerDaemonEvent::SyncEvent(addr) => {
daemon
.crdt_pull(&addr)
.await
.swallow_error("crdt pull failed");
daemon
.crdt_push(&addr)
.await
.swallow_error("crdt push failed");
daemon
.emit(SignerDaemonEvent::ApplyEvent)
.expect("emit apply event failed");
}
SignerDaemonEvent::SyncEventAll => {
let conns = {
daemon
.connections
.lock()
.await
.values()
.map(|i| i.clone())
.collect::<Vec<_>>()
};
for conn in conns {
conn
.sync_crdt_event()
.await
.swallow_error("sync event all failed");
}
}
SignerDaemonEvent::SyncUser => {
daemon
.crdt_sync_user_public()
.await
.swallow_error("crdt sync user public failed");
}
SignerDaemonEvent::ApplyEvent => {
daemon
.crdt_apply_all()
.await
.swallow_error("crdt apply all failed");
daemon
.emit(SignerDaemonEvent::SyncUser)
.expect("emit sync user failed");
}
SignerDaemonEvent::UpdateConnections => {
daemon
.update_connections()
.await
.swallow_error("update connections failed");
}
SignerDaemonEvent::ForceReapplyAllEvent => {
daemon
.crdt_apply_all()
.await
.swallow_error("crdt apply all failed");
}
};
let _ = daemon.event_receiver.send(ev);
}
});
}
}