Skip to main content

iroh_docs/
engine.rs

1//! Handlers and actors to for live syncing replicas.
2//!
3//! [`crate::Replica`] is also called documents here.
4
5use std::sync::{Arc, RwLock};
6
7use anyhow::{bail, Result};
8use futures_lite::{Stream, StreamExt};
9use iroh::{Endpoint, EndpointAddr, PublicKey};
10use iroh_blobs::{
11    api::{blobs::BlobStatus, downloader::Downloader, Store},
12    store::{ProtectCb, ProtectOutcome},
13    Hash,
14};
15use iroh_gossip::net::Gossip;
16use n0_future::task::AbortOnDropHandle;
17use serde::{Deserialize, Serialize};
18use tokio::sync::{mpsc, oneshot};
19use tracing::{debug, error, error_span, Instrument};
20
21use self::live::{LiveActor, ToLiveActor};
22pub use self::{
23    live::SyncEvent,
24    state::{Origin, SyncReason},
25};
26use crate::{
27    actor::SyncHandle, metrics::Metrics, Author, AuthorId, ContentStatus, ContentStatusCallback,
28    Entry, NamespaceId,
29};
30
31mod gossip;
32mod live;
33mod state;
34
35/// Capacity of the channel for the [`ToLiveActor`] messages.
36const ACTOR_CHANNEL_CAP: usize = 64;
37/// Capacity for the channels for [`Engine::subscribe`].
38const SUBSCRIBE_CHANNEL_CAP: usize = 256;
39
40/// The sync engine coordinates actors that manage open documents, set-reconciliation syncs with
41/// peers and a gossip swarm for each syncing document.
42#[derive(derive_more::Debug)]
43pub struct Engine {
44    /// [`Endpoint`] used by the engine.
45    pub endpoint: Endpoint,
46    /// Handle to the actor thread.
47    pub sync: SyncHandle,
48    /// The persistent default author for this engine.
49    pub default_author: DefaultAuthor,
50    to_live_actor: mpsc::Sender<ToLiveActor>,
51    #[allow(dead_code)]
52    actor_handle: AbortOnDropHandle<()>,
53    #[debug("ContentStatusCallback")]
54    content_status_cb: ContentStatusCallback,
55    blob_store: iroh_blobs::api::Store,
56    _gc_protect_task: AbortOnDropHandle<()>,
57}
58
59impl Engine {
60    /// Start the sync engine.
61    ///
62    /// This will spawn two tokio tasks for the live sync coordination and gossip actors, and a
63    /// thread for the actor interacting with doc storage.
64    pub async fn spawn(
65        endpoint: Endpoint,
66        gossip: Gossip,
67        replica_store: crate::store::Store,
68        bao_store: iroh_blobs::api::Store,
69        downloader: Downloader,
70        default_author_storage: DefaultAuthorStorage,
71        protect_cb: Option<ProtectCallbackHandler>,
72    ) -> anyhow::Result<Self> {
73        let (live_actor_tx, to_live_actor_recv) = mpsc::channel(ACTOR_CHANNEL_CAP);
74        let me = endpoint.id().fmt_short().to_string();
75
76        let content_status_cb: ContentStatusCallback = {
77            let blobs = bao_store.blobs().clone();
78            Arc::new(move |hash: iroh_blobs::Hash| {
79                let blobs = blobs.clone();
80                Box::pin(async move {
81                    let blob_status = blobs.status(hash).await;
82                    entry_to_content_status(blob_status)
83                })
84            })
85        };
86        let sync = SyncHandle::spawn(replica_store, Some(content_status_cb.clone()), me.clone());
87
88        let sync2 = sync.clone();
89        let gc_protect_task = AbortOnDropHandle::new(n0_future::task::spawn(async move {
90            let Some(mut protect_handler) = protect_cb else {
91                return;
92            };
93            while let Some(reply_tx) = protect_handler.0.recv().await {
94                let (tx, rx) = mpsc::channel(64);
95                if let Err(_err) = reply_tx.send(rx) {
96                    continue;
97                }
98                let hashes = match sync2.content_hashes().await {
99                    Ok(hashes) => hashes,
100                    Err(err) => {
101                        debug!("protect task: getting content hashes failed with {err:#}");
102                        if let Err(_err) = tx.send(Err(err)).await {
103                            debug!("protect task: failed to forward error");
104                        }
105                        continue;
106                    }
107                };
108                for hash in hashes {
109                    if let Err(_err) = tx.send(hash).await {
110                        debug!("protect task: failed to forward hash");
111                        break;
112                    }
113                }
114            }
115        }));
116
117        let actor = LiveActor::new(
118            sync.clone(),
119            endpoint.clone(),
120            gossip.clone(),
121            bao_store.clone(),
122            downloader,
123            to_live_actor_recv,
124            live_actor_tx.clone(),
125            sync.metrics().clone(),
126        )?;
127        let actor_handle = n0_future::task::spawn(
128            async move {
129                if let Err(err) = actor.run().await {
130                    error!("sync actor failed: {err:?}");
131                }
132            }
133            .instrument(error_span!("sync", %me)),
134        );
135
136        let default_author = match DefaultAuthor::load(default_author_storage, &sync).await {
137            Ok(author) => author,
138            Err(err) => {
139                // If loading the default author failed, make sure to shutdown the sync actor before
140                // returning.
141                let _store = sync.shutdown().await.ok();
142                return Err(err);
143            }
144        };
145
146        Ok(Self {
147            endpoint,
148            sync,
149            to_live_actor: live_actor_tx,
150            actor_handle: AbortOnDropHandle::new(actor_handle),
151            content_status_cb,
152            default_author,
153            blob_store: bao_store,
154            _gc_protect_task: gc_protect_task,
155        })
156    }
157
158    /// Get the blob store.
159    pub fn blob_store(&self) -> &Store {
160        &self.blob_store
161    }
162
163    /// Returns the metrics tracked for this engine.
164    pub fn metrics(&self) -> &Arc<Metrics> {
165        self.sync.metrics()
166    }
167
168    /// Start to sync a document.
169    ///
170    /// If `peers` is non-empty, it will both do an initial set-reconciliation sync with each peer,
171    /// and join an iroh-gossip swarm with these peers to receive and broadcast document updates.
172    pub async fn start_sync(&self, namespace: NamespaceId, peers: Vec<EndpointAddr>) -> Result<()> {
173        let (reply, reply_rx) = oneshot::channel();
174        self.to_live_actor
175            .send(ToLiveActor::StartSync {
176                namespace,
177                peers,
178                reply,
179            })
180            .await?;
181        reply_rx.await??;
182        Ok(())
183    }
184
185    /// Stop the live sync for a document and leave the gossip swarm.
186    ///
187    /// If `kill_subscribers` is true, all existing event subscribers will be dropped. This means
188    /// they will receive `None` and no further events in case of rejoining the document.
189    pub async fn leave(&self, namespace: NamespaceId, kill_subscribers: bool) -> Result<()> {
190        let (reply, reply_rx) = oneshot::channel();
191        self.to_live_actor
192            .send(ToLiveActor::Leave {
193                namespace,
194                kill_subscribers,
195                reply,
196            })
197            .await?;
198        reply_rx.await??;
199        Ok(())
200    }
201
202    /// Subscribe to replica and sync progress events.
203    pub async fn subscribe(
204        &self,
205        namespace: NamespaceId,
206    ) -> Result<impl Stream<Item = Result<LiveEvent>> + Unpin + 'static> {
207        // Create a future that sends channel senders to the respective actors.
208        // We clone `self` so that the future does not capture any lifetimes.
209        let content_status_cb = self.content_status_cb.clone();
210
211        // Subscribe to insert events from the replica.
212        let a = {
213            let (s, r) = async_channel::bounded(SUBSCRIBE_CHANNEL_CAP);
214            self.sync.subscribe(namespace, s).await?;
215            Box::pin(r).then(move |ev| {
216                let content_status_cb = content_status_cb.clone();
217                Box::pin(async move { LiveEvent::from_replica_event(ev, &content_status_cb).await })
218            })
219        };
220
221        // Subscribe to events from the [`live::Actor`].
222        let b = {
223            let (s, r) = async_channel::bounded(SUBSCRIBE_CHANNEL_CAP);
224            let r = Box::pin(r);
225            let (reply, reply_rx) = oneshot::channel();
226            self.to_live_actor
227                .send(ToLiveActor::Subscribe {
228                    namespace,
229                    sender: s,
230                    reply,
231                })
232                .await?;
233            reply_rx.await??;
234            r.map(|event| Ok(LiveEvent::from(event)))
235        };
236
237        Ok(a.or(b))
238    }
239
240    /// Handle an incoming iroh-docs connection.
241    pub async fn handle_connection(&self, conn: iroh::endpoint::Connection) -> anyhow::Result<()> {
242        self.to_live_actor
243            .send(ToLiveActor::HandleConnection { conn })
244            .await?;
245        Ok(())
246    }
247
248    /// Shutdown the engine.
249    pub async fn shutdown(&self) -> Result<()> {
250        let (reply, reply_rx) = oneshot::channel();
251        self.to_live_actor
252            .send(ToLiveActor::Shutdown { reply })
253            .await?;
254        reply_rx.await?;
255        Ok(())
256    }
257}
258
259/// Converts an [`BlobStatus`] into a ['ContentStatus'].
260fn entry_to_content_status(entry: irpc::Result<BlobStatus>) -> ContentStatus {
261    match entry {
262        Ok(BlobStatus::Complete { .. }) => ContentStatus::Complete,
263        Ok(BlobStatus::Partial { .. }) => ContentStatus::Incomplete,
264        Ok(BlobStatus::NotFound) => ContentStatus::Missing,
265        Err(cause) => {
266            tracing::warn!("Error while checking entry status: {cause:?}");
267            ContentStatus::Missing
268        }
269    }
270}
271
272/// Events informing about actions of the live sync progress.
273#[derive(Serialize, Deserialize, Debug, Clone, Eq, PartialEq, strum::Display)]
274pub enum LiveEvent {
275    /// A local insertion.
276    InsertLocal {
277        /// The inserted entry.
278        entry: Entry,
279    },
280    /// Received a remote insert.
281    InsertRemote {
282        /// The peer that sent us the entry.
283        from: PublicKey,
284        /// The inserted entry.
285        entry: Entry,
286        /// If the content is available at the local node
287        content_status: ContentStatus,
288    },
289    /// The content of an entry was downloaded and is now available at the local node
290    ContentReady {
291        /// The content hash of the newly available entry content
292        hash: Hash,
293    },
294    /// All pending content is now ready.
295    ///
296    /// This event signals that all queued content downloads from the last sync run have either
297    /// completed or failed.
298    ///
299    /// It will only be emitted after a [`Self::SyncFinished`] event, never before.
300    ///
301    /// Receiving this event does not guarantee that all content in the document is available. If
302    /// blobs failed to download, this event will still be emitted after all operations completed.
303    PendingContentReady,
304    /// We have a new neighbor in the swarm.
305    NeighborUp(PublicKey),
306    /// We lost a neighbor in the swarm.
307    NeighborDown(PublicKey),
308    /// A set-reconciliation sync finished.
309    SyncFinished(SyncEvent),
310}
311
312impl From<live::Event> for LiveEvent {
313    fn from(ev: live::Event) -> Self {
314        match ev {
315            live::Event::ContentReady { hash } => Self::ContentReady { hash },
316            live::Event::NeighborUp(peer) => Self::NeighborUp(peer),
317            live::Event::NeighborDown(peer) => Self::NeighborDown(peer),
318            live::Event::SyncFinished(ev) => Self::SyncFinished(ev),
319            live::Event::PendingContentReady => Self::PendingContentReady,
320        }
321    }
322}
323
324impl LiveEvent {
325    async fn from_replica_event(
326        ev: crate::Event,
327        content_status_cb: &ContentStatusCallback,
328    ) -> Result<Self> {
329        Ok(match ev {
330            crate::Event::LocalInsert { entry, .. } => Self::InsertLocal {
331                entry: entry.into(),
332            },
333            crate::Event::RemoteInsert { entry, from, .. } => Self::InsertRemote {
334                content_status: content_status_cb(entry.content_hash()).await,
335                entry: entry.into(),
336                from: PublicKey::from_bytes(&from)?,
337            },
338        })
339    }
340}
341
342/// Where to persist the default author.
343///
344/// If set to `Mem`, a new author will be created in the docs store before spawning the sync
345/// engine. Changing the default author will not be persisted.
346///
347/// If set to `Persistent`, the default author will be loaded from and persisted to the specified
348/// path (as hex encoded string of the author's public key).
349#[derive(Debug)]
350pub enum DefaultAuthorStorage {
351    /// Memory storage.
352    Mem,
353    /// File based persistent storage.
354    #[cfg(feature = "fs-store")]
355    Persistent(std::path::PathBuf),
356}
357
358impl DefaultAuthorStorage {
359    /// Load the default author from the storage.
360    ///
361    /// Will create and save a new author if the storage is empty.
362    ///
363    /// Returns an error if the author can't be parsed or if the uathor does not exist in the docs
364    /// store.
365    pub async fn load(&self, docs_store: &SyncHandle) -> anyhow::Result<AuthorId> {
366        match self {
367            Self::Mem => {
368                let author = Author::new(&mut rand::rng());
369                let author_id = author.id();
370                docs_store.import_author(author).await?;
371                Ok(author_id)
372            }
373            #[cfg(feature = "fs-store")]
374            Self::Persistent(ref path) => {
375                use std::str::FromStr;
376
377                use anyhow::Context;
378                if path.exists() {
379                    let data = tokio::fs::read_to_string(path).await.with_context(|| {
380                        format!(
381                            "Failed to read the default author file at `{}`",
382                            path.to_string_lossy()
383                        )
384                    })?;
385                    let author_id = AuthorId::from_str(&data).with_context(|| {
386                        format!(
387                            "Failed to parse the default author from `{}`",
388                            path.to_string_lossy()
389                        )
390                    })?;
391                    if docs_store.export_author(author_id).await?.is_none() {
392                        bail!(
393                            "The default author is missing from the docs store. To recover, delete the file `{}`. Then iroh will create a new default author.",
394                            path.to_string_lossy()
395                        )
396                    }
397                    Ok(author_id)
398                } else {
399                    let author = Author::new(&mut rand::rng());
400                    let author_id = author.id();
401                    docs_store.import_author(author).await?;
402                    // Make sure to write the default author to the store
403                    // *before* we write the default author ID file.
404                    // Otherwise the default author ID file is effectively a dangling reference.
405                    docs_store.flush_store().await?;
406                    self.persist(author_id).await?;
407                    Ok(author_id)
408                }
409            }
410        }
411    }
412
413    /// Save a new default author.
414    pub async fn persist(&self, #[allow(unused)] author_id: AuthorId) -> anyhow::Result<()> {
415        match self {
416            Self::Mem => {
417                // persistence is not possible for the mem storage so this is a noop.
418            }
419            #[cfg(feature = "fs-store")]
420            Self::Persistent(ref path) => {
421                use anyhow::Context;
422                tokio::fs::write(path, author_id.to_string())
423                    .await
424                    .with_context(|| {
425                        format!(
426                            "Failed to write the default author to `{}`",
427                            path.to_string_lossy()
428                        )
429                    })?;
430            }
431        }
432        Ok(())
433    }
434}
435
436/// Persistent default author for a docs engine.
437#[derive(Debug)]
438pub struct DefaultAuthor {
439    value: RwLock<AuthorId>,
440    storage: DefaultAuthorStorage,
441}
442
443impl DefaultAuthor {
444    /// Load the default author from storage.
445    ///
446    /// If the storage is empty creates a new author and persists it.
447    pub async fn load(storage: DefaultAuthorStorage, docs_store: &SyncHandle) -> Result<Self> {
448        let value = storage.load(docs_store).await?;
449        Ok(Self {
450            value: RwLock::new(value),
451            storage,
452        })
453    }
454
455    /// Get the current default author.
456    pub fn get(&self) -> AuthorId {
457        *self.value.read().unwrap()
458    }
459
460    /// Set the default author.
461    pub async fn set(&self, author_id: AuthorId, docs_store: &SyncHandle) -> Result<()> {
462        if docs_store.export_author(author_id).await?.is_none() {
463            bail!("The author does not exist");
464        }
465        self.storage.persist(author_id).await?;
466        *self.value.write().unwrap() = author_id;
467        Ok(())
468    }
469}
470
471#[derive(Debug)]
472struct ProtectCallbackSender(mpsc::Sender<oneshot::Sender<mpsc::Receiver<Result<Hash>>>>);
473
474/// The handler for a blobs protection callback.
475///
476/// See [`ProtectCallbackHandler::new`].
477#[derive(Debug)]
478pub struct ProtectCallbackHandler(
479    pub(crate) mpsc::Receiver<oneshot::Sender<mpsc::Receiver<Result<Hash>>>>,
480);
481
482impl ProtectCallbackHandler {
483    /// Creates a callback and handler to manage blob protection.
484    ///
485    /// The returned [`ProtectCb`] must be passed set in the [`GcConfig`] of the [`iroh_blobs`] store where
486    /// the blobs for hashes in documents are persisted. The [`ProtectCallbackHandler`] must be passed to
487    /// [`Builder::protect_handler`] (or [`Engine::spawn`]). This will then ensure that hashes referenced
488    /// in docs will not be deleted from the blobs store, and will be garbage collected if they no longer appear
489    /// in any doc.
490    ///
491    /// [`Builder::protect_handler`]: crate::protocol::Builder::protect_handler
492    /// [`GcConfig`]: iroh_blobs::store::GcConfig
493    pub fn new() -> (Self, ProtectCb) {
494        let (tx, rx) = mpsc::channel(4);
495        let cb = ProtectCallbackSender(tx).into_cb();
496        let handler = ProtectCallbackHandler(rx);
497        (handler, cb)
498    }
499}
500
501impl ProtectCallbackSender {
502    fn into_cb(self) -> ProtectCb {
503        let start_tx = self.0.clone();
504        Arc::new(move |live| {
505            let start_tx = start_tx.clone();
506            Box::pin(async move {
507                let (tx, rx) = oneshot::channel();
508                if let Err(_err) = start_tx.send(tx).await {
509                    tracing::warn!(
510                        "Failed to get protected hashes from docs: ProtectCallback receiver dropped"
511                    );
512                    return ProtectOutcome::Abort;
513                }
514                let mut rx = match rx.await {
515                    Ok(rx) => rx,
516                    Err(_err) => {
517                        tracing::warn!(
518                            "Failed to get protected hashes from docs: ProtectCallback sender dropped"
519                        );
520                        return ProtectOutcome::Abort;
521                    }
522                };
523                while let Some(res) = rx.recv().await {
524                    match res {
525                        Err(err) => {
526                            tracing::warn!("Getting protected hashes produces error: {err:#}");
527                            return ProtectOutcome::Abort;
528                        }
529                        Ok(hash) => {
530                            live.insert(hash);
531                        }
532                    }
533                }
534                ProtectOutcome::Continue
535            })
536        })
537    }
538}