iroh_docs/engine/
live.rs

1#![allow(missing_docs)]
2
3use std::{
4    collections::{HashMap, HashSet},
5    sync::Arc,
6    time::SystemTime,
7};
8
9use anyhow::{Context, Result};
10use futures_lite::FutureExt;
11use iroh::{Endpoint, NodeAddr, NodeId, PublicKey};
12use iroh_blobs::{
13    downloader::{DownloadError, DownloadRequest, Downloader},
14    get::Stats,
15    store::EntryStatus,
16    Hash, HashAndFormat,
17};
18use iroh_gossip::net::Gossip;
19use serde::{Deserialize, Serialize};
20use tokio::{
21    sync::{self, mpsc, oneshot},
22    task::JoinSet,
23};
24use tracing::{debug, error, info, instrument, trace, warn, Instrument, Span};
25
26// use super::gossip::{GossipActor, ToGossipActor};
27use super::state::{NamespaceStates, Origin, SyncReason};
28use crate::{
29    actor::{OpenOpts, SyncHandle},
30    engine::gossip::GossipState,
31    metrics::Metrics,
32    net::{
33        connect_and_sync, handle_connection, AbortReason, AcceptError, AcceptOutcome, ConnectError,
34        SyncFinished,
35    },
36    AuthorHeads, ContentStatus, NamespaceId, SignedEntry,
37};
38
39/// Name used for logging when new node addresses are added from the docs engine.
40const SOURCE_NAME: &str = "docs_engine";
41
42/// An iroh-docs operation
43///
44/// This is the message that is broadcast over iroh-gossip.
45#[derive(Debug, Clone, Serialize, Deserialize, strum::Display)]
46pub enum Op {
47    /// A new entry was inserted into the document.
48    Put(SignedEntry),
49    /// A peer now has content available for a hash.
50    ContentReady(Hash),
51    /// We synced with another peer, here's the news.
52    SyncReport(SyncReport),
53}
54
55/// Report of a successful sync with the new heads.
56#[derive(Debug, Clone, Serialize, Deserialize)]
57pub struct SyncReport {
58    namespace: NamespaceId,
59    /// Encoded [`AuthorHeads`]
60    heads: Vec<u8>,
61}
62
63/// Messages to the sync actor
64#[derive(derive_more::Debug, strum::Display)]
65pub enum ToLiveActor {
66    StartSync {
67        namespace: NamespaceId,
68        peers: Vec<NodeAddr>,
69        #[debug("onsehot::Sender")]
70        reply: sync::oneshot::Sender<anyhow::Result<()>>,
71    },
72    Leave {
73        namespace: NamespaceId,
74        kill_subscribers: bool,
75        #[debug("onsehot::Sender")]
76        reply: sync::oneshot::Sender<anyhow::Result<()>>,
77    },
78    Shutdown {
79        reply: sync::oneshot::Sender<()>,
80    },
81    Subscribe {
82        namespace: NamespaceId,
83        #[debug("sender")]
84        sender: async_channel::Sender<Event>,
85        #[debug("oneshot::Sender")]
86        reply: sync::oneshot::Sender<Result<()>>,
87    },
88    HandleConnection {
89        conn: iroh::endpoint::Connection,
90    },
91    AcceptSyncRequest {
92        namespace: NamespaceId,
93        peer: PublicKey,
94        #[debug("oneshot::Sender")]
95        reply: sync::oneshot::Sender<AcceptOutcome>,
96    },
97
98    IncomingSyncReport {
99        from: PublicKey,
100        report: SyncReport,
101    },
102    NeighborContentReady {
103        namespace: NamespaceId,
104        node: PublicKey,
105        hash: Hash,
106    },
107    NeighborUp {
108        namespace: NamespaceId,
109        peer: PublicKey,
110    },
111    NeighborDown {
112        namespace: NamespaceId,
113        peer: PublicKey,
114    },
115}
116
117/// Events informing about actions of the live sync progress.
118#[derive(Serialize, Deserialize, Debug, Clone, Eq, PartialEq, strum::Display)]
119pub enum Event {
120    /// The content of an entry was downloaded and is now available at the local node
121    ContentReady {
122        /// The content hash of the newly available entry content
123        hash: Hash,
124    },
125    /// We have a new neighbor in the swarm.
126    NeighborUp(PublicKey),
127    /// We lost a neighbor in the swarm.
128    NeighborDown(PublicKey),
129    /// A set-reconciliation sync finished.
130    SyncFinished(SyncEvent),
131    /// All pending content is now ready.
132    ///
133    /// This event is only emitted after a sync completed and `Self::SyncFinished` was emitted at
134    /// least once. It signals that all currently pending downloads have been completed.
135    ///
136    /// Receiving this event does not guarantee that all content in the document is available. If
137    /// blobs failed to download, this event will still be emitted after all operations completed.
138    PendingContentReady,
139}
140
141type SyncConnectRes = (
142    NamespaceId,
143    PublicKey,
144    SyncReason,
145    Result<SyncFinished, ConnectError>,
146);
147type SyncAcceptRes = Result<SyncFinished, AcceptError>;
148type DownloadRes = (NamespaceId, Hash, Result<Stats, DownloadError>);
149
150// Currently peers might double-sync in both directions.
151pub struct LiveActor<B: iroh_blobs::store::Store> {
152    /// Receiver for actor messages.
153    inbox: mpsc::Receiver<ToLiveActor>,
154    sync: SyncHandle,
155    endpoint: Endpoint,
156    bao_store: B,
157    downloader: Downloader,
158    replica_events_tx: async_channel::Sender<crate::Event>,
159    replica_events_rx: async_channel::Receiver<crate::Event>,
160
161    /// Send messages to self.
162    /// Note: Must not be used in methods called from `Self::run` directly to prevent deadlocks.
163    /// Only clone into newly spawned tasks.
164    sync_actor_tx: mpsc::Sender<ToLiveActor>,
165    gossip: GossipState,
166
167    /// Running sync futures (from connect).
168    running_sync_connect: JoinSet<SyncConnectRes>,
169    /// Running sync futures (from accept).
170    running_sync_accept: JoinSet<SyncAcceptRes>,
171    /// Running download futures.
172    download_tasks: JoinSet<DownloadRes>,
173    /// Content hashes which are wanted but not yet queued because no provider was found.
174    missing_hashes: HashSet<Hash>,
175    /// Content hashes queued in downloader.
176    queued_hashes: QueuedHashes,
177
178    /// Subscribers to actor events
179    subscribers: SubscribersMap,
180
181    /// Sync state per replica and peer
182    state: NamespaceStates,
183    metrics: Arc<Metrics>,
184}
185impl<B: iroh_blobs::store::Store> LiveActor<B> {
186    /// Create the live actor.
187    #[allow(clippy::too_many_arguments)]
188    pub fn new(
189        sync: SyncHandle,
190        endpoint: Endpoint,
191        gossip: Gossip,
192        bao_store: B,
193        downloader: Downloader,
194        inbox: mpsc::Receiver<ToLiveActor>,
195        sync_actor_tx: mpsc::Sender<ToLiveActor>,
196        metrics: Arc<Metrics>,
197    ) -> Self {
198        let (replica_events_tx, replica_events_rx) = async_channel::bounded(1024);
199        let gossip_state = GossipState::new(gossip, sync.clone(), sync_actor_tx.clone());
200        Self {
201            inbox,
202            sync,
203            replica_events_rx,
204            replica_events_tx,
205            endpoint,
206            gossip: gossip_state,
207            bao_store,
208            downloader,
209            sync_actor_tx,
210            running_sync_connect: Default::default(),
211            running_sync_accept: Default::default(),
212            subscribers: Default::default(),
213            download_tasks: Default::default(),
214            state: Default::default(),
215            missing_hashes: Default::default(),
216            queued_hashes: Default::default(),
217            metrics,
218        }
219    }
220
221    /// Run the actor loop.
222    pub async fn run(mut self) -> Result<()> {
223        let shutdown_reply = self.run_inner().await;
224        if let Err(err) = self.shutdown().await {
225            error!(?err, "Error during shutdown");
226        }
227        drop(self);
228        match shutdown_reply {
229            Ok(reply) => {
230                reply.send(()).ok();
231                Ok(())
232            }
233            Err(err) => Err(err),
234        }
235    }
236
237    async fn run_inner(&mut self) -> Result<oneshot::Sender<()>> {
238        let mut i = 0;
239        loop {
240            i += 1;
241            trace!(?i, "tick wait");
242            self.metrics.doc_live_tick_main.inc();
243            tokio::select! {
244                biased;
245                msg = self.inbox.recv() => {
246                    let msg = msg.context("to_actor closed")?;
247                    trace!(?i, %msg, "tick: to_actor");
248                    self.metrics.doc_live_tick_actor.inc();
249                    match msg {
250                        ToLiveActor::Shutdown { reply } => {
251                            break Ok(reply);
252                        }
253                        msg => {
254                            self.on_actor_message(msg).await.context("on_actor_message")?;
255                        }
256                    }
257                }
258                event = self.replica_events_rx.recv() => {
259                    trace!(?i, "tick: replica_event");
260                    self.metrics.doc_live_tick_replica_event.inc();
261                    let event = event.context("replica_events closed")?;
262                    if let Err(err) = self.on_replica_event(event).await {
263                        error!(?err, "Failed to process replica event");
264                    }
265                }
266                Some(res) = self.running_sync_connect.join_next(), if !self.running_sync_connect.is_empty() => {
267                    trace!(?i, "tick: running_sync_connect");
268                    self.metrics.doc_live_tick_running_sync_connect.inc();
269                    let (namespace, peer, reason, res) = res.context("running_sync_connect closed")?;
270                    self.on_sync_via_connect_finished(namespace, peer, reason, res).await;
271
272                }
273                Some(res) = self.running_sync_accept.join_next(), if !self.running_sync_accept.is_empty() => {
274                    trace!(?i, "tick: running_sync_accept");
275                    self.metrics.doc_live_tick_running_sync_accept.inc();
276                    let res = res.context("running_sync_accept closed")?;
277                    self.on_sync_via_accept_finished(res).await;
278                }
279                Some(res) = self.download_tasks.join_next(), if !self.download_tasks.is_empty() => {
280                    trace!(?i, "tick: pending_downloads");
281                    self.metrics.doc_live_tick_pending_downloads.inc();
282                    let (namespace, hash, res) = res.context("pending_downloads closed")?;
283                    self.on_download_ready(namespace, hash, res).await;
284                }
285                res = self.gossip.progress(), if !self.gossip.is_empty() => {
286                    if let Err(error) = res {
287                        warn!(?error, "gossip state failed");
288                    }
289                }
290            }
291        }
292    }
293
294    async fn on_actor_message(&mut self, msg: ToLiveActor) -> anyhow::Result<bool> {
295        match msg {
296            ToLiveActor::Shutdown { .. } => {
297                unreachable!("handled in run");
298            }
299            ToLiveActor::IncomingSyncReport { from, report } => {
300                self.on_sync_report(from, report).await
301            }
302            ToLiveActor::NeighborUp { namespace, peer } => {
303                debug!(peer = %peer.fmt_short(), namespace = %namespace.fmt_short(), "neighbor up");
304                self.sync_with_peer(namespace, peer, SyncReason::NewNeighbor);
305                self.subscribers
306                    .send(&namespace, Event::NeighborUp(peer))
307                    .await;
308            }
309            ToLiveActor::NeighborDown { namespace, peer } => {
310                debug!(peer = %peer.fmt_short(), namespace = %namespace.fmt_short(), "neighbor down");
311                self.subscribers
312                    .send(&namespace, Event::NeighborDown(peer))
313                    .await;
314            }
315            ToLiveActor::StartSync {
316                namespace,
317                peers,
318                reply,
319            } => {
320                let res = self.start_sync(namespace, peers).await;
321                reply.send(res).ok();
322            }
323            ToLiveActor::Leave {
324                namespace,
325                kill_subscribers,
326                reply,
327            } => {
328                let res = self.leave(namespace, kill_subscribers).await;
329                reply.send(res).ok();
330            }
331            ToLiveActor::Subscribe {
332                namespace,
333                sender,
334                reply,
335            } => {
336                self.subscribers.subscribe(namespace, sender);
337                reply.send(Ok(())).ok();
338            }
339            ToLiveActor::HandleConnection { conn } => {
340                self.handle_connection(conn).await;
341            }
342            ToLiveActor::AcceptSyncRequest {
343                namespace,
344                peer,
345                reply,
346            } => {
347                let outcome = self.accept_sync_request(namespace, peer);
348                reply.send(outcome).ok();
349            }
350            ToLiveActor::NeighborContentReady {
351                namespace,
352                node,
353                hash,
354            } => {
355                self.on_neighbor_content_ready(namespace, node, hash).await;
356            }
357        };
358        Ok(true)
359    }
360
361    #[instrument("connect", skip_all, fields(peer = %peer.fmt_short(), namespace = %namespace.fmt_short()))]
362    fn sync_with_peer(&mut self, namespace: NamespaceId, peer: PublicKey, reason: SyncReason) {
363        if !self.state.start_connect(&namespace, peer, reason) {
364            return;
365        }
366        let endpoint = self.endpoint.clone();
367        let sync = self.sync.clone();
368        let metrics = self.metrics.clone();
369        let fut = async move {
370            let res = connect_and_sync(
371                &endpoint,
372                &sync,
373                namespace,
374                NodeAddr::new(peer),
375                Some(&metrics),
376            )
377            .await;
378            (namespace, peer, reason, res)
379        }
380        .instrument(Span::current());
381        self.running_sync_connect.spawn(fut);
382    }
383
384    async fn shutdown(&mut self) -> anyhow::Result<()> {
385        // cancel all subscriptions
386        self.subscribers.clear();
387        let (gossip_shutdown_res, _store) = tokio::join!(
388            // quit the gossip topics and task loops.
389            self.gossip.shutdown(),
390            // shutdown sync thread
391            self.sync.shutdown()
392        );
393        gossip_shutdown_res?;
394        // TODO: abort_all and join_next all JoinSets to catch panics
395        // (they are aborted on drop, but that swallows panics)
396        Ok(())
397    }
398
399    async fn start_sync(&mut self, namespace: NamespaceId, mut peers: Vec<NodeAddr>) -> Result<()> {
400        debug!(?namespace, peers = peers.len(), "start sync");
401        // update state to allow sync
402        if !self.state.is_syncing(&namespace) {
403            let opts = OpenOpts::default()
404                .sync()
405                .subscribe(self.replica_events_tx.clone());
406            self.sync.open(namespace, opts).await?;
407            self.state.insert(namespace);
408        }
409        // add the peers stored for this document
410        match self.sync.get_sync_peers(namespace).await {
411            Ok(None) => {
412                // no peers for this document
413            }
414            Ok(Some(known_useful_peers)) => {
415                let as_node_addr = known_useful_peers.into_iter().filter_map(|peer_id_bytes| {
416                    // peers are stored as bytes, don't fail the operation if they can't be
417                    // decoded: simply ignore the peer
418                    match PublicKey::from_bytes(&peer_id_bytes) {
419                        Ok(public_key) => Some(NodeAddr::new(public_key)),
420                        Err(_signing_error) => {
421                            warn!("potential db corruption: peers per doc can't be decoded");
422                            None
423                        }
424                    }
425                });
426                peers.extend(as_node_addr);
427            }
428            Err(e) => {
429                // try to continue if peers per doc can't be read since they are not vital for sync
430                warn!(%e, "db error reading peers per document")
431            }
432        }
433        self.join_peers(namespace, peers).await?;
434        Ok(())
435    }
436
437    async fn leave(
438        &mut self,
439        namespace: NamespaceId,
440        kill_subscribers: bool,
441    ) -> anyhow::Result<()> {
442        // self.subscribers.remove(&namespace);
443        if self.state.remove(&namespace) {
444            self.sync.set_sync(namespace, false).await?;
445            self.sync
446                .unsubscribe(namespace, self.replica_events_tx.clone())
447                .await?;
448            self.sync.close(namespace).await?;
449            self.gossip.quit(&namespace);
450        }
451        if kill_subscribers {
452            self.subscribers.remove(&namespace);
453        }
454        Ok(())
455    }
456
457    async fn join_peers(&mut self, namespace: NamespaceId, peers: Vec<NodeAddr>) -> Result<()> {
458        let mut peer_ids = Vec::new();
459
460        // add addresses of peers to our endpoint address book
461        for peer in peers.into_iter() {
462            let peer_id = peer.node_id;
463            // adding a node address without any addressing info fails with an error,
464            // but we still want to include those peers because node discovery might find addresses for them
465            if peer.is_empty() {
466                peer_ids.push(peer_id)
467            } else {
468                match self.endpoint.add_node_addr_with_source(peer, SOURCE_NAME) {
469                    Ok(()) => {
470                        peer_ids.push(peer_id);
471                    }
472                    Err(err) => {
473                        warn!(peer = %peer_id.fmt_short(), "failed to add known addrs: {err:?}");
474                    }
475                }
476            }
477        }
478
479        // tell gossip to join
480        self.gossip.join(namespace, peer_ids.clone()).await?;
481
482        if !peer_ids.is_empty() {
483            // trigger initial sync with initial peers
484            for peer in peer_ids {
485                self.sync_with_peer(namespace, peer, SyncReason::DirectJoin);
486            }
487        }
488        Ok(())
489    }
490
491    #[instrument("connect", skip_all, fields(peer = %peer.fmt_short(), namespace = %namespace.fmt_short()))]
492    async fn on_sync_via_connect_finished(
493        &mut self,
494        namespace: NamespaceId,
495        peer: PublicKey,
496        reason: SyncReason,
497        result: Result<SyncFinished, ConnectError>,
498    ) {
499        match result {
500            Err(ConnectError::RemoteAbort(AbortReason::AlreadySyncing)) => {
501                debug!(?reason, "remote abort, already syncing");
502            }
503            res => {
504                self.on_sync_finished(
505                    namespace,
506                    peer,
507                    Origin::Connect(reason),
508                    res.map_err(Into::into),
509                )
510                .await
511            }
512        }
513    }
514
515    #[instrument("accept", skip_all, fields(peer = %fmt_accept_peer(&res), namespace = %fmt_accept_namespace(&res)))]
516    async fn on_sync_via_accept_finished(&mut self, res: Result<SyncFinished, AcceptError>) {
517        match res {
518            Ok(state) => {
519                self.on_sync_finished(state.namespace, state.peer, Origin::Accept, Ok(state))
520                    .await
521            }
522            Err(AcceptError::Abort { reason, .. }) if reason == AbortReason::AlreadySyncing => {
523                // In case we aborted the sync: do nothing (our outgoing sync is in progress)
524                debug!(?reason, "aborted by us");
525            }
526            Err(err) => {
527                if let (Some(peer), Some(namespace)) = (err.peer(), err.namespace()) {
528                    self.on_sync_finished(
529                        namespace,
530                        peer,
531                        Origin::Accept,
532                        Err(anyhow::Error::from(err)),
533                    )
534                    .await;
535                } else {
536                    debug!(?err, "failed before reading the first message");
537                }
538            }
539        }
540    }
541
542    async fn on_sync_finished(
543        &mut self,
544        namespace: NamespaceId,
545        peer: PublicKey,
546        origin: Origin,
547        result: Result<SyncFinished>,
548    ) {
549        match &result {
550            Err(ref err) => {
551                warn!(?origin, ?err, "sync failed");
552            }
553            Ok(ref details) => {
554                info!(
555                    sent = %details.outcome.num_sent,
556                    recv = %details.outcome.num_recv,
557                    t_connect = ?details.timings.connect,
558                    t_process = ?details.timings.process,
559                    "sync finished",
560                );
561
562                // register the peer as useful for the document
563                if let Err(e) = self
564                    .sync
565                    .register_useful_peer(namespace, *peer.as_bytes())
566                    .await
567                {
568                    debug!(%e, "failed to register peer for document")
569                }
570
571                // broadcast a sync report to our neighbors, but only if we received new entries.
572                if details.outcome.num_recv > 0 {
573                    info!("broadcast sync report to neighbors");
574                    match details
575                        .outcome
576                        .heads_received
577                        .encode(Some(self.gossip.max_message_size()))
578                    {
579                        Err(err) => warn!(?err, "Failed to encode author heads for sync report"),
580                        Ok(heads) => {
581                            let report = SyncReport { namespace, heads };
582                            self.broadcast_neighbors(namespace, &Op::SyncReport(report))
583                                .await;
584                        }
585                    }
586                }
587            }
588        };
589
590        let result_for_event = match &result {
591            Ok(details) => Ok(details.into()),
592            Err(err) => Err(err.to_string()),
593        };
594
595        let Some((started, resync)) = self.state.finish(&namespace, peer, &origin, result) else {
596            return;
597        };
598
599        let ev = SyncEvent {
600            peer,
601            origin,
602            result: result_for_event,
603            finished: SystemTime::now(),
604            started,
605        };
606        self.subscribers
607            .send(&namespace, Event::SyncFinished(ev))
608            .await;
609
610        // Check if there are queued pending content hashes for this namespace.
611        // If hashes are pending, mark this namespace to be eglible for a PendingContentReady event once all
612        // pending hashes have completed downloading.
613        // If no hashes are pending, emit the PendingContentReady event right away. The next
614        // PendingContentReady event may then only be emitted after the next sync completes.
615        if self.queued_hashes.contains_namespace(&namespace) {
616            self.state.set_may_emit_ready(&namespace, true);
617        } else {
618            self.subscribers
619                .send(&namespace, Event::PendingContentReady)
620                .await;
621            self.state.set_may_emit_ready(&namespace, false);
622        }
623
624        if resync {
625            self.sync_with_peer(namespace, peer, SyncReason::Resync);
626        }
627    }
628
629    async fn broadcast_neighbors(&self, namespace: NamespaceId, op: &Op) {
630        if !self.state.is_syncing(&namespace) {
631            return;
632        }
633
634        let msg = match postcard::to_stdvec(op) {
635            Ok(msg) => msg,
636            Err(err) => {
637                error!(?err, ?op, "Failed to serialize message:");
638                return;
639            }
640        };
641        // TODO: We should debounce and merge these neighbor announcements likely.
642        self.gossip
643            .broadcast_neighbors(&namespace, msg.into())
644            .await;
645    }
646
647    async fn on_download_ready(
648        &mut self,
649        namespace: NamespaceId,
650        hash: Hash,
651        res: Result<Stats, DownloadError>,
652    ) {
653        let completed_namespaces = self.queued_hashes.remove_hash(&hash);
654        debug!(namespace=%namespace.fmt_short(), success=res.is_ok(), completed_namespaces=completed_namespaces.len(), "download ready");
655        if res.is_ok() {
656            self.subscribers
657                .send(&namespace, Event::ContentReady { hash })
658                .await;
659            // Inform our neighbors that we have new content ready.
660            self.broadcast_neighbors(namespace, &Op::ContentReady(hash))
661                .await;
662        } else {
663            self.missing_hashes.insert(hash);
664        }
665        for namespace in completed_namespaces.iter() {
666            if let Some(true) = self.state.may_emit_ready(namespace) {
667                self.subscribers
668                    .send(namespace, Event::PendingContentReady)
669                    .await;
670            }
671        }
672    }
673
674    async fn on_neighbor_content_ready(
675        &mut self,
676        namespace: NamespaceId,
677        node: NodeId,
678        hash: Hash,
679    ) {
680        self.start_download(namespace, hash, node, true).await;
681    }
682
683    #[instrument("on_sync_report", skip_all, fields(peer = %from.fmt_short(), namespace = %report.namespace.fmt_short()))]
684    async fn on_sync_report(&mut self, from: PublicKey, report: SyncReport) {
685        let namespace = report.namespace;
686        if !self.state.is_syncing(&namespace) {
687            return;
688        }
689        let heads = match AuthorHeads::decode(&report.heads) {
690            Ok(heads) => heads,
691            Err(err) => {
692                warn!(?err, "failed to decode AuthorHeads");
693                return;
694            }
695        };
696        match self.sync.has_news_for_us(report.namespace, heads).await {
697            Ok(Some(updated_authors)) => {
698                info!(%updated_authors, "news reported: sync now");
699                self.sync_with_peer(report.namespace, from, SyncReason::SyncReport);
700            }
701            Ok(None) => {
702                debug!("no news reported: nothing to do");
703            }
704            Err(err) => {
705                warn!("sync actor error: {err:?}");
706            }
707        }
708    }
709
710    async fn on_replica_event(&mut self, event: crate::Event) -> Result<()> {
711        match event {
712            crate::Event::LocalInsert { namespace, entry } => {
713                debug!(namespace=%namespace.fmt_short(), "replica event: LocalInsert");
714                // A new entry was inserted locally. Broadcast a gossip message.
715                if self.state.is_syncing(&namespace) {
716                    let op = Op::Put(entry.clone());
717                    let message = postcard::to_stdvec(&op)?.into();
718                    self.gossip.broadcast(&namespace, message).await;
719                }
720            }
721            crate::Event::RemoteInsert {
722                namespace,
723                entry,
724                from,
725                should_download,
726                remote_content_status,
727            } => {
728                debug!(namespace=%namespace.fmt_short(), "replica event: RemoteInsert");
729                // A new entry was inserted from initial sync or gossip. Queue downloading the
730                // content.
731                if should_download {
732                    let hash = entry.content_hash();
733                    if matches!(remote_content_status, ContentStatus::Complete) {
734                        let node_id = PublicKey::from_bytes(&from)?;
735                        self.start_download(namespace, hash, node_id, false).await;
736                    } else {
737                        self.missing_hashes.insert(hash);
738                    }
739                }
740            }
741        }
742
743        Ok(())
744    }
745
746    async fn start_download(
747        &mut self,
748        namespace: NamespaceId,
749        hash: Hash,
750        node: PublicKey,
751        only_if_missing: bool,
752    ) {
753        let entry_status = self.bao_store.entry_status(&hash).await;
754        if matches!(entry_status, Ok(EntryStatus::Complete)) {
755            self.missing_hashes.remove(&hash);
756            return;
757        }
758        if self.queued_hashes.contains_hash(&hash) {
759            self.queued_hashes.insert(hash, namespace);
760            self.downloader.nodes_have(hash, vec![node]).await;
761        } else if !only_if_missing || self.missing_hashes.contains(&hash) {
762            let req = DownloadRequest::new(HashAndFormat::raw(hash), vec![node]);
763            let handle = self.downloader.queue(req).await;
764
765            self.queued_hashes.insert(hash, namespace);
766            self.missing_hashes.remove(&hash);
767            self.download_tasks
768                .spawn(async move { (namespace, hash, handle.await) });
769        }
770    }
771
772    #[instrument("accept", skip_all)]
773    pub async fn handle_connection(&mut self, conn: iroh::endpoint::Connection) {
774        let to_actor_tx = self.sync_actor_tx.clone();
775        let accept_request_cb = move |namespace, peer| {
776            let to_actor_tx = to_actor_tx.clone();
777            async move {
778                let (reply_tx, reply_rx) = oneshot::channel();
779                to_actor_tx
780                    .send(ToLiveActor::AcceptSyncRequest {
781                        namespace,
782                        peer,
783                        reply: reply_tx,
784                    })
785                    .await
786                    .ok();
787                match reply_rx.await {
788                    Ok(outcome) => outcome,
789                    Err(err) => {
790                        warn!(
791                            "accept request callback failed to retrieve reply from actor: {err:?}"
792                        );
793                        AcceptOutcome::Reject(AbortReason::InternalServerError)
794                    }
795                }
796            }
797            .boxed()
798        };
799        debug!("incoming connection");
800        let sync = self.sync.clone();
801        let metrics = self.metrics.clone();
802        self.running_sync_accept.spawn(
803            async move { handle_connection(sync, conn, accept_request_cb, Some(&metrics)).await }
804                .instrument(Span::current()),
805        );
806    }
807
808    pub fn accept_sync_request(
809        &mut self,
810        namespace: NamespaceId,
811        peer: PublicKey,
812    ) -> AcceptOutcome {
813        self.state
814            .accept_request(&self.endpoint.node_id(), &namespace, peer)
815    }
816}
817
818/// Event emitted when a sync operation completes
819#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)]
820pub struct SyncEvent {
821    /// Peer we synced with
822    pub peer: PublicKey,
823    /// Origin of the sync exchange
824    pub origin: Origin,
825    /// Timestamp when the sync started
826    pub finished: SystemTime,
827    /// Timestamp when the sync finished
828    pub started: SystemTime,
829    /// Result of the sync operation
830    pub result: std::result::Result<SyncDetails, String>,
831}
832
833#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)]
834pub struct SyncDetails {
835    /// Number of entries received
836    pub entries_received: usize,
837    /// Number of entries sent
838    pub entries_sent: usize,
839}
840
841impl From<&SyncFinished> for SyncDetails {
842    fn from(value: &SyncFinished) -> Self {
843        Self {
844            entries_received: value.outcome.num_recv,
845            entries_sent: value.outcome.num_sent,
846        }
847    }
848}
849
850#[derive(Debug, Default)]
851struct SubscribersMap(HashMap<NamespaceId, Subscribers>);
852
853impl SubscribersMap {
854    fn subscribe(&mut self, namespace: NamespaceId, sender: async_channel::Sender<Event>) {
855        self.0.entry(namespace).or_default().subscribe(sender);
856    }
857
858    async fn send(&mut self, namespace: &NamespaceId, event: Event) -> bool {
859        debug!(namespace=%namespace.fmt_short(), %event, "emit event");
860        let Some(subscribers) = self.0.get_mut(namespace) else {
861            return false;
862        };
863
864        if !subscribers.send(event).await {
865            self.0.remove(namespace);
866        }
867        true
868    }
869
870    fn remove(&mut self, namespace: &NamespaceId) {
871        self.0.remove(namespace);
872    }
873
874    fn clear(&mut self) {
875        self.0.clear();
876    }
877}
878
879#[derive(Debug, Default)]
880struct QueuedHashes {
881    by_hash: HashMap<Hash, HashSet<NamespaceId>>,
882    by_namespace: HashMap<NamespaceId, HashSet<Hash>>,
883}
884
885impl QueuedHashes {
886    fn insert(&mut self, hash: Hash, namespace: NamespaceId) {
887        self.by_hash.entry(hash).or_default().insert(namespace);
888        self.by_namespace.entry(namespace).or_default().insert(hash);
889    }
890
891    /// Remove a hash from the set of queued hashes.
892    ///
893    /// Returns a list of namespaces that are now complete (have no queued hashes anymore).
894    fn remove_hash(&mut self, hash: &Hash) -> Vec<NamespaceId> {
895        let namespaces = self.by_hash.remove(hash).unwrap_or_default();
896        let mut removed_namespaces = vec![];
897        for namespace in namespaces {
898            if let Some(hashes) = self.by_namespace.get_mut(&namespace) {
899                hashes.remove(hash);
900                if hashes.is_empty() {
901                    self.by_namespace.remove(&namespace);
902                    removed_namespaces.push(namespace);
903                }
904            }
905        }
906        removed_namespaces
907    }
908
909    fn contains_hash(&self, hash: &Hash) -> bool {
910        self.by_hash.contains_key(hash)
911    }
912
913    fn contains_namespace(&self, namespace: &NamespaceId) -> bool {
914        self.by_namespace.contains_key(namespace)
915    }
916}
917
918#[derive(Debug, Default)]
919struct Subscribers(Vec<async_channel::Sender<Event>>);
920
921impl Subscribers {
922    fn subscribe(&mut self, sender: async_channel::Sender<Event>) {
923        self.0.push(sender)
924    }
925
926    async fn send(&mut self, event: Event) -> bool {
927        let futs = self.0.iter().map(|sender| sender.send(event.clone()));
928        let res = futures_buffered::join_all(futs).await;
929        // reverse the order so removing does not shift remaining indices
930        for (i, res) in res.into_iter().enumerate().rev() {
931            if res.is_err() {
932                self.0.remove(i);
933            }
934        }
935        !self.0.is_empty()
936    }
937}
938
939fn fmt_accept_peer(res: &Result<SyncFinished, AcceptError>) -> String {
940    match res {
941        Ok(res) => res.peer.fmt_short(),
942        Err(err) => err
943            .peer()
944            .map(|x| x.fmt_short())
945            .unwrap_or_else(|| "unknown".to_string()),
946    }
947}
948
949fn fmt_accept_namespace(res: &Result<SyncFinished, AcceptError>) -> String {
950    match res {
951        Ok(res) => res.namespace.fmt_short(),
952        Err(err) => err
953            .namespace()
954            .map(|x| x.fmt_short())
955            .unwrap_or_else(|| "unknown".to_string()),
956    }
957}
958
959#[cfg(test)]
960mod tests {
961    use super::*;
962
963    #[tokio::test]
964    async fn test_sync_remove() {
965        let pk = PublicKey::from_bytes(&[1; 32]).unwrap();
966        let (a_tx, a_rx) = async_channel::unbounded();
967        let (b_tx, b_rx) = async_channel::unbounded();
968        let mut subscribers = Subscribers::default();
969        subscribers.subscribe(a_tx);
970        subscribers.subscribe(b_tx);
971        drop(a_rx);
972        drop(b_rx);
973        subscribers.send(Event::NeighborUp(pk)).await;
974    }
975}