Skip to main content

iroh_docs/engine/
live.rs

1#![allow(missing_docs)]
2
3use std::{
4    collections::{HashMap, HashSet},
5    sync::Arc,
6    time::SystemTime,
7};
8
9use anyhow::{Context, Result};
10use futures_lite::FutureExt;
11use iroh::{address_lookup::memory::MemoryLookup, Endpoint, EndpointAddr, EndpointId, PublicKey};
12use iroh_blobs::{
13    api::{
14        blobs::BlobStatus,
15        downloader::{ContentDiscovery, DownloadRequest, Downloader, SplitStrategy},
16        Store,
17    },
18    Hash, HashAndFormat,
19};
20use iroh_gossip::net::Gossip;
21use serde::{Deserialize, Serialize};
22use tokio::{
23    sync::{self, mpsc, oneshot},
24    task::JoinSet,
25};
26use tracing::{debug, error, info, instrument, trace, warn, Instrument, Span};
27
28// use super::gossip::{GossipActor, ToGossipActor};
29use super::state::{NamespaceStates, Origin, SyncReason};
30use crate::{
31    actor::{OpenOpts, SyncHandle},
32    engine::gossip::GossipState,
33    metrics::Metrics,
34    net::{
35        connect_and_sync, handle_connection, AbortReason, AcceptError, AcceptOutcome, ConnectError,
36        SyncFinished,
37    },
38    AuthorHeads, ContentStatus, NamespaceId, SignedEntry,
39};
40
41/// An iroh-docs operation
42///
43/// This is the message that is broadcast over iroh-gossip.
44#[derive(Debug, Clone, Serialize, Deserialize, strum::Display)]
45pub enum Op {
46    /// A new entry was inserted into the document.
47    Put(SignedEntry),
48    /// A peer now has content available for a hash.
49    ContentReady(Hash),
50    /// We synced with another peer, here's the news.
51    SyncReport(SyncReport),
52}
53
54/// Report of a successful sync with the new heads.
55#[derive(Debug, Clone, Serialize, Deserialize)]
56pub struct SyncReport {
57    namespace: NamespaceId,
58    /// Encoded [`AuthorHeads`]
59    heads: Vec<u8>,
60}
61
62/// Messages to the sync actor
63#[derive(derive_more::Debug, strum::Display)]
64pub enum ToLiveActor {
65    StartSync {
66        namespace: NamespaceId,
67        peers: Vec<EndpointAddr>,
68        #[debug("onsehot::Sender")]
69        reply: sync::oneshot::Sender<anyhow::Result<()>>,
70    },
71    Leave {
72        namespace: NamespaceId,
73        kill_subscribers: bool,
74        #[debug("onsehot::Sender")]
75        reply: sync::oneshot::Sender<anyhow::Result<()>>,
76    },
77    Shutdown {
78        reply: sync::oneshot::Sender<()>,
79    },
80    Subscribe {
81        namespace: NamespaceId,
82        #[debug("sender")]
83        sender: async_channel::Sender<Event>,
84        #[debug("oneshot::Sender")]
85        reply: sync::oneshot::Sender<Result<()>>,
86    },
87    HandleConnection {
88        conn: iroh::endpoint::Connection,
89    },
90    AcceptSyncRequest {
91        namespace: NamespaceId,
92        peer: PublicKey,
93        #[debug("oneshot::Sender")]
94        reply: sync::oneshot::Sender<AcceptOutcome>,
95    },
96
97    IncomingSyncReport {
98        from: PublicKey,
99        report: SyncReport,
100    },
101    NeighborContentReady {
102        namespace: NamespaceId,
103        node: PublicKey,
104        hash: Hash,
105    },
106    NeighborUp {
107        namespace: NamespaceId,
108        peer: PublicKey,
109    },
110    NeighborDown {
111        namespace: NamespaceId,
112        peer: PublicKey,
113    },
114}
115
116/// Events informing about actions of the live sync progress.
117#[derive(Serialize, Deserialize, Debug, Clone, Eq, PartialEq, strum::Display)]
118pub enum Event {
119    /// The content of an entry was downloaded and is now available at the local node
120    ContentReady {
121        /// The content hash of the newly available entry content
122        hash: Hash,
123    },
124    /// We have a new neighbor in the swarm.
125    NeighborUp(PublicKey),
126    /// We lost a neighbor in the swarm.
127    NeighborDown(PublicKey),
128    /// A set-reconciliation sync finished.
129    SyncFinished(SyncEvent),
130    /// All pending content is now ready.
131    ///
132    /// This event is only emitted after a sync completed and `Self::SyncFinished` was emitted at
133    /// least once. It signals that all currently pending downloads have been completed.
134    ///
135    /// Receiving this event does not guarantee that all content in the document is available. If
136    /// blobs failed to download, this event will still be emitted after all operations completed.
137    PendingContentReady,
138}
139
140type SyncConnectRes = (
141    NamespaceId,
142    PublicKey,
143    SyncReason,
144    Result<SyncFinished, ConnectError>,
145);
146type SyncAcceptRes = Result<SyncFinished, AcceptError>;
147type DownloadRes = (NamespaceId, Hash, Result<(), anyhow::Error>);
148
149// Currently peers might double-sync in both directions.
150pub struct LiveActor {
151    /// Receiver for actor messages.
152    inbox: mpsc::Receiver<ToLiveActor>,
153    sync: SyncHandle,
154    endpoint: Endpoint,
155    bao_store: Store,
156    downloader: Downloader,
157    memory_lookup: MemoryLookup,
158    replica_events_tx: async_channel::Sender<crate::Event>,
159    replica_events_rx: async_channel::Receiver<crate::Event>,
160
161    /// Send messages to self.
162    /// Note: Must not be used in methods called from `Self::run` directly to prevent deadlocks.
163    /// Only clone into newly spawned tasks.
164    sync_actor_tx: mpsc::Sender<ToLiveActor>,
165    gossip: GossipState,
166
167    /// Running sync futures (from connect).
168    running_sync_connect: JoinSet<SyncConnectRes>,
169    /// Running sync futures (from accept).
170    running_sync_accept: JoinSet<SyncAcceptRes>,
171    /// Running download futures.
172    download_tasks: JoinSet<DownloadRes>,
173    /// Content hashes which are wanted but not yet queued because no provider was found.
174    missing_hashes: HashSet<Hash>,
175    /// Content hashes queued in downloader.
176    queued_hashes: QueuedHashes,
177    /// Nodes known to have a hash
178    hash_providers: ProviderNodes,
179
180    /// Subscribers to actor events
181    subscribers: SubscribersMap,
182
183    /// Sync state per replica and peer
184    state: NamespaceStates,
185    metrics: Arc<Metrics>,
186}
187impl LiveActor {
188    /// Create the live actor.
189    #[allow(clippy::too_many_arguments)]
190    pub fn new(
191        sync: SyncHandle,
192        endpoint: Endpoint,
193        gossip: Gossip,
194        bao_store: Store,
195        downloader: Downloader,
196        inbox: mpsc::Receiver<ToLiveActor>,
197        sync_actor_tx: mpsc::Sender<ToLiveActor>,
198        metrics: Arc<Metrics>,
199    ) -> Self {
200        let (replica_events_tx, replica_events_rx) = async_channel::bounded(1024);
201        let gossip_state = GossipState::new(gossip, sync.clone(), sync_actor_tx.clone());
202        let memory_lookup = MemoryLookup::new();
203        endpoint.address_lookup().add(memory_lookup.clone());
204        Self {
205            inbox,
206            sync,
207            replica_events_rx,
208            replica_events_tx,
209            endpoint,
210            memory_lookup,
211            gossip: gossip_state,
212            bao_store,
213            downloader,
214            sync_actor_tx,
215            running_sync_connect: Default::default(),
216            running_sync_accept: Default::default(),
217            subscribers: Default::default(),
218            download_tasks: Default::default(),
219            state: Default::default(),
220            missing_hashes: Default::default(),
221            queued_hashes: Default::default(),
222            hash_providers: Default::default(),
223            metrics,
224        }
225    }
226
227    /// Run the actor loop.
228    pub async fn run(mut self) -> Result<()> {
229        let shutdown_reply = self.run_inner().await;
230        if let Err(err) = self.shutdown().await {
231            error!(?err, "Error during shutdown");
232        }
233        drop(self);
234        match shutdown_reply {
235            Ok(reply) => {
236                reply.send(()).ok();
237                Ok(())
238            }
239            Err(err) => Err(err),
240        }
241    }
242
243    async fn run_inner(&mut self) -> Result<oneshot::Sender<()>> {
244        let mut i = 0;
245        loop {
246            i += 1;
247            trace!(?i, "tick wait");
248            self.metrics.doc_live_tick_main.inc();
249            tokio::select! {
250                biased;
251                msg = self.inbox.recv() => {
252                    let msg = msg.context("to_actor closed")?;
253                    trace!(?i, %msg, "tick: to_actor");
254                    self.metrics.doc_live_tick_actor.inc();
255                    match msg {
256                        ToLiveActor::Shutdown { reply } => {
257                            break Ok(reply);
258                        }
259                        msg => {
260                            self.on_actor_message(msg).await.context("on_actor_message")?;
261                        }
262                    }
263                }
264                event = self.replica_events_rx.recv() => {
265                    trace!(?i, "tick: replica_event");
266                    self.metrics.doc_live_tick_replica_event.inc();
267                    let event = event.context("replica_events closed")?;
268                    if let Err(err) = self.on_replica_event(event).await {
269                        error!(?err, "Failed to process replica event");
270                    }
271                }
272                Some(res) = self.running_sync_connect.join_next(), if !self.running_sync_connect.is_empty() => {
273                    trace!(?i, "tick: running_sync_connect");
274                    self.metrics.doc_live_tick_running_sync_connect.inc();
275                    let (namespace, peer, reason, res) = res.context("running_sync_connect closed")?;
276                    self.on_sync_via_connect_finished(namespace, peer, reason, res).await;
277
278                }
279                Some(res) = self.running_sync_accept.join_next(), if !self.running_sync_accept.is_empty() => {
280                    trace!(?i, "tick: running_sync_accept");
281                    self.metrics.doc_live_tick_running_sync_accept.inc();
282                    let res = res.context("running_sync_accept closed")?;
283                    self.on_sync_via_accept_finished(res).await;
284                }
285                Some(res) = self.download_tasks.join_next(), if !self.download_tasks.is_empty() => {
286                    trace!(?i, "tick: pending_downloads");
287                    self.metrics.doc_live_tick_pending_downloads.inc();
288                    let (namespace, hash, res) = res.context("pending_downloads closed")?;
289                    self.on_download_ready(namespace, hash, res).await;
290                }
291                res = self.gossip.progress(), if !self.gossip.is_empty() => {
292                    if let Err(error) = res {
293                        warn!(?error, "gossip state failed");
294                    }
295                }
296            }
297        }
298    }
299
300    async fn on_actor_message(&mut self, msg: ToLiveActor) -> anyhow::Result<bool> {
301        match msg {
302            ToLiveActor::Shutdown { .. } => {
303                unreachable!("handled in run");
304            }
305            ToLiveActor::IncomingSyncReport { from, report } => {
306                self.on_sync_report(from, report).await
307            }
308            ToLiveActor::NeighborUp { namespace, peer } => {
309                debug!(peer = %peer.fmt_short(), namespace = %namespace.fmt_short(), "neighbor up");
310                self.sync_with_peer(namespace, peer, SyncReason::NewNeighbor);
311                self.subscribers
312                    .send(&namespace, Event::NeighborUp(peer))
313                    .await;
314            }
315            ToLiveActor::NeighborDown { namespace, peer } => {
316                debug!(peer = %peer.fmt_short(), namespace = %namespace.fmt_short(), "neighbor down");
317                self.subscribers
318                    .send(&namespace, Event::NeighborDown(peer))
319                    .await;
320            }
321            ToLiveActor::StartSync {
322                namespace,
323                peers,
324                reply,
325            } => {
326                let res = self.start_sync(namespace, peers).await;
327                reply.send(res).ok();
328            }
329            ToLiveActor::Leave {
330                namespace,
331                kill_subscribers,
332                reply,
333            } => {
334                let res = self.leave(namespace, kill_subscribers).await;
335                reply.send(res).ok();
336            }
337            ToLiveActor::Subscribe {
338                namespace,
339                sender,
340                reply,
341            } => {
342                self.subscribers.subscribe(namespace, sender);
343                reply.send(Ok(())).ok();
344            }
345            ToLiveActor::HandleConnection { conn } => {
346                self.handle_connection(conn).await;
347            }
348            ToLiveActor::AcceptSyncRequest {
349                namespace,
350                peer,
351                reply,
352            } => {
353                let outcome = self.accept_sync_request(namespace, peer);
354                reply.send(outcome).ok();
355            }
356            ToLiveActor::NeighborContentReady {
357                namespace,
358                node,
359                hash,
360            } => {
361                self.on_neighbor_content_ready(namespace, node, hash).await;
362            }
363        };
364        Ok(true)
365    }
366
367    #[instrument("connect", skip_all, fields(peer = %peer.fmt_short(), namespace = %namespace.fmt_short()))]
368    fn sync_with_peer(&mut self, namespace: NamespaceId, peer: PublicKey, reason: SyncReason) {
369        if !self.state.start_connect(&namespace, peer, reason) {
370            return;
371        }
372        let endpoint = self.endpoint.clone();
373        let sync = self.sync.clone();
374        let metrics = self.metrics.clone();
375        let fut = async move {
376            let res = connect_and_sync(
377                &endpoint,
378                &sync,
379                namespace,
380                EndpointAddr::new(peer),
381                Some(&metrics),
382            )
383            .await;
384            (namespace, peer, reason, res)
385        }
386        .instrument(Span::current());
387        self.running_sync_connect.spawn(fut);
388    }
389
390    async fn shutdown(&mut self) -> anyhow::Result<()> {
391        // cancel all subscriptions
392        self.subscribers.clear();
393        let (gossip_shutdown_res, _store) = tokio::join!(
394            // quit the gossip topics and task loops.
395            self.gossip.shutdown(),
396            // shutdown sync thread
397            self.sync.shutdown()
398        );
399        gossip_shutdown_res?;
400        // TODO: abort_all and join_next all JoinSets to catch panics
401        // (they are aborted on drop, but that swallows panics)
402        Ok(())
403    }
404
405    async fn start_sync(
406        &mut self,
407        namespace: NamespaceId,
408        mut peers: Vec<EndpointAddr>,
409    ) -> Result<()> {
410        debug!(?namespace, peers = peers.len(), "start sync");
411        // update state to allow sync
412        if !self.state.is_syncing(&namespace) {
413            let opts = OpenOpts::default()
414                .sync()
415                .subscribe(self.replica_events_tx.clone());
416            self.sync.open(namespace, opts).await?;
417            self.state.insert(namespace);
418        }
419        // add the peers stored for this document
420        match self.sync.get_sync_peers(namespace).await {
421            Ok(None) => {
422                // no peers for this document
423            }
424            Ok(Some(known_useful_peers)) => {
425                let as_node_addr = known_useful_peers.into_iter().filter_map(|peer_id_bytes| {
426                    // peers are stored as bytes, don't fail the operation if they can't be
427                    // decoded: simply ignore the peer
428                    match PublicKey::from_bytes(&peer_id_bytes) {
429                        Ok(public_key) => Some(EndpointAddr::new(public_key)),
430                        Err(_signing_error) => {
431                            warn!("potential db corruption: peers per doc can't be decoded");
432                            None
433                        }
434                    }
435                });
436                peers.extend(as_node_addr);
437            }
438            Err(e) => {
439                // try to continue if peers per doc can't be read since they are not vital for sync
440                warn!(%e, "db error reading peers per document")
441            }
442        }
443        self.join_peers(namespace, peers).await?;
444        Ok(())
445    }
446
447    async fn leave(
448        &mut self,
449        namespace: NamespaceId,
450        kill_subscribers: bool,
451    ) -> anyhow::Result<()> {
452        // self.subscribers.remove(&namespace);
453        if self.state.remove(&namespace) {
454            self.sync.set_sync(namespace, false).await?;
455            self.sync
456                .unsubscribe(namespace, self.replica_events_tx.clone())
457                .await?;
458            self.sync.close(namespace).await?;
459            self.gossip.quit(&namespace);
460        }
461        if kill_subscribers {
462            self.subscribers.remove(&namespace);
463        }
464        Ok(())
465    }
466
467    async fn join_peers(&mut self, namespace: NamespaceId, peers: Vec<EndpointAddr>) -> Result<()> {
468        let mut peer_ids = Vec::new();
469
470        // add addresses of peers to our endpoint address book
471        for peer in peers.into_iter() {
472            let peer_id = peer.id;
473            // adding a node address without any addressing info fails with an error,
474            // but we still want to include those peers because endpoint address lookup might find addresses for them
475            if !peer.is_empty() {
476                self.memory_lookup.add_endpoint_info(peer);
477            }
478            peer_ids.push(peer_id);
479        }
480
481        // tell gossip to join
482        self.gossip.join(namespace, peer_ids.clone()).await?;
483
484        if !peer_ids.is_empty() {
485            // trigger initial sync with initial peers
486            for peer in peer_ids {
487                self.sync_with_peer(namespace, peer, SyncReason::DirectJoin);
488            }
489        }
490        Ok(())
491    }
492
493    #[instrument("connect", skip_all, fields(peer = %peer.fmt_short(), namespace = %namespace.fmt_short()))]
494    async fn on_sync_via_connect_finished(
495        &mut self,
496        namespace: NamespaceId,
497        peer: PublicKey,
498        reason: SyncReason,
499        result: Result<SyncFinished, ConnectError>,
500    ) {
501        match result {
502            Err(ConnectError::RemoteAbort(AbortReason::AlreadySyncing)) => {
503                debug!(?reason, "remote abort, already syncing");
504            }
505            res => {
506                self.on_sync_finished(
507                    namespace,
508                    peer,
509                    Origin::Connect(reason),
510                    res.map_err(Into::into),
511                )
512                .await
513            }
514        }
515    }
516
517    #[instrument("accept", skip_all, fields(peer = %fmt_accept_peer(&res), namespace = %fmt_accept_namespace(&res)))]
518    async fn on_sync_via_accept_finished(&mut self, res: Result<SyncFinished, AcceptError>) {
519        match res {
520            Ok(state) => {
521                self.on_sync_finished(state.namespace, state.peer, Origin::Accept, Ok(state))
522                    .await
523            }
524            Err(AcceptError::Abort { reason, .. }) if reason == AbortReason::AlreadySyncing => {
525                // In case we aborted the sync: do nothing (our outgoing sync is in progress)
526                debug!(?reason, "aborted by us");
527            }
528            Err(err) => {
529                if let (Some(peer), Some(namespace)) = (err.peer(), err.namespace()) {
530                    self.on_sync_finished(
531                        namespace,
532                        peer,
533                        Origin::Accept,
534                        Err(anyhow::Error::from(err)),
535                    )
536                    .await;
537                } else {
538                    debug!(?err, "failed before reading the first message");
539                }
540            }
541        }
542    }
543
544    async fn on_sync_finished(
545        &mut self,
546        namespace: NamespaceId,
547        peer: PublicKey,
548        origin: Origin,
549        result: Result<SyncFinished>,
550    ) {
551        match &result {
552            Err(ref err) => {
553                warn!(?origin, ?err, "sync failed");
554            }
555            Ok(ref details) => {
556                info!(
557                    sent = %details.outcome.num_sent,
558                    recv = %details.outcome.num_recv,
559                    t_connect = ?details.timings.connect,
560                    t_process = ?details.timings.process,
561                    "sync finished",
562                );
563
564                // register the peer as useful for the document
565                if let Err(e) = self
566                    .sync
567                    .register_useful_peer(namespace, *peer.as_bytes())
568                    .await
569                {
570                    debug!(%e, "failed to register peer for document")
571                }
572
573                // broadcast a sync report to our neighbors, but only if we received new entries.
574                if details.outcome.num_recv > 0 {
575                    info!("broadcast sync report to neighbors");
576                    match details
577                        .outcome
578                        .heads_received
579                        .encode(Some(self.gossip.max_message_size()))
580                    {
581                        Err(err) => warn!(?err, "Failed to encode author heads for sync report"),
582                        Ok(heads) => {
583                            let report = SyncReport { namespace, heads };
584                            self.broadcast_neighbors(namespace, &Op::SyncReport(report))
585                                .await;
586                        }
587                    }
588                }
589            }
590        };
591
592        let result_for_event = match &result {
593            Ok(details) => Ok(details.into()),
594            Err(err) => Err(err.to_string()),
595        };
596
597        let Some((started, resync)) = self.state.finish(&namespace, peer, &origin, result) else {
598            return;
599        };
600
601        let ev = SyncEvent {
602            peer,
603            origin,
604            result: result_for_event,
605            finished: SystemTime::now(),
606            started,
607        };
608        self.subscribers
609            .send(&namespace, Event::SyncFinished(ev))
610            .await;
611
612        // Check if there are queued pending content hashes for this namespace.
613        // If hashes are pending, mark this namespace to be eglible for a PendingContentReady event once all
614        // pending hashes have completed downloading.
615        // If no hashes are pending, emit the PendingContentReady event right away. The next
616        // PendingContentReady event may then only be emitted after the next sync completes.
617        if self.queued_hashes.contains_namespace(&namespace) {
618            self.state.set_may_emit_ready(&namespace, true);
619        } else {
620            self.subscribers
621                .send(&namespace, Event::PendingContentReady)
622                .await;
623            self.state.set_may_emit_ready(&namespace, false);
624        }
625
626        if resync {
627            self.sync_with_peer(namespace, peer, SyncReason::Resync);
628        }
629    }
630
631    async fn broadcast_neighbors(&mut self, namespace: NamespaceId, op: &Op) {
632        if !self.state.is_syncing(&namespace) {
633            return;
634        }
635
636        let msg = match postcard::to_stdvec(op) {
637            Ok(msg) => msg,
638            Err(err) => {
639                error!(?err, ?op, "Failed to serialize message:");
640                return;
641            }
642        };
643        // TODO: We should debounce and merge these neighbor announcements likely.
644        self.gossip
645            .broadcast_neighbors(&namespace, msg.into())
646            .await;
647    }
648
649    async fn on_download_ready(
650        &mut self,
651        namespace: NamespaceId,
652        hash: Hash,
653        res: Result<(), anyhow::Error>,
654    ) {
655        let completed_namespaces = self.queued_hashes.remove_hash(&hash);
656        debug!(namespace=%namespace.fmt_short(), success=res.is_ok(), completed_namespaces=completed_namespaces.len(), "download ready");
657        if res.is_ok() {
658            self.subscribers
659                .send(&namespace, Event::ContentReady { hash })
660                .await;
661            // Inform our neighbors that we have new content ready.
662            self.broadcast_neighbors(namespace, &Op::ContentReady(hash))
663                .await;
664        } else {
665            self.missing_hashes.insert(hash);
666        }
667        for namespace in completed_namespaces.iter() {
668            if let Some(true) = self.state.may_emit_ready(namespace) {
669                self.subscribers
670                    .send(namespace, Event::PendingContentReady)
671                    .await;
672            }
673        }
674    }
675
676    async fn on_neighbor_content_ready(
677        &mut self,
678        namespace: NamespaceId,
679        node: EndpointId,
680        hash: Hash,
681    ) {
682        self.start_download(namespace, hash, node, true).await;
683    }
684
685    #[instrument("on_sync_report", skip_all, fields(peer = %from.fmt_short(), namespace = %report.namespace.fmt_short()))]
686    async fn on_sync_report(&mut self, from: PublicKey, report: SyncReport) {
687        let namespace = report.namespace;
688        if !self.state.is_syncing(&namespace) {
689            return;
690        }
691        let heads = match AuthorHeads::decode(&report.heads) {
692            Ok(heads) => heads,
693            Err(err) => {
694                warn!(?err, "failed to decode AuthorHeads");
695                return;
696            }
697        };
698        match self.sync.has_news_for_us(report.namespace, heads).await {
699            Ok(Some(updated_authors)) => {
700                info!(%updated_authors, "news reported: sync now");
701                self.sync_with_peer(report.namespace, from, SyncReason::SyncReport);
702            }
703            Ok(None) => {
704                debug!("no news reported: nothing to do");
705            }
706            Err(err) => {
707                warn!("sync actor error: {err:?}");
708            }
709        }
710    }
711
712    async fn on_replica_event(&mut self, event: crate::Event) -> Result<()> {
713        match event {
714            crate::Event::LocalInsert { namespace, entry } => {
715                debug!(namespace=%namespace.fmt_short(), "replica event: LocalInsert");
716                // A new entry was inserted locally. Broadcast a gossip message.
717                if self.state.is_syncing(&namespace) {
718                    let op = Op::Put(entry.clone());
719                    let message = postcard::to_stdvec(&op)?.into();
720                    self.gossip.broadcast(&namespace, message).await;
721                }
722            }
723            crate::Event::RemoteInsert {
724                namespace,
725                entry,
726                from,
727                should_download,
728                remote_content_status,
729            } => {
730                debug!(namespace=%namespace.fmt_short(), "replica event: RemoteInsert");
731                // A new entry was inserted from initial sync or gossip. Queue downloading the
732                // content.
733                if should_download {
734                    let hash = entry.content_hash();
735                    if matches!(remote_content_status, ContentStatus::Complete) {
736                        let node_id = PublicKey::from_bytes(&from)?;
737                        self.start_download(namespace, hash, node_id, false).await;
738                    } else {
739                        self.missing_hashes.insert(hash);
740                    }
741                }
742            }
743        }
744
745        Ok(())
746    }
747
748    async fn start_download(
749        &mut self,
750        namespace: NamespaceId,
751        hash: Hash,
752        node: PublicKey,
753        only_if_missing: bool,
754    ) {
755        let entry_status = self.bao_store.blobs().status(hash).await;
756        if matches!(entry_status, Ok(BlobStatus::Complete { .. })) {
757            self.missing_hashes.remove(&hash);
758            return;
759        }
760        self.hash_providers
761            .0
762            .lock()
763            .expect("poisoned")
764            .entry(hash)
765            .or_default()
766            .insert(node);
767        if self.queued_hashes.contains_hash(&hash) {
768            self.queued_hashes.insert(hash, namespace);
769        } else if !only_if_missing || self.missing_hashes.contains(&hash) {
770            let req = DownloadRequest::new(
771                HashAndFormat::raw(hash),
772                self.hash_providers.clone(),
773                SplitStrategy::None,
774            );
775            let handle = self.downloader.download_with_opts(req);
776
777            self.queued_hashes.insert(hash, namespace);
778            self.missing_hashes.remove(&hash);
779            self.download_tasks.spawn(async move {
780                (
781                    namespace,
782                    hash,
783                    handle.await.map_err(|e| anyhow::anyhow!(e)),
784                )
785            });
786        }
787    }
788
789    #[instrument("accept", skip_all)]
790    pub async fn handle_connection(&mut self, conn: iroh::endpoint::Connection) {
791        let to_actor_tx = self.sync_actor_tx.clone();
792        let accept_request_cb = move |namespace, peer| {
793            let to_actor_tx = to_actor_tx.clone();
794            async move {
795                let (reply_tx, reply_rx) = oneshot::channel();
796                to_actor_tx
797                    .send(ToLiveActor::AcceptSyncRequest {
798                        namespace,
799                        peer,
800                        reply: reply_tx,
801                    })
802                    .await
803                    .ok();
804                match reply_rx.await {
805                    Ok(outcome) => outcome,
806                    Err(err) => {
807                        warn!(
808                            "accept request callback failed to retrieve reply from actor: {err:?}"
809                        );
810                        AcceptOutcome::Reject(AbortReason::InternalServerError)
811                    }
812                }
813            }
814            .boxed()
815        };
816        debug!("incoming connection");
817        let sync = self.sync.clone();
818        let metrics = self.metrics.clone();
819        self.running_sync_accept.spawn(
820            async move { handle_connection(sync, conn, accept_request_cb, Some(&metrics)).await }
821                .instrument(Span::current()),
822        );
823    }
824
825    pub fn accept_sync_request(
826        &mut self,
827        namespace: NamespaceId,
828        peer: PublicKey,
829    ) -> AcceptOutcome {
830        self.state
831            .accept_request(&self.endpoint.id(), &namespace, peer)
832    }
833}
834
835/// Event emitted when a sync operation completes
836#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)]
837pub struct SyncEvent {
838    /// Peer we synced with
839    pub peer: PublicKey,
840    /// Origin of the sync exchange
841    pub origin: Origin,
842    /// Timestamp when the sync started
843    pub finished: SystemTime,
844    /// Timestamp when the sync finished
845    pub started: SystemTime,
846    /// Result of the sync operation
847    pub result: std::result::Result<SyncDetails, String>,
848}
849
850#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)]
851pub struct SyncDetails {
852    /// Number of entries received
853    pub entries_received: usize,
854    /// Number of entries sent
855    pub entries_sent: usize,
856}
857
858impl From<&SyncFinished> for SyncDetails {
859    fn from(value: &SyncFinished) -> Self {
860        Self {
861            entries_received: value.outcome.num_recv,
862            entries_sent: value.outcome.num_sent,
863        }
864    }
865}
866
867#[derive(Debug, Default)]
868struct SubscribersMap(HashMap<NamespaceId, Subscribers>);
869
870impl SubscribersMap {
871    fn subscribe(&mut self, namespace: NamespaceId, sender: async_channel::Sender<Event>) {
872        self.0.entry(namespace).or_default().subscribe(sender);
873    }
874
875    async fn send(&mut self, namespace: &NamespaceId, event: Event) -> bool {
876        debug!(namespace=%namespace.fmt_short(), %event, "emit event");
877        let Some(subscribers) = self.0.get_mut(namespace) else {
878            return false;
879        };
880
881        if !subscribers.send(event).await {
882            self.0.remove(namespace);
883        }
884        true
885    }
886
887    fn remove(&mut self, namespace: &NamespaceId) {
888        self.0.remove(namespace);
889    }
890
891    fn clear(&mut self) {
892        self.0.clear();
893    }
894}
895
896#[derive(Debug, Default)]
897struct QueuedHashes {
898    by_hash: HashMap<Hash, HashSet<NamespaceId>>,
899    by_namespace: HashMap<NamespaceId, HashSet<Hash>>,
900}
901
902#[derive(Debug, Clone, Default)]
903struct ProviderNodes(Arc<std::sync::Mutex<HashMap<Hash, HashSet<EndpointId>>>>);
904
905impl ContentDiscovery for ProviderNodes {
906    fn find_providers(&self, hash: HashAndFormat) -> n0_future::stream::Boxed<EndpointId> {
907        let nodes = self
908            .0
909            .lock()
910            .expect("poisoned")
911            .get(&hash.hash)
912            .into_iter()
913            .flatten()
914            .cloned()
915            .collect::<Vec<_>>();
916        Box::pin(n0_future::stream::iter(nodes))
917    }
918}
919
920impl QueuedHashes {
921    fn insert(&mut self, hash: Hash, namespace: NamespaceId) {
922        self.by_hash.entry(hash).or_default().insert(namespace);
923        self.by_namespace.entry(namespace).or_default().insert(hash);
924    }
925
926    /// Remove a hash from the set of queued hashes.
927    ///
928    /// Returns a list of namespaces that are now complete (have no queued hashes anymore).
929    fn remove_hash(&mut self, hash: &Hash) -> Vec<NamespaceId> {
930        let namespaces = self.by_hash.remove(hash).unwrap_or_default();
931        let mut removed_namespaces = vec![];
932        for namespace in namespaces {
933            if let Some(hashes) = self.by_namespace.get_mut(&namespace) {
934                hashes.remove(hash);
935                if hashes.is_empty() {
936                    self.by_namespace.remove(&namespace);
937                    removed_namespaces.push(namespace);
938                }
939            }
940        }
941        removed_namespaces
942    }
943
944    fn contains_hash(&self, hash: &Hash) -> bool {
945        self.by_hash.contains_key(hash)
946    }
947
948    fn contains_namespace(&self, namespace: &NamespaceId) -> bool {
949        self.by_namespace.contains_key(namespace)
950    }
951}
952
953#[derive(Debug, Default)]
954struct Subscribers(Vec<async_channel::Sender<Event>>);
955
956impl Subscribers {
957    fn subscribe(&mut self, sender: async_channel::Sender<Event>) {
958        self.0.push(sender)
959    }
960
961    async fn send(&mut self, event: Event) -> bool {
962        let futs = self.0.iter().map(|sender| sender.send(event.clone()));
963        let res = futures_buffered::join_all(futs).await;
964        // reverse the order so removing does not shift remaining indices
965        for (i, res) in res.into_iter().enumerate().rev() {
966            if res.is_err() {
967                self.0.remove(i);
968            }
969        }
970        !self.0.is_empty()
971    }
972}
973
974fn fmt_accept_peer(res: &Result<SyncFinished, AcceptError>) -> String {
975    match res {
976        Ok(res) => res.peer.fmt_short().to_string(),
977        Err(err) => err
978            .peer()
979            .map(|x| x.fmt_short().to_string())
980            .unwrap_or_else(|| "unknown".to_string()),
981    }
982}
983
984fn fmt_accept_namespace(res: &Result<SyncFinished, AcceptError>) -> String {
985    match res {
986        Ok(res) => res.namespace.fmt_short(),
987        Err(err) => err
988            .namespace()
989            .map(|x| x.fmt_short())
990            .unwrap_or_else(|| "unknown".to_string()),
991    }
992}
993
994#[cfg(test)]
995mod tests {
996    use super::*;
997
998    #[tokio::test]
999    async fn test_sync_remove() {
1000        let pk = PublicKey::from_bytes(&[1; 32]).unwrap();
1001        let (a_tx, a_rx) = async_channel::unbounded();
1002        let (b_tx, b_rx) = async_channel::unbounded();
1003        let mut subscribers = Subscribers::default();
1004        subscribers.subscribe(a_tx);
1005        subscribers.subscribe(b_tx);
1006        drop(a_rx);
1007        drop(b_rx);
1008        subscribers.send(Event::NeighborUp(pk)).await;
1009    }
1010}