1use std::{
6 io,
7 path::PathBuf,
8 str::FromStr,
9 sync::{Arc, RwLock},
10};
11
12use anyhow::{bail, Context, Result};
13use futures_lite::{Stream, StreamExt};
14use iroh::{Endpoint, NodeAddr, PublicKey};
15use iroh_blobs::{
16 downloader::Downloader, net_protocol::ProtectCb, store::EntryStatus,
17 util::local_pool::LocalPoolHandle, Hash,
18};
19use iroh_gossip::net::Gossip;
20use serde::{Deserialize, Serialize};
21use tokio::sync::{mpsc, oneshot};
22use tokio_util::task::AbortOnDropHandle;
23use tracing::{error, error_span, Instrument};
24
25use self::live::{LiveActor, ToLiveActor};
26pub use self::{
27 live::SyncEvent,
28 state::{Origin, SyncReason},
29};
30use crate::{
31 actor::SyncHandle, metrics::Metrics, Author, AuthorId, ContentStatus, ContentStatusCallback,
32 Entry, NamespaceId,
33};
34
35mod gossip;
36mod live;
37mod state;
38
39const ACTOR_CHANNEL_CAP: usize = 64;
41const SUBSCRIBE_CHANNEL_CAP: usize = 256;
43
44#[derive(derive_more::Debug)]
47pub struct Engine<D> {
48 pub endpoint: Endpoint,
50 pub sync: SyncHandle,
52 pub default_author: DefaultAuthor,
54 to_live_actor: mpsc::Sender<ToLiveActor>,
55 #[allow(dead_code)]
56 actor_handle: AbortOnDropHandle<()>,
57 #[debug("ContentStatusCallback")]
58 content_status_cb: ContentStatusCallback,
59 local_pool_handle: LocalPoolHandle,
60 blob_store: D,
61}
62
63impl<D: iroh_blobs::store::Store> Engine<D> {
64 pub async fn spawn(
69 endpoint: Endpoint,
70 gossip: Gossip,
71 replica_store: crate::store::Store,
72 bao_store: D,
73 downloader: Downloader,
74 default_author_storage: DefaultAuthorStorage,
75 local_pool_handle: LocalPoolHandle,
76 ) -> anyhow::Result<Self> {
77 let (live_actor_tx, to_live_actor_recv) = mpsc::channel(ACTOR_CHANNEL_CAP);
78 let me = endpoint.node_id().fmt_short();
79
80 let content_status_cb = {
81 let bao_store = bao_store.clone();
82 Arc::new(move |hash| entry_to_content_status(bao_store.entry_status_sync(&hash)))
83 };
84 let sync = SyncHandle::spawn(replica_store, Some(content_status_cb.clone()), me.clone());
85
86 let actor = LiveActor::new(
87 sync.clone(),
88 endpoint.clone(),
89 gossip.clone(),
90 bao_store.clone(),
91 downloader,
92 to_live_actor_recv,
93 live_actor_tx.clone(),
94 sync.metrics().clone(),
95 );
96 let actor_handle = tokio::task::spawn(
97 async move {
98 if let Err(err) = actor.run().await {
99 error!("sync actor failed: {err:?}");
100 }
101 }
102 .instrument(error_span!("sync", %me)),
103 );
104
105 let default_author = match DefaultAuthor::load(default_author_storage, &sync).await {
106 Ok(author) => author,
107 Err(err) => {
108 let _store = sync.shutdown().await.ok();
111 return Err(err);
112 }
113 };
114
115 Ok(Self {
116 endpoint,
117 sync,
118 to_live_actor: live_actor_tx,
119 actor_handle: AbortOnDropHandle::new(actor_handle),
120 content_status_cb,
121 default_author,
122 local_pool_handle,
123 blob_store: bao_store,
124 })
125 }
126
127 pub fn protect_cb(&self) -> ProtectCb {
130 let sync = self.sync.clone();
131 Box::new(move |live| {
132 let sync = sync.clone();
133 Box::pin(async move {
134 let doc_hashes = match sync.content_hashes().await {
135 Ok(hashes) => hashes,
136 Err(err) => {
137 tracing::warn!("Error getting doc hashes: {}", err);
138 return;
139 }
140 };
141 for hash in doc_hashes {
142 match hash {
143 Ok(hash) => {
144 live.insert(hash);
145 }
146 Err(err) => {
147 tracing::error!("Error getting doc hash: {}", err);
148 }
149 }
150 }
151 })
152 })
153 }
154
155 pub fn blob_store(&self) -> &D {
157 &self.blob_store
158 }
159
160 pub fn metrics(&self) -> &Arc<Metrics> {
162 self.sync.metrics()
163 }
164
165 pub async fn start_sync(&self, namespace: NamespaceId, peers: Vec<NodeAddr>) -> Result<()> {
170 let (reply, reply_rx) = oneshot::channel();
171 self.to_live_actor
172 .send(ToLiveActor::StartSync {
173 namespace,
174 peers,
175 reply,
176 })
177 .await?;
178 reply_rx.await??;
179 Ok(())
180 }
181
182 pub async fn leave(&self, namespace: NamespaceId, kill_subscribers: bool) -> Result<()> {
187 let (reply, reply_rx) = oneshot::channel();
188 self.to_live_actor
189 .send(ToLiveActor::Leave {
190 namespace,
191 kill_subscribers,
192 reply,
193 })
194 .await?;
195 reply_rx.await??;
196 Ok(())
197 }
198
199 pub async fn subscribe(
201 &self,
202 namespace: NamespaceId,
203 ) -> Result<impl Stream<Item = Result<LiveEvent>> + Unpin + 'static> {
204 let content_status_cb = self.content_status_cb.clone();
205
206 let this = self;
209
210 let a = {
212 let (s, r) = async_channel::bounded(SUBSCRIBE_CHANNEL_CAP);
213 this.sync.subscribe(namespace, s).await?;
214 Box::pin(r).map(move |ev| LiveEvent::from_replica_event(ev, &content_status_cb))
215 };
216
217 let b = {
219 let (s, r) = async_channel::bounded(SUBSCRIBE_CHANNEL_CAP);
220 let r = Box::pin(r);
221 let (reply, reply_rx) = oneshot::channel();
222 this.to_live_actor
223 .send(ToLiveActor::Subscribe {
224 namespace,
225 sender: s,
226 reply,
227 })
228 .await?;
229 reply_rx.await??;
230 r.map(|event| Ok(LiveEvent::from(event)))
231 };
232
233 Ok(a.or(b))
234 }
235
236 pub async fn handle_connection(&self, conn: iroh::endpoint::Connection) -> anyhow::Result<()> {
238 self.to_live_actor
239 .send(ToLiveActor::HandleConnection { conn })
240 .await?;
241 Ok(())
242 }
243
244 pub async fn shutdown(&self) -> Result<()> {
246 let (reply, reply_rx) = oneshot::channel();
247 self.to_live_actor
248 .send(ToLiveActor::Shutdown { reply })
249 .await?;
250 reply_rx.await?;
251 Ok(())
252 }
253
254 pub fn local_pool_handle(&self) -> &LocalPoolHandle {
256 &self.local_pool_handle
257 }
258}
259
260pub fn entry_to_content_status(entry: io::Result<EntryStatus>) -> ContentStatus {
262 match entry {
263 Ok(EntryStatus::Complete) => ContentStatus::Complete,
264 Ok(EntryStatus::Partial) => ContentStatus::Incomplete,
265 Ok(EntryStatus::NotFound) => ContentStatus::Missing,
266 Err(cause) => {
267 tracing::warn!("Error while checking entry status: {cause:?}");
268 ContentStatus::Missing
269 }
270 }
271}
272
273#[derive(Serialize, Deserialize, Debug, Clone, Eq, PartialEq, strum::Display)]
275pub enum LiveEvent {
276 InsertLocal {
278 entry: Entry,
280 },
281 InsertRemote {
283 from: PublicKey,
285 entry: Entry,
287 content_status: ContentStatus,
289 },
290 ContentReady {
292 hash: Hash,
294 },
295 PendingContentReady,
305 NeighborUp(PublicKey),
307 NeighborDown(PublicKey),
309 SyncFinished(SyncEvent),
311}
312
313impl From<live::Event> for LiveEvent {
314 fn from(ev: live::Event) -> Self {
315 match ev {
316 live::Event::ContentReady { hash } => Self::ContentReady { hash },
317 live::Event::NeighborUp(peer) => Self::NeighborUp(peer),
318 live::Event::NeighborDown(peer) => Self::NeighborDown(peer),
319 live::Event::SyncFinished(ev) => Self::SyncFinished(ev),
320 live::Event::PendingContentReady => Self::PendingContentReady,
321 }
322 }
323}
324
325impl LiveEvent {
326 fn from_replica_event(
327 ev: crate::Event,
328 content_status_cb: &ContentStatusCallback,
329 ) -> Result<Self> {
330 Ok(match ev {
331 crate::Event::LocalInsert { entry, .. } => Self::InsertLocal {
332 entry: entry.into(),
333 },
334 crate::Event::RemoteInsert { entry, from, .. } => Self::InsertRemote {
335 content_status: content_status_cb(entry.content_hash()),
336 entry: entry.into(),
337 from: PublicKey::from_bytes(&from)?,
338 },
339 })
340 }
341}
342
343#[derive(Debug)]
351pub enum DefaultAuthorStorage {
352 Mem,
354 Persistent(PathBuf),
356}
357
358impl DefaultAuthorStorage {
359 pub async fn load(&self, docs_store: &SyncHandle) -> anyhow::Result<AuthorId> {
366 match self {
367 Self::Mem => {
368 let author = Author::new(&mut rand::thread_rng());
369 let author_id = author.id();
370 docs_store.import_author(author).await?;
371 Ok(author_id)
372 }
373 Self::Persistent(ref path) => {
374 if path.exists() {
375 let data = tokio::fs::read_to_string(path).await.with_context(|| {
376 format!(
377 "Failed to read the default author file at `{}`",
378 path.to_string_lossy()
379 )
380 })?;
381 let author_id = AuthorId::from_str(&data).with_context(|| {
382 format!(
383 "Failed to parse the default author from `{}`",
384 path.to_string_lossy()
385 )
386 })?;
387 if docs_store.export_author(author_id).await?.is_none() {
388 bail!("The default author is missing from the docs store. To recover, delete the file `{}`. Then iroh will create a new default author.", path.to_string_lossy())
389 }
390 Ok(author_id)
391 } else {
392 let author = Author::new(&mut rand::thread_rng());
393 let author_id = author.id();
394 docs_store.import_author(author).await?;
395 docs_store.flush_store().await?;
399 self.persist(author_id).await?;
400 Ok(author_id)
401 }
402 }
403 }
404 }
405
406 pub async fn persist(&self, author_id: AuthorId) -> anyhow::Result<()> {
408 match self {
409 Self::Mem => {
410 }
412 Self::Persistent(ref path) => {
413 tokio::fs::write(path, author_id.to_string())
414 .await
415 .with_context(|| {
416 format!(
417 "Failed to write the default author to `{}`",
418 path.to_string_lossy()
419 )
420 })?;
421 }
422 }
423 Ok(())
424 }
425}
426
427#[derive(Debug)]
429pub struct DefaultAuthor {
430 value: RwLock<AuthorId>,
431 storage: DefaultAuthorStorage,
432}
433
434impl DefaultAuthor {
435 pub async fn load(storage: DefaultAuthorStorage, docs_store: &SyncHandle) -> Result<Self> {
439 let value = storage.load(docs_store).await?;
440 Ok(Self {
441 value: RwLock::new(value),
442 storage,
443 })
444 }
445
446 pub fn get(&self) -> AuthorId {
448 *self.value.read().unwrap()
449 }
450
451 pub async fn set(&self, author_id: AuthorId, docs_store: &SyncHandle) -> Result<()> {
453 if docs_store.export_author(author_id).await?.is_none() {
454 bail!("The author does not exist");
455 }
456 self.storage.persist(author_id).await?;
457 *self.value.write().unwrap() = author_id;
458 Ok(())
459 }
460}