iroh_docs/
actor.rs

1//! This contains an actor spawned on a separate thread to process replica and store operations.
2
3use std::{
4    collections::{hash_map, HashMap},
5    num::NonZeroU64,
6    sync::Arc,
7    thread::JoinHandle,
8    time::Duration,
9};
10
11use anyhow::{anyhow, Context, Result};
12use bytes::Bytes;
13use futures_util::FutureExt;
14use iroh_blobs::Hash;
15use serde::{Deserialize, Serialize};
16use tokio::{sync::oneshot, task::JoinSet};
17use tracing::{debug, error, error_span, trace, warn};
18
19use crate::{
20    metrics::Metrics,
21    ranger::Message,
22    store::{
23        fs::{ContentHashesIterator, StoreInstance},
24        DownloadPolicy, ImportNamespaceOutcome, Query, Store,
25    },
26    Author, AuthorHeads, AuthorId, Capability, CapabilityKind, ContentStatus,
27    ContentStatusCallback, Event, NamespaceId, NamespaceSecret, PeerIdBytes, Replica, ReplicaInfo,
28    SignedEntry, SyncOutcome,
29};
30
31const ACTION_CAP: usize = 1024;
32pub(crate) const MAX_COMMIT_DELAY: Duration = Duration::from_millis(500);
33
34#[derive(derive_more::Debug, derive_more::Display)]
35enum Action {
36    #[display("NewAuthor")]
37    ImportAuthor {
38        author: Author,
39        #[debug("reply")]
40        reply: oneshot::Sender<Result<AuthorId>>,
41    },
42    #[display("ExportAuthor")]
43    ExportAuthor {
44        author: AuthorId,
45        #[debug("reply")]
46        reply: oneshot::Sender<Result<Option<Author>>>,
47    },
48    #[display("DeleteAuthor")]
49    DeleteAuthor {
50        author: AuthorId,
51        #[debug("reply")]
52        reply: oneshot::Sender<Result<()>>,
53    },
54    #[display("NewReplica")]
55    ImportNamespace {
56        capability: Capability,
57        #[debug("reply")]
58        reply: oneshot::Sender<Result<NamespaceId>>,
59    },
60    #[display("ListAuthors")]
61    ListAuthors {
62        #[debug("reply")]
63        reply: async_channel::Sender<Result<AuthorId>>,
64    },
65    #[display("ListReplicas")]
66    ListReplicas {
67        #[debug("reply")]
68        reply: async_channel::Sender<Result<(NamespaceId, CapabilityKind)>>,
69    },
70    #[display("ContentHashes")]
71    ContentHashes {
72        #[debug("reply")]
73        reply: oneshot::Sender<Result<ContentHashesIterator>>,
74    },
75    #[display("FlushStore")]
76    FlushStore {
77        #[debug("reply")]
78        reply: oneshot::Sender<Result<()>>,
79    },
80    #[display("Replica({}, {})", _0.fmt_short(), _1)]
81    Replica(NamespaceId, ReplicaAction),
82    #[display("Shutdown")]
83    Shutdown {
84        #[debug("reply")]
85        reply: Option<oneshot::Sender<Store>>,
86    },
87}
88
89#[derive(derive_more::Debug, strum::Display)]
90enum ReplicaAction {
91    Open {
92        #[debug("reply")]
93        reply: oneshot::Sender<Result<()>>,
94        opts: OpenOpts,
95    },
96    Close {
97        #[debug("reply")]
98        reply: oneshot::Sender<Result<bool>>,
99    },
100    GetState {
101        #[debug("reply")]
102        reply: oneshot::Sender<Result<OpenState>>,
103    },
104    SetSync {
105        sync: bool,
106        #[debug("reply")]
107        reply: oneshot::Sender<Result<()>>,
108    },
109    Subscribe {
110        sender: async_channel::Sender<Event>,
111        #[debug("reply")]
112        reply: oneshot::Sender<Result<()>>,
113    },
114    Unsubscribe {
115        sender: async_channel::Sender<Event>,
116        #[debug("reply")]
117        reply: oneshot::Sender<Result<()>>,
118    },
119    InsertLocal {
120        author: AuthorId,
121        key: Bytes,
122        hash: Hash,
123        len: u64,
124        #[debug("reply")]
125        reply: oneshot::Sender<Result<()>>,
126    },
127    DeletePrefix {
128        author: AuthorId,
129        key: Bytes,
130        #[debug("reply")]
131        reply: oneshot::Sender<Result<usize>>,
132    },
133    InsertRemote {
134        entry: SignedEntry,
135        from: PeerIdBytes,
136        content_status: ContentStatus,
137        #[debug("reply")]
138        reply: oneshot::Sender<Result<()>>,
139    },
140    SyncInitialMessage {
141        #[debug("reply")]
142        reply: oneshot::Sender<Result<Message<SignedEntry>>>,
143    },
144    SyncProcessMessage {
145        message: Message<SignedEntry>,
146        from: PeerIdBytes,
147        state: SyncOutcome,
148        #[debug("reply")]
149        reply: oneshot::Sender<Result<(Option<Message<SignedEntry>>, SyncOutcome)>>,
150    },
151    GetSyncPeers {
152        #[debug("reply")]
153        reply: oneshot::Sender<Result<Option<Vec<PeerIdBytes>>>>,
154    },
155    RegisterUsefulPeer {
156        peer: PeerIdBytes,
157        #[debug("reply")]
158        reply: oneshot::Sender<Result<()>>,
159    },
160    GetExact {
161        author: AuthorId,
162        key: Bytes,
163        include_empty: bool,
164        reply: oneshot::Sender<Result<Option<SignedEntry>>>,
165    },
166    GetMany {
167        query: Query,
168        reply: async_channel::Sender<Result<SignedEntry>>,
169    },
170    DropReplica {
171        reply: oneshot::Sender<Result<()>>,
172    },
173    ExportSecretKey {
174        reply: oneshot::Sender<Result<NamespaceSecret>>,
175    },
176    HasNewsForUs {
177        heads: AuthorHeads,
178        #[debug("reply")]
179        reply: oneshot::Sender<Result<Option<NonZeroU64>>>,
180    },
181    SetDownloadPolicy {
182        policy: DownloadPolicy,
183        #[debug("reply")]
184        reply: oneshot::Sender<Result<()>>,
185    },
186    GetDownloadPolicy {
187        #[debug("reply")]
188        reply: oneshot::Sender<Result<DownloadPolicy>>,
189    },
190}
191
192/// The state for an open replica.
193#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize)]
194pub struct OpenState {
195    /// Whether to accept sync requests for this replica.
196    pub sync: bool,
197    /// How many event subscriptions are open
198    pub subscribers: usize,
199    /// By how many handles the replica is currently held open
200    pub handles: usize,
201}
202
203#[derive(Debug)]
204struct OpenReplica {
205    info: ReplicaInfo,
206    sync: bool,
207    handles: usize,
208}
209
210/// The [`SyncHandle`] controls an actor thread which executes replica and store operations.
211///
212/// The [`SyncHandle`] exposes async methods which all send messages into the actor thread, usually
213/// returning something via a return channel. The actor thread itself is a regular [`std::thread`]
214/// which processes incoming messages sequentially.
215///
216/// The handle is cheaply cloneable. Once the last clone is dropped, the actor thread is joined.
217/// The thread will finish processing all messages in the channel queue, and then exit.
218/// To prevent this last drop from blocking the calling thread, you can call [`SyncHandle::shutdown`]
219/// and await its result before dropping the last [`SyncHandle`]. This ensures that
220/// waiting for the actor to finish happens in an async context, and therefore that the final
221/// [`SyncHandle::drop`] will not block.
222#[derive(Debug, Clone)]
223pub struct SyncHandle {
224    tx: async_channel::Sender<Action>,
225    join_handle: Arc<Option<JoinHandle<()>>>,
226    metrics: Arc<Metrics>,
227}
228
229/// Options when opening a replica.
230#[derive(Debug, Default)]
231pub struct OpenOpts {
232    /// Set to true to set sync state to true.
233    pub sync: bool,
234    /// Optionally subscribe to replica events.
235    pub subscribe: Option<async_channel::Sender<Event>>,
236}
237impl OpenOpts {
238    /// Set sync state to true.
239    pub fn sync(mut self) -> Self {
240        self.sync = true;
241        self
242    }
243    /// Subscribe to replica events.
244    pub fn subscribe(mut self, subscribe: async_channel::Sender<Event>) -> Self {
245        self.subscribe = Some(subscribe);
246        self
247    }
248}
249
250#[allow(missing_docs)]
251impl SyncHandle {
252    /// Spawn a sync actor and return a handle.
253    pub fn spawn(
254        store: Store,
255        content_status_callback: Option<ContentStatusCallback>,
256        me: String,
257    ) -> SyncHandle {
258        let metrics = Arc::new(Metrics::default());
259        let (action_tx, action_rx) = async_channel::bounded(ACTION_CAP);
260        let actor = Actor {
261            store,
262            states: Default::default(),
263            action_rx,
264            content_status_callback,
265            tasks: Default::default(),
266            metrics: metrics.clone(),
267        };
268        let join_handle = std::thread::Builder::new()
269            .name("sync-actor".to_string())
270            .spawn(move || {
271                let span = error_span!("sync", %me);
272                let _enter = span.enter();
273
274                if let Err(err) = actor.run() {
275                    error!("Sync actor closed with error: {err:?}");
276                }
277            })
278            .expect("failed to spawn thread");
279        let join_handle = Arc::new(Some(join_handle));
280        SyncHandle {
281            tx: action_tx,
282            join_handle,
283            metrics,
284        }
285    }
286
287    /// Returns the metrics collected in this sync actor.
288    pub fn metrics(&self) -> &Arc<Metrics> {
289        &self.metrics
290    }
291
292    pub async fn open(&self, namespace: NamespaceId, opts: OpenOpts) -> Result<()> {
293        let (reply, rx) = oneshot::channel();
294        let action = ReplicaAction::Open { reply, opts };
295        self.send_replica(namespace, action).await?;
296        rx.await?
297    }
298
299    pub async fn close(&self, namespace: NamespaceId) -> Result<bool> {
300        let (reply, rx) = oneshot::channel();
301        self.send_replica(namespace, ReplicaAction::Close { reply })
302            .await?;
303        rx.await?
304    }
305
306    pub async fn subscribe(
307        &self,
308        namespace: NamespaceId,
309        sender: async_channel::Sender<Event>,
310    ) -> Result<()> {
311        let (reply, rx) = oneshot::channel();
312        self.send_replica(namespace, ReplicaAction::Subscribe { sender, reply })
313            .await?;
314        rx.await?
315    }
316
317    pub async fn unsubscribe(
318        &self,
319        namespace: NamespaceId,
320        sender: async_channel::Sender<Event>,
321    ) -> Result<()> {
322        let (reply, rx) = oneshot::channel();
323        self.send_replica(namespace, ReplicaAction::Unsubscribe { sender, reply })
324            .await?;
325        rx.await?
326    }
327
328    pub async fn set_sync(&self, namespace: NamespaceId, sync: bool) -> Result<()> {
329        let (reply, rx) = oneshot::channel();
330        let action = ReplicaAction::SetSync { sync, reply };
331        self.send_replica(namespace, action).await?;
332        rx.await?
333    }
334
335    pub async fn insert_local(
336        &self,
337        namespace: NamespaceId,
338        author: AuthorId,
339        key: Bytes,
340        hash: Hash,
341        len: u64,
342    ) -> Result<()> {
343        let (reply, rx) = oneshot::channel();
344        let action = ReplicaAction::InsertLocal {
345            author,
346            key,
347            hash,
348            len,
349            reply,
350        };
351        self.send_replica(namespace, action).await?;
352        rx.await?
353    }
354
355    pub async fn delete_prefix(
356        &self,
357        namespace: NamespaceId,
358        author: AuthorId,
359        key: Bytes,
360    ) -> Result<usize> {
361        let (reply, rx) = oneshot::channel();
362        let action = ReplicaAction::DeletePrefix { author, key, reply };
363        self.send_replica(namespace, action).await?;
364        rx.await?
365    }
366
367    pub async fn insert_remote(
368        &self,
369        namespace: NamespaceId,
370        entry: SignedEntry,
371        from: PeerIdBytes,
372        content_status: ContentStatus,
373    ) -> Result<()> {
374        let (reply, rx) = oneshot::channel();
375        let action = ReplicaAction::InsertRemote {
376            entry,
377            from,
378            content_status,
379            reply,
380        };
381        self.send_replica(namespace, action).await?;
382        rx.await?
383    }
384
385    pub async fn sync_initial_message(
386        &self,
387        namespace: NamespaceId,
388    ) -> Result<Message<SignedEntry>> {
389        let (reply, rx) = oneshot::channel();
390        let action = ReplicaAction::SyncInitialMessage { reply };
391        self.send_replica(namespace, action).await?;
392        rx.await?
393    }
394
395    pub async fn sync_process_message(
396        &self,
397        namespace: NamespaceId,
398        message: Message<SignedEntry>,
399        from: PeerIdBytes,
400        state: SyncOutcome,
401    ) -> Result<(Option<Message<SignedEntry>>, SyncOutcome)> {
402        let (reply, rx) = oneshot::channel();
403        let action = ReplicaAction::SyncProcessMessage {
404            reply,
405            message,
406            from,
407            state,
408        };
409        self.send_replica(namespace, action).await?;
410        rx.await?
411    }
412
413    pub async fn get_sync_peers(&self, namespace: NamespaceId) -> Result<Option<Vec<PeerIdBytes>>> {
414        let (reply, rx) = oneshot::channel();
415        let action = ReplicaAction::GetSyncPeers { reply };
416        self.send_replica(namespace, action).await?;
417        rx.await?
418    }
419
420    pub async fn register_useful_peer(
421        &self,
422        namespace: NamespaceId,
423        peer: PeerIdBytes,
424    ) -> Result<()> {
425        let (reply, rx) = oneshot::channel();
426        let action = ReplicaAction::RegisterUsefulPeer { reply, peer };
427        self.send_replica(namespace, action).await?;
428        rx.await?
429    }
430
431    pub async fn has_news_for_us(
432        &self,
433        namespace: NamespaceId,
434        heads: AuthorHeads,
435    ) -> Result<Option<NonZeroU64>> {
436        let (reply, rx) = oneshot::channel();
437        let action = ReplicaAction::HasNewsForUs { reply, heads };
438        self.send_replica(namespace, action).await?;
439        rx.await?
440    }
441
442    pub async fn get_many(
443        &self,
444        namespace: NamespaceId,
445        query: Query,
446        reply: async_channel::Sender<Result<SignedEntry>>,
447    ) -> Result<()> {
448        let action = ReplicaAction::GetMany { query, reply };
449        self.send_replica(namespace, action).await?;
450        Ok(())
451    }
452
453    pub async fn get_exact(
454        &self,
455        namespace: NamespaceId,
456        author: AuthorId,
457        key: Bytes,
458        include_empty: bool,
459    ) -> Result<Option<SignedEntry>> {
460        let (reply, rx) = oneshot::channel();
461        let action = ReplicaAction::GetExact {
462            author,
463            key,
464            include_empty,
465            reply,
466        };
467        self.send_replica(namespace, action).await?;
468        rx.await?
469    }
470
471    pub async fn drop_replica(&self, namespace: NamespaceId) -> Result<()> {
472        let (reply, rx) = oneshot::channel();
473        let action = ReplicaAction::DropReplica { reply };
474        self.send_replica(namespace, action).await?;
475        rx.await?
476    }
477
478    pub async fn export_secret_key(&self, namespace: NamespaceId) -> Result<NamespaceSecret> {
479        let (reply, rx) = oneshot::channel();
480        let action = ReplicaAction::ExportSecretKey { reply };
481        self.send_replica(namespace, action).await?;
482        rx.await?
483    }
484
485    pub async fn get_state(&self, namespace: NamespaceId) -> Result<OpenState> {
486        let (reply, rx) = oneshot::channel();
487        let action = ReplicaAction::GetState { reply };
488        self.send_replica(namespace, action).await?;
489        rx.await?
490    }
491
492    pub async fn shutdown(&self) -> Result<Store> {
493        let (reply, rx) = oneshot::channel();
494        let action = Action::Shutdown { reply: Some(reply) };
495        self.send(action).await?;
496        let store = rx.await?;
497        Ok(store)
498    }
499
500    pub async fn list_authors(&self, reply: async_channel::Sender<Result<AuthorId>>) -> Result<()> {
501        self.send(Action::ListAuthors { reply }).await
502    }
503
504    pub async fn list_replicas(
505        &self,
506        reply: async_channel::Sender<Result<(NamespaceId, CapabilityKind)>>,
507    ) -> Result<()> {
508        self.send(Action::ListReplicas { reply }).await
509    }
510
511    pub async fn import_author(&self, author: Author) -> Result<AuthorId> {
512        let (reply, rx) = oneshot::channel();
513        self.send(Action::ImportAuthor { author, reply }).await?;
514        rx.await?
515    }
516
517    pub async fn export_author(&self, author: AuthorId) -> Result<Option<Author>> {
518        let (reply, rx) = oneshot::channel();
519        self.send(Action::ExportAuthor { author, reply }).await?;
520        rx.await?
521    }
522
523    pub async fn delete_author(&self, author: AuthorId) -> Result<()> {
524        let (reply, rx) = oneshot::channel();
525        self.send(Action::DeleteAuthor { author, reply }).await?;
526        rx.await?
527    }
528
529    pub async fn import_namespace(&self, capability: Capability) -> Result<NamespaceId> {
530        let (reply, rx) = oneshot::channel();
531        self.send(Action::ImportNamespace { capability, reply })
532            .await?;
533        rx.await?
534    }
535
536    pub async fn get_download_policy(&self, namespace: NamespaceId) -> Result<DownloadPolicy> {
537        let (reply, rx) = oneshot::channel();
538        let action = ReplicaAction::GetDownloadPolicy { reply };
539        self.send_replica(namespace, action).await?;
540        rx.await?
541    }
542
543    pub async fn set_download_policy(
544        &self,
545        namespace: NamespaceId,
546        policy: DownloadPolicy,
547    ) -> Result<()> {
548        let (reply, rx) = oneshot::channel();
549        let action = ReplicaAction::SetDownloadPolicy { reply, policy };
550        self.send_replica(namespace, action).await?;
551        rx.await?
552    }
553
554    pub async fn content_hashes(&self) -> Result<ContentHashesIterator> {
555        let (reply, rx) = oneshot::channel();
556        self.send(Action::ContentHashes { reply }).await?;
557        rx.await?
558    }
559
560    /// Makes sure that all pending database operations are persisted to disk.
561    ///
562    /// Otherwise, database operations are batched into bigger transactions for speed.
563    /// Use this if you need to make sure something is written to the database
564    /// before another operation, e.g. to make sure sudden process exits don't corrupt
565    /// your application state.
566    ///
567    /// It's not necessary to call this function before shutdown, as `shutdown` will
568    /// trigger a flush on its own.
569    pub async fn flush_store(&self) -> Result<()> {
570        let (reply, rx) = oneshot::channel();
571        self.send(Action::FlushStore { reply }).await?;
572        rx.await?
573    }
574
575    async fn send(&self, action: Action) -> Result<()> {
576        self.tx
577            .send(action)
578            .await
579            .context("sending to iroh_docs actor failed")?;
580        Ok(())
581    }
582    async fn send_replica(&self, namespace: NamespaceId, action: ReplicaAction) -> Result<()> {
583        self.send(Action::Replica(namespace, action)).await?;
584        Ok(())
585    }
586}
587
588impl Drop for SyncHandle {
589    fn drop(&mut self) {
590        // this means we're dropping the last reference
591        if let Some(handle) = Arc::get_mut(&mut self.join_handle) {
592            // this call is the reason tx can not be a tokio mpsc channel.
593            // we have no control about where drop is called, yet tokio send_blocking panics
594            // when called from inside a tokio runtime.
595            self.tx.send_blocking(Action::Shutdown { reply: None }).ok();
596            let handle = handle.take().expect("this can only run once");
597            if let Err(err) = handle.join() {
598                warn!(?err, "Failed to join sync actor");
599            }
600        }
601    }
602}
603
604struct Actor {
605    store: Store,
606    states: OpenReplicas,
607    action_rx: async_channel::Receiver<Action>,
608    content_status_callback: Option<ContentStatusCallback>,
609    tasks: JoinSet<()>,
610    metrics: Arc<Metrics>,
611}
612
613impl Actor {
614    fn run(self) -> Result<()> {
615        let rt = tokio::runtime::Builder::new_current_thread()
616            .enable_time()
617            .build()?;
618        let local_set = tokio::task::LocalSet::new();
619        local_set.block_on(&rt, async move { self.run_async().await });
620        Ok(())
621    }
622
623    async fn run_async(mut self) {
624        let reply = loop {
625            let timeout = tokio::time::sleep(MAX_COMMIT_DELAY);
626            tokio::pin!(timeout);
627            let action = tokio::select! {
628                _ = &mut timeout => {
629                    if let Err(cause) = self.store.flush() {
630                        error!(?cause, "failed to flush store");
631                    }
632                    continue;
633                }
634                action = self.action_rx.recv() => {
635                    match action {
636                        Ok(action) => action,
637                        Err(async_channel::RecvError) => {
638                            debug!("action channel disconnected");
639                            break None;
640                        }
641
642                    }
643                }
644            };
645            trace!(%action, "tick");
646            self.metrics.actor_tick_main.inc();
647            match action {
648                Action::Shutdown { reply } => {
649                    break reply;
650                }
651                action => {
652                    if self.on_action(action).is_err() {
653                        warn!("failed to send reply: receiver dropped");
654                    }
655                }
656            }
657        };
658
659        if let Err(cause) = self.store.flush() {
660            warn!(?cause, "failed to flush store");
661        }
662        self.close_all();
663        self.tasks.abort_all();
664        debug!("docs actor shutdown");
665        if let Some(reply) = reply {
666            reply.send(self.store).ok();
667        }
668    }
669
670    fn on_action(&mut self, action: Action) -> Result<(), SendReplyError> {
671        match action {
672            Action::Shutdown { .. } => {
673                unreachable!("Shutdown is handled in run()")
674            }
675            Action::ImportAuthor { author, reply } => {
676                let id = author.id();
677                send_reply(reply, self.store.import_author(author).map(|_| id))
678            }
679            Action::ExportAuthor { author, reply } => {
680                send_reply(reply, self.store.get_author(&author))
681            }
682            Action::DeleteAuthor { author, reply } => {
683                send_reply(reply, self.store.delete_author(author))
684            }
685            Action::ImportNamespace { capability, reply } => send_reply_with(reply, self, |this| {
686                let id = capability.id();
687                let outcome = this.store.import_namespace(capability.clone())?;
688                if let ImportNamespaceOutcome::Upgraded = outcome {
689                    if let Ok(state) = this.states.get_mut(&id) {
690                        state.info.merge_capability(capability)?;
691                    }
692                }
693                Ok(id)
694            }),
695            Action::ListAuthors { reply } => {
696                let iter = self
697                    .store
698                    .list_authors()
699                    .map(|a| a.map(|a| a.map(|a| a.id())));
700                self.tasks
701                    .spawn_local(iter_to_channel_async(reply, iter).map(|_| ()));
702                Ok(())
703            }
704            Action::ListReplicas { reply } => {
705                let iter = self.store.list_namespaces();
706                self.tasks
707                    .spawn_local(iter_to_channel_async(reply, iter).map(|_| ()));
708                Ok(())
709            }
710            Action::ContentHashes { reply } => {
711                send_reply_with(reply, self, |this| this.store.content_hashes())
712            }
713            Action::FlushStore { reply } => send_reply(reply, self.store.flush()),
714            Action::Replica(namespace, action) => self.on_replica_action(namespace, action),
715        }
716    }
717
718    fn on_replica_action(
719        &mut self,
720        namespace: NamespaceId,
721        action: ReplicaAction,
722    ) -> Result<(), SendReplyError> {
723        match action {
724            ReplicaAction::Open { reply, opts } => {
725                tracing::trace!("open in");
726                let res = self.open(namespace, opts);
727                tracing::trace!("open out");
728                send_reply(reply, res)
729            }
730            ReplicaAction::Close { reply } => {
731                let res = self.close(namespace);
732                // ignore errors when no receiver is present for close
733                reply.send(Ok(res)).ok();
734                Ok(())
735            }
736            ReplicaAction::Subscribe { sender, reply } => send_reply_with(reply, self, |this| {
737                let state = this.states.get_mut(&namespace)?;
738                state.info.subscribe(sender);
739                Ok(())
740            }),
741            ReplicaAction::Unsubscribe { sender, reply } => send_reply_with(reply, self, |this| {
742                let state = this.states.get_mut(&namespace)?;
743                state.info.unsubscribe(&sender);
744                drop(sender);
745                Ok(())
746            }),
747            ReplicaAction::SetSync { sync, reply } => send_reply_with(reply, self, |this| {
748                let state = this.states.get_mut(&namespace)?;
749                state.sync = sync;
750                Ok(())
751            }),
752            ReplicaAction::InsertLocal {
753                author,
754                key,
755                hash,
756                len,
757                reply,
758            } => send_reply_with(reply, self, move |this| {
759                let author = get_author(&mut this.store, &author)?;
760                let mut replica = this.states.replica(namespace, &mut this.store)?;
761                replica.insert(&key, &author, hash, len)?;
762                this.metrics.new_entries_local.inc();
763                this.metrics.new_entries_local_size.inc_by(len);
764                Ok(())
765            }),
766            ReplicaAction::DeletePrefix { author, key, reply } => {
767                send_reply_with(reply, self, |this| {
768                    let author = get_author(&mut this.store, &author)?;
769                    let mut replica = this.states.replica(namespace, &mut this.store)?;
770                    let res = replica.delete_prefix(&key, &author)?;
771                    Ok(res)
772                })
773            }
774            ReplicaAction::InsertRemote {
775                entry,
776                from,
777                content_status,
778                reply,
779            } => send_reply_with(reply, self, move |this| {
780                let mut replica = this
781                    .states
782                    .replica_if_syncing(&namespace, &mut this.store)?;
783                let len = entry.content_len();
784                replica.insert_remote_entry(entry, from, content_status)?;
785                this.metrics.new_entries_remote.inc();
786                this.metrics.new_entries_remote_size.inc_by(len);
787                Ok(())
788            }),
789
790            ReplicaAction::SyncInitialMessage { reply } => {
791                send_reply_with(reply, self, move |this| {
792                    let mut replica = this
793                        .states
794                        .replica_if_syncing(&namespace, &mut this.store)?;
795                    let res = replica.sync_initial_message()?;
796                    Ok(res)
797                })
798            }
799            ReplicaAction::SyncProcessMessage {
800                message,
801                from,
802                mut state,
803                reply,
804            } => send_reply_with(reply, self, move |this| {
805                let mut replica = this
806                    .states
807                    .replica_if_syncing(&namespace, &mut this.store)?;
808                let res = replica.sync_process_message(message, from, &mut state)?;
809                Ok((res, state))
810            }),
811            ReplicaAction::GetSyncPeers { reply } => send_reply_with(reply, self, move |this| {
812                this.states.ensure_open(&namespace)?;
813                let peers = this.store.get_sync_peers(&namespace)?;
814                Ok(peers.map(|iter| iter.collect()))
815            }),
816            ReplicaAction::RegisterUsefulPeer { peer, reply } => {
817                let res = self.store.register_useful_peer(namespace, peer);
818                send_reply(reply, res)
819            }
820            ReplicaAction::GetExact {
821                author,
822                key,
823                include_empty,
824                reply,
825            } => send_reply_with(reply, self, move |this| {
826                this.states.ensure_open(&namespace)?;
827                this.store.get_exact(namespace, author, key, include_empty)
828            }),
829            ReplicaAction::GetMany { query, reply } => {
830                let iter = self
831                    .states
832                    .ensure_open(&namespace)
833                    .and_then(|_| self.store.get_many(namespace, query));
834                self.tasks
835                    .spawn_local(iter_to_channel_async(reply, iter).map(|_| ()));
836                Ok(())
837            }
838            ReplicaAction::DropReplica { reply } => send_reply_with(reply, self, |this| {
839                this.close(namespace);
840                this.store.remove_replica(&namespace)
841            }),
842            ReplicaAction::ExportSecretKey { reply } => {
843                let res = self
844                    .states
845                    .get_mut(&namespace)
846                    .and_then(|state| Ok(state.info.capability.secret_key()?.clone()));
847                send_reply(reply, res)
848            }
849            ReplicaAction::GetState { reply } => send_reply_with(reply, self, move |this| {
850                let state = this.states.get_mut(&namespace)?;
851                let handles = state.handles;
852                let sync = state.sync;
853                let subscribers = state.info.subscribers_count();
854                Ok(OpenState {
855                    handles,
856                    sync,
857                    subscribers,
858                })
859            }),
860            ReplicaAction::HasNewsForUs { heads, reply } => {
861                let res = self.store.has_news_for_us(namespace, &heads);
862                send_reply(reply, res)
863            }
864            ReplicaAction::SetDownloadPolicy { policy, reply } => {
865                send_reply(reply, self.store.set_download_policy(&namespace, policy))
866            }
867            ReplicaAction::GetDownloadPolicy { reply } => {
868                send_reply(reply, self.store.get_download_policy(&namespace))
869            }
870        }
871    }
872
873    fn close(&mut self, namespace: NamespaceId) -> bool {
874        let res = self.states.close(namespace);
875        if res {
876            self.store.close_replica(namespace);
877        }
878        res
879    }
880
881    fn close_all(&mut self) {
882        for id in self.states.close_all() {
883            self.store.close_replica(id);
884        }
885    }
886
887    fn open(&mut self, namespace: NamespaceId, opts: OpenOpts) -> Result<()> {
888        let open_cb = || {
889            let mut info = self.store.load_replica_info(&namespace)?;
890            if let Some(cb) = &self.content_status_callback {
891                info.set_content_status_callback(Arc::clone(cb));
892            }
893            Ok(info)
894        };
895        self.states.open_with(namespace, opts, open_cb)
896    }
897}
898
899#[derive(Default)]
900struct OpenReplicas(HashMap<NamespaceId, OpenReplica>);
901
902impl OpenReplicas {
903    fn replica<'a, 'b>(
904        &'a mut self,
905        namespace: NamespaceId,
906        store: &'b mut Store,
907    ) -> Result<Replica<'b, &'a mut ReplicaInfo>> {
908        let state = self.get_mut(&namespace)?;
909        Ok(Replica::new(
910            StoreInstance::new(state.info.capability.id(), store),
911            &mut state.info,
912        ))
913    }
914
915    fn replica_if_syncing<'a, 'b>(
916        &'a mut self,
917        namespace: &NamespaceId,
918        store: &'b mut Store,
919    ) -> Result<Replica<'b, &'a mut ReplicaInfo>> {
920        let state = self.get_mut(namespace)?;
921        anyhow::ensure!(state.sync, "sync is not enabled for replica");
922        Ok(Replica::new(
923            StoreInstance::new(state.info.capability.id(), store),
924            &mut state.info,
925        ))
926    }
927
928    fn get_mut(&mut self, namespace: &NamespaceId) -> Result<&mut OpenReplica> {
929        self.0.get_mut(namespace).context("replica not open")
930    }
931
932    fn is_open(&self, namespace: &NamespaceId) -> bool {
933        self.0.contains_key(namespace)
934    }
935
936    fn ensure_open(&self, namespace: &NamespaceId) -> Result<()> {
937        match self.is_open(namespace) {
938            true => Ok(()),
939            false => Err(anyhow!("replica not open")),
940        }
941    }
942    fn open_with(
943        &mut self,
944        namespace: NamespaceId,
945        opts: OpenOpts,
946        mut open_cb: impl FnMut() -> Result<ReplicaInfo>,
947    ) -> Result<()> {
948        match self.0.entry(namespace) {
949            hash_map::Entry::Vacant(e) => {
950                let mut info = open_cb()?;
951                if let Some(sender) = opts.subscribe {
952                    info.subscribe(sender);
953                }
954                debug!(namespace = %namespace.fmt_short(), "open");
955                let state = OpenReplica {
956                    info,
957                    sync: opts.sync,
958                    handles: 1,
959                };
960                e.insert(state);
961            }
962            hash_map::Entry::Occupied(mut e) => {
963                let state = e.get_mut();
964                state.handles += 1;
965                state.sync = state.sync || opts.sync;
966                if let Some(sender) = opts.subscribe {
967                    state.info.subscribe(sender);
968                }
969            }
970        }
971        Ok(())
972    }
973    fn close(&mut self, namespace: NamespaceId) -> bool {
974        match self.0.entry(namespace) {
975            hash_map::Entry::Vacant(_e) => {
976                warn!(namespace = %namespace.fmt_short(), "received close request for closed replica");
977                true
978            }
979            hash_map::Entry::Occupied(mut e) => {
980                let state = e.get_mut();
981                state.handles = state.handles.wrapping_sub(1);
982                if state.handles == 0 {
983                    let _ = e.remove_entry();
984                    debug!(namespace = %namespace.fmt_short(), "close");
985                    true
986                } else {
987                    false
988                }
989            }
990        }
991    }
992
993    fn close_all(&mut self) -> impl Iterator<Item = NamespaceId> + '_ {
994        self.0.drain().map(|(n, _s)| n)
995    }
996}
997
998async fn iter_to_channel_async<T: Send + 'static>(
999    channel: async_channel::Sender<Result<T>>,
1000    iter: Result<impl Iterator<Item = Result<T>>>,
1001) -> Result<(), SendReplyError> {
1002    match iter {
1003        Err(err) => channel.send(Err(err)).await.map_err(send_reply_error)?,
1004        Ok(iter) => {
1005            for item in iter {
1006                channel.send(item).await.map_err(send_reply_error)?;
1007            }
1008        }
1009    }
1010    Ok(())
1011}
1012
1013fn get_author(store: &mut Store, id: &AuthorId) -> Result<Author> {
1014    store.get_author(id)?.context("author not found")
1015}
1016
1017#[derive(Debug)]
1018struct SendReplyError;
1019
1020fn send_reply<T>(sender: oneshot::Sender<T>, value: T) -> Result<(), SendReplyError> {
1021    sender.send(value).map_err(send_reply_error)
1022}
1023
1024fn send_reply_with<T>(
1025    sender: oneshot::Sender<Result<T>>,
1026    this: &mut Actor,
1027    f: impl FnOnce(&mut Actor) -> Result<T>,
1028) -> Result<(), SendReplyError> {
1029    sender.send(f(this)).map_err(send_reply_error)
1030}
1031
1032fn send_reply_error<T>(_err: T) -> SendReplyError {
1033    SendReplyError
1034}
1035
1036#[cfg(test)]
1037mod tests {
1038    use super::*;
1039    use crate::store;
1040    #[tokio::test]
1041    async fn open_close() -> anyhow::Result<()> {
1042        let store = store::Store::memory();
1043        let sync = SyncHandle::spawn(store, None, "foo".into());
1044        let namespace = NamespaceSecret::new(&mut rand::rngs::OsRng {});
1045        let id = namespace.id();
1046        sync.import_namespace(namespace.into()).await?;
1047        sync.open(id, Default::default()).await?;
1048        let (tx, rx) = async_channel::bounded(10);
1049        sync.subscribe(id, tx).await?;
1050        sync.close(id).await?;
1051        assert!(rx.recv().await.is_err());
1052        Ok(())
1053    }
1054}