1#![allow(missing_docs)]
2
3use std::{
4 collections::{HashMap, HashSet},
5 sync::Arc,
6};
7
8use anyhow::{Context, Result};
9use futures_lite::FutureExt;
10use iroh::{address_lookup::memory::MemoryLookup, Endpoint, EndpointAddr, EndpointId, PublicKey};
11use iroh_blobs::{
12 api::{
13 blobs::BlobStatus,
14 downloader::{ContentDiscovery, DownloadRequest, Downloader, SplitStrategy},
15 Store,
16 },
17 Hash, HashAndFormat,
18};
19use iroh_gossip::net::Gossip;
20use n0_future::{task::JoinSet, time::SystemTime};
21use serde::{Deserialize, Serialize};
22use tokio::sync::{self, mpsc, oneshot};
23use tracing::{debug, error, info, instrument, trace, warn, Instrument, Span};
24
25use super::state::{NamespaceStates, Origin, SyncReason};
27use crate::{
28 actor::{OpenOpts, SyncHandle},
29 engine::gossip::GossipState,
30 metrics::Metrics,
31 net::{
32 connect_and_sync, handle_connection, AbortReason, AcceptError, AcceptOutcome, ConnectError,
33 SyncFinished,
34 },
35 AuthorHeads, ContentStatus, NamespaceId, SignedEntry,
36};
37
38#[derive(Debug, Clone, Serialize, Deserialize, strum::Display)]
42pub enum Op {
43 Put(SignedEntry),
45 ContentReady(Hash),
47 SyncReport(SyncReport),
49}
50
51#[derive(Debug, Clone, Serialize, Deserialize)]
53pub struct SyncReport {
54 namespace: NamespaceId,
55 heads: Vec<u8>,
57}
58
59#[derive(derive_more::Debug, strum::Display)]
61pub enum ToLiveActor {
62 StartSync {
63 namespace: NamespaceId,
64 peers: Vec<EndpointAddr>,
65 #[debug("onsehot::Sender")]
66 reply: sync::oneshot::Sender<anyhow::Result<()>>,
67 },
68 Leave {
69 namespace: NamespaceId,
70 kill_subscribers: bool,
71 #[debug("onsehot::Sender")]
72 reply: sync::oneshot::Sender<anyhow::Result<()>>,
73 },
74 Shutdown {
75 reply: sync::oneshot::Sender<()>,
76 },
77 Subscribe {
78 namespace: NamespaceId,
79 #[debug("sender")]
80 sender: async_channel::Sender<Event>,
81 #[debug("oneshot::Sender")]
82 reply: sync::oneshot::Sender<Result<()>>,
83 },
84 HandleConnection {
85 conn: iroh::endpoint::Connection,
86 },
87 AcceptSyncRequest {
88 namespace: NamespaceId,
89 peer: PublicKey,
90 #[debug("oneshot::Sender")]
91 reply: sync::oneshot::Sender<AcceptOutcome>,
92 },
93
94 IncomingSyncReport {
95 from: PublicKey,
96 report: SyncReport,
97 },
98 NeighborContentReady {
99 namespace: NamespaceId,
100 node: PublicKey,
101 hash: Hash,
102 },
103 NeighborUp {
104 namespace: NamespaceId,
105 peer: PublicKey,
106 },
107 NeighborDown {
108 namespace: NamespaceId,
109 peer: PublicKey,
110 },
111}
112
113#[derive(Serialize, Deserialize, Debug, Clone, Eq, PartialEq, strum::Display)]
115pub enum Event {
116 ContentReady {
118 hash: Hash,
120 },
121 NeighborUp(PublicKey),
123 NeighborDown(PublicKey),
125 SyncFinished(SyncEvent),
127 PendingContentReady,
135}
136
137type SyncConnectRes = (
138 NamespaceId,
139 PublicKey,
140 SyncReason,
141 Result<SyncFinished, ConnectError>,
142);
143type SyncAcceptRes = Result<SyncFinished, AcceptError>;
144type DownloadRes = (NamespaceId, Hash, Result<(), anyhow::Error>);
145
146pub struct LiveActor {
148 inbox: mpsc::Receiver<ToLiveActor>,
150 sync: SyncHandle,
151 endpoint: Endpoint,
152 bao_store: Store,
153 downloader: Downloader,
154 memory_lookup: MemoryLookup,
155 replica_events_tx: async_channel::Sender<crate::Event>,
156 replica_events_rx: async_channel::Receiver<crate::Event>,
157
158 sync_actor_tx: mpsc::Sender<ToLiveActor>,
162 gossip: GossipState,
163
164 running_sync_connect: JoinSet<SyncConnectRes>,
166 running_sync_accept: JoinSet<SyncAcceptRes>,
168 download_tasks: JoinSet<DownloadRes>,
170 missing_hashes: HashSet<Hash>,
172 queued_hashes: QueuedHashes,
174 hash_providers: ProviderNodes,
176
177 subscribers: SubscribersMap,
179
180 state: NamespaceStates,
182 metrics: Arc<Metrics>,
183}
184impl LiveActor {
185 #[allow(clippy::too_many_arguments)]
187 pub fn new(
188 sync: SyncHandle,
189 endpoint: Endpoint,
190 gossip: Gossip,
191 bao_store: Store,
192 downloader: Downloader,
193 inbox: mpsc::Receiver<ToLiveActor>,
194 sync_actor_tx: mpsc::Sender<ToLiveActor>,
195 metrics: Arc<Metrics>,
196 ) -> Result<Self> {
197 let (replica_events_tx, replica_events_rx) = async_channel::bounded(1024);
198 let gossip_state = GossipState::new(gossip, sync.clone(), sync_actor_tx.clone());
199 let memory_lookup = MemoryLookup::new();
200 endpoint.address_lookup()?.add(memory_lookup.clone());
201 Ok(Self {
202 inbox,
203 sync,
204 replica_events_rx,
205 replica_events_tx,
206 endpoint,
207 memory_lookup,
208 gossip: gossip_state,
209 bao_store,
210 downloader,
211 sync_actor_tx,
212 running_sync_connect: Default::default(),
213 running_sync_accept: Default::default(),
214 subscribers: Default::default(),
215 download_tasks: Default::default(),
216 state: Default::default(),
217 missing_hashes: Default::default(),
218 queued_hashes: Default::default(),
219 hash_providers: Default::default(),
220 metrics,
221 })
222 }
223
224 pub async fn run(mut self) -> Result<()> {
226 let shutdown_reply = self.run_inner().await;
227 if let Err(err) = self.shutdown().await {
228 error!(?err, "Error during shutdown");
229 }
230 drop(self);
231 match shutdown_reply {
232 Ok(reply) => {
233 reply.send(()).ok();
234 Ok(())
235 }
236 Err(err) => Err(err),
237 }
238 }
239
240 async fn run_inner(&mut self) -> Result<oneshot::Sender<()>> {
241 let mut i = 0;
242 loop {
243 i += 1;
244 trace!(?i, "tick wait");
245 self.metrics.doc_live_tick_main.inc();
246 tokio::select! {
247 biased;
248 msg = self.inbox.recv() => {
249 let msg = msg.context("to_actor closed")?;
250 trace!(?i, %msg, "tick: to_actor");
251 self.metrics.doc_live_tick_actor.inc();
252 match msg {
253 ToLiveActor::Shutdown { reply } => {
254 break Ok(reply);
255 }
256 msg => {
257 self.on_actor_message(msg).await.context("on_actor_message")?;
258 }
259 }
260 }
261 event = self.replica_events_rx.recv() => {
262 trace!(?i, "tick: replica_event");
263 self.metrics.doc_live_tick_replica_event.inc();
264 let event = event.context("replica_events closed")?;
265 if let Err(err) = self.on_replica_event(event).await {
266 error!(?err, "Failed to process replica event");
267 }
268 }
269 Some(res) = self.running_sync_connect.join_next(), if !self.running_sync_connect.is_empty() => {
270 trace!(?i, "tick: running_sync_connect");
271 self.metrics.doc_live_tick_running_sync_connect.inc();
272 let (namespace, peer, reason, res) = res.context("running_sync_connect closed")?;
273 self.on_sync_via_connect_finished(namespace, peer, reason, res).await;
274
275 }
276 Some(res) = self.running_sync_accept.join_next(), if !self.running_sync_accept.is_empty() => {
277 trace!(?i, "tick: running_sync_accept");
278 self.metrics.doc_live_tick_running_sync_accept.inc();
279 let res = res.context("running_sync_accept closed")?;
280 self.on_sync_via_accept_finished(res).await;
281 }
282 Some(res) = self.download_tasks.join_next(), if !self.download_tasks.is_empty() => {
283 trace!(?i, "tick: pending_downloads");
284 self.metrics.doc_live_tick_pending_downloads.inc();
285 let (namespace, hash, res) = res.context("pending_downloads closed")?;
286 self.on_download_ready(namespace, hash, res).await;
287 }
288 res = self.gossip.progress(), if !self.gossip.is_empty() => {
289 if let Err(error) = res {
290 warn!(?error, "gossip state failed");
291 }
292 }
293 }
294 }
295 }
296
297 async fn on_actor_message(&mut self, msg: ToLiveActor) -> anyhow::Result<bool> {
298 match msg {
299 ToLiveActor::Shutdown { .. } => {
300 unreachable!("handled in run");
301 }
302 ToLiveActor::IncomingSyncReport { from, report } => {
303 self.on_sync_report(from, report).await
304 }
305 ToLiveActor::NeighborUp { namespace, peer } => {
306 debug!(peer = %peer.fmt_short(), namespace = %namespace.fmt_short(), "neighbor up");
307 self.sync_with_peer(namespace, peer, SyncReason::NewNeighbor);
308 self.subscribers
309 .send(&namespace, Event::NeighborUp(peer))
310 .await;
311 }
312 ToLiveActor::NeighborDown { namespace, peer } => {
313 debug!(peer = %peer.fmt_short(), namespace = %namespace.fmt_short(), "neighbor down");
314 self.subscribers
315 .send(&namespace, Event::NeighborDown(peer))
316 .await;
317 }
318 ToLiveActor::StartSync {
319 namespace,
320 peers,
321 reply,
322 } => {
323 let res = self.start_sync(namespace, peers).await;
324 reply.send(res).ok();
325 }
326 ToLiveActor::Leave {
327 namespace,
328 kill_subscribers,
329 reply,
330 } => {
331 let res = self.leave(namespace, kill_subscribers).await;
332 reply.send(res).ok();
333 }
334 ToLiveActor::Subscribe {
335 namespace,
336 sender,
337 reply,
338 } => {
339 self.subscribers.subscribe(namespace, sender);
340 reply.send(Ok(())).ok();
341 }
342 ToLiveActor::HandleConnection { conn } => {
343 self.handle_connection(conn).await;
344 }
345 ToLiveActor::AcceptSyncRequest {
346 namespace,
347 peer,
348 reply,
349 } => {
350 let outcome = self.accept_sync_request(namespace, peer);
351 reply.send(outcome).ok();
352 }
353 ToLiveActor::NeighborContentReady {
354 namespace,
355 node,
356 hash,
357 } => {
358 self.on_neighbor_content_ready(namespace, node, hash).await;
359 }
360 };
361 Ok(true)
362 }
363
364 #[instrument("connect", skip_all, fields(peer = %peer.fmt_short(), namespace = %namespace.fmt_short()))]
365 fn sync_with_peer(&mut self, namespace: NamespaceId, peer: PublicKey, reason: SyncReason) {
366 if !self.state.start_connect(&namespace, peer, reason) {
367 return;
368 }
369 let endpoint = self.endpoint.clone();
370 let sync = self.sync.clone();
371 let metrics = self.metrics.clone();
372 let fut = async move {
373 let res = connect_and_sync(
374 &endpoint,
375 &sync,
376 namespace,
377 EndpointAddr::new(peer),
378 Some(&metrics),
379 )
380 .await;
381 (namespace, peer, reason, res)
382 }
383 .instrument(Span::current());
384 self.running_sync_connect.spawn(fut);
385 }
386
387 async fn shutdown(&mut self) -> anyhow::Result<()> {
388 self.subscribers.clear();
390 let (gossip_shutdown_res, _store) = tokio::join!(
391 self.gossip.shutdown(),
393 self.sync.shutdown()
395 );
396 gossip_shutdown_res?;
397 Ok(())
400 }
401
402 async fn start_sync(
403 &mut self,
404 namespace: NamespaceId,
405 mut peers: Vec<EndpointAddr>,
406 ) -> Result<()> {
407 debug!(?namespace, peers = peers.len(), "start sync");
408 if !self.state.is_syncing(&namespace) {
410 let opts = OpenOpts::default()
411 .sync()
412 .subscribe(self.replica_events_tx.clone());
413 self.sync.open(namespace, opts).await?;
414 self.state.insert(namespace);
415 }
416 match self.sync.get_sync_peers(namespace).await {
418 Ok(None) => {
419 }
421 Ok(Some(known_useful_peers)) => {
422 let as_node_addr = known_useful_peers.into_iter().filter_map(|peer_id_bytes| {
423 match PublicKey::from_bytes(&peer_id_bytes) {
426 Ok(public_key) => Some(EndpointAddr::new(public_key)),
427 Err(_signing_error) => {
428 warn!("potential db corruption: peers per doc can't be decoded");
429 None
430 }
431 }
432 });
433 peers.extend(as_node_addr);
434 }
435 Err(e) => {
436 warn!(%e, "db error reading peers per document")
438 }
439 }
440 self.join_peers(namespace, peers).await?;
441 Ok(())
442 }
443
444 async fn leave(
445 &mut self,
446 namespace: NamespaceId,
447 kill_subscribers: bool,
448 ) -> anyhow::Result<()> {
449 if self.state.remove(&namespace) {
451 self.sync.set_sync(namespace, false).await?;
452 self.sync
453 .unsubscribe(namespace, self.replica_events_tx.clone())
454 .await?;
455 self.sync.close(namespace).await?;
456 self.gossip.quit(&namespace);
457 }
458 if kill_subscribers {
459 self.subscribers.remove(&namespace);
460 }
461 Ok(())
462 }
463
464 async fn join_peers(&mut self, namespace: NamespaceId, peers: Vec<EndpointAddr>) -> Result<()> {
465 let mut peer_ids = Vec::new();
466
467 for peer in peers.into_iter() {
469 let peer_id = peer.id;
470 if !peer.is_empty() {
473 self.memory_lookup.add_endpoint_info(peer);
474 }
475 peer_ids.push(peer_id);
476 }
477
478 self.gossip.join(namespace, peer_ids.clone()).await?;
480
481 if !peer_ids.is_empty() {
482 for peer in peer_ids {
484 self.sync_with_peer(namespace, peer, SyncReason::DirectJoin);
485 }
486 }
487 Ok(())
488 }
489
490 #[instrument("connect", skip_all, fields(peer = %peer.fmt_short(), namespace = %namespace.fmt_short()))]
491 async fn on_sync_via_connect_finished(
492 &mut self,
493 namespace: NamespaceId,
494 peer: PublicKey,
495 reason: SyncReason,
496 result: Result<SyncFinished, ConnectError>,
497 ) {
498 match result {
499 Err(ConnectError::RemoteAbort(AbortReason::AlreadySyncing)) => {
500 debug!(?reason, "remote abort, already syncing");
501 }
502 res => {
503 self.on_sync_finished(
504 namespace,
505 peer,
506 Origin::Connect(reason),
507 res.map_err(Into::into),
508 )
509 .await
510 }
511 }
512 }
513
514 #[instrument("accept", skip_all, fields(peer = %fmt_accept_peer(&res), namespace = %fmt_accept_namespace(&res)))]
515 async fn on_sync_via_accept_finished(&mut self, res: Result<SyncFinished, AcceptError>) {
516 match res {
517 Ok(state) => {
518 self.on_sync_finished(state.namespace, state.peer, Origin::Accept, Ok(state))
519 .await
520 }
521 Err(AcceptError::Abort { reason, .. }) if reason == AbortReason::AlreadySyncing => {
522 debug!(?reason, "aborted by us");
524 }
525 Err(err) => {
526 if let (Some(peer), Some(namespace)) = (err.peer(), err.namespace()) {
527 self.on_sync_finished(
528 namespace,
529 peer,
530 Origin::Accept,
531 Err(anyhow::Error::from(err)),
532 )
533 .await;
534 } else {
535 debug!(?err, "failed before reading the first message");
536 }
537 }
538 }
539 }
540
541 async fn on_sync_finished(
542 &mut self,
543 namespace: NamespaceId,
544 peer: PublicKey,
545 origin: Origin,
546 result: Result<SyncFinished>,
547 ) {
548 match &result {
549 Err(ref err) => {
550 warn!(?origin, ?err, "sync failed");
551 }
552 Ok(ref details) => {
553 info!(
554 sent = %details.outcome.num_sent,
555 recv = %details.outcome.num_recv,
556 t_connect = ?details.timings.connect,
557 t_process = ?details.timings.process,
558 "sync finished",
559 );
560
561 if let Err(e) = self
563 .sync
564 .register_useful_peer(namespace, *peer.as_bytes())
565 .await
566 {
567 debug!(%e, "failed to register peer for document")
568 }
569
570 if details.outcome.num_recv > 0 {
572 info!("broadcast sync report to neighbors");
573 match details
574 .outcome
575 .heads_received
576 .encode(Some(self.gossip.max_message_size()))
577 {
578 Err(err) => warn!(?err, "Failed to encode author heads for sync report"),
579 Ok(heads) => {
580 let report = SyncReport { namespace, heads };
581 self.broadcast_neighbors(namespace, &Op::SyncReport(report))
582 .await;
583 }
584 }
585 }
586 }
587 };
588
589 let result_for_event = match &result {
590 Ok(details) => Ok(details.into()),
591 Err(err) => Err(err.to_string()),
592 };
593
594 let Some((started, resync)) = self.state.finish(&namespace, peer, &origin, result) else {
595 return;
596 };
597
598 let ev = SyncEvent {
599 peer,
600 origin,
601 result: result_for_event,
602 finished: SystemTime::now(),
603 started,
604 };
605 self.subscribers
606 .send(&namespace, Event::SyncFinished(ev))
607 .await;
608
609 if self.queued_hashes.contains_namespace(&namespace) {
615 self.state.set_may_emit_ready(&namespace, true);
616 } else {
617 self.subscribers
618 .send(&namespace, Event::PendingContentReady)
619 .await;
620 self.state.set_may_emit_ready(&namespace, false);
621 }
622
623 if resync {
624 self.sync_with_peer(namespace, peer, SyncReason::Resync);
625 }
626 }
627
628 async fn broadcast_neighbors(&mut self, namespace: NamespaceId, op: &Op) {
629 if !self.state.is_syncing(&namespace) {
630 return;
631 }
632
633 let msg = match postcard::to_stdvec(op) {
634 Ok(msg) => msg,
635 Err(err) => {
636 error!(?err, ?op, "Failed to serialize message:");
637 return;
638 }
639 };
640 self.gossip
642 .broadcast_neighbors(&namespace, msg.into())
643 .await;
644 }
645
646 async fn on_download_ready(
647 &mut self,
648 namespace: NamespaceId,
649 hash: Hash,
650 res: Result<(), anyhow::Error>,
651 ) {
652 let completed_namespaces = self.queued_hashes.remove_hash(&hash);
653 debug!(namespace=%namespace.fmt_short(), success=res.is_ok(), completed_namespaces=completed_namespaces.len(), "download ready");
654 if res.is_ok() {
655 self.subscribers
656 .send(&namespace, Event::ContentReady { hash })
657 .await;
658 self.broadcast_neighbors(namespace, &Op::ContentReady(hash))
660 .await;
661 } else {
662 self.missing_hashes.insert(hash);
663 }
664 for namespace in completed_namespaces.iter() {
665 if let Some(true) = self.state.may_emit_ready(namespace) {
666 self.subscribers
667 .send(namespace, Event::PendingContentReady)
668 .await;
669 }
670 }
671 }
672
673 async fn on_neighbor_content_ready(
674 &mut self,
675 namespace: NamespaceId,
676 node: EndpointId,
677 hash: Hash,
678 ) {
679 self.start_download(namespace, hash, node, true).await;
680 }
681
682 #[instrument("on_sync_report", skip_all, fields(peer = %from.fmt_short(), namespace = %report.namespace.fmt_short()))]
683 async fn on_sync_report(&mut self, from: PublicKey, report: SyncReport) {
684 let namespace = report.namespace;
685 if !self.state.is_syncing(&namespace) {
686 return;
687 }
688 let heads = match AuthorHeads::decode(&report.heads) {
689 Ok(heads) => heads,
690 Err(err) => {
691 warn!(?err, "failed to decode AuthorHeads");
692 return;
693 }
694 };
695 match self.sync.has_news_for_us(report.namespace, heads).await {
696 Ok(Some(updated_authors)) => {
697 info!(%updated_authors, "news reported: sync now");
698 self.sync_with_peer(report.namespace, from, SyncReason::SyncReport);
699 }
700 Ok(None) => {
701 debug!("no news reported: nothing to do");
702 }
703 Err(err) => {
704 warn!("sync actor error: {err:?}");
705 }
706 }
707 }
708
709 async fn on_replica_event(&mut self, event: crate::Event) -> Result<()> {
710 match event {
711 crate::Event::LocalInsert { namespace, entry } => {
712 debug!(namespace=%namespace.fmt_short(), "replica event: LocalInsert");
713 if self.state.is_syncing(&namespace) {
715 let op = Op::Put(entry.clone());
716 let message = postcard::to_stdvec(&op)?.into();
717 self.gossip.broadcast(&namespace, message).await;
718 }
719 }
720 crate::Event::RemoteInsert {
721 namespace,
722 entry,
723 from,
724 should_download,
725 remote_content_status,
726 } => {
727 debug!(namespace=%namespace.fmt_short(), "replica event: RemoteInsert");
728 if should_download {
731 let hash = entry.content_hash();
732 if matches!(remote_content_status, ContentStatus::Complete) {
733 let node_id = PublicKey::from_bytes(&from)?;
734 self.start_download(namespace, hash, node_id, false).await;
735 } else {
736 self.missing_hashes.insert(hash);
737 }
738 }
739 }
740 }
741
742 Ok(())
743 }
744
745 async fn start_download(
746 &mut self,
747 namespace: NamespaceId,
748 hash: Hash,
749 node: PublicKey,
750 only_if_missing: bool,
751 ) {
752 let entry_status = self.bao_store.blobs().status(hash).await;
753 if matches!(entry_status, Ok(BlobStatus::Complete { .. })) {
754 self.missing_hashes.remove(&hash);
755 return;
756 }
757 self.hash_providers
758 .0
759 .lock()
760 .expect("poisoned")
761 .entry(hash)
762 .or_default()
763 .insert(node);
764 if self.queued_hashes.contains_hash(&hash) {
765 self.queued_hashes.insert(hash, namespace);
766 } else if !only_if_missing || self.missing_hashes.contains(&hash) {
767 let req = DownloadRequest::new(
768 HashAndFormat::raw(hash),
769 self.hash_providers.clone(),
770 SplitStrategy::None,
771 );
772 let handle = self.downloader.download_with_opts(req);
773
774 self.queued_hashes.insert(hash, namespace);
775 self.missing_hashes.remove(&hash);
776 self.download_tasks.spawn(async move {
777 (
778 namespace,
779 hash,
780 handle.await.map_err(|e| anyhow::anyhow!(e)),
781 )
782 });
783 }
784 }
785
786 #[instrument("accept", skip_all)]
787 pub async fn handle_connection(&mut self, conn: iroh::endpoint::Connection) {
788 let to_actor_tx = self.sync_actor_tx.clone();
789 let accept_request_cb = move |namespace, peer| {
790 let to_actor_tx = to_actor_tx.clone();
791 async move {
792 let (reply_tx, reply_rx) = oneshot::channel();
793 to_actor_tx
794 .send(ToLiveActor::AcceptSyncRequest {
795 namespace,
796 peer,
797 reply: reply_tx,
798 })
799 .await
800 .ok();
801 match reply_rx.await {
802 Ok(outcome) => outcome,
803 Err(err) => {
804 warn!(
805 "accept request callback failed to retrieve reply from actor: {err:?}"
806 );
807 AcceptOutcome::Reject(AbortReason::InternalServerError)
808 }
809 }
810 }
811 .boxed()
812 };
813 debug!("incoming connection");
814 let sync = self.sync.clone();
815 let metrics = self.metrics.clone();
816 self.running_sync_accept.spawn(
817 async move { handle_connection(sync, conn, accept_request_cb, Some(&metrics)).await }
818 .instrument(Span::current()),
819 );
820 }
821
822 pub fn accept_sync_request(
823 &mut self,
824 namespace: NamespaceId,
825 peer: PublicKey,
826 ) -> AcceptOutcome {
827 self.state
828 .accept_request(&self.endpoint.id(), &namespace, peer)
829 }
830}
831
832#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)]
834pub struct SyncEvent {
835 pub peer: PublicKey,
837 pub origin: Origin,
839 pub finished: SystemTime,
841 pub started: SystemTime,
843 pub result: std::result::Result<SyncDetails, String>,
845}
846
847#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)]
848pub struct SyncDetails {
849 pub entries_received: usize,
851 pub entries_sent: usize,
853}
854
855impl From<&SyncFinished> for SyncDetails {
856 fn from(value: &SyncFinished) -> Self {
857 Self {
858 entries_received: value.outcome.num_recv,
859 entries_sent: value.outcome.num_sent,
860 }
861 }
862}
863
864#[derive(Debug, Default)]
865struct SubscribersMap(HashMap<NamespaceId, Subscribers>);
866
867impl SubscribersMap {
868 fn subscribe(&mut self, namespace: NamespaceId, sender: async_channel::Sender<Event>) {
869 self.0.entry(namespace).or_default().subscribe(sender);
870 }
871
872 async fn send(&mut self, namespace: &NamespaceId, event: Event) -> bool {
873 debug!(namespace=%namespace.fmt_short(), %event, "emit event");
874 let Some(subscribers) = self.0.get_mut(namespace) else {
875 return false;
876 };
877
878 if !subscribers.send(event).await {
879 self.0.remove(namespace);
880 }
881 true
882 }
883
884 fn remove(&mut self, namespace: &NamespaceId) {
885 self.0.remove(namespace);
886 }
887
888 fn clear(&mut self) {
889 self.0.clear();
890 }
891}
892
893#[derive(Debug, Default)]
894struct QueuedHashes {
895 by_hash: HashMap<Hash, HashSet<NamespaceId>>,
896 by_namespace: HashMap<NamespaceId, HashSet<Hash>>,
897}
898
899#[derive(Debug, Clone, Default)]
900struct ProviderNodes(Arc<std::sync::Mutex<HashMap<Hash, HashSet<EndpointId>>>>);
901
902impl ContentDiscovery for ProviderNodes {
903 fn find_providers(&self, hash: HashAndFormat) -> n0_future::stream::Boxed<EndpointId> {
904 let nodes = self
905 .0
906 .lock()
907 .expect("poisoned")
908 .get(&hash.hash)
909 .into_iter()
910 .flatten()
911 .cloned()
912 .collect::<Vec<_>>();
913 Box::pin(n0_future::stream::iter(nodes))
914 }
915}
916
917impl QueuedHashes {
918 fn insert(&mut self, hash: Hash, namespace: NamespaceId) {
919 self.by_hash.entry(hash).or_default().insert(namespace);
920 self.by_namespace.entry(namespace).or_default().insert(hash);
921 }
922
923 fn remove_hash(&mut self, hash: &Hash) -> Vec<NamespaceId> {
927 let namespaces = self.by_hash.remove(hash).unwrap_or_default();
928 let mut removed_namespaces = vec![];
929 for namespace in namespaces {
930 if let Some(hashes) = self.by_namespace.get_mut(&namespace) {
931 hashes.remove(hash);
932 if hashes.is_empty() {
933 self.by_namespace.remove(&namespace);
934 removed_namespaces.push(namespace);
935 }
936 }
937 }
938 removed_namespaces
939 }
940
941 fn contains_hash(&self, hash: &Hash) -> bool {
942 self.by_hash.contains_key(hash)
943 }
944
945 fn contains_namespace(&self, namespace: &NamespaceId) -> bool {
946 self.by_namespace.contains_key(namespace)
947 }
948}
949
950#[derive(Debug, Default)]
951struct Subscribers(Vec<async_channel::Sender<Event>>);
952
953impl Subscribers {
954 fn subscribe(&mut self, sender: async_channel::Sender<Event>) {
955 self.0.push(sender)
956 }
957
958 async fn send(&mut self, event: Event) -> bool {
959 let futs = self.0.iter().map(|sender| sender.send(event.clone()));
960 let res = futures_buffered::join_all(futs).await;
961 for (i, res) in res.into_iter().enumerate().rev() {
963 if res.is_err() {
964 self.0.remove(i);
965 }
966 }
967 !self.0.is_empty()
968 }
969}
970
971fn fmt_accept_peer(res: &Result<SyncFinished, AcceptError>) -> String {
972 match res {
973 Ok(res) => res.peer.fmt_short().to_string(),
974 Err(err) => err
975 .peer()
976 .map(|x| x.fmt_short().to_string())
977 .unwrap_or_else(|| "unknown".to_string()),
978 }
979}
980
981fn fmt_accept_namespace(res: &Result<SyncFinished, AcceptError>) -> String {
982 match res {
983 Ok(res) => res.namespace.fmt_short(),
984 Err(err) => err
985 .namespace()
986 .map(|x| x.fmt_short())
987 .unwrap_or_else(|| "unknown".to_string()),
988 }
989}
990
991#[cfg(test)]
992mod tests {
993 use super::*;
994
995 #[tokio::test]
996 async fn test_sync_remove() {
997 let pk = PublicKey::from_bytes(&[1; 32]).unwrap();
998 let (a_tx, a_rx) = async_channel::unbounded();
999 let (b_tx, b_rx) = async_channel::unbounded();
1000 let mut subscribers = Subscribers::default();
1001 subscribers.subscribe(a_tx);
1002 subscribers.subscribe(b_tx);
1003 drop(a_rx);
1004 drop(b_rx);
1005 subscribers.send(Event::NeighborUp(pk)).await;
1006 }
1007}