iroh_docs/
actor.rs

1//! This contains an actor spawned on a separate thread to process replica and store operations.
2
3use std::{
4    collections::{hash_map, HashMap},
5    num::NonZeroU64,
6    sync::Arc,
7    thread::JoinHandle,
8    time::Duration,
9};
10
11use anyhow::{anyhow, Context, Result};
12use bytes::Bytes;
13use futures_util::FutureExt;
14use iroh_blobs::Hash;
15use irpc::channel::mpsc;
16use serde::{Deserialize, Serialize};
17use tokio::{sync::oneshot, task::JoinSet};
18use tracing::{debug, error, error_span, trace, warn};
19
20use crate::{
21    api::{
22        protocol::{AuthorListResponse, ListResponse},
23        RpcError, RpcResult,
24    },
25    metrics::Metrics,
26    ranger::Message,
27    store::{
28        fs::{ContentHashesIterator, StoreInstance},
29        DownloadPolicy, ImportNamespaceOutcome, Query, Store,
30    },
31    Author, AuthorHeads, AuthorId, Capability, ContentStatus, ContentStatusCallback, Event,
32    NamespaceId, NamespaceSecret, PeerIdBytes, Replica, ReplicaInfo, SignedEntry, SyncOutcome,
33};
34
35const ACTION_CAP: usize = 1024;
36pub(crate) const MAX_COMMIT_DELAY: Duration = Duration::from_millis(500);
37
38#[derive(derive_more::Debug, derive_more::Display)]
39enum Action {
40    #[display("NewAuthor")]
41    ImportAuthor {
42        author: Author,
43        #[debug("reply")]
44        reply: oneshot::Sender<Result<AuthorId>>,
45    },
46    #[display("ExportAuthor")]
47    ExportAuthor {
48        author: AuthorId,
49        #[debug("reply")]
50        reply: oneshot::Sender<Result<Option<Author>>>,
51    },
52    #[display("DeleteAuthor")]
53    DeleteAuthor {
54        author: AuthorId,
55        #[debug("reply")]
56        reply: oneshot::Sender<Result<()>>,
57    },
58    #[display("NewReplica")]
59    ImportNamespace {
60        capability: Capability,
61        #[debug("reply")]
62        reply: oneshot::Sender<Result<NamespaceId>>,
63    },
64    #[display("ListAuthors")]
65    ListAuthors {
66        #[debug("reply")]
67        reply: mpsc::Sender<RpcResult<AuthorListResponse>>,
68    },
69    #[display("ListReplicas")]
70    ListReplicas {
71        #[debug("reply")]
72        reply: mpsc::Sender<RpcResult<ListResponse>>,
73    },
74    #[display("ContentHashes")]
75    ContentHashes {
76        #[debug("reply")]
77        reply: oneshot::Sender<Result<ContentHashesIterator>>,
78    },
79    #[display("FlushStore")]
80    FlushStore {
81        #[debug("reply")]
82        reply: oneshot::Sender<Result<()>>,
83    },
84    #[display("Replica({}, {})", _0.fmt_short(), _1)]
85    Replica(NamespaceId, ReplicaAction),
86    #[display("Shutdown")]
87    Shutdown {
88        #[debug("reply")]
89        reply: Option<oneshot::Sender<Store>>,
90    },
91}
92
93#[derive(derive_more::Debug, strum::Display)]
94enum ReplicaAction {
95    Open {
96        #[debug("reply")]
97        reply: oneshot::Sender<Result<()>>,
98        opts: OpenOpts,
99    },
100    Close {
101        #[debug("reply")]
102        reply: oneshot::Sender<Result<bool>>,
103    },
104    GetState {
105        #[debug("reply")]
106        reply: oneshot::Sender<Result<OpenState>>,
107    },
108    SetSync {
109        sync: bool,
110        #[debug("reply")]
111        reply: oneshot::Sender<Result<()>>,
112    },
113    Subscribe {
114        sender: async_channel::Sender<Event>,
115        #[debug("reply")]
116        reply: oneshot::Sender<Result<()>>,
117    },
118    Unsubscribe {
119        sender: async_channel::Sender<Event>,
120        #[debug("reply")]
121        reply: oneshot::Sender<Result<()>>,
122    },
123    InsertLocal {
124        author: AuthorId,
125        key: Bytes,
126        hash: Hash,
127        len: u64,
128        #[debug("reply")]
129        reply: oneshot::Sender<Result<()>>,
130    },
131    DeletePrefix {
132        author: AuthorId,
133        key: Bytes,
134        #[debug("reply")]
135        reply: oneshot::Sender<Result<usize>>,
136    },
137    InsertRemote {
138        entry: SignedEntry,
139        from: PeerIdBytes,
140        content_status: ContentStatus,
141        #[debug("reply")]
142        reply: oneshot::Sender<Result<()>>,
143    },
144    SyncInitialMessage {
145        #[debug("reply")]
146        reply: oneshot::Sender<Result<Message<SignedEntry>>>,
147    },
148    SyncProcessMessage {
149        message: Message<SignedEntry>,
150        from: PeerIdBytes,
151        state: SyncOutcome,
152        #[debug("reply")]
153        reply: oneshot::Sender<Result<(Option<Message<SignedEntry>>, SyncOutcome)>>,
154    },
155    GetSyncPeers {
156        #[debug("reply")]
157        reply: oneshot::Sender<Result<Option<Vec<PeerIdBytes>>>>,
158    },
159    RegisterUsefulPeer {
160        peer: PeerIdBytes,
161        #[debug("reply")]
162        reply: oneshot::Sender<Result<()>>,
163    },
164    GetExact {
165        author: AuthorId,
166        key: Bytes,
167        include_empty: bool,
168        reply: oneshot::Sender<Result<Option<SignedEntry>>>,
169    },
170    GetMany {
171        query: Query,
172        reply: mpsc::Sender<RpcResult<SignedEntry>>,
173    },
174    DropReplica {
175        reply: oneshot::Sender<Result<()>>,
176    },
177    ExportSecretKey {
178        reply: oneshot::Sender<Result<NamespaceSecret>>,
179    },
180    HasNewsForUs {
181        heads: AuthorHeads,
182        #[debug("reply")]
183        reply: oneshot::Sender<Result<Option<NonZeroU64>>>,
184    },
185    SetDownloadPolicy {
186        policy: DownloadPolicy,
187        #[debug("reply")]
188        reply: oneshot::Sender<Result<()>>,
189    },
190    GetDownloadPolicy {
191        #[debug("reply")]
192        reply: oneshot::Sender<Result<DownloadPolicy>>,
193    },
194}
195
196/// The state for an open replica.
197#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize)]
198pub struct OpenState {
199    /// Whether to accept sync requests for this replica.
200    pub sync: bool,
201    /// How many event subscriptions are open
202    pub subscribers: usize,
203    /// By how many handles the replica is currently held open
204    pub handles: usize,
205}
206
207#[derive(Debug)]
208struct OpenReplica {
209    info: ReplicaInfo,
210    sync: bool,
211    handles: usize,
212}
213
214/// The [`SyncHandle`] controls an actor thread which executes replica and store operations.
215///
216/// The [`SyncHandle`] exposes async methods which all send messages into the actor thread, usually
217/// returning something via a return channel. The actor thread itself is a regular [`std::thread`]
218/// which processes incoming messages sequentially.
219///
220/// The handle is cheaply cloneable. Once the last clone is dropped, the actor thread is joined.
221/// The thread will finish processing all messages in the channel queue, and then exit.
222/// To prevent this last drop from blocking the calling thread, you can call [`SyncHandle::shutdown`]
223/// and await its result before dropping the last [`SyncHandle`]. This ensures that
224/// waiting for the actor to finish happens in an async context, and therefore that the final
225/// [`SyncHandle::drop`] will not block.
226#[derive(Debug, Clone)]
227pub struct SyncHandle {
228    tx: async_channel::Sender<Action>,
229    join_handle: Arc<Option<JoinHandle<()>>>,
230    metrics: Arc<Metrics>,
231}
232
233/// Options when opening a replica.
234#[derive(Debug, Default)]
235pub struct OpenOpts {
236    /// Set to true to set sync state to true.
237    pub sync: bool,
238    /// Optionally subscribe to replica events.
239    pub subscribe: Option<async_channel::Sender<Event>>,
240}
241
242impl OpenOpts {
243    /// Set sync state to true.
244    pub fn sync(mut self) -> Self {
245        self.sync = true;
246        self
247    }
248    /// Subscribe to replica events.
249    pub fn subscribe(mut self, subscribe: async_channel::Sender<Event>) -> Self {
250        self.subscribe = Some(subscribe);
251        self
252    }
253}
254
255#[allow(missing_docs)]
256impl SyncHandle {
257    /// Spawn a sync actor and return a handle.
258    pub fn spawn(
259        store: Store,
260        content_status_callback: Option<ContentStatusCallback>,
261        me: String,
262    ) -> SyncHandle {
263        let metrics = Arc::new(Metrics::default());
264        let (action_tx, action_rx) = async_channel::bounded(ACTION_CAP);
265        let actor = Actor {
266            store,
267            states: Default::default(),
268            action_rx,
269            content_status_callback,
270            tasks: Default::default(),
271            metrics: metrics.clone(),
272        };
273        let join_handle = std::thread::Builder::new()
274            .name("sync-actor".to_string())
275            .spawn(move || {
276                let span = error_span!("sync", %me);
277                let _enter = span.enter();
278
279                if let Err(err) = actor.run() {
280                    error!("Sync actor closed with error: {err:?}");
281                }
282            })
283            .expect("failed to spawn thread");
284        let join_handle = Arc::new(Some(join_handle));
285        SyncHandle {
286            tx: action_tx,
287            join_handle,
288            metrics,
289        }
290    }
291
292    /// Returns the metrics collected in this sync actor.
293    pub fn metrics(&self) -> &Arc<Metrics> {
294        &self.metrics
295    }
296
297    pub async fn open(&self, namespace: NamespaceId, opts: OpenOpts) -> Result<()> {
298        tracing::debug!("SyncHandle::open called");
299        let (reply, rx) = oneshot::channel();
300        let action = ReplicaAction::Open { reply, opts };
301        self.send_replica(namespace, action).await?;
302        rx.await?
303    }
304
305    pub async fn close(&self, namespace: NamespaceId) -> Result<bool> {
306        let (reply, rx) = oneshot::channel();
307        self.send_replica(namespace, ReplicaAction::Close { reply })
308            .await?;
309        rx.await?
310    }
311
312    pub async fn subscribe(
313        &self,
314        namespace: NamespaceId,
315        sender: async_channel::Sender<Event>,
316    ) -> Result<()> {
317        let (reply, rx) = oneshot::channel();
318        self.send_replica(namespace, ReplicaAction::Subscribe { sender, reply })
319            .await?;
320        rx.await?
321    }
322
323    pub async fn unsubscribe(
324        &self,
325        namespace: NamespaceId,
326        sender: async_channel::Sender<Event>,
327    ) -> Result<()> {
328        let (reply, rx) = oneshot::channel();
329        self.send_replica(namespace, ReplicaAction::Unsubscribe { sender, reply })
330            .await?;
331        rx.await?
332    }
333
334    pub async fn set_sync(&self, namespace: NamespaceId, sync: bool) -> Result<()> {
335        let (reply, rx) = oneshot::channel();
336        let action = ReplicaAction::SetSync { sync, reply };
337        self.send_replica(namespace, action).await?;
338        rx.await?
339    }
340
341    pub async fn insert_local(
342        &self,
343        namespace: NamespaceId,
344        author: AuthorId,
345        key: Bytes,
346        hash: Hash,
347        len: u64,
348    ) -> Result<()> {
349        let (reply, rx) = oneshot::channel();
350        let action = ReplicaAction::InsertLocal {
351            author,
352            key,
353            hash,
354            len,
355            reply,
356        };
357        self.send_replica(namespace, action).await?;
358        rx.await?
359    }
360
361    pub async fn delete_prefix(
362        &self,
363        namespace: NamespaceId,
364        author: AuthorId,
365        key: Bytes,
366    ) -> Result<usize> {
367        let (reply, rx) = oneshot::channel();
368        let action = ReplicaAction::DeletePrefix { author, key, reply };
369        self.send_replica(namespace, action).await?;
370        rx.await?
371    }
372
373    pub async fn insert_remote(
374        &self,
375        namespace: NamespaceId,
376        entry: SignedEntry,
377        from: PeerIdBytes,
378        content_status: ContentStatus,
379    ) -> Result<()> {
380        let (reply, rx) = oneshot::channel();
381        let action = ReplicaAction::InsertRemote {
382            entry,
383            from,
384            content_status,
385            reply,
386        };
387        self.send_replica(namespace, action).await?;
388        rx.await?
389    }
390
391    pub async fn sync_initial_message(
392        &self,
393        namespace: NamespaceId,
394    ) -> Result<Message<SignedEntry>> {
395        let (reply, rx) = oneshot::channel();
396        let action = ReplicaAction::SyncInitialMessage { reply };
397        self.send_replica(namespace, action).await?;
398        rx.await?
399    }
400
401    pub async fn sync_process_message(
402        &self,
403        namespace: NamespaceId,
404        message: Message<SignedEntry>,
405        from: PeerIdBytes,
406        state: SyncOutcome,
407    ) -> Result<(Option<Message<SignedEntry>>, SyncOutcome)> {
408        let (reply, rx) = oneshot::channel();
409        let action = ReplicaAction::SyncProcessMessage {
410            reply,
411            message,
412            from,
413            state,
414        };
415        self.send_replica(namespace, action).await?;
416        rx.await?
417    }
418
419    pub async fn get_sync_peers(&self, namespace: NamespaceId) -> Result<Option<Vec<PeerIdBytes>>> {
420        let (reply, rx) = oneshot::channel();
421        let action = ReplicaAction::GetSyncPeers { reply };
422        self.send_replica(namespace, action).await?;
423        rx.await?
424    }
425
426    pub async fn register_useful_peer(
427        &self,
428        namespace: NamespaceId,
429        peer: PeerIdBytes,
430    ) -> Result<()> {
431        let (reply, rx) = oneshot::channel();
432        let action = ReplicaAction::RegisterUsefulPeer { reply, peer };
433        self.send_replica(namespace, action).await?;
434        rx.await?
435    }
436
437    pub async fn has_news_for_us(
438        &self,
439        namespace: NamespaceId,
440        heads: AuthorHeads,
441    ) -> Result<Option<NonZeroU64>> {
442        let (reply, rx) = oneshot::channel();
443        let action = ReplicaAction::HasNewsForUs { reply, heads };
444        self.send_replica(namespace, action).await?;
445        rx.await?
446    }
447
448    pub async fn get_many(
449        &self,
450        namespace: NamespaceId,
451        query: Query,
452        reply: mpsc::Sender<RpcResult<SignedEntry>>,
453    ) -> Result<()> {
454        let action = ReplicaAction::GetMany { query, reply };
455        self.send_replica(namespace, action).await?;
456        Ok(())
457    }
458
459    pub async fn get_exact(
460        &self,
461        namespace: NamespaceId,
462        author: AuthorId,
463        key: Bytes,
464        include_empty: bool,
465    ) -> Result<Option<SignedEntry>> {
466        let (reply, rx) = oneshot::channel();
467        let action = ReplicaAction::GetExact {
468            author,
469            key,
470            include_empty,
471            reply,
472        };
473        self.send_replica(namespace, action).await?;
474        rx.await?
475    }
476
477    pub async fn drop_replica(&self, namespace: NamespaceId) -> Result<()> {
478        let (reply, rx) = oneshot::channel();
479        let action = ReplicaAction::DropReplica { reply };
480        self.send_replica(namespace, action).await?;
481        rx.await?
482    }
483
484    pub async fn export_secret_key(&self, namespace: NamespaceId) -> Result<NamespaceSecret> {
485        let (reply, rx) = oneshot::channel();
486        let action = ReplicaAction::ExportSecretKey { reply };
487        self.send_replica(namespace, action).await?;
488        rx.await?
489    }
490
491    pub async fn get_state(&self, namespace: NamespaceId) -> Result<OpenState> {
492        let (reply, rx) = oneshot::channel();
493        let action = ReplicaAction::GetState { reply };
494        self.send_replica(namespace, action).await?;
495        rx.await?
496    }
497
498    pub async fn shutdown(&self) -> Result<Store> {
499        let (reply, rx) = oneshot::channel();
500        let action = Action::Shutdown { reply: Some(reply) };
501        self.send(action).await?;
502        let store = rx.await?;
503        Ok(store)
504    }
505
506    pub async fn list_authors(
507        &self,
508        reply: mpsc::Sender<RpcResult<AuthorListResponse>>,
509    ) -> Result<()> {
510        self.send(Action::ListAuthors { reply }).await
511    }
512
513    pub async fn list_replicas(&self, reply: mpsc::Sender<RpcResult<ListResponse>>) -> Result<()> {
514        self.send(Action::ListReplicas { reply }).await
515    }
516
517    pub async fn import_author(&self, author: Author) -> Result<AuthorId> {
518        let (reply, rx) = oneshot::channel();
519        self.send(Action::ImportAuthor { author, reply }).await?;
520        rx.await?
521    }
522
523    pub async fn export_author(&self, author: AuthorId) -> Result<Option<Author>> {
524        let (reply, rx) = oneshot::channel();
525        self.send(Action::ExportAuthor { author, reply }).await?;
526        rx.await?
527    }
528
529    pub async fn delete_author(&self, author: AuthorId) -> Result<()> {
530        let (reply, rx) = oneshot::channel();
531        self.send(Action::DeleteAuthor { author, reply }).await?;
532        rx.await?
533    }
534
535    pub async fn import_namespace(&self, capability: Capability) -> Result<NamespaceId> {
536        let (reply, rx) = oneshot::channel();
537        self.send(Action::ImportNamespace { capability, reply })
538            .await?;
539        rx.await?
540    }
541
542    pub async fn get_download_policy(&self, namespace: NamespaceId) -> Result<DownloadPolicy> {
543        let (reply, rx) = oneshot::channel();
544        let action = ReplicaAction::GetDownloadPolicy { reply };
545        self.send_replica(namespace, action).await?;
546        rx.await?
547    }
548
549    pub async fn set_download_policy(
550        &self,
551        namespace: NamespaceId,
552        policy: DownloadPolicy,
553    ) -> Result<()> {
554        let (reply, rx) = oneshot::channel();
555        let action = ReplicaAction::SetDownloadPolicy { reply, policy };
556        self.send_replica(namespace, action).await?;
557        rx.await?
558    }
559
560    pub async fn content_hashes(&self) -> Result<ContentHashesIterator> {
561        let (reply, rx) = oneshot::channel();
562        self.send(Action::ContentHashes { reply }).await?;
563        rx.await?
564    }
565
566    /// Makes sure that all pending database operations are persisted to disk.
567    ///
568    /// Otherwise, database operations are batched into bigger transactions for speed.
569    /// Use this if you need to make sure something is written to the database
570    /// before another operation, e.g. to make sure sudden process exits don't corrupt
571    /// your application state.
572    ///
573    /// It's not necessary to call this function before shutdown, as `shutdown` will
574    /// trigger a flush on its own.
575    pub async fn flush_store(&self) -> Result<()> {
576        let (reply, rx) = oneshot::channel();
577        self.send(Action::FlushStore { reply }).await?;
578        rx.await?
579    }
580
581    async fn send(&self, action: Action) -> Result<()> {
582        self.tx
583            .send(action)
584            .await
585            .context("sending to iroh_docs actor failed")?;
586        Ok(())
587    }
588    async fn send_replica(&self, namespace: NamespaceId, action: ReplicaAction) -> Result<()> {
589        self.send(Action::Replica(namespace, action)).await?;
590        Ok(())
591    }
592}
593
594impl Drop for SyncHandle {
595    fn drop(&mut self) {
596        // this means we're dropping the last reference
597        if let Some(handle) = Arc::get_mut(&mut self.join_handle) {
598            // this call is the reason tx can not be a tokio mpsc channel.
599            // we have no control about where drop is called, yet tokio send_blocking panics
600            // when called from inside a tokio runtime.
601            self.tx.send_blocking(Action::Shutdown { reply: None }).ok();
602            let handle = handle.take().expect("this can only run once");
603            if let Err(err) = handle.join() {
604                warn!(?err, "Failed to join sync actor");
605            }
606        }
607    }
608}
609
610struct Actor {
611    store: Store,
612    states: OpenReplicas,
613    action_rx: async_channel::Receiver<Action>,
614    content_status_callback: Option<ContentStatusCallback>,
615    tasks: JoinSet<()>,
616    metrics: Arc<Metrics>,
617}
618
619impl Actor {
620    fn run(self) -> Result<()> {
621        let rt = tokio::runtime::Builder::new_current_thread()
622            .enable_time()
623            .build()?;
624        let local_set = tokio::task::LocalSet::new();
625        local_set.block_on(&rt, async move { self.run_async().await });
626        Ok(())
627    }
628
629    async fn run_async(mut self) {
630        let reply = loop {
631            let timeout = tokio::time::sleep(MAX_COMMIT_DELAY);
632            tokio::pin!(timeout);
633            let action = tokio::select! {
634                _ = &mut timeout => {
635                    if let Err(cause) = self.store.flush() {
636                        error!(?cause, "failed to flush store");
637                    }
638                    continue;
639                }
640                action = self.action_rx.recv() => {
641                    match action {
642                        Ok(action) => action,
643                        Err(async_channel::RecvError) => {
644                            debug!("action channel disconnected");
645                            break None;
646                        }
647
648                    }
649                }
650            };
651            trace!(%action, "tick");
652            self.metrics.actor_tick_main.inc();
653            match action {
654                Action::Shutdown { reply } => {
655                    break reply;
656                }
657                action => {
658                    if self.on_action(action).await.is_err() {
659                        warn!("failed to send reply: receiver dropped");
660                    }
661                }
662            }
663        };
664
665        if let Err(cause) = self.store.flush() {
666            warn!(?cause, "failed to flush store");
667        }
668        self.close_all();
669        self.tasks.abort_all();
670        debug!("docs actor shutdown");
671        if let Some(reply) = reply {
672            reply.send(self.store).ok();
673        }
674    }
675
676    async fn on_action(&mut self, action: Action) -> Result<(), SendReplyError> {
677        match action {
678            Action::Shutdown { .. } => {
679                unreachable!("Shutdown is handled in run()")
680            }
681            Action::ImportAuthor { author, reply } => {
682                let id = author.id();
683                send_reply(reply, self.store.import_author(author).map(|_| id))
684            }
685            Action::ExportAuthor { author, reply } => {
686                send_reply(reply, self.store.get_author(&author))
687            }
688            Action::DeleteAuthor { author, reply } => {
689                send_reply(reply, self.store.delete_author(author))
690            }
691            Action::ImportNamespace { capability, reply } => send_reply_with(reply, self, |this| {
692                let id = capability.id();
693                let outcome = this.store.import_namespace(capability.clone())?;
694                if let ImportNamespaceOutcome::Upgraded = outcome {
695                    if let Ok(state) = this.states.get_mut(&id) {
696                        state.info.merge_capability(capability)?;
697                    }
698                }
699                Ok(id)
700            }),
701            Action::ListAuthors { reply } => {
702                let iter = self
703                    .store
704                    .list_authors()
705                    .map(|a| a.map(|a| a.map(|a| AuthorListResponse { author_id: a.id() })));
706                self.tasks
707                    .spawn_local(iter_to_irpc(reply, iter).map(|_| ()));
708                Ok(())
709            }
710            Action::ListReplicas { reply } => {
711                let iter = self.store.list_namespaces();
712                let iter = iter.map(|inner| {
713                    inner.map(|res| res.map(|(id, capability)| ListResponse { id, capability }))
714                });
715                self.tasks
716                    .spawn_local(iter_to_irpc(reply, iter).map(|_| ()));
717                Ok(())
718            }
719            Action::ContentHashes { reply } => {
720                send_reply_with(reply, self, |this| this.store.content_hashes())
721            }
722            Action::FlushStore { reply } => send_reply(reply, self.store.flush()),
723            Action::Replica(namespace, action) => self.on_replica_action(namespace, action).await,
724        }
725    }
726
727    async fn on_replica_action(
728        &mut self,
729        namespace: NamespaceId,
730        action: ReplicaAction,
731    ) -> Result<(), SendReplyError> {
732        match action {
733            ReplicaAction::Open { reply, opts } => {
734                tracing::trace!("open in");
735                let res = self.open(namespace, opts);
736                tracing::trace!("open out");
737                send_reply(reply, res)
738            }
739            ReplicaAction::Close { reply } => {
740                let res = self.close(namespace);
741                // ignore errors when no receiver is present for close
742                reply.send(Ok(res)).ok();
743                Ok(())
744            }
745            ReplicaAction::Subscribe { sender, reply } => send_reply_with(reply, self, |this| {
746                let state = this.states.get_mut(&namespace)?;
747                state.info.subscribe(sender);
748                Ok(())
749            }),
750            ReplicaAction::Unsubscribe { sender, reply } => send_reply_with(reply, self, |this| {
751                let state = this.states.get_mut(&namespace)?;
752                state.info.unsubscribe(&sender);
753                drop(sender);
754                Ok(())
755            }),
756            ReplicaAction::SetSync { sync, reply } => send_reply_with(reply, self, |this| {
757                let state = this.states.get_mut(&namespace)?;
758                state.sync = sync;
759                Ok(())
760            }),
761            ReplicaAction::InsertLocal {
762                author,
763                key,
764                hash,
765                len,
766                reply,
767            } => send_reply_with(reply, self, move |this| {
768                let author = get_author(&mut this.store, &author)?;
769                let mut replica = this.states.replica(namespace, &mut this.store)?;
770                replica.insert(&key, &author, hash, len)?;
771                this.metrics.new_entries_local.inc();
772                this.metrics.new_entries_local_size.inc_by(len);
773                Ok(())
774            }),
775            ReplicaAction::DeletePrefix { author, key, reply } => {
776                send_reply_with(reply, self, |this| {
777                    let author = get_author(&mut this.store, &author)?;
778                    let mut replica = this.states.replica(namespace, &mut this.store)?;
779                    let res = replica.delete_prefix(&key, &author)?;
780                    Ok(res)
781                })
782            }
783            ReplicaAction::InsertRemote {
784                entry,
785                from,
786                content_status,
787                reply,
788            } => send_reply_with(reply, self, move |this| {
789                let mut replica = this
790                    .states
791                    .replica_if_syncing(&namespace, &mut this.store)?;
792                let len = entry.content_len();
793                replica.insert_remote_entry(entry, from, content_status)?;
794                this.metrics.new_entries_remote.inc();
795                this.metrics.new_entries_remote_size.inc_by(len);
796                Ok(())
797            }),
798
799            ReplicaAction::SyncInitialMessage { reply } => {
800                send_reply_with(reply, self, move |this| {
801                    let mut replica = this
802                        .states
803                        .replica_if_syncing(&namespace, &mut this.store)?;
804                    let res = replica.sync_initial_message()?;
805                    Ok(res)
806                })
807            }
808            ReplicaAction::SyncProcessMessage {
809                message,
810                from,
811                mut state,
812                reply,
813            } => {
814                let res = async {
815                    let mut replica = self
816                        .states
817                        .replica_if_syncing(&namespace, &mut self.store)?;
818                    let res = replica
819                        .sync_process_message(message, from, &mut state)
820                        .await?;
821                    Ok((res, state))
822                }
823                .await;
824                reply.send(res).map_err(send_reply_error)
825            }
826            ReplicaAction::GetSyncPeers { reply } => send_reply_with(reply, self, move |this| {
827                this.states.ensure_open(&namespace)?;
828                let peers = this.store.get_sync_peers(&namespace)?;
829                Ok(peers.map(|iter| iter.collect()))
830            }),
831            ReplicaAction::RegisterUsefulPeer { peer, reply } => {
832                let res = self.store.register_useful_peer(namespace, peer);
833                send_reply(reply, res)
834            }
835            ReplicaAction::GetExact {
836                author,
837                key,
838                include_empty,
839                reply,
840            } => send_reply_with(reply, self, move |this| {
841                this.states.ensure_open(&namespace)?;
842                this.store.get_exact(namespace, author, key, include_empty)
843            }),
844            ReplicaAction::GetMany { query, reply } => {
845                let iter = self
846                    .states
847                    .ensure_open(&namespace)
848                    .and_then(|_| self.store.get_many(namespace, query));
849                self.tasks
850                    .spawn_local(iter_to_irpc(reply, iter).map(|_| ()));
851                Ok(())
852            }
853            ReplicaAction::DropReplica { reply } => send_reply_with(reply, self, |this| {
854                this.close(namespace);
855                this.store.remove_replica(&namespace)
856            }),
857            ReplicaAction::ExportSecretKey { reply } => {
858                let res = self
859                    .states
860                    .get_mut(&namespace)
861                    .and_then(|state| Ok(state.info.capability.secret_key()?.clone()));
862                send_reply(reply, res)
863            }
864            ReplicaAction::GetState { reply } => send_reply_with(reply, self, move |this| {
865                let state = this.states.get_mut(&namespace)?;
866                let handles = state.handles;
867                let sync = state.sync;
868                let subscribers = state.info.subscribers_count();
869                Ok(OpenState {
870                    handles,
871                    sync,
872                    subscribers,
873                })
874            }),
875            ReplicaAction::HasNewsForUs { heads, reply } => {
876                let res = self.store.has_news_for_us(namespace, &heads);
877                send_reply(reply, res)
878            }
879            ReplicaAction::SetDownloadPolicy { policy, reply } => {
880                send_reply(reply, self.store.set_download_policy(&namespace, policy))
881            }
882            ReplicaAction::GetDownloadPolicy { reply } => {
883                send_reply(reply, self.store.get_download_policy(&namespace))
884            }
885        }
886    }
887
888    fn close(&mut self, namespace: NamespaceId) -> bool {
889        let res = self.states.close(namespace);
890        if res {
891            self.store.close_replica(namespace);
892        }
893        res
894    }
895
896    fn close_all(&mut self) {
897        for id in self.states.close_all() {
898            self.store.close_replica(id);
899        }
900    }
901
902    fn open(&mut self, namespace: NamespaceId, opts: OpenOpts) -> Result<()> {
903        let open_cb = || {
904            let mut info = self.store.load_replica_info(&namespace)?;
905            if let Some(cb) = &self.content_status_callback {
906                info.set_content_status_callback(Arc::clone(cb));
907            }
908            Ok(info)
909        };
910        self.states.open_with(namespace, opts, open_cb)
911    }
912}
913
914#[derive(Default)]
915struct OpenReplicas(HashMap<NamespaceId, OpenReplica>);
916
917impl OpenReplicas {
918    fn replica<'a, 'b>(
919        &'a mut self,
920        namespace: NamespaceId,
921        store: &'b mut Store,
922    ) -> Result<Replica<'b, &'a mut ReplicaInfo>> {
923        let state = self.get_mut(&namespace)?;
924        Ok(Replica::new(
925            StoreInstance::new(state.info.capability.id(), store),
926            &mut state.info,
927        ))
928    }
929
930    fn replica_if_syncing<'a, 'b>(
931        &'a mut self,
932        namespace: &NamespaceId,
933        store: &'b mut Store,
934    ) -> Result<Replica<'b, &'a mut ReplicaInfo>> {
935        let state = self.get_mut(namespace)?;
936        anyhow::ensure!(state.sync, "sync is not enabled for replica");
937        Ok(Replica::new(
938            StoreInstance::new(state.info.capability.id(), store),
939            &mut state.info,
940        ))
941    }
942
943    fn get_mut(&mut self, namespace: &NamespaceId) -> Result<&mut OpenReplica> {
944        self.0.get_mut(namespace).context("replica not open")
945    }
946
947    fn is_open(&self, namespace: &NamespaceId) -> bool {
948        self.0.contains_key(namespace)
949    }
950
951    fn ensure_open(&self, namespace: &NamespaceId) -> Result<()> {
952        match self.is_open(namespace) {
953            true => Ok(()),
954            false => Err(anyhow!("replica not open")),
955        }
956    }
957    fn open_with(
958        &mut self,
959        namespace: NamespaceId,
960        opts: OpenOpts,
961        mut open_cb: impl FnMut() -> Result<ReplicaInfo>,
962    ) -> Result<()> {
963        match self.0.entry(namespace) {
964            hash_map::Entry::Vacant(e) => {
965                let mut info = open_cb()?;
966                if let Some(sender) = opts.subscribe {
967                    info.subscribe(sender);
968                }
969                debug!(namespace = %namespace.fmt_short(), "open");
970                let state = OpenReplica {
971                    info,
972                    sync: opts.sync,
973                    handles: 1,
974                };
975                e.insert(state);
976            }
977            hash_map::Entry::Occupied(mut e) => {
978                let state = e.get_mut();
979                state.handles += 1;
980                state.sync = state.sync || opts.sync;
981                if let Some(sender) = opts.subscribe {
982                    state.info.subscribe(sender);
983                }
984            }
985        }
986        Ok(())
987    }
988    fn close(&mut self, namespace: NamespaceId) -> bool {
989        match self.0.entry(namespace) {
990            hash_map::Entry::Vacant(_e) => {
991                warn!(namespace = %namespace.fmt_short(), "received close request for closed replica");
992                true
993            }
994            hash_map::Entry::Occupied(mut e) => {
995                let state = e.get_mut();
996                tracing::debug!("STATE {state:?}");
997                state.handles = state.handles.wrapping_sub(1);
998                if state.handles == 0 {
999                    let _ = e.remove_entry();
1000                    debug!(namespace = %namespace.fmt_short(), "close");
1001                    true
1002                } else {
1003                    false
1004                }
1005            }
1006        }
1007    }
1008
1009    fn close_all(&mut self) -> impl Iterator<Item = NamespaceId> + '_ {
1010        self.0.drain().map(|(n, _s)| n)
1011    }
1012}
1013
1014async fn iter_to_irpc<T: irpc::RpcMessage>(
1015    channel: mpsc::Sender<RpcResult<T>>,
1016    iter: Result<impl Iterator<Item = Result<T>>>,
1017) -> Result<(), SendReplyError> {
1018    match iter {
1019        Err(err) => channel
1020            .send(Err(RpcError::new(&*err)))
1021            .await
1022            .map_err(send_reply_error)?,
1023        Ok(iter) => {
1024            for item in iter {
1025                let item = item.map_err(|err| RpcError::new(&*err));
1026                channel.send(item).await.map_err(send_reply_error)?;
1027            }
1028        }
1029    }
1030    Ok(())
1031}
1032
1033fn get_author(store: &mut Store, id: &AuthorId) -> Result<Author> {
1034    store.get_author(id)?.context("author not found")
1035}
1036
1037#[derive(Debug)]
1038struct SendReplyError;
1039
1040fn send_reply<T>(sender: oneshot::Sender<T>, value: T) -> Result<(), SendReplyError> {
1041    sender.send(value).map_err(send_reply_error)
1042}
1043
1044fn send_reply_with<T>(
1045    sender: oneshot::Sender<Result<T>>,
1046    this: &mut Actor,
1047    f: impl FnOnce(&mut Actor) -> Result<T>,
1048) -> Result<(), SendReplyError> {
1049    sender.send(f(this)).map_err(send_reply_error)
1050}
1051
1052fn send_reply_error<T>(_err: T) -> SendReplyError {
1053    SendReplyError
1054}
1055
1056#[cfg(test)]
1057mod tests {
1058    use super::*;
1059    use crate::store;
1060    #[tokio::test]
1061    async fn open_close() -> anyhow::Result<()> {
1062        let store = store::Store::memory();
1063        let sync = SyncHandle::spawn(store, None, "foo".into());
1064        let namespace = NamespaceSecret::new(&mut rand::rngs::OsRng {});
1065        let id = namespace.id();
1066        sync.import_namespace(namespace.into()).await?;
1067        sync.open(id, Default::default()).await?;
1068        let (tx, rx) = async_channel::bounded(10);
1069        sync.subscribe(id, tx).await?;
1070        sync.close(id).await?;
1071        assert!(rx.recv().await.is_err());
1072        Ok(())
1073    }
1074}