iroh_docs/
engine.rs

1//! Handlers and actors to for live syncing replicas.
2//!
3//! [`crate::Replica`] is also called documents here.
4
5use std::{
6    path::PathBuf,
7    str::FromStr,
8    sync::{Arc, RwLock},
9};
10
11use anyhow::{bail, Context, Result};
12use futures_lite::{Stream, StreamExt};
13use iroh::{Endpoint, NodeAddr, PublicKey};
14use iroh_blobs::{
15    api::{blobs::BlobStatus, downloader::Downloader, Store},
16    store::fs::options::{ProtectCb, ProtectOutcome},
17    Hash,
18};
19use iroh_gossip::net::Gossip;
20use serde::{Deserialize, Serialize};
21use tokio::sync::{mpsc, oneshot};
22use tokio_util::task::AbortOnDropHandle;
23use tracing::{debug, error, error_span, Instrument};
24
25use self::live::{LiveActor, ToLiveActor};
26pub use self::{
27    live::SyncEvent,
28    state::{Origin, SyncReason},
29};
30use crate::{
31    actor::SyncHandle, metrics::Metrics, Author, AuthorId, ContentStatus, ContentStatusCallback,
32    Entry, NamespaceId,
33};
34
35mod gossip;
36mod live;
37mod state;
38
39/// Capacity of the channel for the [`ToLiveActor`] messages.
40const ACTOR_CHANNEL_CAP: usize = 64;
41/// Capacity for the channels for [`Engine::subscribe`].
42const SUBSCRIBE_CHANNEL_CAP: usize = 256;
43
44/// The sync engine coordinates actors that manage open documents, set-reconciliation syncs with
45/// peers and a gossip swarm for each syncing document.
46#[derive(derive_more::Debug)]
47pub struct Engine {
48    /// [`Endpoint`] used by the engine.
49    pub endpoint: Endpoint,
50    /// Handle to the actor thread.
51    pub sync: SyncHandle,
52    /// The persistent default author for this engine.
53    pub default_author: DefaultAuthor,
54    to_live_actor: mpsc::Sender<ToLiveActor>,
55    #[allow(dead_code)]
56    actor_handle: AbortOnDropHandle<()>,
57    #[debug("ContentStatusCallback")]
58    content_status_cb: ContentStatusCallback,
59    blob_store: iroh_blobs::api::Store,
60    _gc_protect_task: AbortOnDropHandle<()>,
61}
62
63impl Engine {
64    /// Start the sync engine.
65    ///
66    /// This will spawn two tokio tasks for the live sync coordination and gossip actors, and a
67    /// thread for the actor interacting with doc storage.
68    pub async fn spawn(
69        endpoint: Endpoint,
70        gossip: Gossip,
71        replica_store: crate::store::Store,
72        bao_store: iroh_blobs::api::Store,
73        downloader: Downloader,
74        default_author_storage: DefaultAuthorStorage,
75        protect_cb: Option<ProtectCallbackHandler>,
76    ) -> anyhow::Result<Self> {
77        let (live_actor_tx, to_live_actor_recv) = mpsc::channel(ACTOR_CHANNEL_CAP);
78        let me = endpoint.node_id().fmt_short();
79
80        let content_status_cb: ContentStatusCallback = {
81            let blobs = bao_store.blobs().clone();
82            Arc::new(move |hash: iroh_blobs::Hash| {
83                let blobs = blobs.clone();
84                Box::pin(async move {
85                    let blob_status = blobs.status(hash).await;
86                    entry_to_content_status(blob_status)
87                })
88            })
89        };
90        let sync = SyncHandle::spawn(replica_store, Some(content_status_cb.clone()), me.clone());
91
92        let sync2 = sync.clone();
93        let gc_protect_task = AbortOnDropHandle::new(n0_future::task::spawn(async move {
94            let Some(mut protect_handler) = protect_cb else {
95                return;
96            };
97            while let Some(reply_tx) = protect_handler.0.recv().await {
98                let (tx, rx) = mpsc::channel(64);
99                if let Err(_err) = reply_tx.send(rx) {
100                    continue;
101                }
102                let hashes = match sync2.content_hashes().await {
103                    Ok(hashes) => hashes,
104                    Err(err) => {
105                        debug!("protect task: getting content hashes failed with {err:#}");
106                        if let Err(_err) = tx.send(Err(err)).await {
107                            debug!("protect task: failed to forward error");
108                        }
109                        continue;
110                    }
111                };
112                for hash in hashes {
113                    if let Err(_err) = tx.send(hash).await {
114                        debug!("protect task: failed to forward hash");
115                        break;
116                    }
117                }
118            }
119        }));
120
121        let actor = LiveActor::new(
122            sync.clone(),
123            endpoint.clone(),
124            gossip.clone(),
125            bao_store.clone(),
126            downloader,
127            to_live_actor_recv,
128            live_actor_tx.clone(),
129            sync.metrics().clone(),
130        );
131        let actor_handle = tokio::task::spawn(
132            async move {
133                if let Err(err) = actor.run().await {
134                    error!("sync actor failed: {err:?}");
135                }
136            }
137            .instrument(error_span!("sync", %me)),
138        );
139
140        let default_author = match DefaultAuthor::load(default_author_storage, &sync).await {
141            Ok(author) => author,
142            Err(err) => {
143                // If loading the default author failed, make sure to shutdown the sync actor before
144                // returning.
145                let _store = sync.shutdown().await.ok();
146                return Err(err);
147            }
148        };
149
150        Ok(Self {
151            endpoint,
152            sync,
153            to_live_actor: live_actor_tx,
154            actor_handle: AbortOnDropHandle::new(actor_handle),
155            content_status_cb,
156            default_author,
157            blob_store: bao_store,
158            _gc_protect_task: gc_protect_task,
159        })
160    }
161
162    /// Get the blob store.
163    pub fn blob_store(&self) -> &Store {
164        &self.blob_store
165    }
166
167    /// Returns the metrics tracked for this engine.
168    pub fn metrics(&self) -> &Arc<Metrics> {
169        self.sync.metrics()
170    }
171
172    /// Start to sync a document.
173    ///
174    /// If `peers` is non-empty, it will both do an initial set-reconciliation sync with each peer,
175    /// and join an iroh-gossip swarm with these peers to receive and broadcast document updates.
176    pub async fn start_sync(&self, namespace: NamespaceId, peers: Vec<NodeAddr>) -> Result<()> {
177        let (reply, reply_rx) = oneshot::channel();
178        self.to_live_actor
179            .send(ToLiveActor::StartSync {
180                namespace,
181                peers,
182                reply,
183            })
184            .await?;
185        reply_rx.await??;
186        Ok(())
187    }
188
189    /// Stop the live sync for a document and leave the gossip swarm.
190    ///
191    /// If `kill_subscribers` is true, all existing event subscribers will be dropped. This means
192    /// they will receive `None` and no further events in case of rejoining the document.
193    pub async fn leave(&self, namespace: NamespaceId, kill_subscribers: bool) -> Result<()> {
194        let (reply, reply_rx) = oneshot::channel();
195        self.to_live_actor
196            .send(ToLiveActor::Leave {
197                namespace,
198                kill_subscribers,
199                reply,
200            })
201            .await?;
202        reply_rx.await??;
203        Ok(())
204    }
205
206    /// Subscribe to replica and sync progress events.
207    pub async fn subscribe(
208        &self,
209        namespace: NamespaceId,
210    ) -> Result<impl Stream<Item = Result<LiveEvent>> + Unpin + 'static> {
211        // Create a future that sends channel senders to the respective actors.
212        // We clone `self` so that the future does not capture any lifetimes.
213        let content_status_cb = self.content_status_cb.clone();
214
215        // Subscribe to insert events from the replica.
216        let a = {
217            let (s, r) = async_channel::bounded(SUBSCRIBE_CHANNEL_CAP);
218            self.sync.subscribe(namespace, s).await?;
219            Box::pin(r).then(move |ev| {
220                let content_status_cb = content_status_cb.clone();
221                Box::pin(async move { LiveEvent::from_replica_event(ev, &content_status_cb).await })
222            })
223        };
224
225        // Subscribe to events from the [`live::Actor`].
226        let b = {
227            let (s, r) = async_channel::bounded(SUBSCRIBE_CHANNEL_CAP);
228            let r = Box::pin(r);
229            let (reply, reply_rx) = oneshot::channel();
230            self.to_live_actor
231                .send(ToLiveActor::Subscribe {
232                    namespace,
233                    sender: s,
234                    reply,
235                })
236                .await?;
237            reply_rx.await??;
238            r.map(|event| Ok(LiveEvent::from(event)))
239        };
240
241        Ok(a.or(b))
242    }
243
244    /// Handle an incoming iroh-docs connection.
245    pub async fn handle_connection(&self, conn: iroh::endpoint::Connection) -> anyhow::Result<()> {
246        self.to_live_actor
247            .send(ToLiveActor::HandleConnection { conn })
248            .await?;
249        Ok(())
250    }
251
252    /// Shutdown the engine.
253    pub async fn shutdown(&self) -> Result<()> {
254        let (reply, reply_rx) = oneshot::channel();
255        self.to_live_actor
256            .send(ToLiveActor::Shutdown { reply })
257            .await?;
258        reply_rx.await?;
259        Ok(())
260    }
261}
262
263/// Converts an [`BlobStatus`] into a ['ContentStatus'].
264fn entry_to_content_status(entry: irpc::Result<BlobStatus>) -> ContentStatus {
265    match entry {
266        Ok(BlobStatus::Complete { .. }) => ContentStatus::Complete,
267        Ok(BlobStatus::Partial { .. }) => ContentStatus::Incomplete,
268        Ok(BlobStatus::NotFound) => ContentStatus::Missing,
269        Err(cause) => {
270            tracing::warn!("Error while checking entry status: {cause:?}");
271            ContentStatus::Missing
272        }
273    }
274}
275
276/// Events informing about actions of the live sync progress.
277#[derive(Serialize, Deserialize, Debug, Clone, Eq, PartialEq, strum::Display)]
278pub enum LiveEvent {
279    /// A local insertion.
280    InsertLocal {
281        /// The inserted entry.
282        entry: Entry,
283    },
284    /// Received a remote insert.
285    InsertRemote {
286        /// The peer that sent us the entry.
287        from: PublicKey,
288        /// The inserted entry.
289        entry: Entry,
290        /// If the content is available at the local node
291        content_status: ContentStatus,
292    },
293    /// The content of an entry was downloaded and is now available at the local node
294    ContentReady {
295        /// The content hash of the newly available entry content
296        hash: Hash,
297    },
298    /// All pending content is now ready.
299    ///
300    /// This event signals that all queued content downloads from the last sync run have either
301    /// completed or failed.
302    ///
303    /// It will only be emitted after a [`Self::SyncFinished`] event, never before.
304    ///
305    /// Receiving this event does not guarantee that all content in the document is available. If
306    /// blobs failed to download, this event will still be emitted after all operations completed.
307    PendingContentReady,
308    /// We have a new neighbor in the swarm.
309    NeighborUp(PublicKey),
310    /// We lost a neighbor in the swarm.
311    NeighborDown(PublicKey),
312    /// A set-reconciliation sync finished.
313    SyncFinished(SyncEvent),
314}
315
316impl From<live::Event> for LiveEvent {
317    fn from(ev: live::Event) -> Self {
318        match ev {
319            live::Event::ContentReady { hash } => Self::ContentReady { hash },
320            live::Event::NeighborUp(peer) => Self::NeighborUp(peer),
321            live::Event::NeighborDown(peer) => Self::NeighborDown(peer),
322            live::Event::SyncFinished(ev) => Self::SyncFinished(ev),
323            live::Event::PendingContentReady => Self::PendingContentReady,
324        }
325    }
326}
327
328impl LiveEvent {
329    async fn from_replica_event(
330        ev: crate::Event,
331        content_status_cb: &ContentStatusCallback,
332    ) -> Result<Self> {
333        Ok(match ev {
334            crate::Event::LocalInsert { entry, .. } => Self::InsertLocal {
335                entry: entry.into(),
336            },
337            crate::Event::RemoteInsert { entry, from, .. } => Self::InsertRemote {
338                content_status: content_status_cb(entry.content_hash()).await,
339                entry: entry.into(),
340                from: PublicKey::from_bytes(&from)?,
341            },
342        })
343    }
344}
345
346/// Where to persist the default author.
347///
348/// If set to `Mem`, a new author will be created in the docs store before spawning the sync
349/// engine. Changing the default author will not be persisted.
350///
351/// If set to `Persistent`, the default author will be loaded from and persisted to the specified
352/// path (as hex encoded string of the author's public key).
353#[derive(Debug)]
354pub enum DefaultAuthorStorage {
355    /// Memory storage.
356    Mem,
357    /// File based persistent storage.
358    Persistent(PathBuf),
359}
360
361impl DefaultAuthorStorage {
362    /// Load the default author from the storage.
363    ///
364    /// Will create and save a new author if the storage is empty.
365    ///
366    /// Returns an error if the author can't be parsed or if the uathor does not exist in the docs
367    /// store.
368    pub async fn load(&self, docs_store: &SyncHandle) -> anyhow::Result<AuthorId> {
369        match self {
370            Self::Mem => {
371                let author = Author::new(&mut rand::thread_rng());
372                let author_id = author.id();
373                docs_store.import_author(author).await?;
374                Ok(author_id)
375            }
376            Self::Persistent(ref path) => {
377                if path.exists() {
378                    let data = tokio::fs::read_to_string(path).await.with_context(|| {
379                        format!(
380                            "Failed to read the default author file at `{}`",
381                            path.to_string_lossy()
382                        )
383                    })?;
384                    let author_id = AuthorId::from_str(&data).with_context(|| {
385                        format!(
386                            "Failed to parse the default author from `{}`",
387                            path.to_string_lossy()
388                        )
389                    })?;
390                    if docs_store.export_author(author_id).await?.is_none() {
391                        bail!("The default author is missing from the docs store. To recover, delete the file `{}`. Then iroh will create a new default author.", path.to_string_lossy())
392                    }
393                    Ok(author_id)
394                } else {
395                    let author = Author::new(&mut rand::thread_rng());
396                    let author_id = author.id();
397                    docs_store.import_author(author).await?;
398                    // Make sure to write the default author to the store
399                    // *before* we write the default author ID file.
400                    // Otherwise the default author ID file is effectively a dangling reference.
401                    docs_store.flush_store().await?;
402                    self.persist(author_id).await?;
403                    Ok(author_id)
404                }
405            }
406        }
407    }
408
409    /// Save a new default author.
410    pub async fn persist(&self, author_id: AuthorId) -> anyhow::Result<()> {
411        match self {
412            Self::Mem => {
413                // persistence is not possible for the mem storage so this is a noop.
414            }
415            Self::Persistent(ref path) => {
416                tokio::fs::write(path, author_id.to_string())
417                    .await
418                    .with_context(|| {
419                        format!(
420                            "Failed to write the default author to `{}`",
421                            path.to_string_lossy()
422                        )
423                    })?;
424            }
425        }
426        Ok(())
427    }
428}
429
430/// Persistent default author for a docs engine.
431#[derive(Debug)]
432pub struct DefaultAuthor {
433    value: RwLock<AuthorId>,
434    storage: DefaultAuthorStorage,
435}
436
437impl DefaultAuthor {
438    /// Load the default author from storage.
439    ///
440    /// If the storage is empty creates a new author and persists it.
441    pub async fn load(storage: DefaultAuthorStorage, docs_store: &SyncHandle) -> Result<Self> {
442        let value = storage.load(docs_store).await?;
443        Ok(Self {
444            value: RwLock::new(value),
445            storage,
446        })
447    }
448
449    /// Get the current default author.
450    pub fn get(&self) -> AuthorId {
451        *self.value.read().unwrap()
452    }
453
454    /// Set the default author.
455    pub async fn set(&self, author_id: AuthorId, docs_store: &SyncHandle) -> Result<()> {
456        if docs_store.export_author(author_id).await?.is_none() {
457            bail!("The author does not exist");
458        }
459        self.storage.persist(author_id).await?;
460        *self.value.write().unwrap() = author_id;
461        Ok(())
462    }
463}
464
465#[derive(Debug)]
466struct ProtectCallbackSender(mpsc::Sender<oneshot::Sender<mpsc::Receiver<Result<Hash>>>>);
467
468/// The handler for a blobs protection callback.
469///
470/// See [`ProtectCallbackHandler::new`].
471#[derive(Debug)]
472pub struct ProtectCallbackHandler(
473    pub(crate) mpsc::Receiver<oneshot::Sender<mpsc::Receiver<Result<Hash>>>>,
474);
475
476impl ProtectCallbackHandler {
477    /// Creates a callback and handler to manage blob protection.
478    ///
479    /// The returned [`ProtectCb`] must be passed set in the [`GcConfig`] of the [`iroh_blobs`] store where
480    /// the blobs for hashes in documents are persisted. The [`ProtectCallbackHandler`] must be passed to
481    /// [`Builder::protect_handler`] (or [`Engine::spawn`]). This will then ensure that hashes referenced
482    /// in docs will not be deleted from the blobs store, and will be garbage collected if they no longer appear
483    /// in any doc.
484    ///
485    /// [`Builder::protect_handler`]: crate::protocol::Builder::protect_handler
486    /// [`GcConfig`]: iroh_blobs::store::fs::options::GcConfig
487    pub fn new() -> (Self, ProtectCb) {
488        let (tx, rx) = mpsc::channel(4);
489        let cb = ProtectCallbackSender(tx).into_cb();
490        let handler = ProtectCallbackHandler(rx);
491        (handler, cb)
492    }
493}
494
495impl ProtectCallbackSender {
496    fn into_cb(self) -> ProtectCb {
497        let start_tx = self.0.clone();
498        Arc::new(move |live| {
499            let start_tx = start_tx.clone();
500            Box::pin(async move {
501                let (tx, rx) = oneshot::channel();
502                if let Err(_err) = start_tx.send(tx).await {
503                    tracing::warn!("Failed to get protected hashes from docs: ProtectCallback receiver dropped");
504                    return ProtectOutcome::Abort;
505                }
506                let mut rx = match rx.await {
507                    Ok(rx) => rx,
508                    Err(_err) => {
509                        tracing::warn!("Failed to get protected hashes from docs: ProtectCallback sender dropped");
510                        return ProtectOutcome::Abort;
511                    }
512                };
513                while let Some(res) = rx.recv().await {
514                    match res {
515                        Err(err) => {
516                            tracing::warn!("Getting protected hashes produces error: {err:#}");
517                            return ProtectOutcome::Abort;
518                        }
519                        Ok(hash) => {
520                            live.insert(hash);
521                        }
522                    }
523                }
524                ProtectOutcome::Continue
525            })
526        })
527    }
528}