iroh_docs/
engine.rs

1//! Handlers and actors to for live syncing replicas.
2//!
3//! [`crate::Replica`] is also called documents here.
4
5use std::{
6    io,
7    path::PathBuf,
8    str::FromStr,
9    sync::{Arc, RwLock},
10};
11
12use anyhow::{bail, Context, Result};
13use futures_lite::{Stream, StreamExt};
14use iroh::{Endpoint, NodeAddr, PublicKey};
15use iroh_blobs::{
16    downloader::Downloader, net_protocol::ProtectCb, store::EntryStatus,
17    util::local_pool::LocalPoolHandle, Hash,
18};
19use iroh_gossip::net::Gossip;
20use serde::{Deserialize, Serialize};
21use tokio::sync::{mpsc, oneshot};
22use tokio_util::task::AbortOnDropHandle;
23use tracing::{error, error_span, Instrument};
24
25use self::live::{LiveActor, ToLiveActor};
26pub use self::{
27    live::SyncEvent,
28    state::{Origin, SyncReason},
29};
30use crate::{
31    actor::SyncHandle, metrics::Metrics, Author, AuthorId, ContentStatus, ContentStatusCallback,
32    Entry, NamespaceId,
33};
34
35mod gossip;
36mod live;
37mod state;
38
39/// Capacity of the channel for the [`ToLiveActor`] messages.
40const ACTOR_CHANNEL_CAP: usize = 64;
41/// Capacity for the channels for [`Engine::subscribe`].
42const SUBSCRIBE_CHANNEL_CAP: usize = 256;
43
44/// The sync engine coordinates actors that manage open documents, set-reconciliation syncs with
45/// peers and a gossip swarm for each syncing document.
46#[derive(derive_more::Debug)]
47pub struct Engine<D> {
48    /// [`Endpoint`] used by the engine.
49    pub endpoint: Endpoint,
50    /// Handle to the actor thread.
51    pub sync: SyncHandle,
52    /// The persistent default author for this engine.
53    pub default_author: DefaultAuthor,
54    to_live_actor: mpsc::Sender<ToLiveActor>,
55    #[allow(dead_code)]
56    actor_handle: AbortOnDropHandle<()>,
57    #[debug("ContentStatusCallback")]
58    content_status_cb: ContentStatusCallback,
59    local_pool_handle: LocalPoolHandle,
60    blob_store: D,
61}
62
63impl<D: iroh_blobs::store::Store> Engine<D> {
64    /// Start the sync engine.
65    ///
66    /// This will spawn two tokio tasks for the live sync coordination and gossip actors, and a
67    /// thread for the [`crate::actor::SyncHandle`].
68    pub async fn spawn(
69        endpoint: Endpoint,
70        gossip: Gossip,
71        replica_store: crate::store::Store,
72        bao_store: D,
73        downloader: Downloader,
74        default_author_storage: DefaultAuthorStorage,
75        local_pool_handle: LocalPoolHandle,
76    ) -> anyhow::Result<Self> {
77        let (live_actor_tx, to_live_actor_recv) = mpsc::channel(ACTOR_CHANNEL_CAP);
78        let me = endpoint.node_id().fmt_short();
79
80        let content_status_cb = {
81            let bao_store = bao_store.clone();
82            Arc::new(move |hash| entry_to_content_status(bao_store.entry_status_sync(&hash)))
83        };
84        let sync = SyncHandle::spawn(replica_store, Some(content_status_cb.clone()), me.clone());
85
86        let actor = LiveActor::new(
87            sync.clone(),
88            endpoint.clone(),
89            gossip.clone(),
90            bao_store.clone(),
91            downloader,
92            to_live_actor_recv,
93            live_actor_tx.clone(),
94            sync.metrics().clone(),
95        );
96        let actor_handle = tokio::task::spawn(
97            async move {
98                if let Err(err) = actor.run().await {
99                    error!("sync actor failed: {err:?}");
100                }
101            }
102            .instrument(error_span!("sync", %me)),
103        );
104
105        let default_author = match DefaultAuthor::load(default_author_storage, &sync).await {
106            Ok(author) => author,
107            Err(err) => {
108                // If loading the default author failed, make sure to shutdown the sync actor before
109                // returning.
110                let _store = sync.shutdown().await.ok();
111                return Err(err);
112            }
113        };
114
115        Ok(Self {
116            endpoint,
117            sync,
118            to_live_actor: live_actor_tx,
119            actor_handle: AbortOnDropHandle::new(actor_handle),
120            content_status_cb,
121            default_author,
122            local_pool_handle,
123            blob_store: bao_store,
124        })
125    }
126
127    /// Return a callback that can be added to blobs to protect the content of
128    /// all docs from garbage collection.
129    pub fn protect_cb(&self) -> ProtectCb {
130        let sync = self.sync.clone();
131        Box::new(move |live| {
132            let sync = sync.clone();
133            Box::pin(async move {
134                let doc_hashes = match sync.content_hashes().await {
135                    Ok(hashes) => hashes,
136                    Err(err) => {
137                        tracing::warn!("Error getting doc hashes: {}", err);
138                        return;
139                    }
140                };
141                for hash in doc_hashes {
142                    match hash {
143                        Ok(hash) => {
144                            live.insert(hash);
145                        }
146                        Err(err) => {
147                            tracing::error!("Error getting doc hash: {}", err);
148                        }
149                    }
150                }
151            })
152        })
153    }
154
155    /// Get the blob store.
156    pub fn blob_store(&self) -> &D {
157        &self.blob_store
158    }
159
160    /// Returns the metrics tracked for this engine.
161    pub fn metrics(&self) -> &Arc<Metrics> {
162        self.sync.metrics()
163    }
164
165    /// Start to sync a document.
166    ///
167    /// If `peers` is non-empty, it will both do an initial set-reconciliation sync with each peer,
168    /// and join an iroh-gossip swarm with these peers to receive and broadcast document updates.
169    pub async fn start_sync(&self, namespace: NamespaceId, peers: Vec<NodeAddr>) -> Result<()> {
170        let (reply, reply_rx) = oneshot::channel();
171        self.to_live_actor
172            .send(ToLiveActor::StartSync {
173                namespace,
174                peers,
175                reply,
176            })
177            .await?;
178        reply_rx.await??;
179        Ok(())
180    }
181
182    /// Stop the live sync for a document and leave the gossip swarm.
183    ///
184    /// If `kill_subscribers` is true, all existing event subscribers will be dropped. This means
185    /// they will receive `None` and no further events in case of rejoining the document.
186    pub async fn leave(&self, namespace: NamespaceId, kill_subscribers: bool) -> Result<()> {
187        let (reply, reply_rx) = oneshot::channel();
188        self.to_live_actor
189            .send(ToLiveActor::Leave {
190                namespace,
191                kill_subscribers,
192                reply,
193            })
194            .await?;
195        reply_rx.await??;
196        Ok(())
197    }
198
199    /// Subscribe to replica and sync progress events.
200    pub async fn subscribe(
201        &self,
202        namespace: NamespaceId,
203    ) -> Result<impl Stream<Item = Result<LiveEvent>> + Unpin + 'static> {
204        let content_status_cb = self.content_status_cb.clone();
205
206        // Create a future that sends channel senders to the respective actors.
207        // We clone `self` so that the future does not capture any lifetimes.
208        let this = self;
209
210        // Subscribe to insert events from the replica.
211        let a = {
212            let (s, r) = async_channel::bounded(SUBSCRIBE_CHANNEL_CAP);
213            this.sync.subscribe(namespace, s).await?;
214            Box::pin(r).map(move |ev| LiveEvent::from_replica_event(ev, &content_status_cb))
215        };
216
217        // Subscribe to events from the [`live::Actor`].
218        let b = {
219            let (s, r) = async_channel::bounded(SUBSCRIBE_CHANNEL_CAP);
220            let r = Box::pin(r);
221            let (reply, reply_rx) = oneshot::channel();
222            this.to_live_actor
223                .send(ToLiveActor::Subscribe {
224                    namespace,
225                    sender: s,
226                    reply,
227                })
228                .await?;
229            reply_rx.await??;
230            r.map(|event| Ok(LiveEvent::from(event)))
231        };
232
233        Ok(a.or(b))
234    }
235
236    /// Handle an incoming iroh-docs connection.
237    pub async fn handle_connection(&self, conn: iroh::endpoint::Connection) -> anyhow::Result<()> {
238        self.to_live_actor
239            .send(ToLiveActor::HandleConnection { conn })
240            .await?;
241        Ok(())
242    }
243
244    /// Shutdown the engine.
245    pub async fn shutdown(&self) -> Result<()> {
246        let (reply, reply_rx) = oneshot::channel();
247        self.to_live_actor
248            .send(ToLiveActor::Shutdown { reply })
249            .await?;
250        reply_rx.await?;
251        Ok(())
252    }
253
254    /// Returns the stored `LocalPoolHandle`.
255    pub fn local_pool_handle(&self) -> &LocalPoolHandle {
256        &self.local_pool_handle
257    }
258}
259
260/// Converts an [`EntryStatus`] into a ['ContentStatus'].
261pub fn entry_to_content_status(entry: io::Result<EntryStatus>) -> ContentStatus {
262    match entry {
263        Ok(EntryStatus::Complete) => ContentStatus::Complete,
264        Ok(EntryStatus::Partial) => ContentStatus::Incomplete,
265        Ok(EntryStatus::NotFound) => ContentStatus::Missing,
266        Err(cause) => {
267            tracing::warn!("Error while checking entry status: {cause:?}");
268            ContentStatus::Missing
269        }
270    }
271}
272
273/// Events informing about actions of the live sync progress.
274#[derive(Serialize, Deserialize, Debug, Clone, Eq, PartialEq, strum::Display)]
275pub enum LiveEvent {
276    /// A local insertion.
277    InsertLocal {
278        /// The inserted entry.
279        entry: Entry,
280    },
281    /// Received a remote insert.
282    InsertRemote {
283        /// The peer that sent us the entry.
284        from: PublicKey,
285        /// The inserted entry.
286        entry: Entry,
287        /// If the content is available at the local node
288        content_status: ContentStatus,
289    },
290    /// The content of an entry was downloaded and is now available at the local node
291    ContentReady {
292        /// The content hash of the newly available entry content
293        hash: Hash,
294    },
295    /// All pending content is now ready.
296    ///
297    /// This event signals that all queued content downloads from the last sync run have either
298    /// completed or failed.
299    ///
300    /// It will only be emitted after a [`Self::SyncFinished`] event, never before.
301    ///
302    /// Receiving this event does not guarantee that all content in the document is available. If
303    /// blobs failed to download, this event will still be emitted after all operations completed.
304    PendingContentReady,
305    /// We have a new neighbor in the swarm.
306    NeighborUp(PublicKey),
307    /// We lost a neighbor in the swarm.
308    NeighborDown(PublicKey),
309    /// A set-reconciliation sync finished.
310    SyncFinished(SyncEvent),
311}
312
313impl From<live::Event> for LiveEvent {
314    fn from(ev: live::Event) -> Self {
315        match ev {
316            live::Event::ContentReady { hash } => Self::ContentReady { hash },
317            live::Event::NeighborUp(peer) => Self::NeighborUp(peer),
318            live::Event::NeighborDown(peer) => Self::NeighborDown(peer),
319            live::Event::SyncFinished(ev) => Self::SyncFinished(ev),
320            live::Event::PendingContentReady => Self::PendingContentReady,
321        }
322    }
323}
324
325impl LiveEvent {
326    fn from_replica_event(
327        ev: crate::Event,
328        content_status_cb: &ContentStatusCallback,
329    ) -> Result<Self> {
330        Ok(match ev {
331            crate::Event::LocalInsert { entry, .. } => Self::InsertLocal {
332                entry: entry.into(),
333            },
334            crate::Event::RemoteInsert { entry, from, .. } => Self::InsertRemote {
335                content_status: content_status_cb(entry.content_hash()),
336                entry: entry.into(),
337                from: PublicKey::from_bytes(&from)?,
338            },
339        })
340    }
341}
342
343/// Where to persist the default author.
344///
345/// If set to `Mem`, a new author will be created in the docs store before spawning the sync
346/// engine. Changing the default author will not be persisted.
347///
348/// If set to `Persistent`, the default author will be loaded from and persisted to the specified
349/// path (as hex encoded string of the author's public key).
350#[derive(Debug)]
351pub enum DefaultAuthorStorage {
352    /// Memory storage.
353    Mem,
354    /// File based persistent storage.
355    Persistent(PathBuf),
356}
357
358impl DefaultAuthorStorage {
359    /// Load the default author from the storage.
360    ///
361    /// Will create and save a new author if the storage is empty.
362    ///
363    /// Returns an error if the author can't be parsed or if the uathor does not exist in the docs
364    /// store.
365    pub async fn load(&self, docs_store: &SyncHandle) -> anyhow::Result<AuthorId> {
366        match self {
367            Self::Mem => {
368                let author = Author::new(&mut rand::thread_rng());
369                let author_id = author.id();
370                docs_store.import_author(author).await?;
371                Ok(author_id)
372            }
373            Self::Persistent(ref path) => {
374                if path.exists() {
375                    let data = tokio::fs::read_to_string(path).await.with_context(|| {
376                        format!(
377                            "Failed to read the default author file at `{}`",
378                            path.to_string_lossy()
379                        )
380                    })?;
381                    let author_id = AuthorId::from_str(&data).with_context(|| {
382                        format!(
383                            "Failed to parse the default author from `{}`",
384                            path.to_string_lossy()
385                        )
386                    })?;
387                    if docs_store.export_author(author_id).await?.is_none() {
388                        bail!("The default author is missing from the docs store. To recover, delete the file `{}`. Then iroh will create a new default author.", path.to_string_lossy())
389                    }
390                    Ok(author_id)
391                } else {
392                    let author = Author::new(&mut rand::thread_rng());
393                    let author_id = author.id();
394                    docs_store.import_author(author).await?;
395                    // Make sure to write the default author to the store
396                    // *before* we write the default author ID file.
397                    // Otherwise the default author ID file is effectively a dangling reference.
398                    docs_store.flush_store().await?;
399                    self.persist(author_id).await?;
400                    Ok(author_id)
401                }
402            }
403        }
404    }
405
406    /// Save a new default author.
407    pub async fn persist(&self, author_id: AuthorId) -> anyhow::Result<()> {
408        match self {
409            Self::Mem => {
410                // persistence is not possible for the mem storage so this is a noop.
411            }
412            Self::Persistent(ref path) => {
413                tokio::fs::write(path, author_id.to_string())
414                    .await
415                    .with_context(|| {
416                        format!(
417                            "Failed to write the default author to `{}`",
418                            path.to_string_lossy()
419                        )
420                    })?;
421            }
422        }
423        Ok(())
424    }
425}
426
427/// Persistent default author for a docs engine.
428#[derive(Debug)]
429pub struct DefaultAuthor {
430    value: RwLock<AuthorId>,
431    storage: DefaultAuthorStorage,
432}
433
434impl DefaultAuthor {
435    /// Load the default author from storage.
436    ///
437    /// If the storage is empty creates a new author and persists it.
438    pub async fn load(storage: DefaultAuthorStorage, docs_store: &SyncHandle) -> Result<Self> {
439        let value = storage.load(docs_store).await?;
440        Ok(Self {
441            value: RwLock::new(value),
442            storage,
443        })
444    }
445
446    /// Get the current default author.
447    pub fn get(&self) -> AuthorId {
448        *self.value.read().unwrap()
449    }
450
451    /// Set the default author.
452    pub async fn set(&self, author_id: AuthorId, docs_store: &SyncHandle) -> Result<()> {
453        if docs_store.export_author(author_id).await?.is_none() {
454            bail!("The author does not exist");
455        }
456        self.storage.persist(author_id).await?;
457        *self.value.write().unwrap() = author_id;
458        Ok(())
459    }
460}