1#![allow(missing_docs)]
2
3use std::{
4 collections::{HashMap, HashSet},
5 sync::Arc,
6 time::SystemTime,
7};
8
9use anyhow::{Context, Result};
10use futures_lite::FutureExt;
11use iroh::{Endpoint, NodeAddr, NodeId, PublicKey};
12use iroh_blobs::{
13 downloader::{DownloadError, DownloadRequest, Downloader},
14 get::Stats,
15 store::EntryStatus,
16 Hash, HashAndFormat,
17};
18use iroh_gossip::net::Gossip;
19use serde::{Deserialize, Serialize};
20use tokio::{
21 sync::{self, mpsc, oneshot},
22 task::JoinSet,
23};
24use tracing::{debug, error, info, instrument, trace, warn, Instrument, Span};
25
26use super::state::{NamespaceStates, Origin, SyncReason};
28use crate::{
29 actor::{OpenOpts, SyncHandle},
30 engine::gossip::GossipState,
31 metrics::Metrics,
32 net::{
33 connect_and_sync, handle_connection, AbortReason, AcceptError, AcceptOutcome, ConnectError,
34 SyncFinished,
35 },
36 AuthorHeads, ContentStatus, NamespaceId, SignedEntry,
37};
38
39const SOURCE_NAME: &str = "docs_engine";
41
42#[derive(Debug, Clone, Serialize, Deserialize, strum::Display)]
46pub enum Op {
47 Put(SignedEntry),
49 ContentReady(Hash),
51 SyncReport(SyncReport),
53}
54
55#[derive(Debug, Clone, Serialize, Deserialize)]
57pub struct SyncReport {
58 namespace: NamespaceId,
59 heads: Vec<u8>,
61}
62
63#[derive(derive_more::Debug, strum::Display)]
65pub enum ToLiveActor {
66 StartSync {
67 namespace: NamespaceId,
68 peers: Vec<NodeAddr>,
69 #[debug("onsehot::Sender")]
70 reply: sync::oneshot::Sender<anyhow::Result<()>>,
71 },
72 Leave {
73 namespace: NamespaceId,
74 kill_subscribers: bool,
75 #[debug("onsehot::Sender")]
76 reply: sync::oneshot::Sender<anyhow::Result<()>>,
77 },
78 Shutdown {
79 reply: sync::oneshot::Sender<()>,
80 },
81 Subscribe {
82 namespace: NamespaceId,
83 #[debug("sender")]
84 sender: async_channel::Sender<Event>,
85 #[debug("oneshot::Sender")]
86 reply: sync::oneshot::Sender<Result<()>>,
87 },
88 HandleConnection {
89 conn: iroh::endpoint::Connection,
90 },
91 AcceptSyncRequest {
92 namespace: NamespaceId,
93 peer: PublicKey,
94 #[debug("oneshot::Sender")]
95 reply: sync::oneshot::Sender<AcceptOutcome>,
96 },
97
98 IncomingSyncReport {
99 from: PublicKey,
100 report: SyncReport,
101 },
102 NeighborContentReady {
103 namespace: NamespaceId,
104 node: PublicKey,
105 hash: Hash,
106 },
107 NeighborUp {
108 namespace: NamespaceId,
109 peer: PublicKey,
110 },
111 NeighborDown {
112 namespace: NamespaceId,
113 peer: PublicKey,
114 },
115}
116
117#[derive(Serialize, Deserialize, Debug, Clone, Eq, PartialEq, strum::Display)]
119pub enum Event {
120 ContentReady {
122 hash: Hash,
124 },
125 NeighborUp(PublicKey),
127 NeighborDown(PublicKey),
129 SyncFinished(SyncEvent),
131 PendingContentReady,
139}
140
141type SyncConnectRes = (
142 NamespaceId,
143 PublicKey,
144 SyncReason,
145 Result<SyncFinished, ConnectError>,
146);
147type SyncAcceptRes = Result<SyncFinished, AcceptError>;
148type DownloadRes = (NamespaceId, Hash, Result<Stats, DownloadError>);
149
150pub struct LiveActor<B: iroh_blobs::store::Store> {
152 inbox: mpsc::Receiver<ToLiveActor>,
154 sync: SyncHandle,
155 endpoint: Endpoint,
156 bao_store: B,
157 downloader: Downloader,
158 replica_events_tx: async_channel::Sender<crate::Event>,
159 replica_events_rx: async_channel::Receiver<crate::Event>,
160
161 sync_actor_tx: mpsc::Sender<ToLiveActor>,
165 gossip: GossipState,
166
167 running_sync_connect: JoinSet<SyncConnectRes>,
169 running_sync_accept: JoinSet<SyncAcceptRes>,
171 download_tasks: JoinSet<DownloadRes>,
173 missing_hashes: HashSet<Hash>,
175 queued_hashes: QueuedHashes,
177
178 subscribers: SubscribersMap,
180
181 state: NamespaceStates,
183 metrics: Arc<Metrics>,
184}
185impl<B: iroh_blobs::store::Store> LiveActor<B> {
186 #[allow(clippy::too_many_arguments)]
188 pub fn new(
189 sync: SyncHandle,
190 endpoint: Endpoint,
191 gossip: Gossip,
192 bao_store: B,
193 downloader: Downloader,
194 inbox: mpsc::Receiver<ToLiveActor>,
195 sync_actor_tx: mpsc::Sender<ToLiveActor>,
196 metrics: Arc<Metrics>,
197 ) -> Self {
198 let (replica_events_tx, replica_events_rx) = async_channel::bounded(1024);
199 let gossip_state = GossipState::new(gossip, sync.clone(), sync_actor_tx.clone());
200 Self {
201 inbox,
202 sync,
203 replica_events_rx,
204 replica_events_tx,
205 endpoint,
206 gossip: gossip_state,
207 bao_store,
208 downloader,
209 sync_actor_tx,
210 running_sync_connect: Default::default(),
211 running_sync_accept: Default::default(),
212 subscribers: Default::default(),
213 download_tasks: Default::default(),
214 state: Default::default(),
215 missing_hashes: Default::default(),
216 queued_hashes: Default::default(),
217 metrics,
218 }
219 }
220
221 pub async fn run(mut self) -> Result<()> {
223 let shutdown_reply = self.run_inner().await;
224 if let Err(err) = self.shutdown().await {
225 error!(?err, "Error during shutdown");
226 }
227 drop(self);
228 match shutdown_reply {
229 Ok(reply) => {
230 reply.send(()).ok();
231 Ok(())
232 }
233 Err(err) => Err(err),
234 }
235 }
236
237 async fn run_inner(&mut self) -> Result<oneshot::Sender<()>> {
238 let mut i = 0;
239 loop {
240 i += 1;
241 trace!(?i, "tick wait");
242 self.metrics.doc_live_tick_main.inc();
243 tokio::select! {
244 biased;
245 msg = self.inbox.recv() => {
246 let msg = msg.context("to_actor closed")?;
247 trace!(?i, %msg, "tick: to_actor");
248 self.metrics.doc_live_tick_actor.inc();
249 match msg {
250 ToLiveActor::Shutdown { reply } => {
251 break Ok(reply);
252 }
253 msg => {
254 self.on_actor_message(msg).await.context("on_actor_message")?;
255 }
256 }
257 }
258 event = self.replica_events_rx.recv() => {
259 trace!(?i, "tick: replica_event");
260 self.metrics.doc_live_tick_replica_event.inc();
261 let event = event.context("replica_events closed")?;
262 if let Err(err) = self.on_replica_event(event).await {
263 error!(?err, "Failed to process replica event");
264 }
265 }
266 Some(res) = self.running_sync_connect.join_next(), if !self.running_sync_connect.is_empty() => {
267 trace!(?i, "tick: running_sync_connect");
268 self.metrics.doc_live_tick_running_sync_connect.inc();
269 let (namespace, peer, reason, res) = res.context("running_sync_connect closed")?;
270 self.on_sync_via_connect_finished(namespace, peer, reason, res).await;
271
272 }
273 Some(res) = self.running_sync_accept.join_next(), if !self.running_sync_accept.is_empty() => {
274 trace!(?i, "tick: running_sync_accept");
275 self.metrics.doc_live_tick_running_sync_accept.inc();
276 let res = res.context("running_sync_accept closed")?;
277 self.on_sync_via_accept_finished(res).await;
278 }
279 Some(res) = self.download_tasks.join_next(), if !self.download_tasks.is_empty() => {
280 trace!(?i, "tick: pending_downloads");
281 self.metrics.doc_live_tick_pending_downloads.inc();
282 let (namespace, hash, res) = res.context("pending_downloads closed")?;
283 self.on_download_ready(namespace, hash, res).await;
284 }
285 res = self.gossip.progress(), if !self.gossip.is_empty() => {
286 if let Err(error) = res {
287 warn!(?error, "gossip state failed");
288 }
289 }
290 }
291 }
292 }
293
294 async fn on_actor_message(&mut self, msg: ToLiveActor) -> anyhow::Result<bool> {
295 match msg {
296 ToLiveActor::Shutdown { .. } => {
297 unreachable!("handled in run");
298 }
299 ToLiveActor::IncomingSyncReport { from, report } => {
300 self.on_sync_report(from, report).await
301 }
302 ToLiveActor::NeighborUp { namespace, peer } => {
303 debug!(peer = %peer.fmt_short(), namespace = %namespace.fmt_short(), "neighbor up");
304 self.sync_with_peer(namespace, peer, SyncReason::NewNeighbor);
305 self.subscribers
306 .send(&namespace, Event::NeighborUp(peer))
307 .await;
308 }
309 ToLiveActor::NeighborDown { namespace, peer } => {
310 debug!(peer = %peer.fmt_short(), namespace = %namespace.fmt_short(), "neighbor down");
311 self.subscribers
312 .send(&namespace, Event::NeighborDown(peer))
313 .await;
314 }
315 ToLiveActor::StartSync {
316 namespace,
317 peers,
318 reply,
319 } => {
320 let res = self.start_sync(namespace, peers).await;
321 reply.send(res).ok();
322 }
323 ToLiveActor::Leave {
324 namespace,
325 kill_subscribers,
326 reply,
327 } => {
328 let res = self.leave(namespace, kill_subscribers).await;
329 reply.send(res).ok();
330 }
331 ToLiveActor::Subscribe {
332 namespace,
333 sender,
334 reply,
335 } => {
336 self.subscribers.subscribe(namespace, sender);
337 reply.send(Ok(())).ok();
338 }
339 ToLiveActor::HandleConnection { conn } => {
340 self.handle_connection(conn).await;
341 }
342 ToLiveActor::AcceptSyncRequest {
343 namespace,
344 peer,
345 reply,
346 } => {
347 let outcome = self.accept_sync_request(namespace, peer);
348 reply.send(outcome).ok();
349 }
350 ToLiveActor::NeighborContentReady {
351 namespace,
352 node,
353 hash,
354 } => {
355 self.on_neighbor_content_ready(namespace, node, hash).await;
356 }
357 };
358 Ok(true)
359 }
360
361 #[instrument("connect", skip_all, fields(peer = %peer.fmt_short(), namespace = %namespace.fmt_short()))]
362 fn sync_with_peer(&mut self, namespace: NamespaceId, peer: PublicKey, reason: SyncReason) {
363 if !self.state.start_connect(&namespace, peer, reason) {
364 return;
365 }
366 let endpoint = self.endpoint.clone();
367 let sync = self.sync.clone();
368 let metrics = self.metrics.clone();
369 let fut = async move {
370 let res = connect_and_sync(
371 &endpoint,
372 &sync,
373 namespace,
374 NodeAddr::new(peer),
375 Some(&metrics),
376 )
377 .await;
378 (namespace, peer, reason, res)
379 }
380 .instrument(Span::current());
381 self.running_sync_connect.spawn(fut);
382 }
383
384 async fn shutdown(&mut self) -> anyhow::Result<()> {
385 self.subscribers.clear();
387 let (gossip_shutdown_res, _store) = tokio::join!(
388 self.gossip.shutdown(),
390 self.sync.shutdown()
392 );
393 gossip_shutdown_res?;
394 Ok(())
397 }
398
399 async fn start_sync(&mut self, namespace: NamespaceId, mut peers: Vec<NodeAddr>) -> Result<()> {
400 debug!(?namespace, peers = peers.len(), "start sync");
401 if !self.state.is_syncing(&namespace) {
403 let opts = OpenOpts::default()
404 .sync()
405 .subscribe(self.replica_events_tx.clone());
406 self.sync.open(namespace, opts).await?;
407 self.state.insert(namespace);
408 }
409 match self.sync.get_sync_peers(namespace).await {
411 Ok(None) => {
412 }
414 Ok(Some(known_useful_peers)) => {
415 let as_node_addr = known_useful_peers.into_iter().filter_map(|peer_id_bytes| {
416 match PublicKey::from_bytes(&peer_id_bytes) {
419 Ok(public_key) => Some(NodeAddr::new(public_key)),
420 Err(_signing_error) => {
421 warn!("potential db corruption: peers per doc can't be decoded");
422 None
423 }
424 }
425 });
426 peers.extend(as_node_addr);
427 }
428 Err(e) => {
429 warn!(%e, "db error reading peers per document")
431 }
432 }
433 self.join_peers(namespace, peers).await?;
434 Ok(())
435 }
436
437 async fn leave(
438 &mut self,
439 namespace: NamespaceId,
440 kill_subscribers: bool,
441 ) -> anyhow::Result<()> {
442 if self.state.remove(&namespace) {
444 self.sync.set_sync(namespace, false).await?;
445 self.sync
446 .unsubscribe(namespace, self.replica_events_tx.clone())
447 .await?;
448 self.sync.close(namespace).await?;
449 self.gossip.quit(&namespace);
450 }
451 if kill_subscribers {
452 self.subscribers.remove(&namespace);
453 }
454 Ok(())
455 }
456
457 async fn join_peers(&mut self, namespace: NamespaceId, peers: Vec<NodeAddr>) -> Result<()> {
458 let mut peer_ids = Vec::new();
459
460 for peer in peers.into_iter() {
462 let peer_id = peer.node_id;
463 if peer.is_empty() {
466 peer_ids.push(peer_id)
467 } else {
468 match self.endpoint.add_node_addr_with_source(peer, SOURCE_NAME) {
469 Ok(()) => {
470 peer_ids.push(peer_id);
471 }
472 Err(err) => {
473 warn!(peer = %peer_id.fmt_short(), "failed to add known addrs: {err:?}");
474 }
475 }
476 }
477 }
478
479 self.gossip.join(namespace, peer_ids.clone()).await?;
481
482 if !peer_ids.is_empty() {
483 for peer in peer_ids {
485 self.sync_with_peer(namespace, peer, SyncReason::DirectJoin);
486 }
487 }
488 Ok(())
489 }
490
491 #[instrument("connect", skip_all, fields(peer = %peer.fmt_short(), namespace = %namespace.fmt_short()))]
492 async fn on_sync_via_connect_finished(
493 &mut self,
494 namespace: NamespaceId,
495 peer: PublicKey,
496 reason: SyncReason,
497 result: Result<SyncFinished, ConnectError>,
498 ) {
499 match result {
500 Err(ConnectError::RemoteAbort(AbortReason::AlreadySyncing)) => {
501 debug!(?reason, "remote abort, already syncing");
502 }
503 res => {
504 self.on_sync_finished(
505 namespace,
506 peer,
507 Origin::Connect(reason),
508 res.map_err(Into::into),
509 )
510 .await
511 }
512 }
513 }
514
515 #[instrument("accept", skip_all, fields(peer = %fmt_accept_peer(&res), namespace = %fmt_accept_namespace(&res)))]
516 async fn on_sync_via_accept_finished(&mut self, res: Result<SyncFinished, AcceptError>) {
517 match res {
518 Ok(state) => {
519 self.on_sync_finished(state.namespace, state.peer, Origin::Accept, Ok(state))
520 .await
521 }
522 Err(AcceptError::Abort { reason, .. }) if reason == AbortReason::AlreadySyncing => {
523 debug!(?reason, "aborted by us");
525 }
526 Err(err) => {
527 if let (Some(peer), Some(namespace)) = (err.peer(), err.namespace()) {
528 self.on_sync_finished(
529 namespace,
530 peer,
531 Origin::Accept,
532 Err(anyhow::Error::from(err)),
533 )
534 .await;
535 } else {
536 debug!(?err, "failed before reading the first message");
537 }
538 }
539 }
540 }
541
542 async fn on_sync_finished(
543 &mut self,
544 namespace: NamespaceId,
545 peer: PublicKey,
546 origin: Origin,
547 result: Result<SyncFinished>,
548 ) {
549 match &result {
550 Err(ref err) => {
551 warn!(?origin, ?err, "sync failed");
552 }
553 Ok(ref details) => {
554 info!(
555 sent = %details.outcome.num_sent,
556 recv = %details.outcome.num_recv,
557 t_connect = ?details.timings.connect,
558 t_process = ?details.timings.process,
559 "sync finished",
560 );
561
562 if let Err(e) = self
564 .sync
565 .register_useful_peer(namespace, *peer.as_bytes())
566 .await
567 {
568 debug!(%e, "failed to register peer for document")
569 }
570
571 if details.outcome.num_recv > 0 {
573 info!("broadcast sync report to neighbors");
574 match details
575 .outcome
576 .heads_received
577 .encode(Some(self.gossip.max_message_size()))
578 {
579 Err(err) => warn!(?err, "Failed to encode author heads for sync report"),
580 Ok(heads) => {
581 let report = SyncReport { namespace, heads };
582 self.broadcast_neighbors(namespace, &Op::SyncReport(report))
583 .await;
584 }
585 }
586 }
587 }
588 };
589
590 let result_for_event = match &result {
591 Ok(details) => Ok(details.into()),
592 Err(err) => Err(err.to_string()),
593 };
594
595 let Some((started, resync)) = self.state.finish(&namespace, peer, &origin, result) else {
596 return;
597 };
598
599 let ev = SyncEvent {
600 peer,
601 origin,
602 result: result_for_event,
603 finished: SystemTime::now(),
604 started,
605 };
606 self.subscribers
607 .send(&namespace, Event::SyncFinished(ev))
608 .await;
609
610 if self.queued_hashes.contains_namespace(&namespace) {
616 self.state.set_may_emit_ready(&namespace, true);
617 } else {
618 self.subscribers
619 .send(&namespace, Event::PendingContentReady)
620 .await;
621 self.state.set_may_emit_ready(&namespace, false);
622 }
623
624 if resync {
625 self.sync_with_peer(namespace, peer, SyncReason::Resync);
626 }
627 }
628
629 async fn broadcast_neighbors(&self, namespace: NamespaceId, op: &Op) {
630 if !self.state.is_syncing(&namespace) {
631 return;
632 }
633
634 let msg = match postcard::to_stdvec(op) {
635 Ok(msg) => msg,
636 Err(err) => {
637 error!(?err, ?op, "Failed to serialize message:");
638 return;
639 }
640 };
641 self.gossip
643 .broadcast_neighbors(&namespace, msg.into())
644 .await;
645 }
646
647 async fn on_download_ready(
648 &mut self,
649 namespace: NamespaceId,
650 hash: Hash,
651 res: Result<Stats, DownloadError>,
652 ) {
653 let completed_namespaces = self.queued_hashes.remove_hash(&hash);
654 debug!(namespace=%namespace.fmt_short(), success=res.is_ok(), completed_namespaces=completed_namespaces.len(), "download ready");
655 if res.is_ok() {
656 self.subscribers
657 .send(&namespace, Event::ContentReady { hash })
658 .await;
659 self.broadcast_neighbors(namespace, &Op::ContentReady(hash))
661 .await;
662 } else {
663 self.missing_hashes.insert(hash);
664 }
665 for namespace in completed_namespaces.iter() {
666 if let Some(true) = self.state.may_emit_ready(namespace) {
667 self.subscribers
668 .send(namespace, Event::PendingContentReady)
669 .await;
670 }
671 }
672 }
673
674 async fn on_neighbor_content_ready(
675 &mut self,
676 namespace: NamespaceId,
677 node: NodeId,
678 hash: Hash,
679 ) {
680 self.start_download(namespace, hash, node, true).await;
681 }
682
683 #[instrument("on_sync_report", skip_all, fields(peer = %from.fmt_short(), namespace = %report.namespace.fmt_short()))]
684 async fn on_sync_report(&mut self, from: PublicKey, report: SyncReport) {
685 let namespace = report.namespace;
686 if !self.state.is_syncing(&namespace) {
687 return;
688 }
689 let heads = match AuthorHeads::decode(&report.heads) {
690 Ok(heads) => heads,
691 Err(err) => {
692 warn!(?err, "failed to decode AuthorHeads");
693 return;
694 }
695 };
696 match self.sync.has_news_for_us(report.namespace, heads).await {
697 Ok(Some(updated_authors)) => {
698 info!(%updated_authors, "news reported: sync now");
699 self.sync_with_peer(report.namespace, from, SyncReason::SyncReport);
700 }
701 Ok(None) => {
702 debug!("no news reported: nothing to do");
703 }
704 Err(err) => {
705 warn!("sync actor error: {err:?}");
706 }
707 }
708 }
709
710 async fn on_replica_event(&mut self, event: crate::Event) -> Result<()> {
711 match event {
712 crate::Event::LocalInsert { namespace, entry } => {
713 debug!(namespace=%namespace.fmt_short(), "replica event: LocalInsert");
714 if self.state.is_syncing(&namespace) {
716 let op = Op::Put(entry.clone());
717 let message = postcard::to_stdvec(&op)?.into();
718 self.gossip.broadcast(&namespace, message).await;
719 }
720 }
721 crate::Event::RemoteInsert {
722 namespace,
723 entry,
724 from,
725 should_download,
726 remote_content_status,
727 } => {
728 debug!(namespace=%namespace.fmt_short(), "replica event: RemoteInsert");
729 if should_download {
732 let hash = entry.content_hash();
733 if matches!(remote_content_status, ContentStatus::Complete) {
734 let node_id = PublicKey::from_bytes(&from)?;
735 self.start_download(namespace, hash, node_id, false).await;
736 } else {
737 self.missing_hashes.insert(hash);
738 }
739 }
740 }
741 }
742
743 Ok(())
744 }
745
746 async fn start_download(
747 &mut self,
748 namespace: NamespaceId,
749 hash: Hash,
750 node: PublicKey,
751 only_if_missing: bool,
752 ) {
753 let entry_status = self.bao_store.entry_status(&hash).await;
754 if matches!(entry_status, Ok(EntryStatus::Complete)) {
755 self.missing_hashes.remove(&hash);
756 return;
757 }
758 if self.queued_hashes.contains_hash(&hash) {
759 self.queued_hashes.insert(hash, namespace);
760 self.downloader.nodes_have(hash, vec![node]).await;
761 } else if !only_if_missing || self.missing_hashes.contains(&hash) {
762 let req = DownloadRequest::new(HashAndFormat::raw(hash), vec![node]);
763 let handle = self.downloader.queue(req).await;
764
765 self.queued_hashes.insert(hash, namespace);
766 self.missing_hashes.remove(&hash);
767 self.download_tasks
768 .spawn(async move { (namespace, hash, handle.await) });
769 }
770 }
771
772 #[instrument("accept", skip_all)]
773 pub async fn handle_connection(&mut self, conn: iroh::endpoint::Connection) {
774 let to_actor_tx = self.sync_actor_tx.clone();
775 let accept_request_cb = move |namespace, peer| {
776 let to_actor_tx = to_actor_tx.clone();
777 async move {
778 let (reply_tx, reply_rx) = oneshot::channel();
779 to_actor_tx
780 .send(ToLiveActor::AcceptSyncRequest {
781 namespace,
782 peer,
783 reply: reply_tx,
784 })
785 .await
786 .ok();
787 match reply_rx.await {
788 Ok(outcome) => outcome,
789 Err(err) => {
790 warn!(
791 "accept request callback failed to retrieve reply from actor: {err:?}"
792 );
793 AcceptOutcome::Reject(AbortReason::InternalServerError)
794 }
795 }
796 }
797 .boxed()
798 };
799 debug!("incoming connection");
800 let sync = self.sync.clone();
801 let metrics = self.metrics.clone();
802 self.running_sync_accept.spawn(
803 async move { handle_connection(sync, conn, accept_request_cb, Some(&metrics)).await }
804 .instrument(Span::current()),
805 );
806 }
807
808 pub fn accept_sync_request(
809 &mut self,
810 namespace: NamespaceId,
811 peer: PublicKey,
812 ) -> AcceptOutcome {
813 self.state
814 .accept_request(&self.endpoint.node_id(), &namespace, peer)
815 }
816}
817
818#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)]
820pub struct SyncEvent {
821 pub peer: PublicKey,
823 pub origin: Origin,
825 pub finished: SystemTime,
827 pub started: SystemTime,
829 pub result: std::result::Result<SyncDetails, String>,
831}
832
833#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)]
834pub struct SyncDetails {
835 pub entries_received: usize,
837 pub entries_sent: usize,
839}
840
841impl From<&SyncFinished> for SyncDetails {
842 fn from(value: &SyncFinished) -> Self {
843 Self {
844 entries_received: value.outcome.num_recv,
845 entries_sent: value.outcome.num_sent,
846 }
847 }
848}
849
850#[derive(Debug, Default)]
851struct SubscribersMap(HashMap<NamespaceId, Subscribers>);
852
853impl SubscribersMap {
854 fn subscribe(&mut self, namespace: NamespaceId, sender: async_channel::Sender<Event>) {
855 self.0.entry(namespace).or_default().subscribe(sender);
856 }
857
858 async fn send(&mut self, namespace: &NamespaceId, event: Event) -> bool {
859 debug!(namespace=%namespace.fmt_short(), %event, "emit event");
860 let Some(subscribers) = self.0.get_mut(namespace) else {
861 return false;
862 };
863
864 if !subscribers.send(event).await {
865 self.0.remove(namespace);
866 }
867 true
868 }
869
870 fn remove(&mut self, namespace: &NamespaceId) {
871 self.0.remove(namespace);
872 }
873
874 fn clear(&mut self) {
875 self.0.clear();
876 }
877}
878
879#[derive(Debug, Default)]
880struct QueuedHashes {
881 by_hash: HashMap<Hash, HashSet<NamespaceId>>,
882 by_namespace: HashMap<NamespaceId, HashSet<Hash>>,
883}
884
885impl QueuedHashes {
886 fn insert(&mut self, hash: Hash, namespace: NamespaceId) {
887 self.by_hash.entry(hash).or_default().insert(namespace);
888 self.by_namespace.entry(namespace).or_default().insert(hash);
889 }
890
891 fn remove_hash(&mut self, hash: &Hash) -> Vec<NamespaceId> {
895 let namespaces = self.by_hash.remove(hash).unwrap_or_default();
896 let mut removed_namespaces = vec![];
897 for namespace in namespaces {
898 if let Some(hashes) = self.by_namespace.get_mut(&namespace) {
899 hashes.remove(hash);
900 if hashes.is_empty() {
901 self.by_namespace.remove(&namespace);
902 removed_namespaces.push(namespace);
903 }
904 }
905 }
906 removed_namespaces
907 }
908
909 fn contains_hash(&self, hash: &Hash) -> bool {
910 self.by_hash.contains_key(hash)
911 }
912
913 fn contains_namespace(&self, namespace: &NamespaceId) -> bool {
914 self.by_namespace.contains_key(namespace)
915 }
916}
917
918#[derive(Debug, Default)]
919struct Subscribers(Vec<async_channel::Sender<Event>>);
920
921impl Subscribers {
922 fn subscribe(&mut self, sender: async_channel::Sender<Event>) {
923 self.0.push(sender)
924 }
925
926 async fn send(&mut self, event: Event) -> bool {
927 let futs = self.0.iter().map(|sender| sender.send(event.clone()));
928 let res = futures_buffered::join_all(futs).await;
929 for (i, res) in res.into_iter().enumerate().rev() {
931 if res.is_err() {
932 self.0.remove(i);
933 }
934 }
935 !self.0.is_empty()
936 }
937}
938
939fn fmt_accept_peer(res: &Result<SyncFinished, AcceptError>) -> String {
940 match res {
941 Ok(res) => res.peer.fmt_short(),
942 Err(err) => err
943 .peer()
944 .map(|x| x.fmt_short())
945 .unwrap_or_else(|| "unknown".to_string()),
946 }
947}
948
949fn fmt_accept_namespace(res: &Result<SyncFinished, AcceptError>) -> String {
950 match res {
951 Ok(res) => res.namespace.fmt_short(),
952 Err(err) => err
953 .namespace()
954 .map(|x| x.fmt_short())
955 .unwrap_or_else(|| "unknown".to_string()),
956 }
957}
958
959#[cfg(test)]
960mod tests {
961 use super::*;
962
963 #[tokio::test]
964 async fn test_sync_remove() {
965 let pk = PublicKey::from_bytes(&[1; 32]).unwrap();
966 let (a_tx, a_rx) = async_channel::unbounded();
967 let (b_tx, b_rx) = async_channel::unbounded();
968 let mut subscribers = Subscribers::default();
969 subscribers.subscribe(a_tx);
970 subscribers.subscribe(b_tx);
971 drop(a_rx);
972 drop(b_rx);
973 subscribers.send(Event::NeighborUp(pk)).await;
974 }
975}