Skip to main content

iroh_docs/engine/
live.rs

1#![allow(missing_docs)]
2
3use std::{
4    collections::{HashMap, HashSet},
5    sync::Arc,
6};
7
8use anyhow::{Context, Result};
9use futures_lite::FutureExt;
10use iroh::{address_lookup::memory::MemoryLookup, Endpoint, EndpointAddr, EndpointId, PublicKey};
11use iroh_blobs::{
12    api::{
13        blobs::BlobStatus,
14        downloader::{ContentDiscovery, DownloadRequest, Downloader, SplitStrategy},
15        Store,
16    },
17    Hash, HashAndFormat,
18};
19use iroh_gossip::net::Gossip;
20use n0_future::{task::JoinSet, time::SystemTime};
21use serde::{Deserialize, Serialize};
22use tokio::sync::{self, mpsc, oneshot};
23use tracing::{debug, error, info, instrument, trace, warn, Instrument, Span};
24
25// use super::gossip::{GossipActor, ToGossipActor};
26use super::state::{NamespaceStates, Origin, SyncReason};
27use crate::{
28    actor::{OpenOpts, SyncHandle},
29    engine::gossip::GossipState,
30    metrics::Metrics,
31    net::{
32        connect_and_sync, handle_connection, AbortReason, AcceptError, AcceptOutcome, ConnectError,
33        SyncFinished,
34    },
35    AuthorHeads, ContentStatus, NamespaceId, SignedEntry,
36};
37
38/// An iroh-docs operation
39///
40/// This is the message that is broadcast over iroh-gossip.
41#[derive(Debug, Clone, Serialize, Deserialize, strum::Display)]
42pub enum Op {
43    /// A new entry was inserted into the document.
44    Put(SignedEntry),
45    /// A peer now has content available for a hash.
46    ContentReady(Hash),
47    /// We synced with another peer, here's the news.
48    SyncReport(SyncReport),
49}
50
51/// Report of a successful sync with the new heads.
52#[derive(Debug, Clone, Serialize, Deserialize)]
53pub struct SyncReport {
54    namespace: NamespaceId,
55    /// Encoded [`AuthorHeads`]
56    heads: Vec<u8>,
57}
58
59/// Messages to the sync actor
60#[derive(derive_more::Debug, strum::Display)]
61pub enum ToLiveActor {
62    StartSync {
63        namespace: NamespaceId,
64        peers: Vec<EndpointAddr>,
65        #[debug("onsehot::Sender")]
66        reply: sync::oneshot::Sender<anyhow::Result<()>>,
67    },
68    Leave {
69        namespace: NamespaceId,
70        kill_subscribers: bool,
71        #[debug("onsehot::Sender")]
72        reply: sync::oneshot::Sender<anyhow::Result<()>>,
73    },
74    Shutdown {
75        reply: sync::oneshot::Sender<()>,
76    },
77    Subscribe {
78        namespace: NamespaceId,
79        #[debug("sender")]
80        sender: async_channel::Sender<Event>,
81        #[debug("oneshot::Sender")]
82        reply: sync::oneshot::Sender<Result<()>>,
83    },
84    HandleConnection {
85        conn: iroh::endpoint::Connection,
86    },
87    AcceptSyncRequest {
88        namespace: NamespaceId,
89        peer: PublicKey,
90        #[debug("oneshot::Sender")]
91        reply: sync::oneshot::Sender<AcceptOutcome>,
92    },
93
94    IncomingSyncReport {
95        from: PublicKey,
96        report: SyncReport,
97    },
98    NeighborContentReady {
99        namespace: NamespaceId,
100        node: PublicKey,
101        hash: Hash,
102    },
103    NeighborUp {
104        namespace: NamespaceId,
105        peer: PublicKey,
106    },
107    NeighborDown {
108        namespace: NamespaceId,
109        peer: PublicKey,
110    },
111}
112
113/// Events informing about actions of the live sync progress.
114#[derive(Serialize, Deserialize, Debug, Clone, Eq, PartialEq, strum::Display)]
115pub enum Event {
116    /// The content of an entry was downloaded and is now available at the local node
117    ContentReady {
118        /// The content hash of the newly available entry content
119        hash: Hash,
120    },
121    /// We have a new neighbor in the swarm.
122    NeighborUp(PublicKey),
123    /// We lost a neighbor in the swarm.
124    NeighborDown(PublicKey),
125    /// A set-reconciliation sync finished.
126    SyncFinished(SyncEvent),
127    /// All pending content is now ready.
128    ///
129    /// This event is only emitted after a sync completed and `Self::SyncFinished` was emitted at
130    /// least once. It signals that all currently pending downloads have been completed.
131    ///
132    /// Receiving this event does not guarantee that all content in the document is available. If
133    /// blobs failed to download, this event will still be emitted after all operations completed.
134    PendingContentReady,
135}
136
137type SyncConnectRes = (
138    NamespaceId,
139    PublicKey,
140    SyncReason,
141    Result<SyncFinished, ConnectError>,
142);
143type SyncAcceptRes = Result<SyncFinished, AcceptError>;
144type DownloadRes = (NamespaceId, Hash, Result<(), anyhow::Error>);
145
146// Currently peers might double-sync in both directions.
147pub struct LiveActor {
148    /// Receiver for actor messages.
149    inbox: mpsc::Receiver<ToLiveActor>,
150    sync: SyncHandle,
151    endpoint: Endpoint,
152    bao_store: Store,
153    downloader: Downloader,
154    memory_lookup: MemoryLookup,
155    replica_events_tx: async_channel::Sender<crate::Event>,
156    replica_events_rx: async_channel::Receiver<crate::Event>,
157
158    /// Send messages to self.
159    /// Note: Must not be used in methods called from `Self::run` directly to prevent deadlocks.
160    /// Only clone into newly spawned tasks.
161    sync_actor_tx: mpsc::Sender<ToLiveActor>,
162    gossip: GossipState,
163
164    /// Running sync futures (from connect).
165    running_sync_connect: JoinSet<SyncConnectRes>,
166    /// Running sync futures (from accept).
167    running_sync_accept: JoinSet<SyncAcceptRes>,
168    /// Running download futures.
169    download_tasks: JoinSet<DownloadRes>,
170    /// Content hashes which are wanted but not yet queued because no provider was found.
171    missing_hashes: HashSet<Hash>,
172    /// Content hashes queued in downloader.
173    queued_hashes: QueuedHashes,
174    /// Nodes known to have a hash
175    hash_providers: ProviderNodes,
176
177    /// Subscribers to actor events
178    subscribers: SubscribersMap,
179
180    /// Sync state per replica and peer
181    state: NamespaceStates,
182    metrics: Arc<Metrics>,
183}
184impl LiveActor {
185    /// Create the live actor.
186    #[allow(clippy::too_many_arguments)]
187    pub fn new(
188        sync: SyncHandle,
189        endpoint: Endpoint,
190        gossip: Gossip,
191        bao_store: Store,
192        downloader: Downloader,
193        inbox: mpsc::Receiver<ToLiveActor>,
194        sync_actor_tx: mpsc::Sender<ToLiveActor>,
195        metrics: Arc<Metrics>,
196    ) -> Result<Self> {
197        let (replica_events_tx, replica_events_rx) = async_channel::bounded(1024);
198        let gossip_state = GossipState::new(gossip, sync.clone(), sync_actor_tx.clone());
199        let memory_lookup = MemoryLookup::new();
200        endpoint.address_lookup()?.add(memory_lookup.clone());
201        Ok(Self {
202            inbox,
203            sync,
204            replica_events_rx,
205            replica_events_tx,
206            endpoint,
207            memory_lookup,
208            gossip: gossip_state,
209            bao_store,
210            downloader,
211            sync_actor_tx,
212            running_sync_connect: Default::default(),
213            running_sync_accept: Default::default(),
214            subscribers: Default::default(),
215            download_tasks: Default::default(),
216            state: Default::default(),
217            missing_hashes: Default::default(),
218            queued_hashes: Default::default(),
219            hash_providers: Default::default(),
220            metrics,
221        })
222    }
223
224    /// Run the actor loop.
225    pub async fn run(mut self) -> Result<()> {
226        let shutdown_reply = self.run_inner().await;
227        if let Err(err) = self.shutdown().await {
228            error!(?err, "Error during shutdown");
229        }
230        drop(self);
231        match shutdown_reply {
232            Ok(reply) => {
233                reply.send(()).ok();
234                Ok(())
235            }
236            Err(err) => Err(err),
237        }
238    }
239
240    async fn run_inner(&mut self) -> Result<oneshot::Sender<()>> {
241        let mut i = 0;
242        loop {
243            i += 1;
244            trace!(?i, "tick wait");
245            self.metrics.doc_live_tick_main.inc();
246            tokio::select! {
247                biased;
248                msg = self.inbox.recv() => {
249                    let msg = msg.context("to_actor closed")?;
250                    trace!(?i, %msg, "tick: to_actor");
251                    self.metrics.doc_live_tick_actor.inc();
252                    match msg {
253                        ToLiveActor::Shutdown { reply } => {
254                            break Ok(reply);
255                        }
256                        msg => {
257                            self.on_actor_message(msg).await.context("on_actor_message")?;
258                        }
259                    }
260                }
261                event = self.replica_events_rx.recv() => {
262                    trace!(?i, "tick: replica_event");
263                    self.metrics.doc_live_tick_replica_event.inc();
264                    let event = event.context("replica_events closed")?;
265                    if let Err(err) = self.on_replica_event(event).await {
266                        error!(?err, "Failed to process replica event");
267                    }
268                }
269                Some(res) = self.running_sync_connect.join_next(), if !self.running_sync_connect.is_empty() => {
270                    trace!(?i, "tick: running_sync_connect");
271                    self.metrics.doc_live_tick_running_sync_connect.inc();
272                    let (namespace, peer, reason, res) = res.context("running_sync_connect closed")?;
273                    self.on_sync_via_connect_finished(namespace, peer, reason, res).await;
274
275                }
276                Some(res) = self.running_sync_accept.join_next(), if !self.running_sync_accept.is_empty() => {
277                    trace!(?i, "tick: running_sync_accept");
278                    self.metrics.doc_live_tick_running_sync_accept.inc();
279                    let res = res.context("running_sync_accept closed")?;
280                    self.on_sync_via_accept_finished(res).await;
281                }
282                Some(res) = self.download_tasks.join_next(), if !self.download_tasks.is_empty() => {
283                    trace!(?i, "tick: pending_downloads");
284                    self.metrics.doc_live_tick_pending_downloads.inc();
285                    let (namespace, hash, res) = res.context("pending_downloads closed")?;
286                    self.on_download_ready(namespace, hash, res).await;
287                }
288                res = self.gossip.progress(), if !self.gossip.is_empty() => {
289                    if let Err(error) = res {
290                        warn!(?error, "gossip state failed");
291                    }
292                }
293            }
294        }
295    }
296
297    async fn on_actor_message(&mut self, msg: ToLiveActor) -> anyhow::Result<bool> {
298        match msg {
299            ToLiveActor::Shutdown { .. } => {
300                unreachable!("handled in run");
301            }
302            ToLiveActor::IncomingSyncReport { from, report } => {
303                self.on_sync_report(from, report).await
304            }
305            ToLiveActor::NeighborUp { namespace, peer } => {
306                debug!(peer = %peer.fmt_short(), namespace = %namespace.fmt_short(), "neighbor up");
307                self.sync_with_peer(namespace, peer, SyncReason::NewNeighbor);
308                self.subscribers
309                    .send(&namespace, Event::NeighborUp(peer))
310                    .await;
311            }
312            ToLiveActor::NeighborDown { namespace, peer } => {
313                debug!(peer = %peer.fmt_short(), namespace = %namespace.fmt_short(), "neighbor down");
314                self.subscribers
315                    .send(&namespace, Event::NeighborDown(peer))
316                    .await;
317            }
318            ToLiveActor::StartSync {
319                namespace,
320                peers,
321                reply,
322            } => {
323                let res = self.start_sync(namespace, peers).await;
324                reply.send(res).ok();
325            }
326            ToLiveActor::Leave {
327                namespace,
328                kill_subscribers,
329                reply,
330            } => {
331                let res = self.leave(namespace, kill_subscribers).await;
332                reply.send(res).ok();
333            }
334            ToLiveActor::Subscribe {
335                namespace,
336                sender,
337                reply,
338            } => {
339                self.subscribers.subscribe(namespace, sender);
340                reply.send(Ok(())).ok();
341            }
342            ToLiveActor::HandleConnection { conn } => {
343                self.handle_connection(conn).await;
344            }
345            ToLiveActor::AcceptSyncRequest {
346                namespace,
347                peer,
348                reply,
349            } => {
350                let outcome = self.accept_sync_request(namespace, peer);
351                reply.send(outcome).ok();
352            }
353            ToLiveActor::NeighborContentReady {
354                namespace,
355                node,
356                hash,
357            } => {
358                self.on_neighbor_content_ready(namespace, node, hash).await;
359            }
360        };
361        Ok(true)
362    }
363
364    #[instrument("connect", skip_all, fields(peer = %peer.fmt_short(), namespace = %namespace.fmt_short()))]
365    fn sync_with_peer(&mut self, namespace: NamespaceId, peer: PublicKey, reason: SyncReason) {
366        if !self.state.start_connect(&namespace, peer, reason) {
367            return;
368        }
369        let endpoint = self.endpoint.clone();
370        let sync = self.sync.clone();
371        let metrics = self.metrics.clone();
372        let fut = async move {
373            let res = connect_and_sync(
374                &endpoint,
375                &sync,
376                namespace,
377                EndpointAddr::new(peer),
378                Some(&metrics),
379            )
380            .await;
381            (namespace, peer, reason, res)
382        }
383        .instrument(Span::current());
384        self.running_sync_connect.spawn(fut);
385    }
386
387    async fn shutdown(&mut self) -> anyhow::Result<()> {
388        // cancel all subscriptions
389        self.subscribers.clear();
390        let (gossip_shutdown_res, _store) = tokio::join!(
391            // quit the gossip topics and task loops.
392            self.gossip.shutdown(),
393            // shutdown sync thread
394            self.sync.shutdown()
395        );
396        gossip_shutdown_res?;
397        // TODO: abort_all and join_next all JoinSets to catch panics
398        // (they are aborted on drop, but that swallows panics)
399        Ok(())
400    }
401
402    async fn start_sync(
403        &mut self,
404        namespace: NamespaceId,
405        mut peers: Vec<EndpointAddr>,
406    ) -> Result<()> {
407        debug!(?namespace, peers = peers.len(), "start sync");
408        // update state to allow sync
409        if !self.state.is_syncing(&namespace) {
410            let opts = OpenOpts::default()
411                .sync()
412                .subscribe(self.replica_events_tx.clone());
413            self.sync.open(namespace, opts).await?;
414            self.state.insert(namespace);
415        }
416        // add the peers stored for this document
417        match self.sync.get_sync_peers(namespace).await {
418            Ok(None) => {
419                // no peers for this document
420            }
421            Ok(Some(known_useful_peers)) => {
422                let as_node_addr = known_useful_peers.into_iter().filter_map(|peer_id_bytes| {
423                    // peers are stored as bytes, don't fail the operation if they can't be
424                    // decoded: simply ignore the peer
425                    match PublicKey::from_bytes(&peer_id_bytes) {
426                        Ok(public_key) => Some(EndpointAddr::new(public_key)),
427                        Err(_signing_error) => {
428                            warn!("potential db corruption: peers per doc can't be decoded");
429                            None
430                        }
431                    }
432                });
433                peers.extend(as_node_addr);
434            }
435            Err(e) => {
436                // try to continue if peers per doc can't be read since they are not vital for sync
437                warn!(%e, "db error reading peers per document")
438            }
439        }
440        self.join_peers(namespace, peers).await?;
441        Ok(())
442    }
443
444    async fn leave(
445        &mut self,
446        namespace: NamespaceId,
447        kill_subscribers: bool,
448    ) -> anyhow::Result<()> {
449        // self.subscribers.remove(&namespace);
450        if self.state.remove(&namespace) {
451            self.sync.set_sync(namespace, false).await?;
452            self.sync
453                .unsubscribe(namespace, self.replica_events_tx.clone())
454                .await?;
455            self.sync.close(namespace).await?;
456            self.gossip.quit(&namespace);
457        }
458        if kill_subscribers {
459            self.subscribers.remove(&namespace);
460        }
461        Ok(())
462    }
463
464    async fn join_peers(&mut self, namespace: NamespaceId, peers: Vec<EndpointAddr>) -> Result<()> {
465        let mut peer_ids = Vec::new();
466
467        // add addresses of peers to our endpoint address book
468        for peer in peers.into_iter() {
469            let peer_id = peer.id;
470            // adding a node address without any addressing info fails with an error,
471            // but we still want to include those peers because endpoint address lookup might find addresses for them
472            if !peer.is_empty() {
473                self.memory_lookup.add_endpoint_info(peer);
474            }
475            peer_ids.push(peer_id);
476        }
477
478        // tell gossip to join
479        self.gossip.join(namespace, peer_ids.clone()).await?;
480
481        if !peer_ids.is_empty() {
482            // trigger initial sync with initial peers
483            for peer in peer_ids {
484                self.sync_with_peer(namespace, peer, SyncReason::DirectJoin);
485            }
486        }
487        Ok(())
488    }
489
490    #[instrument("connect", skip_all, fields(peer = %peer.fmt_short(), namespace = %namespace.fmt_short()))]
491    async fn on_sync_via_connect_finished(
492        &mut self,
493        namespace: NamespaceId,
494        peer: PublicKey,
495        reason: SyncReason,
496        result: Result<SyncFinished, ConnectError>,
497    ) {
498        match result {
499            Err(ConnectError::RemoteAbort(AbortReason::AlreadySyncing)) => {
500                debug!(?reason, "remote abort, already syncing");
501            }
502            res => {
503                self.on_sync_finished(
504                    namespace,
505                    peer,
506                    Origin::Connect(reason),
507                    res.map_err(Into::into),
508                )
509                .await
510            }
511        }
512    }
513
514    #[instrument("accept", skip_all, fields(peer = %fmt_accept_peer(&res), namespace = %fmt_accept_namespace(&res)))]
515    async fn on_sync_via_accept_finished(&mut self, res: Result<SyncFinished, AcceptError>) {
516        match res {
517            Ok(state) => {
518                self.on_sync_finished(state.namespace, state.peer, Origin::Accept, Ok(state))
519                    .await
520            }
521            Err(AcceptError::Abort { reason, .. }) if reason == AbortReason::AlreadySyncing => {
522                // In case we aborted the sync: do nothing (our outgoing sync is in progress)
523                debug!(?reason, "aborted by us");
524            }
525            Err(err) => {
526                if let (Some(peer), Some(namespace)) = (err.peer(), err.namespace()) {
527                    self.on_sync_finished(
528                        namespace,
529                        peer,
530                        Origin::Accept,
531                        Err(anyhow::Error::from(err)),
532                    )
533                    .await;
534                } else {
535                    debug!(?err, "failed before reading the first message");
536                }
537            }
538        }
539    }
540
541    async fn on_sync_finished(
542        &mut self,
543        namespace: NamespaceId,
544        peer: PublicKey,
545        origin: Origin,
546        result: Result<SyncFinished>,
547    ) {
548        match &result {
549            Err(ref err) => {
550                warn!(?origin, ?err, "sync failed");
551            }
552            Ok(ref details) => {
553                info!(
554                    sent = %details.outcome.num_sent,
555                    recv = %details.outcome.num_recv,
556                    t_connect = ?details.timings.connect,
557                    t_process = ?details.timings.process,
558                    "sync finished",
559                );
560
561                // register the peer as useful for the document
562                if let Err(e) = self
563                    .sync
564                    .register_useful_peer(namespace, *peer.as_bytes())
565                    .await
566                {
567                    debug!(%e, "failed to register peer for document")
568                }
569
570                // broadcast a sync report to our neighbors, but only if we received new entries.
571                if details.outcome.num_recv > 0 {
572                    info!("broadcast sync report to neighbors");
573                    match details
574                        .outcome
575                        .heads_received
576                        .encode(Some(self.gossip.max_message_size()))
577                    {
578                        Err(err) => warn!(?err, "Failed to encode author heads for sync report"),
579                        Ok(heads) => {
580                            let report = SyncReport { namespace, heads };
581                            self.broadcast_neighbors(namespace, &Op::SyncReport(report))
582                                .await;
583                        }
584                    }
585                }
586            }
587        };
588
589        let result_for_event = match &result {
590            Ok(details) => Ok(details.into()),
591            Err(err) => Err(err.to_string()),
592        };
593
594        let Some((started, resync)) = self.state.finish(&namespace, peer, &origin, result) else {
595            return;
596        };
597
598        let ev = SyncEvent {
599            peer,
600            origin,
601            result: result_for_event,
602            finished: SystemTime::now(),
603            started,
604        };
605        self.subscribers
606            .send(&namespace, Event::SyncFinished(ev))
607            .await;
608
609        // Check if there are queued pending content hashes for this namespace.
610        // If hashes are pending, mark this namespace to be eglible for a PendingContentReady event once all
611        // pending hashes have completed downloading.
612        // If no hashes are pending, emit the PendingContentReady event right away. The next
613        // PendingContentReady event may then only be emitted after the next sync completes.
614        if self.queued_hashes.contains_namespace(&namespace) {
615            self.state.set_may_emit_ready(&namespace, true);
616        } else {
617            self.subscribers
618                .send(&namespace, Event::PendingContentReady)
619                .await;
620            self.state.set_may_emit_ready(&namespace, false);
621        }
622
623        if resync {
624            self.sync_with_peer(namespace, peer, SyncReason::Resync);
625        }
626    }
627
628    async fn broadcast_neighbors(&mut self, namespace: NamespaceId, op: &Op) {
629        if !self.state.is_syncing(&namespace) {
630            return;
631        }
632
633        let msg = match postcard::to_stdvec(op) {
634            Ok(msg) => msg,
635            Err(err) => {
636                error!(?err, ?op, "Failed to serialize message:");
637                return;
638            }
639        };
640        // TODO: We should debounce and merge these neighbor announcements likely.
641        self.gossip
642            .broadcast_neighbors(&namespace, msg.into())
643            .await;
644    }
645
646    async fn on_download_ready(
647        &mut self,
648        namespace: NamespaceId,
649        hash: Hash,
650        res: Result<(), anyhow::Error>,
651    ) {
652        let completed_namespaces = self.queued_hashes.remove_hash(&hash);
653        debug!(namespace=%namespace.fmt_short(), success=res.is_ok(), completed_namespaces=completed_namespaces.len(), "download ready");
654        if res.is_ok() {
655            self.subscribers
656                .send(&namespace, Event::ContentReady { hash })
657                .await;
658            // Inform our neighbors that we have new content ready.
659            self.broadcast_neighbors(namespace, &Op::ContentReady(hash))
660                .await;
661        } else {
662            self.missing_hashes.insert(hash);
663        }
664        for namespace in completed_namespaces.iter() {
665            if let Some(true) = self.state.may_emit_ready(namespace) {
666                self.subscribers
667                    .send(namespace, Event::PendingContentReady)
668                    .await;
669            }
670        }
671    }
672
673    async fn on_neighbor_content_ready(
674        &mut self,
675        namespace: NamespaceId,
676        node: EndpointId,
677        hash: Hash,
678    ) {
679        self.start_download(namespace, hash, node, true).await;
680    }
681
682    #[instrument("on_sync_report", skip_all, fields(peer = %from.fmt_short(), namespace = %report.namespace.fmt_short()))]
683    async fn on_sync_report(&mut self, from: PublicKey, report: SyncReport) {
684        let namespace = report.namespace;
685        if !self.state.is_syncing(&namespace) {
686            return;
687        }
688        let heads = match AuthorHeads::decode(&report.heads) {
689            Ok(heads) => heads,
690            Err(err) => {
691                warn!(?err, "failed to decode AuthorHeads");
692                return;
693            }
694        };
695        match self.sync.has_news_for_us(report.namespace, heads).await {
696            Ok(Some(updated_authors)) => {
697                info!(%updated_authors, "news reported: sync now");
698                self.sync_with_peer(report.namespace, from, SyncReason::SyncReport);
699            }
700            Ok(None) => {
701                debug!("no news reported: nothing to do");
702            }
703            Err(err) => {
704                warn!("sync actor error: {err:?}");
705            }
706        }
707    }
708
709    async fn on_replica_event(&mut self, event: crate::Event) -> Result<()> {
710        match event {
711            crate::Event::LocalInsert { namespace, entry } => {
712                debug!(namespace=%namespace.fmt_short(), "replica event: LocalInsert");
713                // A new entry was inserted locally. Broadcast a gossip message.
714                if self.state.is_syncing(&namespace) {
715                    let op = Op::Put(entry.clone());
716                    let message = postcard::to_stdvec(&op)?.into();
717                    self.gossip.broadcast(&namespace, message).await;
718                }
719            }
720            crate::Event::RemoteInsert {
721                namespace,
722                entry,
723                from,
724                should_download,
725                remote_content_status,
726            } => {
727                debug!(namespace=%namespace.fmt_short(), "replica event: RemoteInsert");
728                // A new entry was inserted from initial sync or gossip. Queue downloading the
729                // content.
730                if should_download {
731                    let hash = entry.content_hash();
732                    if matches!(remote_content_status, ContentStatus::Complete) {
733                        let node_id = PublicKey::from_bytes(&from)?;
734                        self.start_download(namespace, hash, node_id, false).await;
735                    } else {
736                        self.missing_hashes.insert(hash);
737                    }
738                }
739            }
740        }
741
742        Ok(())
743    }
744
745    async fn start_download(
746        &mut self,
747        namespace: NamespaceId,
748        hash: Hash,
749        node: PublicKey,
750        only_if_missing: bool,
751    ) {
752        let entry_status = self.bao_store.blobs().status(hash).await;
753        if matches!(entry_status, Ok(BlobStatus::Complete { .. })) {
754            self.missing_hashes.remove(&hash);
755            return;
756        }
757        self.hash_providers
758            .0
759            .lock()
760            .expect("poisoned")
761            .entry(hash)
762            .or_default()
763            .insert(node);
764        if self.queued_hashes.contains_hash(&hash) {
765            self.queued_hashes.insert(hash, namespace);
766        } else if !only_if_missing || self.missing_hashes.contains(&hash) {
767            let req = DownloadRequest::new(
768                HashAndFormat::raw(hash),
769                self.hash_providers.clone(),
770                SplitStrategy::None,
771            );
772            let handle = self.downloader.download_with_opts(req);
773
774            self.queued_hashes.insert(hash, namespace);
775            self.missing_hashes.remove(&hash);
776            self.download_tasks.spawn(async move {
777                (
778                    namespace,
779                    hash,
780                    handle.await.map_err(|e| anyhow::anyhow!(e)),
781                )
782            });
783        }
784    }
785
786    #[instrument("accept", skip_all)]
787    pub async fn handle_connection(&mut self, conn: iroh::endpoint::Connection) {
788        let to_actor_tx = self.sync_actor_tx.clone();
789        let accept_request_cb = move |namespace, peer| {
790            let to_actor_tx = to_actor_tx.clone();
791            async move {
792                let (reply_tx, reply_rx) = oneshot::channel();
793                to_actor_tx
794                    .send(ToLiveActor::AcceptSyncRequest {
795                        namespace,
796                        peer,
797                        reply: reply_tx,
798                    })
799                    .await
800                    .ok();
801                match reply_rx.await {
802                    Ok(outcome) => outcome,
803                    Err(err) => {
804                        warn!(
805                            "accept request callback failed to retrieve reply from actor: {err:?}"
806                        );
807                        AcceptOutcome::Reject(AbortReason::InternalServerError)
808                    }
809                }
810            }
811            .boxed()
812        };
813        debug!("incoming connection");
814        let sync = self.sync.clone();
815        let metrics = self.metrics.clone();
816        self.running_sync_accept.spawn(
817            async move { handle_connection(sync, conn, accept_request_cb, Some(&metrics)).await }
818                .instrument(Span::current()),
819        );
820    }
821
822    pub fn accept_sync_request(
823        &mut self,
824        namespace: NamespaceId,
825        peer: PublicKey,
826    ) -> AcceptOutcome {
827        self.state
828            .accept_request(&self.endpoint.id(), &namespace, peer)
829    }
830}
831
832/// Event emitted when a sync operation completes
833#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)]
834pub struct SyncEvent {
835    /// Peer we synced with
836    pub peer: PublicKey,
837    /// Origin of the sync exchange
838    pub origin: Origin,
839    /// Timestamp when the sync started
840    pub finished: SystemTime,
841    /// Timestamp when the sync finished
842    pub started: SystemTime,
843    /// Result of the sync operation
844    pub result: std::result::Result<SyncDetails, String>,
845}
846
847#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)]
848pub struct SyncDetails {
849    /// Number of entries received
850    pub entries_received: usize,
851    /// Number of entries sent
852    pub entries_sent: usize,
853}
854
855impl From<&SyncFinished> for SyncDetails {
856    fn from(value: &SyncFinished) -> Self {
857        Self {
858            entries_received: value.outcome.num_recv,
859            entries_sent: value.outcome.num_sent,
860        }
861    }
862}
863
864#[derive(Debug, Default)]
865struct SubscribersMap(HashMap<NamespaceId, Subscribers>);
866
867impl SubscribersMap {
868    fn subscribe(&mut self, namespace: NamespaceId, sender: async_channel::Sender<Event>) {
869        self.0.entry(namespace).or_default().subscribe(sender);
870    }
871
872    async fn send(&mut self, namespace: &NamespaceId, event: Event) -> bool {
873        debug!(namespace=%namespace.fmt_short(), %event, "emit event");
874        let Some(subscribers) = self.0.get_mut(namespace) else {
875            return false;
876        };
877
878        if !subscribers.send(event).await {
879            self.0.remove(namespace);
880        }
881        true
882    }
883
884    fn remove(&mut self, namespace: &NamespaceId) {
885        self.0.remove(namespace);
886    }
887
888    fn clear(&mut self) {
889        self.0.clear();
890    }
891}
892
893#[derive(Debug, Default)]
894struct QueuedHashes {
895    by_hash: HashMap<Hash, HashSet<NamespaceId>>,
896    by_namespace: HashMap<NamespaceId, HashSet<Hash>>,
897}
898
899#[derive(Debug, Clone, Default)]
900struct ProviderNodes(Arc<std::sync::Mutex<HashMap<Hash, HashSet<EndpointId>>>>);
901
902impl ContentDiscovery for ProviderNodes {
903    fn find_providers(&self, hash: HashAndFormat) -> n0_future::stream::Boxed<EndpointId> {
904        let nodes = self
905            .0
906            .lock()
907            .expect("poisoned")
908            .get(&hash.hash)
909            .into_iter()
910            .flatten()
911            .cloned()
912            .collect::<Vec<_>>();
913        Box::pin(n0_future::stream::iter(nodes))
914    }
915}
916
917impl QueuedHashes {
918    fn insert(&mut self, hash: Hash, namespace: NamespaceId) {
919        self.by_hash.entry(hash).or_default().insert(namespace);
920        self.by_namespace.entry(namespace).or_default().insert(hash);
921    }
922
923    /// Remove a hash from the set of queued hashes.
924    ///
925    /// Returns a list of namespaces that are now complete (have no queued hashes anymore).
926    fn remove_hash(&mut self, hash: &Hash) -> Vec<NamespaceId> {
927        let namespaces = self.by_hash.remove(hash).unwrap_or_default();
928        let mut removed_namespaces = vec![];
929        for namespace in namespaces {
930            if let Some(hashes) = self.by_namespace.get_mut(&namespace) {
931                hashes.remove(hash);
932                if hashes.is_empty() {
933                    self.by_namespace.remove(&namespace);
934                    removed_namespaces.push(namespace);
935                }
936            }
937        }
938        removed_namespaces
939    }
940
941    fn contains_hash(&self, hash: &Hash) -> bool {
942        self.by_hash.contains_key(hash)
943    }
944
945    fn contains_namespace(&self, namespace: &NamespaceId) -> bool {
946        self.by_namespace.contains_key(namespace)
947    }
948}
949
950#[derive(Debug, Default)]
951struct Subscribers(Vec<async_channel::Sender<Event>>);
952
953impl Subscribers {
954    fn subscribe(&mut self, sender: async_channel::Sender<Event>) {
955        self.0.push(sender)
956    }
957
958    async fn send(&mut self, event: Event) -> bool {
959        let futs = self.0.iter().map(|sender| sender.send(event.clone()));
960        let res = futures_buffered::join_all(futs).await;
961        // reverse the order so removing does not shift remaining indices
962        for (i, res) in res.into_iter().enumerate().rev() {
963            if res.is_err() {
964                self.0.remove(i);
965            }
966        }
967        !self.0.is_empty()
968    }
969}
970
971fn fmt_accept_peer(res: &Result<SyncFinished, AcceptError>) -> String {
972    match res {
973        Ok(res) => res.peer.fmt_short().to_string(),
974        Err(err) => err
975            .peer()
976            .map(|x| x.fmt_short().to_string())
977            .unwrap_or_else(|| "unknown".to_string()),
978    }
979}
980
981fn fmt_accept_namespace(res: &Result<SyncFinished, AcceptError>) -> String {
982    match res {
983        Ok(res) => res.namespace.fmt_short(),
984        Err(err) => err
985            .namespace()
986            .map(|x| x.fmt_short())
987            .unwrap_or_else(|| "unknown".to_string()),
988    }
989}
990
991#[cfg(test)]
992mod tests {
993    use super::*;
994
995    #[tokio::test]
996    async fn test_sync_remove() {
997        let pk = PublicKey::from_bytes(&[1; 32]).unwrap();
998        let (a_tx, a_rx) = async_channel::unbounded();
999        let (b_tx, b_rx) = async_channel::unbounded();
1000        let mut subscribers = Subscribers::default();
1001        subscribers.subscribe(a_tx);
1002        subscribers.subscribe(b_tx);
1003        drop(a_rx);
1004        drop(b_rx);
1005        subscribers.send(Event::NeighborUp(pk)).await;
1006    }
1007}