1#![allow(missing_docs)]
2
3use std::{
4 collections::{HashMap, HashSet},
5 sync::Arc,
6 time::SystemTime,
7};
8
9use anyhow::{Context, Result};
10use futures_lite::FutureExt;
11use iroh::{address_lookup::memory::MemoryLookup, Endpoint, EndpointAddr, EndpointId, PublicKey};
12use iroh_blobs::{
13 api::{
14 blobs::BlobStatus,
15 downloader::{ContentDiscovery, DownloadRequest, Downloader, SplitStrategy},
16 Store,
17 },
18 Hash, HashAndFormat,
19};
20use iroh_gossip::net::Gossip;
21use serde::{Deserialize, Serialize};
22use tokio::{
23 sync::{self, mpsc, oneshot},
24 task::JoinSet,
25};
26use tracing::{debug, error, info, instrument, trace, warn, Instrument, Span};
27
28use super::state::{NamespaceStates, Origin, SyncReason};
30use crate::{
31 actor::{OpenOpts, SyncHandle},
32 engine::gossip::GossipState,
33 metrics::Metrics,
34 net::{
35 connect_and_sync, handle_connection, AbortReason, AcceptError, AcceptOutcome, ConnectError,
36 SyncFinished,
37 },
38 AuthorHeads, ContentStatus, NamespaceId, SignedEntry,
39};
40
41#[derive(Debug, Clone, Serialize, Deserialize, strum::Display)]
45pub enum Op {
46 Put(SignedEntry),
48 ContentReady(Hash),
50 SyncReport(SyncReport),
52}
53
54#[derive(Debug, Clone, Serialize, Deserialize)]
56pub struct SyncReport {
57 namespace: NamespaceId,
58 heads: Vec<u8>,
60}
61
62#[derive(derive_more::Debug, strum::Display)]
64pub enum ToLiveActor {
65 StartSync {
66 namespace: NamespaceId,
67 peers: Vec<EndpointAddr>,
68 #[debug("onsehot::Sender")]
69 reply: sync::oneshot::Sender<anyhow::Result<()>>,
70 },
71 Leave {
72 namespace: NamespaceId,
73 kill_subscribers: bool,
74 #[debug("onsehot::Sender")]
75 reply: sync::oneshot::Sender<anyhow::Result<()>>,
76 },
77 Shutdown {
78 reply: sync::oneshot::Sender<()>,
79 },
80 Subscribe {
81 namespace: NamespaceId,
82 #[debug("sender")]
83 sender: async_channel::Sender<Event>,
84 #[debug("oneshot::Sender")]
85 reply: sync::oneshot::Sender<Result<()>>,
86 },
87 HandleConnection {
88 conn: iroh::endpoint::Connection,
89 },
90 AcceptSyncRequest {
91 namespace: NamespaceId,
92 peer: PublicKey,
93 #[debug("oneshot::Sender")]
94 reply: sync::oneshot::Sender<AcceptOutcome>,
95 },
96
97 IncomingSyncReport {
98 from: PublicKey,
99 report: SyncReport,
100 },
101 NeighborContentReady {
102 namespace: NamespaceId,
103 node: PublicKey,
104 hash: Hash,
105 },
106 NeighborUp {
107 namespace: NamespaceId,
108 peer: PublicKey,
109 },
110 NeighborDown {
111 namespace: NamespaceId,
112 peer: PublicKey,
113 },
114}
115
116#[derive(Serialize, Deserialize, Debug, Clone, Eq, PartialEq, strum::Display)]
118pub enum Event {
119 ContentReady {
121 hash: Hash,
123 },
124 NeighborUp(PublicKey),
126 NeighborDown(PublicKey),
128 SyncFinished(SyncEvent),
130 PendingContentReady,
138}
139
140type SyncConnectRes = (
141 NamespaceId,
142 PublicKey,
143 SyncReason,
144 Result<SyncFinished, ConnectError>,
145);
146type SyncAcceptRes = Result<SyncFinished, AcceptError>;
147type DownloadRes = (NamespaceId, Hash, Result<(), anyhow::Error>);
148
149pub struct LiveActor {
151 inbox: mpsc::Receiver<ToLiveActor>,
153 sync: SyncHandle,
154 endpoint: Endpoint,
155 bao_store: Store,
156 downloader: Downloader,
157 memory_lookup: MemoryLookup,
158 replica_events_tx: async_channel::Sender<crate::Event>,
159 replica_events_rx: async_channel::Receiver<crate::Event>,
160
161 sync_actor_tx: mpsc::Sender<ToLiveActor>,
165 gossip: GossipState,
166
167 running_sync_connect: JoinSet<SyncConnectRes>,
169 running_sync_accept: JoinSet<SyncAcceptRes>,
171 download_tasks: JoinSet<DownloadRes>,
173 missing_hashes: HashSet<Hash>,
175 queued_hashes: QueuedHashes,
177 hash_providers: ProviderNodes,
179
180 subscribers: SubscribersMap,
182
183 state: NamespaceStates,
185 metrics: Arc<Metrics>,
186}
187impl LiveActor {
188 #[allow(clippy::too_many_arguments)]
190 pub fn new(
191 sync: SyncHandle,
192 endpoint: Endpoint,
193 gossip: Gossip,
194 bao_store: Store,
195 downloader: Downloader,
196 inbox: mpsc::Receiver<ToLiveActor>,
197 sync_actor_tx: mpsc::Sender<ToLiveActor>,
198 metrics: Arc<Metrics>,
199 ) -> Self {
200 let (replica_events_tx, replica_events_rx) = async_channel::bounded(1024);
201 let gossip_state = GossipState::new(gossip, sync.clone(), sync_actor_tx.clone());
202 let memory_lookup = MemoryLookup::new();
203 endpoint.address_lookup().add(memory_lookup.clone());
204 Self {
205 inbox,
206 sync,
207 replica_events_rx,
208 replica_events_tx,
209 endpoint,
210 memory_lookup,
211 gossip: gossip_state,
212 bao_store,
213 downloader,
214 sync_actor_tx,
215 running_sync_connect: Default::default(),
216 running_sync_accept: Default::default(),
217 subscribers: Default::default(),
218 download_tasks: Default::default(),
219 state: Default::default(),
220 missing_hashes: Default::default(),
221 queued_hashes: Default::default(),
222 hash_providers: Default::default(),
223 metrics,
224 }
225 }
226
227 pub async fn run(mut self) -> Result<()> {
229 let shutdown_reply = self.run_inner().await;
230 if let Err(err) = self.shutdown().await {
231 error!(?err, "Error during shutdown");
232 }
233 drop(self);
234 match shutdown_reply {
235 Ok(reply) => {
236 reply.send(()).ok();
237 Ok(())
238 }
239 Err(err) => Err(err),
240 }
241 }
242
243 async fn run_inner(&mut self) -> Result<oneshot::Sender<()>> {
244 let mut i = 0;
245 loop {
246 i += 1;
247 trace!(?i, "tick wait");
248 self.metrics.doc_live_tick_main.inc();
249 tokio::select! {
250 biased;
251 msg = self.inbox.recv() => {
252 let msg = msg.context("to_actor closed")?;
253 trace!(?i, %msg, "tick: to_actor");
254 self.metrics.doc_live_tick_actor.inc();
255 match msg {
256 ToLiveActor::Shutdown { reply } => {
257 break Ok(reply);
258 }
259 msg => {
260 self.on_actor_message(msg).await.context("on_actor_message")?;
261 }
262 }
263 }
264 event = self.replica_events_rx.recv() => {
265 trace!(?i, "tick: replica_event");
266 self.metrics.doc_live_tick_replica_event.inc();
267 let event = event.context("replica_events closed")?;
268 if let Err(err) = self.on_replica_event(event).await {
269 error!(?err, "Failed to process replica event");
270 }
271 }
272 Some(res) = self.running_sync_connect.join_next(), if !self.running_sync_connect.is_empty() => {
273 trace!(?i, "tick: running_sync_connect");
274 self.metrics.doc_live_tick_running_sync_connect.inc();
275 let (namespace, peer, reason, res) = res.context("running_sync_connect closed")?;
276 self.on_sync_via_connect_finished(namespace, peer, reason, res).await;
277
278 }
279 Some(res) = self.running_sync_accept.join_next(), if !self.running_sync_accept.is_empty() => {
280 trace!(?i, "tick: running_sync_accept");
281 self.metrics.doc_live_tick_running_sync_accept.inc();
282 let res = res.context("running_sync_accept closed")?;
283 self.on_sync_via_accept_finished(res).await;
284 }
285 Some(res) = self.download_tasks.join_next(), if !self.download_tasks.is_empty() => {
286 trace!(?i, "tick: pending_downloads");
287 self.metrics.doc_live_tick_pending_downloads.inc();
288 let (namespace, hash, res) = res.context("pending_downloads closed")?;
289 self.on_download_ready(namespace, hash, res).await;
290 }
291 res = self.gossip.progress(), if !self.gossip.is_empty() => {
292 if let Err(error) = res {
293 warn!(?error, "gossip state failed");
294 }
295 }
296 }
297 }
298 }
299
300 async fn on_actor_message(&mut self, msg: ToLiveActor) -> anyhow::Result<bool> {
301 match msg {
302 ToLiveActor::Shutdown { .. } => {
303 unreachable!("handled in run");
304 }
305 ToLiveActor::IncomingSyncReport { from, report } => {
306 self.on_sync_report(from, report).await
307 }
308 ToLiveActor::NeighborUp { namespace, peer } => {
309 debug!(peer = %peer.fmt_short(), namespace = %namespace.fmt_short(), "neighbor up");
310 self.sync_with_peer(namespace, peer, SyncReason::NewNeighbor);
311 self.subscribers
312 .send(&namespace, Event::NeighborUp(peer))
313 .await;
314 }
315 ToLiveActor::NeighborDown { namespace, peer } => {
316 debug!(peer = %peer.fmt_short(), namespace = %namespace.fmt_short(), "neighbor down");
317 self.subscribers
318 .send(&namespace, Event::NeighborDown(peer))
319 .await;
320 }
321 ToLiveActor::StartSync {
322 namespace,
323 peers,
324 reply,
325 } => {
326 let res = self.start_sync(namespace, peers).await;
327 reply.send(res).ok();
328 }
329 ToLiveActor::Leave {
330 namespace,
331 kill_subscribers,
332 reply,
333 } => {
334 let res = self.leave(namespace, kill_subscribers).await;
335 reply.send(res).ok();
336 }
337 ToLiveActor::Subscribe {
338 namespace,
339 sender,
340 reply,
341 } => {
342 self.subscribers.subscribe(namespace, sender);
343 reply.send(Ok(())).ok();
344 }
345 ToLiveActor::HandleConnection { conn } => {
346 self.handle_connection(conn).await;
347 }
348 ToLiveActor::AcceptSyncRequest {
349 namespace,
350 peer,
351 reply,
352 } => {
353 let outcome = self.accept_sync_request(namespace, peer);
354 reply.send(outcome).ok();
355 }
356 ToLiveActor::NeighborContentReady {
357 namespace,
358 node,
359 hash,
360 } => {
361 self.on_neighbor_content_ready(namespace, node, hash).await;
362 }
363 };
364 Ok(true)
365 }
366
367 #[instrument("connect", skip_all, fields(peer = %peer.fmt_short(), namespace = %namespace.fmt_short()))]
368 fn sync_with_peer(&mut self, namespace: NamespaceId, peer: PublicKey, reason: SyncReason) {
369 if !self.state.start_connect(&namespace, peer, reason) {
370 return;
371 }
372 let endpoint = self.endpoint.clone();
373 let sync = self.sync.clone();
374 let metrics = self.metrics.clone();
375 let fut = async move {
376 let res = connect_and_sync(
377 &endpoint,
378 &sync,
379 namespace,
380 EndpointAddr::new(peer),
381 Some(&metrics),
382 )
383 .await;
384 (namespace, peer, reason, res)
385 }
386 .instrument(Span::current());
387 self.running_sync_connect.spawn(fut);
388 }
389
390 async fn shutdown(&mut self) -> anyhow::Result<()> {
391 self.subscribers.clear();
393 let (gossip_shutdown_res, _store) = tokio::join!(
394 self.gossip.shutdown(),
396 self.sync.shutdown()
398 );
399 gossip_shutdown_res?;
400 Ok(())
403 }
404
405 async fn start_sync(
406 &mut self,
407 namespace: NamespaceId,
408 mut peers: Vec<EndpointAddr>,
409 ) -> Result<()> {
410 debug!(?namespace, peers = peers.len(), "start sync");
411 if !self.state.is_syncing(&namespace) {
413 let opts = OpenOpts::default()
414 .sync()
415 .subscribe(self.replica_events_tx.clone());
416 self.sync.open(namespace, opts).await?;
417 self.state.insert(namespace);
418 }
419 match self.sync.get_sync_peers(namespace).await {
421 Ok(None) => {
422 }
424 Ok(Some(known_useful_peers)) => {
425 let as_node_addr = known_useful_peers.into_iter().filter_map(|peer_id_bytes| {
426 match PublicKey::from_bytes(&peer_id_bytes) {
429 Ok(public_key) => Some(EndpointAddr::new(public_key)),
430 Err(_signing_error) => {
431 warn!("potential db corruption: peers per doc can't be decoded");
432 None
433 }
434 }
435 });
436 peers.extend(as_node_addr);
437 }
438 Err(e) => {
439 warn!(%e, "db error reading peers per document")
441 }
442 }
443 self.join_peers(namespace, peers).await?;
444 Ok(())
445 }
446
447 async fn leave(
448 &mut self,
449 namespace: NamespaceId,
450 kill_subscribers: bool,
451 ) -> anyhow::Result<()> {
452 if self.state.remove(&namespace) {
454 self.sync.set_sync(namespace, false).await?;
455 self.sync
456 .unsubscribe(namespace, self.replica_events_tx.clone())
457 .await?;
458 self.sync.close(namespace).await?;
459 self.gossip.quit(&namespace);
460 }
461 if kill_subscribers {
462 self.subscribers.remove(&namespace);
463 }
464 Ok(())
465 }
466
467 async fn join_peers(&mut self, namespace: NamespaceId, peers: Vec<EndpointAddr>) -> Result<()> {
468 let mut peer_ids = Vec::new();
469
470 for peer in peers.into_iter() {
472 let peer_id = peer.id;
473 if !peer.is_empty() {
476 self.memory_lookup.add_endpoint_info(peer);
477 }
478 peer_ids.push(peer_id);
479 }
480
481 self.gossip.join(namespace, peer_ids.clone()).await?;
483
484 if !peer_ids.is_empty() {
485 for peer in peer_ids {
487 self.sync_with_peer(namespace, peer, SyncReason::DirectJoin);
488 }
489 }
490 Ok(())
491 }
492
493 #[instrument("connect", skip_all, fields(peer = %peer.fmt_short(), namespace = %namespace.fmt_short()))]
494 async fn on_sync_via_connect_finished(
495 &mut self,
496 namespace: NamespaceId,
497 peer: PublicKey,
498 reason: SyncReason,
499 result: Result<SyncFinished, ConnectError>,
500 ) {
501 match result {
502 Err(ConnectError::RemoteAbort(AbortReason::AlreadySyncing)) => {
503 debug!(?reason, "remote abort, already syncing");
504 }
505 res => {
506 self.on_sync_finished(
507 namespace,
508 peer,
509 Origin::Connect(reason),
510 res.map_err(Into::into),
511 )
512 .await
513 }
514 }
515 }
516
517 #[instrument("accept", skip_all, fields(peer = %fmt_accept_peer(&res), namespace = %fmt_accept_namespace(&res)))]
518 async fn on_sync_via_accept_finished(&mut self, res: Result<SyncFinished, AcceptError>) {
519 match res {
520 Ok(state) => {
521 self.on_sync_finished(state.namespace, state.peer, Origin::Accept, Ok(state))
522 .await
523 }
524 Err(AcceptError::Abort { reason, .. }) if reason == AbortReason::AlreadySyncing => {
525 debug!(?reason, "aborted by us");
527 }
528 Err(err) => {
529 if let (Some(peer), Some(namespace)) = (err.peer(), err.namespace()) {
530 self.on_sync_finished(
531 namespace,
532 peer,
533 Origin::Accept,
534 Err(anyhow::Error::from(err)),
535 )
536 .await;
537 } else {
538 debug!(?err, "failed before reading the first message");
539 }
540 }
541 }
542 }
543
544 async fn on_sync_finished(
545 &mut self,
546 namespace: NamespaceId,
547 peer: PublicKey,
548 origin: Origin,
549 result: Result<SyncFinished>,
550 ) {
551 match &result {
552 Err(ref err) => {
553 warn!(?origin, ?err, "sync failed");
554 }
555 Ok(ref details) => {
556 info!(
557 sent = %details.outcome.num_sent,
558 recv = %details.outcome.num_recv,
559 t_connect = ?details.timings.connect,
560 t_process = ?details.timings.process,
561 "sync finished",
562 );
563
564 if let Err(e) = self
566 .sync
567 .register_useful_peer(namespace, *peer.as_bytes())
568 .await
569 {
570 debug!(%e, "failed to register peer for document")
571 }
572
573 if details.outcome.num_recv > 0 {
575 info!("broadcast sync report to neighbors");
576 match details
577 .outcome
578 .heads_received
579 .encode(Some(self.gossip.max_message_size()))
580 {
581 Err(err) => warn!(?err, "Failed to encode author heads for sync report"),
582 Ok(heads) => {
583 let report = SyncReport { namespace, heads };
584 self.broadcast_neighbors(namespace, &Op::SyncReport(report))
585 .await;
586 }
587 }
588 }
589 }
590 };
591
592 let result_for_event = match &result {
593 Ok(details) => Ok(details.into()),
594 Err(err) => Err(err.to_string()),
595 };
596
597 let Some((started, resync)) = self.state.finish(&namespace, peer, &origin, result) else {
598 return;
599 };
600
601 let ev = SyncEvent {
602 peer,
603 origin,
604 result: result_for_event,
605 finished: SystemTime::now(),
606 started,
607 };
608 self.subscribers
609 .send(&namespace, Event::SyncFinished(ev))
610 .await;
611
612 if self.queued_hashes.contains_namespace(&namespace) {
618 self.state.set_may_emit_ready(&namespace, true);
619 } else {
620 self.subscribers
621 .send(&namespace, Event::PendingContentReady)
622 .await;
623 self.state.set_may_emit_ready(&namespace, false);
624 }
625
626 if resync {
627 self.sync_with_peer(namespace, peer, SyncReason::Resync);
628 }
629 }
630
631 async fn broadcast_neighbors(&mut self, namespace: NamespaceId, op: &Op) {
632 if !self.state.is_syncing(&namespace) {
633 return;
634 }
635
636 let msg = match postcard::to_stdvec(op) {
637 Ok(msg) => msg,
638 Err(err) => {
639 error!(?err, ?op, "Failed to serialize message:");
640 return;
641 }
642 };
643 self.gossip
645 .broadcast_neighbors(&namespace, msg.into())
646 .await;
647 }
648
649 async fn on_download_ready(
650 &mut self,
651 namespace: NamespaceId,
652 hash: Hash,
653 res: Result<(), anyhow::Error>,
654 ) {
655 let completed_namespaces = self.queued_hashes.remove_hash(&hash);
656 debug!(namespace=%namespace.fmt_short(), success=res.is_ok(), completed_namespaces=completed_namespaces.len(), "download ready");
657 if res.is_ok() {
658 self.subscribers
659 .send(&namespace, Event::ContentReady { hash })
660 .await;
661 self.broadcast_neighbors(namespace, &Op::ContentReady(hash))
663 .await;
664 } else {
665 self.missing_hashes.insert(hash);
666 }
667 for namespace in completed_namespaces.iter() {
668 if let Some(true) = self.state.may_emit_ready(namespace) {
669 self.subscribers
670 .send(namespace, Event::PendingContentReady)
671 .await;
672 }
673 }
674 }
675
676 async fn on_neighbor_content_ready(
677 &mut self,
678 namespace: NamespaceId,
679 node: EndpointId,
680 hash: Hash,
681 ) {
682 self.start_download(namespace, hash, node, true).await;
683 }
684
685 #[instrument("on_sync_report", skip_all, fields(peer = %from.fmt_short(), namespace = %report.namespace.fmt_short()))]
686 async fn on_sync_report(&mut self, from: PublicKey, report: SyncReport) {
687 let namespace = report.namespace;
688 if !self.state.is_syncing(&namespace) {
689 return;
690 }
691 let heads = match AuthorHeads::decode(&report.heads) {
692 Ok(heads) => heads,
693 Err(err) => {
694 warn!(?err, "failed to decode AuthorHeads");
695 return;
696 }
697 };
698 match self.sync.has_news_for_us(report.namespace, heads).await {
699 Ok(Some(updated_authors)) => {
700 info!(%updated_authors, "news reported: sync now");
701 self.sync_with_peer(report.namespace, from, SyncReason::SyncReport);
702 }
703 Ok(None) => {
704 debug!("no news reported: nothing to do");
705 }
706 Err(err) => {
707 warn!("sync actor error: {err:?}");
708 }
709 }
710 }
711
712 async fn on_replica_event(&mut self, event: crate::Event) -> Result<()> {
713 match event {
714 crate::Event::LocalInsert { namespace, entry } => {
715 debug!(namespace=%namespace.fmt_short(), "replica event: LocalInsert");
716 if self.state.is_syncing(&namespace) {
718 let op = Op::Put(entry.clone());
719 let message = postcard::to_stdvec(&op)?.into();
720 self.gossip.broadcast(&namespace, message).await;
721 }
722 }
723 crate::Event::RemoteInsert {
724 namespace,
725 entry,
726 from,
727 should_download,
728 remote_content_status,
729 } => {
730 debug!(namespace=%namespace.fmt_short(), "replica event: RemoteInsert");
731 if should_download {
734 let hash = entry.content_hash();
735 if matches!(remote_content_status, ContentStatus::Complete) {
736 let node_id = PublicKey::from_bytes(&from)?;
737 self.start_download(namespace, hash, node_id, false).await;
738 } else {
739 self.missing_hashes.insert(hash);
740 }
741 }
742 }
743 }
744
745 Ok(())
746 }
747
748 async fn start_download(
749 &mut self,
750 namespace: NamespaceId,
751 hash: Hash,
752 node: PublicKey,
753 only_if_missing: bool,
754 ) {
755 let entry_status = self.bao_store.blobs().status(hash).await;
756 if matches!(entry_status, Ok(BlobStatus::Complete { .. })) {
757 self.missing_hashes.remove(&hash);
758 return;
759 }
760 self.hash_providers
761 .0
762 .lock()
763 .expect("poisoned")
764 .entry(hash)
765 .or_default()
766 .insert(node);
767 if self.queued_hashes.contains_hash(&hash) {
768 self.queued_hashes.insert(hash, namespace);
769 } else if !only_if_missing || self.missing_hashes.contains(&hash) {
770 let req = DownloadRequest::new(
771 HashAndFormat::raw(hash),
772 self.hash_providers.clone(),
773 SplitStrategy::None,
774 );
775 let handle = self.downloader.download_with_opts(req);
776
777 self.queued_hashes.insert(hash, namespace);
778 self.missing_hashes.remove(&hash);
779 self.download_tasks.spawn(async move {
780 (
781 namespace,
782 hash,
783 handle.await.map_err(|e| anyhow::anyhow!(e)),
784 )
785 });
786 }
787 }
788
789 #[instrument("accept", skip_all)]
790 pub async fn handle_connection(&mut self, conn: iroh::endpoint::Connection) {
791 let to_actor_tx = self.sync_actor_tx.clone();
792 let accept_request_cb = move |namespace, peer| {
793 let to_actor_tx = to_actor_tx.clone();
794 async move {
795 let (reply_tx, reply_rx) = oneshot::channel();
796 to_actor_tx
797 .send(ToLiveActor::AcceptSyncRequest {
798 namespace,
799 peer,
800 reply: reply_tx,
801 })
802 .await
803 .ok();
804 match reply_rx.await {
805 Ok(outcome) => outcome,
806 Err(err) => {
807 warn!(
808 "accept request callback failed to retrieve reply from actor: {err:?}"
809 );
810 AcceptOutcome::Reject(AbortReason::InternalServerError)
811 }
812 }
813 }
814 .boxed()
815 };
816 debug!("incoming connection");
817 let sync = self.sync.clone();
818 let metrics = self.metrics.clone();
819 self.running_sync_accept.spawn(
820 async move { handle_connection(sync, conn, accept_request_cb, Some(&metrics)).await }
821 .instrument(Span::current()),
822 );
823 }
824
825 pub fn accept_sync_request(
826 &mut self,
827 namespace: NamespaceId,
828 peer: PublicKey,
829 ) -> AcceptOutcome {
830 self.state
831 .accept_request(&self.endpoint.id(), &namespace, peer)
832 }
833}
834
835#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)]
837pub struct SyncEvent {
838 pub peer: PublicKey,
840 pub origin: Origin,
842 pub finished: SystemTime,
844 pub started: SystemTime,
846 pub result: std::result::Result<SyncDetails, String>,
848}
849
850#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)]
851pub struct SyncDetails {
852 pub entries_received: usize,
854 pub entries_sent: usize,
856}
857
858impl From<&SyncFinished> for SyncDetails {
859 fn from(value: &SyncFinished) -> Self {
860 Self {
861 entries_received: value.outcome.num_recv,
862 entries_sent: value.outcome.num_sent,
863 }
864 }
865}
866
867#[derive(Debug, Default)]
868struct SubscribersMap(HashMap<NamespaceId, Subscribers>);
869
870impl SubscribersMap {
871 fn subscribe(&mut self, namespace: NamespaceId, sender: async_channel::Sender<Event>) {
872 self.0.entry(namespace).or_default().subscribe(sender);
873 }
874
875 async fn send(&mut self, namespace: &NamespaceId, event: Event) -> bool {
876 debug!(namespace=%namespace.fmt_short(), %event, "emit event");
877 let Some(subscribers) = self.0.get_mut(namespace) else {
878 return false;
879 };
880
881 if !subscribers.send(event).await {
882 self.0.remove(namespace);
883 }
884 true
885 }
886
887 fn remove(&mut self, namespace: &NamespaceId) {
888 self.0.remove(namespace);
889 }
890
891 fn clear(&mut self) {
892 self.0.clear();
893 }
894}
895
896#[derive(Debug, Default)]
897struct QueuedHashes {
898 by_hash: HashMap<Hash, HashSet<NamespaceId>>,
899 by_namespace: HashMap<NamespaceId, HashSet<Hash>>,
900}
901
902#[derive(Debug, Clone, Default)]
903struct ProviderNodes(Arc<std::sync::Mutex<HashMap<Hash, HashSet<EndpointId>>>>);
904
905impl ContentDiscovery for ProviderNodes {
906 fn find_providers(&self, hash: HashAndFormat) -> n0_future::stream::Boxed<EndpointId> {
907 let nodes = self
908 .0
909 .lock()
910 .expect("poisoned")
911 .get(&hash.hash)
912 .into_iter()
913 .flatten()
914 .cloned()
915 .collect::<Vec<_>>();
916 Box::pin(n0_future::stream::iter(nodes))
917 }
918}
919
920impl QueuedHashes {
921 fn insert(&mut self, hash: Hash, namespace: NamespaceId) {
922 self.by_hash.entry(hash).or_default().insert(namespace);
923 self.by_namespace.entry(namespace).or_default().insert(hash);
924 }
925
926 fn remove_hash(&mut self, hash: &Hash) -> Vec<NamespaceId> {
930 let namespaces = self.by_hash.remove(hash).unwrap_or_default();
931 let mut removed_namespaces = vec![];
932 for namespace in namespaces {
933 if let Some(hashes) = self.by_namespace.get_mut(&namespace) {
934 hashes.remove(hash);
935 if hashes.is_empty() {
936 self.by_namespace.remove(&namespace);
937 removed_namespaces.push(namespace);
938 }
939 }
940 }
941 removed_namespaces
942 }
943
944 fn contains_hash(&self, hash: &Hash) -> bool {
945 self.by_hash.contains_key(hash)
946 }
947
948 fn contains_namespace(&self, namespace: &NamespaceId) -> bool {
949 self.by_namespace.contains_key(namespace)
950 }
951}
952
953#[derive(Debug, Default)]
954struct Subscribers(Vec<async_channel::Sender<Event>>);
955
956impl Subscribers {
957 fn subscribe(&mut self, sender: async_channel::Sender<Event>) {
958 self.0.push(sender)
959 }
960
961 async fn send(&mut self, event: Event) -> bool {
962 let futs = self.0.iter().map(|sender| sender.send(event.clone()));
963 let res = futures_buffered::join_all(futs).await;
964 for (i, res) in res.into_iter().enumerate().rev() {
966 if res.is_err() {
967 self.0.remove(i);
968 }
969 }
970 !self.0.is_empty()
971 }
972}
973
974fn fmt_accept_peer(res: &Result<SyncFinished, AcceptError>) -> String {
975 match res {
976 Ok(res) => res.peer.fmt_short().to_string(),
977 Err(err) => err
978 .peer()
979 .map(|x| x.fmt_short().to_string())
980 .unwrap_or_else(|| "unknown".to_string()),
981 }
982}
983
984fn fmt_accept_namespace(res: &Result<SyncFinished, AcceptError>) -> String {
985 match res {
986 Ok(res) => res.namespace.fmt_short(),
987 Err(err) => err
988 .namespace()
989 .map(|x| x.fmt_short())
990 .unwrap_or_else(|| "unknown".to_string()),
991 }
992}
993
994#[cfg(test)]
995mod tests {
996 use super::*;
997
998 #[tokio::test]
999 async fn test_sync_remove() {
1000 let pk = PublicKey::from_bytes(&[1; 32]).unwrap();
1001 let (a_tx, a_rx) = async_channel::unbounded();
1002 let (b_tx, b_rx) = async_channel::unbounded();
1003 let mut subscribers = Subscribers::default();
1004 subscribers.subscribe(a_tx);
1005 subscribers.subscribe(b_tx);
1006 drop(a_rx);
1007 drop(b_rx);
1008 subscribers.send(Event::NeighborUp(pk)).await;
1009 }
1010}