1use std::{
6 path::PathBuf,
7 str::FromStr,
8 sync::{Arc, RwLock},
9};
10
11use anyhow::{bail, Context, Result};
12use futures_lite::{Stream, StreamExt};
13use iroh::{Endpoint, NodeAddr, PublicKey};
14use iroh_blobs::{
15 api::{blobs::BlobStatus, downloader::Downloader, Store},
16 store::fs::options::{ProtectCb, ProtectOutcome},
17 Hash,
18};
19use iroh_gossip::net::Gossip;
20use serde::{Deserialize, Serialize};
21use tokio::sync::{mpsc, oneshot};
22use tokio_util::task::AbortOnDropHandle;
23use tracing::{debug, error, error_span, Instrument};
24
25use self::live::{LiveActor, ToLiveActor};
26pub use self::{
27 live::SyncEvent,
28 state::{Origin, SyncReason},
29};
30use crate::{
31 actor::SyncHandle, metrics::Metrics, Author, AuthorId, ContentStatus, ContentStatusCallback,
32 Entry, NamespaceId,
33};
34
35mod gossip;
36mod live;
37mod state;
38
39const ACTOR_CHANNEL_CAP: usize = 64;
41const SUBSCRIBE_CHANNEL_CAP: usize = 256;
43
44#[derive(derive_more::Debug)]
47pub struct Engine {
48 pub endpoint: Endpoint,
50 pub sync: SyncHandle,
52 pub default_author: DefaultAuthor,
54 to_live_actor: mpsc::Sender<ToLiveActor>,
55 #[allow(dead_code)]
56 actor_handle: AbortOnDropHandle<()>,
57 #[debug("ContentStatusCallback")]
58 content_status_cb: ContentStatusCallback,
59 blob_store: iroh_blobs::api::Store,
60 _gc_protect_task: AbortOnDropHandle<()>,
61}
62
63impl Engine {
64 pub async fn spawn(
69 endpoint: Endpoint,
70 gossip: Gossip,
71 replica_store: crate::store::Store,
72 bao_store: iroh_blobs::api::Store,
73 downloader: Downloader,
74 default_author_storage: DefaultAuthorStorage,
75 protect_cb: Option<ProtectCallbackHandler>,
76 ) -> anyhow::Result<Self> {
77 let (live_actor_tx, to_live_actor_recv) = mpsc::channel(ACTOR_CHANNEL_CAP);
78 let me = endpoint.node_id().fmt_short();
79
80 let content_status_cb: ContentStatusCallback = {
81 let blobs = bao_store.blobs().clone();
82 Arc::new(move |hash: iroh_blobs::Hash| {
83 let blobs = blobs.clone();
84 Box::pin(async move {
85 let blob_status = blobs.status(hash).await;
86 entry_to_content_status(blob_status)
87 })
88 })
89 };
90 let sync = SyncHandle::spawn(replica_store, Some(content_status_cb.clone()), me.clone());
91
92 let sync2 = sync.clone();
93 let gc_protect_task = AbortOnDropHandle::new(n0_future::task::spawn(async move {
94 let Some(mut protect_handler) = protect_cb else {
95 return;
96 };
97 while let Some(reply_tx) = protect_handler.0.recv().await {
98 let (tx, rx) = mpsc::channel(64);
99 if let Err(_err) = reply_tx.send(rx) {
100 continue;
101 }
102 let hashes = match sync2.content_hashes().await {
103 Ok(hashes) => hashes,
104 Err(err) => {
105 debug!("protect task: getting content hashes failed with {err:#}");
106 if let Err(_err) = tx.send(Err(err)).await {
107 debug!("protect task: failed to forward error");
108 }
109 continue;
110 }
111 };
112 for hash in hashes {
113 if let Err(_err) = tx.send(hash).await {
114 debug!("protect task: failed to forward hash");
115 break;
116 }
117 }
118 }
119 }));
120
121 let actor = LiveActor::new(
122 sync.clone(),
123 endpoint.clone(),
124 gossip.clone(),
125 bao_store.clone(),
126 downloader,
127 to_live_actor_recv,
128 live_actor_tx.clone(),
129 sync.metrics().clone(),
130 );
131 let actor_handle = tokio::task::spawn(
132 async move {
133 if let Err(err) = actor.run().await {
134 error!("sync actor failed: {err:?}");
135 }
136 }
137 .instrument(error_span!("sync", %me)),
138 );
139
140 let default_author = match DefaultAuthor::load(default_author_storage, &sync).await {
141 Ok(author) => author,
142 Err(err) => {
143 let _store = sync.shutdown().await.ok();
146 return Err(err);
147 }
148 };
149
150 Ok(Self {
151 endpoint,
152 sync,
153 to_live_actor: live_actor_tx,
154 actor_handle: AbortOnDropHandle::new(actor_handle),
155 content_status_cb,
156 default_author,
157 blob_store: bao_store,
158 _gc_protect_task: gc_protect_task,
159 })
160 }
161
162 pub fn blob_store(&self) -> &Store {
164 &self.blob_store
165 }
166
167 pub fn metrics(&self) -> &Arc<Metrics> {
169 self.sync.metrics()
170 }
171
172 pub async fn start_sync(&self, namespace: NamespaceId, peers: Vec<NodeAddr>) -> Result<()> {
177 let (reply, reply_rx) = oneshot::channel();
178 self.to_live_actor
179 .send(ToLiveActor::StartSync {
180 namespace,
181 peers,
182 reply,
183 })
184 .await?;
185 reply_rx.await??;
186 Ok(())
187 }
188
189 pub async fn leave(&self, namespace: NamespaceId, kill_subscribers: bool) -> Result<()> {
194 let (reply, reply_rx) = oneshot::channel();
195 self.to_live_actor
196 .send(ToLiveActor::Leave {
197 namespace,
198 kill_subscribers,
199 reply,
200 })
201 .await?;
202 reply_rx.await??;
203 Ok(())
204 }
205
206 pub async fn subscribe(
208 &self,
209 namespace: NamespaceId,
210 ) -> Result<impl Stream<Item = Result<LiveEvent>> + Unpin + 'static> {
211 let content_status_cb = self.content_status_cb.clone();
214
215 let a = {
217 let (s, r) = async_channel::bounded(SUBSCRIBE_CHANNEL_CAP);
218 self.sync.subscribe(namespace, s).await?;
219 Box::pin(r).then(move |ev| {
220 let content_status_cb = content_status_cb.clone();
221 Box::pin(async move { LiveEvent::from_replica_event(ev, &content_status_cb).await })
222 })
223 };
224
225 let b = {
227 let (s, r) = async_channel::bounded(SUBSCRIBE_CHANNEL_CAP);
228 let r = Box::pin(r);
229 let (reply, reply_rx) = oneshot::channel();
230 self.to_live_actor
231 .send(ToLiveActor::Subscribe {
232 namespace,
233 sender: s,
234 reply,
235 })
236 .await?;
237 reply_rx.await??;
238 r.map(|event| Ok(LiveEvent::from(event)))
239 };
240
241 Ok(a.or(b))
242 }
243
244 pub async fn handle_connection(&self, conn: iroh::endpoint::Connection) -> anyhow::Result<()> {
246 self.to_live_actor
247 .send(ToLiveActor::HandleConnection { conn })
248 .await?;
249 Ok(())
250 }
251
252 pub async fn shutdown(&self) -> Result<()> {
254 let (reply, reply_rx) = oneshot::channel();
255 self.to_live_actor
256 .send(ToLiveActor::Shutdown { reply })
257 .await?;
258 reply_rx.await?;
259 Ok(())
260 }
261}
262
263fn entry_to_content_status(entry: irpc::Result<BlobStatus>) -> ContentStatus {
265 match entry {
266 Ok(BlobStatus::Complete { .. }) => ContentStatus::Complete,
267 Ok(BlobStatus::Partial { .. }) => ContentStatus::Incomplete,
268 Ok(BlobStatus::NotFound) => ContentStatus::Missing,
269 Err(cause) => {
270 tracing::warn!("Error while checking entry status: {cause:?}");
271 ContentStatus::Missing
272 }
273 }
274}
275
276#[derive(Serialize, Deserialize, Debug, Clone, Eq, PartialEq, strum::Display)]
278pub enum LiveEvent {
279 InsertLocal {
281 entry: Entry,
283 },
284 InsertRemote {
286 from: PublicKey,
288 entry: Entry,
290 content_status: ContentStatus,
292 },
293 ContentReady {
295 hash: Hash,
297 },
298 PendingContentReady,
308 NeighborUp(PublicKey),
310 NeighborDown(PublicKey),
312 SyncFinished(SyncEvent),
314}
315
316impl From<live::Event> for LiveEvent {
317 fn from(ev: live::Event) -> Self {
318 match ev {
319 live::Event::ContentReady { hash } => Self::ContentReady { hash },
320 live::Event::NeighborUp(peer) => Self::NeighborUp(peer),
321 live::Event::NeighborDown(peer) => Self::NeighborDown(peer),
322 live::Event::SyncFinished(ev) => Self::SyncFinished(ev),
323 live::Event::PendingContentReady => Self::PendingContentReady,
324 }
325 }
326}
327
328impl LiveEvent {
329 async fn from_replica_event(
330 ev: crate::Event,
331 content_status_cb: &ContentStatusCallback,
332 ) -> Result<Self> {
333 Ok(match ev {
334 crate::Event::LocalInsert { entry, .. } => Self::InsertLocal {
335 entry: entry.into(),
336 },
337 crate::Event::RemoteInsert { entry, from, .. } => Self::InsertRemote {
338 content_status: content_status_cb(entry.content_hash()).await,
339 entry: entry.into(),
340 from: PublicKey::from_bytes(&from)?,
341 },
342 })
343 }
344}
345
346#[derive(Debug)]
354pub enum DefaultAuthorStorage {
355 Mem,
357 Persistent(PathBuf),
359}
360
361impl DefaultAuthorStorage {
362 pub async fn load(&self, docs_store: &SyncHandle) -> anyhow::Result<AuthorId> {
369 match self {
370 Self::Mem => {
371 let author = Author::new(&mut rand::thread_rng());
372 let author_id = author.id();
373 docs_store.import_author(author).await?;
374 Ok(author_id)
375 }
376 Self::Persistent(ref path) => {
377 if path.exists() {
378 let data = tokio::fs::read_to_string(path).await.with_context(|| {
379 format!(
380 "Failed to read the default author file at `{}`",
381 path.to_string_lossy()
382 )
383 })?;
384 let author_id = AuthorId::from_str(&data).with_context(|| {
385 format!(
386 "Failed to parse the default author from `{}`",
387 path.to_string_lossy()
388 )
389 })?;
390 if docs_store.export_author(author_id).await?.is_none() {
391 bail!("The default author is missing from the docs store. To recover, delete the file `{}`. Then iroh will create a new default author.", path.to_string_lossy())
392 }
393 Ok(author_id)
394 } else {
395 let author = Author::new(&mut rand::thread_rng());
396 let author_id = author.id();
397 docs_store.import_author(author).await?;
398 docs_store.flush_store().await?;
402 self.persist(author_id).await?;
403 Ok(author_id)
404 }
405 }
406 }
407 }
408
409 pub async fn persist(&self, author_id: AuthorId) -> anyhow::Result<()> {
411 match self {
412 Self::Mem => {
413 }
415 Self::Persistent(ref path) => {
416 tokio::fs::write(path, author_id.to_string())
417 .await
418 .with_context(|| {
419 format!(
420 "Failed to write the default author to `{}`",
421 path.to_string_lossy()
422 )
423 })?;
424 }
425 }
426 Ok(())
427 }
428}
429
430#[derive(Debug)]
432pub struct DefaultAuthor {
433 value: RwLock<AuthorId>,
434 storage: DefaultAuthorStorage,
435}
436
437impl DefaultAuthor {
438 pub async fn load(storage: DefaultAuthorStorage, docs_store: &SyncHandle) -> Result<Self> {
442 let value = storage.load(docs_store).await?;
443 Ok(Self {
444 value: RwLock::new(value),
445 storage,
446 })
447 }
448
449 pub fn get(&self) -> AuthorId {
451 *self.value.read().unwrap()
452 }
453
454 pub async fn set(&self, author_id: AuthorId, docs_store: &SyncHandle) -> Result<()> {
456 if docs_store.export_author(author_id).await?.is_none() {
457 bail!("The author does not exist");
458 }
459 self.storage.persist(author_id).await?;
460 *self.value.write().unwrap() = author_id;
461 Ok(())
462 }
463}
464
465#[derive(Debug)]
466struct ProtectCallbackSender(mpsc::Sender<oneshot::Sender<mpsc::Receiver<Result<Hash>>>>);
467
468#[derive(Debug)]
472pub struct ProtectCallbackHandler(
473 pub(crate) mpsc::Receiver<oneshot::Sender<mpsc::Receiver<Result<Hash>>>>,
474);
475
476impl ProtectCallbackHandler {
477 pub fn new() -> (Self, ProtectCb) {
488 let (tx, rx) = mpsc::channel(4);
489 let cb = ProtectCallbackSender(tx).into_cb();
490 let handler = ProtectCallbackHandler(rx);
491 (handler, cb)
492 }
493}
494
495impl ProtectCallbackSender {
496 fn into_cb(self) -> ProtectCb {
497 let start_tx = self.0.clone();
498 Arc::new(move |live| {
499 let start_tx = start_tx.clone();
500 Box::pin(async move {
501 let (tx, rx) = oneshot::channel();
502 if let Err(_err) = start_tx.send(tx).await {
503 tracing::warn!("Failed to get protected hashes from docs: ProtectCallback receiver dropped");
504 return ProtectOutcome::Abort;
505 }
506 let mut rx = match rx.await {
507 Ok(rx) => rx,
508 Err(_err) => {
509 tracing::warn!("Failed to get protected hashes from docs: ProtectCallback sender dropped");
510 return ProtectOutcome::Abort;
511 }
512 };
513 while let Some(res) = rx.recv().await {
514 match res {
515 Err(err) => {
516 tracing::warn!("Getting protected hashes produces error: {err:#}");
517 return ProtectOutcome::Abort;
518 }
519 Ok(hash) => {
520 live.insert(hash);
521 }
522 }
523 }
524 ProtectOutcome::Continue
525 })
526 })
527 }
528}