iroh_sync/
actor.rs

1//! This contains an actor spawned on a separate thread to process replica and store operations.
2
3use std::{
4    collections::{hash_map, HashMap},
5    num::NonZeroU64,
6    sync::Arc,
7    thread::JoinHandle,
8    time::Duration,
9};
10
11use anyhow::{anyhow, Context, Result};
12use bytes::Bytes;
13use iroh_base::hash::Hash;
14use serde::{Deserialize, Serialize};
15use tokio::sync::oneshot;
16use tracing::{debug, error, error_span, trace, warn};
17
18use crate::{
19    ranger::Message,
20    store::{
21        fs::{ContentHashesIterator, StoreInstance},
22        DownloadPolicy, ImportNamespaceOutcome, Query, Store,
23    },
24    Author, AuthorHeads, AuthorId, Capability, CapabilityKind, ContentStatus,
25    ContentStatusCallback, Event, NamespaceId, NamespaceSecret, PeerIdBytes, Replica, ReplicaInfo,
26    SignedEntry, SyncOutcome,
27};
28
29const ACTION_CAP: usize = 1024;
30const MAX_COMMIT_DELAY: Duration = Duration::from_millis(500);
31
32#[derive(derive_more::Debug, derive_more::Display)]
33enum Action {
34    #[display("NewAuthor")]
35    ImportAuthor {
36        author: Author,
37        #[debug("reply")]
38        reply: oneshot::Sender<Result<AuthorId>>,
39    },
40    #[display("ExportAuthor")]
41    ExportAuthor {
42        author: AuthorId,
43        #[debug("reply")]
44        reply: oneshot::Sender<Result<Option<Author>>>,
45    },
46    #[display("DeleteAuthor")]
47    DeleteAuthor {
48        author: AuthorId,
49        #[debug("reply")]
50        reply: oneshot::Sender<Result<()>>,
51    },
52    #[display("NewReplica")]
53    ImportNamespace {
54        capability: Capability,
55        #[debug("reply")]
56        reply: oneshot::Sender<Result<NamespaceId>>,
57    },
58    #[display("ListAuthors")]
59    ListAuthors {
60        #[debug("reply")]
61        reply: flume::Sender<Result<AuthorId>>,
62    },
63    #[display("ListReplicas")]
64    ListReplicas {
65        #[debug("reply")]
66        reply: flume::Sender<Result<(NamespaceId, CapabilityKind)>>,
67    },
68    #[display("ContentHashes")]
69    ContentHashes {
70        #[debug("reply")]
71        reply: oneshot::Sender<Result<ContentHashesIterator>>,
72    },
73    #[display("Replica({}, {})", _0.fmt_short(), _1)]
74    Replica(NamespaceId, ReplicaAction),
75    #[display("Shutdown")]
76    Shutdown {
77        #[debug("reply")]
78        reply: Option<oneshot::Sender<Store>>,
79    },
80}
81
82#[derive(derive_more::Debug, strum::Display)]
83enum ReplicaAction {
84    Open {
85        #[debug("reply")]
86        reply: oneshot::Sender<Result<()>>,
87        opts: OpenOpts,
88    },
89    Close {
90        #[debug("reply")]
91        reply: oneshot::Sender<Result<bool>>,
92    },
93    GetState {
94        #[debug("reply")]
95        reply: oneshot::Sender<Result<OpenState>>,
96    },
97    SetSync {
98        sync: bool,
99        #[debug("reply")]
100        reply: oneshot::Sender<Result<()>>,
101    },
102    Subscribe {
103        sender: flume::Sender<Event>,
104        #[debug("reply")]
105        reply: oneshot::Sender<Result<()>>,
106    },
107    Unsubscribe {
108        sender: flume::Sender<Event>,
109        #[debug("reply")]
110        reply: oneshot::Sender<Result<()>>,
111    },
112    InsertLocal {
113        author: AuthorId,
114        key: Bytes,
115        hash: Hash,
116        len: u64,
117        #[debug("reply")]
118        reply: oneshot::Sender<Result<()>>,
119    },
120    DeletePrefix {
121        author: AuthorId,
122        key: Bytes,
123        #[debug("reply")]
124        reply: oneshot::Sender<Result<usize>>,
125    },
126    InsertRemote {
127        entry: SignedEntry,
128        from: PeerIdBytes,
129        content_status: ContentStatus,
130        #[debug("reply")]
131        reply: oneshot::Sender<Result<()>>,
132    },
133    SyncInitialMessage {
134        #[debug("reply")]
135        reply: oneshot::Sender<Result<Message<SignedEntry>>>,
136    },
137    SyncProcessMessage {
138        message: Message<SignedEntry>,
139        from: PeerIdBytes,
140        state: SyncOutcome,
141        #[debug("reply")]
142        reply: oneshot::Sender<Result<(Option<Message<SignedEntry>>, SyncOutcome)>>,
143    },
144    GetSyncPeers {
145        #[debug("reply")]
146        reply: oneshot::Sender<Result<Option<Vec<PeerIdBytes>>>>,
147    },
148    RegisterUsefulPeer {
149        peer: PeerIdBytes,
150        #[debug("reply")]
151        reply: oneshot::Sender<Result<()>>,
152    },
153    GetExact {
154        author: AuthorId,
155        key: Bytes,
156        include_empty: bool,
157        reply: oneshot::Sender<Result<Option<SignedEntry>>>,
158    },
159    GetMany {
160        query: Query,
161        reply: flume::Sender<Result<SignedEntry>>,
162    },
163    DropReplica {
164        reply: oneshot::Sender<Result<()>>,
165    },
166    ExportSecretKey {
167        reply: oneshot::Sender<Result<NamespaceSecret>>,
168    },
169    HasNewsForUs {
170        heads: AuthorHeads,
171        #[debug("reply")]
172        reply: oneshot::Sender<Result<Option<NonZeroU64>>>,
173    },
174    SetDownloadPolicy {
175        policy: DownloadPolicy,
176        #[debug("reply")]
177        reply: oneshot::Sender<Result<()>>,
178    },
179    GetDownloadPolicy {
180        #[debug("reply")]
181        reply: oneshot::Sender<Result<DownloadPolicy>>,
182    },
183}
184
185/// The state for an open replica.
186#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize)]
187pub struct OpenState {
188    /// Whether to accept sync requests for this replica.
189    pub sync: bool,
190    /// How many event subscriptions are open
191    pub subscribers: usize,
192    /// By how many handles the replica is currently held open
193    pub handles: usize,
194}
195
196#[derive(Debug)]
197struct OpenReplica {
198    info: ReplicaInfo,
199    sync: bool,
200    handles: usize,
201}
202
203/// The [`SyncHandle`] controls an actor thread which executes replica and store operations.
204///
205/// The [`SyncHandle`] exposes async methods which all send messages into the actor thread, usually
206/// returning something via a return channel. The actor thread itself is a regular [`std::thread`]
207/// which processes incoming messages sequentially.
208///
209/// The handle is cheaply cloneable. Once the last clone is dropped, the actor thread is joined.
210/// The thread will finish processing all messages in the channel queue, and then exit.
211/// To prevent this last drop from blocking the calling thread, you can call [`SyncHandle::shutdown`]
212/// and await its result before dropping the last [`SyncHandle`]. This ensures that
213/// waiting for the actor to finish happens in an async context, and therefore that the final
214/// [`SyncHandle::drop`] will not block.
215#[derive(Debug, Clone)]
216pub struct SyncHandle {
217    tx: flume::Sender<Action>,
218    join_handle: Arc<Option<JoinHandle<()>>>,
219}
220
221/// Options when opening a replica.
222#[derive(Debug, Default)]
223pub struct OpenOpts {
224    /// Set to true to set sync state to true.
225    pub sync: bool,
226    /// Optionally subscribe to replica events.
227    pub subscribe: Option<flume::Sender<Event>>,
228}
229impl OpenOpts {
230    /// Set sync state to true.
231    pub fn sync(mut self) -> Self {
232        self.sync = true;
233        self
234    }
235    /// Subscribe to replica events.
236    pub fn subscribe(mut self, subscribe: flume::Sender<Event>) -> Self {
237        self.subscribe = Some(subscribe);
238        self
239    }
240}
241
242#[allow(missing_docs)]
243impl SyncHandle {
244    /// Spawn a sync actor and return a handle.
245    pub fn spawn(
246        store: Store,
247        content_status_callback: Option<ContentStatusCallback>,
248        me: String,
249    ) -> SyncHandle {
250        let (action_tx, action_rx) = flume::bounded(ACTION_CAP);
251        let actor = Actor {
252            store,
253            states: Default::default(),
254            action_rx,
255            content_status_callback,
256        };
257        let join_handle = std::thread::Builder::new()
258            .name("sync-actor".to_string())
259            .spawn(move || {
260                let span = error_span!("sync", %me);
261                let _enter = span.enter();
262
263                if let Err(err) = actor.run() {
264                    error!("Sync actor closed with error: {err:?}");
265                }
266            })
267            .expect("failed to spawn thread");
268        let join_handle = Arc::new(Some(join_handle));
269        SyncHandle {
270            tx: action_tx,
271            join_handle,
272        }
273    }
274
275    pub async fn open(&self, namespace: NamespaceId, opts: OpenOpts) -> Result<()> {
276        let (reply, rx) = oneshot::channel();
277        let action = ReplicaAction::Open { reply, opts };
278        self.send_replica(namespace, action).await?;
279        rx.await?
280    }
281
282    pub async fn close(&self, namespace: NamespaceId) -> Result<bool> {
283        let (reply, rx) = oneshot::channel();
284        self.send_replica(namespace, ReplicaAction::Close { reply })
285            .await?;
286        rx.await?
287    }
288
289    pub async fn subscribe(
290        &self,
291        namespace: NamespaceId,
292        sender: flume::Sender<Event>,
293    ) -> Result<()> {
294        let (reply, rx) = oneshot::channel();
295        self.send_replica(namespace, ReplicaAction::Subscribe { sender, reply })
296            .await?;
297        rx.await?
298    }
299
300    pub async fn unsubscribe(
301        &self,
302        namespace: NamespaceId,
303        sender: flume::Sender<Event>,
304    ) -> Result<()> {
305        let (reply, rx) = oneshot::channel();
306        self.send_replica(namespace, ReplicaAction::Unsubscribe { sender, reply })
307            .await?;
308        rx.await?
309    }
310
311    pub async fn set_sync(&self, namespace: NamespaceId, sync: bool) -> Result<()> {
312        let (reply, rx) = oneshot::channel();
313        let action = ReplicaAction::SetSync { sync, reply };
314        self.send_replica(namespace, action).await?;
315        rx.await?
316    }
317
318    pub async fn insert_local(
319        &self,
320        namespace: NamespaceId,
321        author: AuthorId,
322        key: Bytes,
323        hash: Hash,
324        len: u64,
325    ) -> Result<()> {
326        let (reply, rx) = oneshot::channel();
327        let action = ReplicaAction::InsertLocal {
328            author,
329            key,
330            hash,
331            len,
332            reply,
333        };
334        self.send_replica(namespace, action).await?;
335        rx.await?
336    }
337
338    pub async fn delete_prefix(
339        &self,
340        namespace: NamespaceId,
341        author: AuthorId,
342        key: Bytes,
343    ) -> Result<usize> {
344        let (reply, rx) = oneshot::channel();
345        let action = ReplicaAction::DeletePrefix { author, key, reply };
346        self.send_replica(namespace, action).await?;
347        rx.await?
348    }
349
350    pub async fn insert_remote(
351        &self,
352        namespace: NamespaceId,
353        entry: SignedEntry,
354        from: PeerIdBytes,
355        content_status: ContentStatus,
356    ) -> Result<()> {
357        let (reply, rx) = oneshot::channel();
358        let action = ReplicaAction::InsertRemote {
359            entry,
360            from,
361            content_status,
362            reply,
363        };
364        self.send_replica(namespace, action).await?;
365        rx.await?
366    }
367
368    pub async fn sync_initial_message(
369        &self,
370        namespace: NamespaceId,
371    ) -> Result<Message<SignedEntry>> {
372        let (reply, rx) = oneshot::channel();
373        let action = ReplicaAction::SyncInitialMessage { reply };
374        self.send_replica(namespace, action).await?;
375        rx.await?
376    }
377
378    pub async fn sync_process_message(
379        &self,
380        namespace: NamespaceId,
381        message: Message<SignedEntry>,
382        from: PeerIdBytes,
383        state: SyncOutcome,
384    ) -> Result<(Option<Message<SignedEntry>>, SyncOutcome)> {
385        let (reply, rx) = oneshot::channel();
386        let action = ReplicaAction::SyncProcessMessage {
387            reply,
388            message,
389            from,
390            state,
391        };
392        self.send_replica(namespace, action).await?;
393        rx.await?
394    }
395
396    pub async fn get_sync_peers(&self, namespace: NamespaceId) -> Result<Option<Vec<PeerIdBytes>>> {
397        let (reply, rx) = oneshot::channel();
398        let action = ReplicaAction::GetSyncPeers { reply };
399        self.send_replica(namespace, action).await?;
400        rx.await?
401    }
402
403    pub async fn register_useful_peer(
404        &self,
405        namespace: NamespaceId,
406        peer: PeerIdBytes,
407    ) -> Result<()> {
408        let (reply, rx) = oneshot::channel();
409        let action = ReplicaAction::RegisterUsefulPeer { reply, peer };
410        self.send_replica(namespace, action).await?;
411        rx.await?
412    }
413
414    pub async fn has_news_for_us(
415        &self,
416        namespace: NamespaceId,
417        heads: AuthorHeads,
418    ) -> Result<Option<NonZeroU64>> {
419        let (reply, rx) = oneshot::channel();
420        let action = ReplicaAction::HasNewsForUs { reply, heads };
421        self.send_replica(namespace, action).await?;
422        rx.await?
423    }
424
425    pub async fn get_many(
426        &self,
427        namespace: NamespaceId,
428        query: Query,
429        reply: flume::Sender<Result<SignedEntry>>,
430    ) -> Result<()> {
431        let action = ReplicaAction::GetMany { query, reply };
432        self.send_replica(namespace, action).await?;
433        Ok(())
434    }
435
436    pub async fn get_exact(
437        &self,
438        namespace: NamespaceId,
439        author: AuthorId,
440        key: Bytes,
441        include_empty: bool,
442    ) -> Result<Option<SignedEntry>> {
443        let (reply, rx) = oneshot::channel();
444        let action = ReplicaAction::GetExact {
445            author,
446            key,
447            include_empty,
448            reply,
449        };
450        self.send_replica(namespace, action).await?;
451        rx.await?
452    }
453
454    pub async fn drop_replica(&self, namespace: NamespaceId) -> Result<()> {
455        let (reply, rx) = oneshot::channel();
456        let action = ReplicaAction::DropReplica { reply };
457        self.send_replica(namespace, action).await?;
458        rx.await?
459    }
460
461    pub async fn export_secret_key(&self, namespace: NamespaceId) -> Result<NamespaceSecret> {
462        let (reply, rx) = oneshot::channel();
463        let action = ReplicaAction::ExportSecretKey { reply };
464        self.send_replica(namespace, action).await?;
465        rx.await?
466    }
467
468    pub async fn get_state(&self, namespace: NamespaceId) -> Result<OpenState> {
469        let (reply, rx) = oneshot::channel();
470        let action = ReplicaAction::GetState { reply };
471        self.send_replica(namespace, action).await?;
472        rx.await?
473    }
474
475    pub async fn shutdown(&self) -> Result<Store> {
476        let (reply, rx) = oneshot::channel();
477        let action = Action::Shutdown { reply: Some(reply) };
478        self.send(action).await.ok();
479        Ok(rx.await?)
480    }
481
482    pub async fn list_authors(&self, reply: flume::Sender<Result<AuthorId>>) -> Result<()> {
483        self.send(Action::ListAuthors { reply }).await
484    }
485
486    pub async fn list_replicas(
487        &self,
488        reply: flume::Sender<Result<(NamespaceId, CapabilityKind)>>,
489    ) -> Result<()> {
490        self.send(Action::ListReplicas { reply }).await
491    }
492
493    pub async fn import_author(&self, author: Author) -> Result<AuthorId> {
494        let (reply, rx) = oneshot::channel();
495        self.send(Action::ImportAuthor { author, reply }).await?;
496        rx.await?
497    }
498
499    pub async fn export_author(&self, author: AuthorId) -> Result<Option<Author>> {
500        let (reply, rx) = oneshot::channel();
501        self.send(Action::ExportAuthor { author, reply }).await?;
502        rx.await?
503    }
504
505    pub async fn delete_author(&self, author: AuthorId) -> Result<()> {
506        let (reply, rx) = oneshot::channel();
507        self.send(Action::DeleteAuthor { author, reply }).await?;
508        rx.await?
509    }
510
511    pub async fn import_namespace(&self, capability: Capability) -> Result<NamespaceId> {
512        let (reply, rx) = oneshot::channel();
513        self.send(Action::ImportNamespace { capability, reply })
514            .await?;
515        rx.await?
516    }
517
518    pub async fn get_download_policy(&self, namespace: NamespaceId) -> Result<DownloadPolicy> {
519        let (reply, rx) = oneshot::channel();
520        let action = ReplicaAction::GetDownloadPolicy { reply };
521        self.send_replica(namespace, action).await?;
522        rx.await?
523    }
524
525    pub async fn set_download_policy(
526        &self,
527        namespace: NamespaceId,
528        policy: DownloadPolicy,
529    ) -> Result<()> {
530        let (reply, rx) = oneshot::channel();
531        let action = ReplicaAction::SetDownloadPolicy { reply, policy };
532        self.send_replica(namespace, action).await?;
533        rx.await?
534    }
535
536    pub async fn content_hashes(&self) -> Result<ContentHashesIterator> {
537        let (reply, rx) = oneshot::channel();
538        self.send(Action::ContentHashes { reply }).await?;
539        rx.await?
540    }
541
542    async fn send(&self, action: Action) -> Result<()> {
543        self.tx
544            .send_async(action)
545            .await
546            .context("sending to iroh_sync actor failed")?;
547        Ok(())
548    }
549    async fn send_replica(&self, namespace: NamespaceId, action: ReplicaAction) -> Result<()> {
550        self.send(Action::Replica(namespace, action)).await?;
551        Ok(())
552    }
553}
554
555impl Drop for SyncHandle {
556    fn drop(&mut self) {
557        // this means we're dropping the last reference
558        if let Some(handle) = Arc::get_mut(&mut self.join_handle) {
559            self.tx.send(Action::Shutdown { reply: None }).ok();
560            let handle = handle.take().expect("this can only run once");
561            if let Err(err) = handle.join() {
562                warn!(?err, "Failed to join sync actor");
563            }
564        }
565    }
566}
567
568struct Actor {
569    store: Store,
570    states: OpenReplicas,
571    action_rx: flume::Receiver<Action>,
572    content_status_callback: Option<ContentStatusCallback>,
573}
574
575impl Actor {
576    fn run(mut self) -> Result<()> {
577        loop {
578            let action = match self.action_rx.recv_timeout(MAX_COMMIT_DELAY) {
579                Ok(action) => action,
580                Err(flume::RecvTimeoutError::Timeout) => {
581                    if let Err(cause) = self.store.flush() {
582                        error!(?cause, "failed to flush store");
583                    }
584                    continue;
585                }
586                Err(flume::RecvTimeoutError::Disconnected) => {
587                    debug!("action channel disconnected");
588                    break;
589                }
590            };
591            trace!(%action, "tick");
592            match action {
593                Action::Shutdown { reply } => {
594                    if let Err(cause) = self.store.flush() {
595                        warn!(?cause, "failed to flush store");
596                    }
597                    self.close_all();
598                    if let Some(reply) = reply {
599                        send_reply(reply, self.store).ok();
600                    }
601                    break;
602                }
603                action => {
604                    if self.on_action(action).is_err() {
605                        warn!("failed to send reply: receiver dropped");
606                    }
607                }
608            }
609        }
610        debug!("shutdown");
611        Ok(())
612    }
613
614    fn on_action(&mut self, action: Action) -> Result<(), SendReplyError> {
615        match action {
616            Action::Shutdown { .. } => {
617                unreachable!("Shutdown action should be handled in run()")
618            }
619            Action::ImportAuthor { author, reply } => {
620                let id = author.id();
621                send_reply(reply, self.store.import_author(author).map(|_| id))
622            }
623            Action::ExportAuthor { author, reply } => {
624                send_reply(reply, self.store.get_author(&author))
625            }
626            Action::DeleteAuthor { author, reply } => {
627                send_reply(reply, self.store.delete_author(author))
628            }
629            Action::ImportNamespace { capability, reply } => send_reply_with(reply, self, |this| {
630                let id = capability.id();
631                let outcome = this.store.import_namespace(capability.clone())?;
632                if let ImportNamespaceOutcome::Upgraded = outcome {
633                    if let Ok(state) = this.states.get_mut(&id) {
634                        state.info.merge_capability(capability)?;
635                    }
636                }
637                Ok(id)
638            }),
639            Action::ListAuthors { reply } => iter_to_channel(
640                reply,
641                self.store
642                    .list_authors()
643                    .map(|a| a.map(|a| a.map(|a| a.id()))),
644            ),
645            Action::ListReplicas { reply } => iter_to_channel(reply, self.store.list_namespaces()),
646            Action::ContentHashes { reply } => {
647                send_reply_with(reply, self, |this| this.store.content_hashes())
648            }
649            Action::Replica(namespace, action) => self.on_replica_action(namespace, action),
650        }
651    }
652
653    fn on_replica_action(
654        &mut self,
655        namespace: NamespaceId,
656        action: ReplicaAction,
657    ) -> Result<(), SendReplyError> {
658        match action {
659            ReplicaAction::Open { reply, opts } => {
660                let res = self.open(namespace, opts);
661                send_reply(reply, res)
662            }
663            ReplicaAction::Close { reply } => {
664                let res = self.close(namespace);
665                // ignore errors when no receiver is present for close
666                reply.send(Ok(res)).ok();
667                Ok(())
668            }
669            ReplicaAction::Subscribe { sender, reply } => send_reply_with(reply, self, |this| {
670                let state = this.states.get_mut(&namespace)?;
671                state.info.subscribe(sender);
672                Ok(())
673            }),
674            ReplicaAction::Unsubscribe { sender, reply } => send_reply_with(reply, self, |this| {
675                let state = this.states.get_mut(&namespace)?;
676                state.info.unsubscribe(&sender);
677                drop(sender);
678                Ok(())
679            }),
680            ReplicaAction::SetSync { sync, reply } => send_reply_with(reply, self, |this| {
681                let state = this.states.get_mut(&namespace)?;
682                state.sync = sync;
683                Ok(())
684            }),
685            ReplicaAction::InsertLocal {
686                author,
687                key,
688                hash,
689                len,
690                reply,
691            } => send_reply_with(reply, self, move |this| {
692                let author = get_author(&mut this.store, &author)?;
693                let mut replica = this.states.replica(namespace, &mut this.store)?;
694                replica.insert(&key, &author, hash, len)?;
695                Ok(())
696            }),
697            ReplicaAction::DeletePrefix { author, key, reply } => {
698                send_reply_with(reply, self, |this| {
699                    let author = get_author(&mut this.store, &author)?;
700                    let mut replica = this.states.replica(namespace, &mut this.store)?;
701                    let res = replica.delete_prefix(&key, &author)?;
702                    Ok(res)
703                })
704            }
705            ReplicaAction::InsertRemote {
706                entry,
707                from,
708                content_status,
709                reply,
710            } => send_reply_with(reply, self, move |this| {
711                let mut replica = this
712                    .states
713                    .replica_if_syncing(&namespace, &mut this.store)?;
714                replica.insert_remote_entry(entry, from, content_status)?;
715                Ok(())
716            }),
717
718            ReplicaAction::SyncInitialMessage { reply } => {
719                send_reply_with(reply, self, move |this| {
720                    let mut replica = this
721                        .states
722                        .replica_if_syncing(&namespace, &mut this.store)?;
723                    let res = replica.sync_initial_message()?;
724                    Ok(res)
725                })
726            }
727            ReplicaAction::SyncProcessMessage {
728                message,
729                from,
730                mut state,
731                reply,
732            } => send_reply_with(reply, self, move |this| {
733                let mut replica = this
734                    .states
735                    .replica_if_syncing(&namespace, &mut this.store)?;
736                let res = replica.sync_process_message(message, from, &mut state)?;
737                Ok((res, state))
738            }),
739            ReplicaAction::GetSyncPeers { reply } => send_reply_with(reply, self, move |this| {
740                this.states.ensure_open(&namespace)?;
741                let peers = this.store.get_sync_peers(&namespace)?;
742                Ok(peers.map(|iter| iter.collect()))
743            }),
744            ReplicaAction::RegisterUsefulPeer { peer, reply } => {
745                let res = self.store.register_useful_peer(namespace, peer);
746                send_reply(reply, res)
747            }
748            ReplicaAction::GetExact {
749                author,
750                key,
751                include_empty,
752                reply,
753            } => send_reply_with(reply, self, move |this| {
754                this.states.ensure_open(&namespace)?;
755                this.store.get_exact(namespace, author, key, include_empty)
756            }),
757            ReplicaAction::GetMany { query, reply } => {
758                let iter = self
759                    .states
760                    .ensure_open(&namespace)
761                    .and_then(|_| self.store.get_many(namespace, query));
762                iter_to_channel(reply, iter)
763            }
764            ReplicaAction::DropReplica { reply } => send_reply_with(reply, self, |this| {
765                this.close(namespace);
766                this.store.remove_replica(&namespace)
767            }),
768            ReplicaAction::ExportSecretKey { reply } => {
769                let res = self
770                    .states
771                    .get_mut(&namespace)
772                    .and_then(|state| Ok(state.info.capability.secret_key()?.clone()));
773                send_reply(reply, res)
774            }
775            ReplicaAction::GetState { reply } => send_reply_with(reply, self, move |this| {
776                let state = this.states.get_mut(&namespace)?;
777                let handles = state.handles;
778                let sync = state.sync;
779                let subscribers = state.info.subscribers_count();
780                Ok(OpenState {
781                    handles,
782                    sync,
783                    subscribers,
784                })
785            }),
786            ReplicaAction::HasNewsForUs { heads, reply } => {
787                let res = self.store.has_news_for_us(namespace, &heads);
788                send_reply(reply, res)
789            }
790            ReplicaAction::SetDownloadPolicy { policy, reply } => {
791                send_reply(reply, self.store.set_download_policy(&namespace, policy))
792            }
793            ReplicaAction::GetDownloadPolicy { reply } => {
794                send_reply(reply, self.store.get_download_policy(&namespace))
795            }
796        }
797    }
798
799    fn close(&mut self, namespace: NamespaceId) -> bool {
800        let res = self.states.close(namespace);
801        if res {
802            self.store.close_replica(namespace);
803        }
804        res
805    }
806
807    fn close_all(&mut self) {
808        for id in self.states.close_all() {
809            self.store.close_replica(id);
810        }
811    }
812
813    fn open(&mut self, namespace: NamespaceId, opts: OpenOpts) -> Result<()> {
814        let open_cb = || {
815            let mut info = self.store.load_replica_info(&namespace)?;
816            if let Some(cb) = &self.content_status_callback {
817                info.set_content_status_callback(Arc::clone(cb));
818            }
819            Ok(info)
820        };
821        self.states.open_with(namespace, opts, open_cb)
822    }
823}
824
825#[derive(Default)]
826struct OpenReplicas(HashMap<NamespaceId, OpenReplica>);
827
828impl OpenReplicas {
829    fn replica<'a, 'b>(
830        &'a mut self,
831        namespace: NamespaceId,
832        store: &'b mut Store,
833    ) -> Result<Replica<'b, &'a mut ReplicaInfo>> {
834        let state = self.get_mut(&namespace)?;
835        Ok(Replica::new(
836            StoreInstance::new(state.info.capability.id(), store),
837            &mut state.info,
838        ))
839    }
840
841    fn replica_if_syncing<'a, 'b>(
842        &'a mut self,
843        namespace: &NamespaceId,
844        store: &'b mut Store,
845    ) -> Result<Replica<'b, &'a mut ReplicaInfo>> {
846        let state = self.get_mut(namespace)?;
847        anyhow::ensure!(state.sync, "sync is not enabled for replica");
848        Ok(Replica::new(
849            StoreInstance::new(state.info.capability.id(), store),
850            &mut state.info,
851        ))
852    }
853
854    fn get_mut(&mut self, namespace: &NamespaceId) -> Result<&mut OpenReplica> {
855        self.0.get_mut(namespace).context("replica not open")
856    }
857
858    fn is_open(&self, namespace: &NamespaceId) -> bool {
859        self.0.contains_key(namespace)
860    }
861
862    fn ensure_open(&self, namespace: &NamespaceId) -> Result<()> {
863        match self.is_open(namespace) {
864            true => Ok(()),
865            false => Err(anyhow!("replica not open")),
866        }
867    }
868    fn open_with(
869        &mut self,
870        namespace: NamespaceId,
871        opts: OpenOpts,
872        mut open_cb: impl FnMut() -> Result<ReplicaInfo>,
873    ) -> Result<()> {
874        match self.0.entry(namespace) {
875            hash_map::Entry::Vacant(e) => {
876                let mut info = open_cb()?;
877                if let Some(sender) = opts.subscribe {
878                    info.subscribe(sender);
879                }
880                debug!(namespace = %namespace.fmt_short(), "open");
881                let state = OpenReplica {
882                    info,
883                    sync: opts.sync,
884                    handles: 1,
885                };
886                e.insert(state);
887            }
888            hash_map::Entry::Occupied(mut e) => {
889                let state = e.get_mut();
890                state.handles += 1;
891                state.sync = state.sync || opts.sync;
892                if let Some(sender) = opts.subscribe {
893                    state.info.subscribe(sender);
894                }
895            }
896        }
897        Ok(())
898    }
899    fn close(&mut self, namespace: NamespaceId) -> bool {
900        match self.0.entry(namespace) {
901            hash_map::Entry::Vacant(_e) => {
902                warn!(namespace = %namespace.fmt_short(), "received close request for closed replica");
903                true
904            }
905            hash_map::Entry::Occupied(mut e) => {
906                let state = e.get_mut();
907                state.handles = state.handles.wrapping_sub(1);
908                if state.handles == 0 {
909                    let _ = e.remove_entry();
910                    debug!(namespace = %namespace.fmt_short(), "close");
911                    true
912                } else {
913                    false
914                }
915            }
916        }
917    }
918
919    fn close_all(&mut self) -> impl Iterator<Item = NamespaceId> + '_ {
920        self.0.drain().map(|(n, _s)| n)
921    }
922}
923
924fn iter_to_channel<T: Send + 'static>(
925    channel: flume::Sender<Result<T>>,
926    iter: Result<impl Iterator<Item = Result<T>>>,
927) -> Result<(), SendReplyError> {
928    match iter {
929        Err(err) => channel.send(Err(err)).map_err(send_reply_error)?,
930        Ok(iter) => {
931            for item in iter {
932                channel.send(item).map_err(send_reply_error)?;
933            }
934        }
935    }
936    Ok(())
937}
938
939fn get_author(store: &mut Store, id: &AuthorId) -> Result<Author> {
940    store.get_author(id)?.context("author not found")
941}
942
943#[derive(Debug)]
944struct SendReplyError;
945
946fn send_reply<T>(sender: oneshot::Sender<T>, value: T) -> Result<(), SendReplyError> {
947    sender.send(value).map_err(send_reply_error)
948}
949
950fn send_reply_with<T>(
951    sender: oneshot::Sender<Result<T>>,
952    this: &mut Actor,
953    f: impl FnOnce(&mut Actor) -> Result<T>,
954) -> Result<(), SendReplyError> {
955    sender.send(f(this)).map_err(send_reply_error)
956}
957
958fn send_reply_error<T>(_err: T) -> SendReplyError {
959    SendReplyError
960}
961
962#[cfg(test)]
963mod tests {
964    use crate::store;
965
966    use super::*;
967    #[tokio::test]
968    async fn open_close() -> anyhow::Result<()> {
969        let store = store::Store::memory();
970        let sync = SyncHandle::spawn(store, None, "foo".into());
971        let namespace = NamespaceSecret::new(&mut rand::rngs::OsRng {});
972        let id = namespace.id();
973        sync.import_namespace(namespace.into()).await?;
974        sync.open(id, Default::default()).await?;
975        let (tx, rx) = flume::bounded(10);
976        sync.subscribe(id, tx).await?;
977        sync.close(id).await?;
978        assert!(rx.recv_async().await.is_err());
979        Ok(())
980    }
981}