1use std::{
4 collections::{hash_map, HashMap},
5 num::NonZeroU64,
6 sync::Arc,
7 thread::JoinHandle,
8 time::Duration,
9};
10
11use anyhow::{anyhow, Context, Result};
12use bytes::Bytes;
13use futures_util::FutureExt;
14use iroh_blobs::Hash;
15use serde::{Deserialize, Serialize};
16use tokio::{sync::oneshot, task::JoinSet};
17use tracing::{debug, error, error_span, trace, warn};
18
19use crate::{
20 metrics::Metrics,
21 ranger::Message,
22 store::{
23 fs::{ContentHashesIterator, StoreInstance},
24 DownloadPolicy, ImportNamespaceOutcome, Query, Store,
25 },
26 Author, AuthorHeads, AuthorId, Capability, CapabilityKind, ContentStatus,
27 ContentStatusCallback, Event, NamespaceId, NamespaceSecret, PeerIdBytes, Replica, ReplicaInfo,
28 SignedEntry, SyncOutcome,
29};
30
31const ACTION_CAP: usize = 1024;
32pub(crate) const MAX_COMMIT_DELAY: Duration = Duration::from_millis(500);
33
34#[derive(derive_more::Debug, derive_more::Display)]
35enum Action {
36 #[display("NewAuthor")]
37 ImportAuthor {
38 author: Author,
39 #[debug("reply")]
40 reply: oneshot::Sender<Result<AuthorId>>,
41 },
42 #[display("ExportAuthor")]
43 ExportAuthor {
44 author: AuthorId,
45 #[debug("reply")]
46 reply: oneshot::Sender<Result<Option<Author>>>,
47 },
48 #[display("DeleteAuthor")]
49 DeleteAuthor {
50 author: AuthorId,
51 #[debug("reply")]
52 reply: oneshot::Sender<Result<()>>,
53 },
54 #[display("NewReplica")]
55 ImportNamespace {
56 capability: Capability,
57 #[debug("reply")]
58 reply: oneshot::Sender<Result<NamespaceId>>,
59 },
60 #[display("ListAuthors")]
61 ListAuthors {
62 #[debug("reply")]
63 reply: async_channel::Sender<Result<AuthorId>>,
64 },
65 #[display("ListReplicas")]
66 ListReplicas {
67 #[debug("reply")]
68 reply: async_channel::Sender<Result<(NamespaceId, CapabilityKind)>>,
69 },
70 #[display("ContentHashes")]
71 ContentHashes {
72 #[debug("reply")]
73 reply: oneshot::Sender<Result<ContentHashesIterator>>,
74 },
75 #[display("FlushStore")]
76 FlushStore {
77 #[debug("reply")]
78 reply: oneshot::Sender<Result<()>>,
79 },
80 #[display("Replica({}, {})", _0.fmt_short(), _1)]
81 Replica(NamespaceId, ReplicaAction),
82 #[display("Shutdown")]
83 Shutdown {
84 #[debug("reply")]
85 reply: Option<oneshot::Sender<Store>>,
86 },
87}
88
89#[derive(derive_more::Debug, strum::Display)]
90enum ReplicaAction {
91 Open {
92 #[debug("reply")]
93 reply: oneshot::Sender<Result<()>>,
94 opts: OpenOpts,
95 },
96 Close {
97 #[debug("reply")]
98 reply: oneshot::Sender<Result<bool>>,
99 },
100 GetState {
101 #[debug("reply")]
102 reply: oneshot::Sender<Result<OpenState>>,
103 },
104 SetSync {
105 sync: bool,
106 #[debug("reply")]
107 reply: oneshot::Sender<Result<()>>,
108 },
109 Subscribe {
110 sender: async_channel::Sender<Event>,
111 #[debug("reply")]
112 reply: oneshot::Sender<Result<()>>,
113 },
114 Unsubscribe {
115 sender: async_channel::Sender<Event>,
116 #[debug("reply")]
117 reply: oneshot::Sender<Result<()>>,
118 },
119 InsertLocal {
120 author: AuthorId,
121 key: Bytes,
122 hash: Hash,
123 len: u64,
124 #[debug("reply")]
125 reply: oneshot::Sender<Result<()>>,
126 },
127 DeletePrefix {
128 author: AuthorId,
129 key: Bytes,
130 #[debug("reply")]
131 reply: oneshot::Sender<Result<usize>>,
132 },
133 InsertRemote {
134 entry: SignedEntry,
135 from: PeerIdBytes,
136 content_status: ContentStatus,
137 #[debug("reply")]
138 reply: oneshot::Sender<Result<()>>,
139 },
140 SyncInitialMessage {
141 #[debug("reply")]
142 reply: oneshot::Sender<Result<Message<SignedEntry>>>,
143 },
144 SyncProcessMessage {
145 message: Message<SignedEntry>,
146 from: PeerIdBytes,
147 state: SyncOutcome,
148 #[debug("reply")]
149 reply: oneshot::Sender<Result<(Option<Message<SignedEntry>>, SyncOutcome)>>,
150 },
151 GetSyncPeers {
152 #[debug("reply")]
153 reply: oneshot::Sender<Result<Option<Vec<PeerIdBytes>>>>,
154 },
155 RegisterUsefulPeer {
156 peer: PeerIdBytes,
157 #[debug("reply")]
158 reply: oneshot::Sender<Result<()>>,
159 },
160 GetExact {
161 author: AuthorId,
162 key: Bytes,
163 include_empty: bool,
164 reply: oneshot::Sender<Result<Option<SignedEntry>>>,
165 },
166 GetMany {
167 query: Query,
168 reply: async_channel::Sender<Result<SignedEntry>>,
169 },
170 DropReplica {
171 reply: oneshot::Sender<Result<()>>,
172 },
173 ExportSecretKey {
174 reply: oneshot::Sender<Result<NamespaceSecret>>,
175 },
176 HasNewsForUs {
177 heads: AuthorHeads,
178 #[debug("reply")]
179 reply: oneshot::Sender<Result<Option<NonZeroU64>>>,
180 },
181 SetDownloadPolicy {
182 policy: DownloadPolicy,
183 #[debug("reply")]
184 reply: oneshot::Sender<Result<()>>,
185 },
186 GetDownloadPolicy {
187 #[debug("reply")]
188 reply: oneshot::Sender<Result<DownloadPolicy>>,
189 },
190}
191
192#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize)]
194pub struct OpenState {
195 pub sync: bool,
197 pub subscribers: usize,
199 pub handles: usize,
201}
202
203#[derive(Debug)]
204struct OpenReplica {
205 info: ReplicaInfo,
206 sync: bool,
207 handles: usize,
208}
209
210#[derive(Debug, Clone)]
223pub struct SyncHandle {
224 tx: async_channel::Sender<Action>,
225 join_handle: Arc<Option<JoinHandle<()>>>,
226 metrics: Arc<Metrics>,
227}
228
229#[derive(Debug, Default)]
231pub struct OpenOpts {
232 pub sync: bool,
234 pub subscribe: Option<async_channel::Sender<Event>>,
236}
237impl OpenOpts {
238 pub fn sync(mut self) -> Self {
240 self.sync = true;
241 self
242 }
243 pub fn subscribe(mut self, subscribe: async_channel::Sender<Event>) -> Self {
245 self.subscribe = Some(subscribe);
246 self
247 }
248}
249
250#[allow(missing_docs)]
251impl SyncHandle {
252 pub fn spawn(
254 store: Store,
255 content_status_callback: Option<ContentStatusCallback>,
256 me: String,
257 ) -> SyncHandle {
258 let metrics = Arc::new(Metrics::default());
259 let (action_tx, action_rx) = async_channel::bounded(ACTION_CAP);
260 let actor = Actor {
261 store,
262 states: Default::default(),
263 action_rx,
264 content_status_callback,
265 tasks: Default::default(),
266 metrics: metrics.clone(),
267 };
268 let join_handle = std::thread::Builder::new()
269 .name("sync-actor".to_string())
270 .spawn(move || {
271 let span = error_span!("sync", %me);
272 let _enter = span.enter();
273
274 if let Err(err) = actor.run() {
275 error!("Sync actor closed with error: {err:?}");
276 }
277 })
278 .expect("failed to spawn thread");
279 let join_handle = Arc::new(Some(join_handle));
280 SyncHandle {
281 tx: action_tx,
282 join_handle,
283 metrics,
284 }
285 }
286
287 pub fn metrics(&self) -> &Arc<Metrics> {
289 &self.metrics
290 }
291
292 pub async fn open(&self, namespace: NamespaceId, opts: OpenOpts) -> Result<()> {
293 let (reply, rx) = oneshot::channel();
294 let action = ReplicaAction::Open { reply, opts };
295 self.send_replica(namespace, action).await?;
296 rx.await?
297 }
298
299 pub async fn close(&self, namespace: NamespaceId) -> Result<bool> {
300 let (reply, rx) = oneshot::channel();
301 self.send_replica(namespace, ReplicaAction::Close { reply })
302 .await?;
303 rx.await?
304 }
305
306 pub async fn subscribe(
307 &self,
308 namespace: NamespaceId,
309 sender: async_channel::Sender<Event>,
310 ) -> Result<()> {
311 let (reply, rx) = oneshot::channel();
312 self.send_replica(namespace, ReplicaAction::Subscribe { sender, reply })
313 .await?;
314 rx.await?
315 }
316
317 pub async fn unsubscribe(
318 &self,
319 namespace: NamespaceId,
320 sender: async_channel::Sender<Event>,
321 ) -> Result<()> {
322 let (reply, rx) = oneshot::channel();
323 self.send_replica(namespace, ReplicaAction::Unsubscribe { sender, reply })
324 .await?;
325 rx.await?
326 }
327
328 pub async fn set_sync(&self, namespace: NamespaceId, sync: bool) -> Result<()> {
329 let (reply, rx) = oneshot::channel();
330 let action = ReplicaAction::SetSync { sync, reply };
331 self.send_replica(namespace, action).await?;
332 rx.await?
333 }
334
335 pub async fn insert_local(
336 &self,
337 namespace: NamespaceId,
338 author: AuthorId,
339 key: Bytes,
340 hash: Hash,
341 len: u64,
342 ) -> Result<()> {
343 let (reply, rx) = oneshot::channel();
344 let action = ReplicaAction::InsertLocal {
345 author,
346 key,
347 hash,
348 len,
349 reply,
350 };
351 self.send_replica(namespace, action).await?;
352 rx.await?
353 }
354
355 pub async fn delete_prefix(
356 &self,
357 namespace: NamespaceId,
358 author: AuthorId,
359 key: Bytes,
360 ) -> Result<usize> {
361 let (reply, rx) = oneshot::channel();
362 let action = ReplicaAction::DeletePrefix { author, key, reply };
363 self.send_replica(namespace, action).await?;
364 rx.await?
365 }
366
367 pub async fn insert_remote(
368 &self,
369 namespace: NamespaceId,
370 entry: SignedEntry,
371 from: PeerIdBytes,
372 content_status: ContentStatus,
373 ) -> Result<()> {
374 let (reply, rx) = oneshot::channel();
375 let action = ReplicaAction::InsertRemote {
376 entry,
377 from,
378 content_status,
379 reply,
380 };
381 self.send_replica(namespace, action).await?;
382 rx.await?
383 }
384
385 pub async fn sync_initial_message(
386 &self,
387 namespace: NamespaceId,
388 ) -> Result<Message<SignedEntry>> {
389 let (reply, rx) = oneshot::channel();
390 let action = ReplicaAction::SyncInitialMessage { reply };
391 self.send_replica(namespace, action).await?;
392 rx.await?
393 }
394
395 pub async fn sync_process_message(
396 &self,
397 namespace: NamespaceId,
398 message: Message<SignedEntry>,
399 from: PeerIdBytes,
400 state: SyncOutcome,
401 ) -> Result<(Option<Message<SignedEntry>>, SyncOutcome)> {
402 let (reply, rx) = oneshot::channel();
403 let action = ReplicaAction::SyncProcessMessage {
404 reply,
405 message,
406 from,
407 state,
408 };
409 self.send_replica(namespace, action).await?;
410 rx.await?
411 }
412
413 pub async fn get_sync_peers(&self, namespace: NamespaceId) -> Result<Option<Vec<PeerIdBytes>>> {
414 let (reply, rx) = oneshot::channel();
415 let action = ReplicaAction::GetSyncPeers { reply };
416 self.send_replica(namespace, action).await?;
417 rx.await?
418 }
419
420 pub async fn register_useful_peer(
421 &self,
422 namespace: NamespaceId,
423 peer: PeerIdBytes,
424 ) -> Result<()> {
425 let (reply, rx) = oneshot::channel();
426 let action = ReplicaAction::RegisterUsefulPeer { reply, peer };
427 self.send_replica(namespace, action).await?;
428 rx.await?
429 }
430
431 pub async fn has_news_for_us(
432 &self,
433 namespace: NamespaceId,
434 heads: AuthorHeads,
435 ) -> Result<Option<NonZeroU64>> {
436 let (reply, rx) = oneshot::channel();
437 let action = ReplicaAction::HasNewsForUs { reply, heads };
438 self.send_replica(namespace, action).await?;
439 rx.await?
440 }
441
442 pub async fn get_many(
443 &self,
444 namespace: NamespaceId,
445 query: Query,
446 reply: async_channel::Sender<Result<SignedEntry>>,
447 ) -> Result<()> {
448 let action = ReplicaAction::GetMany { query, reply };
449 self.send_replica(namespace, action).await?;
450 Ok(())
451 }
452
453 pub async fn get_exact(
454 &self,
455 namespace: NamespaceId,
456 author: AuthorId,
457 key: Bytes,
458 include_empty: bool,
459 ) -> Result<Option<SignedEntry>> {
460 let (reply, rx) = oneshot::channel();
461 let action = ReplicaAction::GetExact {
462 author,
463 key,
464 include_empty,
465 reply,
466 };
467 self.send_replica(namespace, action).await?;
468 rx.await?
469 }
470
471 pub async fn drop_replica(&self, namespace: NamespaceId) -> Result<()> {
472 let (reply, rx) = oneshot::channel();
473 let action = ReplicaAction::DropReplica { reply };
474 self.send_replica(namespace, action).await?;
475 rx.await?
476 }
477
478 pub async fn export_secret_key(&self, namespace: NamespaceId) -> Result<NamespaceSecret> {
479 let (reply, rx) = oneshot::channel();
480 let action = ReplicaAction::ExportSecretKey { reply };
481 self.send_replica(namespace, action).await?;
482 rx.await?
483 }
484
485 pub async fn get_state(&self, namespace: NamespaceId) -> Result<OpenState> {
486 let (reply, rx) = oneshot::channel();
487 let action = ReplicaAction::GetState { reply };
488 self.send_replica(namespace, action).await?;
489 rx.await?
490 }
491
492 pub async fn shutdown(&self) -> Result<Store> {
493 let (reply, rx) = oneshot::channel();
494 let action = Action::Shutdown { reply: Some(reply) };
495 self.send(action).await?;
496 let store = rx.await?;
497 Ok(store)
498 }
499
500 pub async fn list_authors(&self, reply: async_channel::Sender<Result<AuthorId>>) -> Result<()> {
501 self.send(Action::ListAuthors { reply }).await
502 }
503
504 pub async fn list_replicas(
505 &self,
506 reply: async_channel::Sender<Result<(NamespaceId, CapabilityKind)>>,
507 ) -> Result<()> {
508 self.send(Action::ListReplicas { reply }).await
509 }
510
511 pub async fn import_author(&self, author: Author) -> Result<AuthorId> {
512 let (reply, rx) = oneshot::channel();
513 self.send(Action::ImportAuthor { author, reply }).await?;
514 rx.await?
515 }
516
517 pub async fn export_author(&self, author: AuthorId) -> Result<Option<Author>> {
518 let (reply, rx) = oneshot::channel();
519 self.send(Action::ExportAuthor { author, reply }).await?;
520 rx.await?
521 }
522
523 pub async fn delete_author(&self, author: AuthorId) -> Result<()> {
524 let (reply, rx) = oneshot::channel();
525 self.send(Action::DeleteAuthor { author, reply }).await?;
526 rx.await?
527 }
528
529 pub async fn import_namespace(&self, capability: Capability) -> Result<NamespaceId> {
530 let (reply, rx) = oneshot::channel();
531 self.send(Action::ImportNamespace { capability, reply })
532 .await?;
533 rx.await?
534 }
535
536 pub async fn get_download_policy(&self, namespace: NamespaceId) -> Result<DownloadPolicy> {
537 let (reply, rx) = oneshot::channel();
538 let action = ReplicaAction::GetDownloadPolicy { reply };
539 self.send_replica(namespace, action).await?;
540 rx.await?
541 }
542
543 pub async fn set_download_policy(
544 &self,
545 namespace: NamespaceId,
546 policy: DownloadPolicy,
547 ) -> Result<()> {
548 let (reply, rx) = oneshot::channel();
549 let action = ReplicaAction::SetDownloadPolicy { reply, policy };
550 self.send_replica(namespace, action).await?;
551 rx.await?
552 }
553
554 pub async fn content_hashes(&self) -> Result<ContentHashesIterator> {
555 let (reply, rx) = oneshot::channel();
556 self.send(Action::ContentHashes { reply }).await?;
557 rx.await?
558 }
559
560 pub async fn flush_store(&self) -> Result<()> {
570 let (reply, rx) = oneshot::channel();
571 self.send(Action::FlushStore { reply }).await?;
572 rx.await?
573 }
574
575 async fn send(&self, action: Action) -> Result<()> {
576 self.tx
577 .send(action)
578 .await
579 .context("sending to iroh_docs actor failed")?;
580 Ok(())
581 }
582 async fn send_replica(&self, namespace: NamespaceId, action: ReplicaAction) -> Result<()> {
583 self.send(Action::Replica(namespace, action)).await?;
584 Ok(())
585 }
586}
587
588impl Drop for SyncHandle {
589 fn drop(&mut self) {
590 if let Some(handle) = Arc::get_mut(&mut self.join_handle) {
592 self.tx.send_blocking(Action::Shutdown { reply: None }).ok();
596 let handle = handle.take().expect("this can only run once");
597 if let Err(err) = handle.join() {
598 warn!(?err, "Failed to join sync actor");
599 }
600 }
601 }
602}
603
604struct Actor {
605 store: Store,
606 states: OpenReplicas,
607 action_rx: async_channel::Receiver<Action>,
608 content_status_callback: Option<ContentStatusCallback>,
609 tasks: JoinSet<()>,
610 metrics: Arc<Metrics>,
611}
612
613impl Actor {
614 fn run(self) -> Result<()> {
615 let rt = tokio::runtime::Builder::new_current_thread()
616 .enable_time()
617 .build()?;
618 let local_set = tokio::task::LocalSet::new();
619 local_set.block_on(&rt, async move { self.run_async().await });
620 Ok(())
621 }
622
623 async fn run_async(mut self) {
624 let reply = loop {
625 let timeout = tokio::time::sleep(MAX_COMMIT_DELAY);
626 tokio::pin!(timeout);
627 let action = tokio::select! {
628 _ = &mut timeout => {
629 if let Err(cause) = self.store.flush() {
630 error!(?cause, "failed to flush store");
631 }
632 continue;
633 }
634 action = self.action_rx.recv() => {
635 match action {
636 Ok(action) => action,
637 Err(async_channel::RecvError) => {
638 debug!("action channel disconnected");
639 break None;
640 }
641
642 }
643 }
644 };
645 trace!(%action, "tick");
646 self.metrics.actor_tick_main.inc();
647 match action {
648 Action::Shutdown { reply } => {
649 break reply;
650 }
651 action => {
652 if self.on_action(action).is_err() {
653 warn!("failed to send reply: receiver dropped");
654 }
655 }
656 }
657 };
658
659 if let Err(cause) = self.store.flush() {
660 warn!(?cause, "failed to flush store");
661 }
662 self.close_all();
663 self.tasks.abort_all();
664 debug!("docs actor shutdown");
665 if let Some(reply) = reply {
666 reply.send(self.store).ok();
667 }
668 }
669
670 fn on_action(&mut self, action: Action) -> Result<(), SendReplyError> {
671 match action {
672 Action::Shutdown { .. } => {
673 unreachable!("Shutdown is handled in run()")
674 }
675 Action::ImportAuthor { author, reply } => {
676 let id = author.id();
677 send_reply(reply, self.store.import_author(author).map(|_| id))
678 }
679 Action::ExportAuthor { author, reply } => {
680 send_reply(reply, self.store.get_author(&author))
681 }
682 Action::DeleteAuthor { author, reply } => {
683 send_reply(reply, self.store.delete_author(author))
684 }
685 Action::ImportNamespace { capability, reply } => send_reply_with(reply, self, |this| {
686 let id = capability.id();
687 let outcome = this.store.import_namespace(capability.clone())?;
688 if let ImportNamespaceOutcome::Upgraded = outcome {
689 if let Ok(state) = this.states.get_mut(&id) {
690 state.info.merge_capability(capability)?;
691 }
692 }
693 Ok(id)
694 }),
695 Action::ListAuthors { reply } => {
696 let iter = self
697 .store
698 .list_authors()
699 .map(|a| a.map(|a| a.map(|a| a.id())));
700 self.tasks
701 .spawn_local(iter_to_channel_async(reply, iter).map(|_| ()));
702 Ok(())
703 }
704 Action::ListReplicas { reply } => {
705 let iter = self.store.list_namespaces();
706 self.tasks
707 .spawn_local(iter_to_channel_async(reply, iter).map(|_| ()));
708 Ok(())
709 }
710 Action::ContentHashes { reply } => {
711 send_reply_with(reply, self, |this| this.store.content_hashes())
712 }
713 Action::FlushStore { reply } => send_reply(reply, self.store.flush()),
714 Action::Replica(namespace, action) => self.on_replica_action(namespace, action),
715 }
716 }
717
718 fn on_replica_action(
719 &mut self,
720 namespace: NamespaceId,
721 action: ReplicaAction,
722 ) -> Result<(), SendReplyError> {
723 match action {
724 ReplicaAction::Open { reply, opts } => {
725 tracing::trace!("open in");
726 let res = self.open(namespace, opts);
727 tracing::trace!("open out");
728 send_reply(reply, res)
729 }
730 ReplicaAction::Close { reply } => {
731 let res = self.close(namespace);
732 reply.send(Ok(res)).ok();
734 Ok(())
735 }
736 ReplicaAction::Subscribe { sender, reply } => send_reply_with(reply, self, |this| {
737 let state = this.states.get_mut(&namespace)?;
738 state.info.subscribe(sender);
739 Ok(())
740 }),
741 ReplicaAction::Unsubscribe { sender, reply } => send_reply_with(reply, self, |this| {
742 let state = this.states.get_mut(&namespace)?;
743 state.info.unsubscribe(&sender);
744 drop(sender);
745 Ok(())
746 }),
747 ReplicaAction::SetSync { sync, reply } => send_reply_with(reply, self, |this| {
748 let state = this.states.get_mut(&namespace)?;
749 state.sync = sync;
750 Ok(())
751 }),
752 ReplicaAction::InsertLocal {
753 author,
754 key,
755 hash,
756 len,
757 reply,
758 } => send_reply_with(reply, self, move |this| {
759 let author = get_author(&mut this.store, &author)?;
760 let mut replica = this.states.replica(namespace, &mut this.store)?;
761 replica.insert(&key, &author, hash, len)?;
762 this.metrics.new_entries_local.inc();
763 this.metrics.new_entries_local_size.inc_by(len);
764 Ok(())
765 }),
766 ReplicaAction::DeletePrefix { author, key, reply } => {
767 send_reply_with(reply, self, |this| {
768 let author = get_author(&mut this.store, &author)?;
769 let mut replica = this.states.replica(namespace, &mut this.store)?;
770 let res = replica.delete_prefix(&key, &author)?;
771 Ok(res)
772 })
773 }
774 ReplicaAction::InsertRemote {
775 entry,
776 from,
777 content_status,
778 reply,
779 } => send_reply_with(reply, self, move |this| {
780 let mut replica = this
781 .states
782 .replica_if_syncing(&namespace, &mut this.store)?;
783 let len = entry.content_len();
784 replica.insert_remote_entry(entry, from, content_status)?;
785 this.metrics.new_entries_remote.inc();
786 this.metrics.new_entries_remote_size.inc_by(len);
787 Ok(())
788 }),
789
790 ReplicaAction::SyncInitialMessage { reply } => {
791 send_reply_with(reply, self, move |this| {
792 let mut replica = this
793 .states
794 .replica_if_syncing(&namespace, &mut this.store)?;
795 let res = replica.sync_initial_message()?;
796 Ok(res)
797 })
798 }
799 ReplicaAction::SyncProcessMessage {
800 message,
801 from,
802 mut state,
803 reply,
804 } => send_reply_with(reply, self, move |this| {
805 let mut replica = this
806 .states
807 .replica_if_syncing(&namespace, &mut this.store)?;
808 let res = replica.sync_process_message(message, from, &mut state)?;
809 Ok((res, state))
810 }),
811 ReplicaAction::GetSyncPeers { reply } => send_reply_with(reply, self, move |this| {
812 this.states.ensure_open(&namespace)?;
813 let peers = this.store.get_sync_peers(&namespace)?;
814 Ok(peers.map(|iter| iter.collect()))
815 }),
816 ReplicaAction::RegisterUsefulPeer { peer, reply } => {
817 let res = self.store.register_useful_peer(namespace, peer);
818 send_reply(reply, res)
819 }
820 ReplicaAction::GetExact {
821 author,
822 key,
823 include_empty,
824 reply,
825 } => send_reply_with(reply, self, move |this| {
826 this.states.ensure_open(&namespace)?;
827 this.store.get_exact(namespace, author, key, include_empty)
828 }),
829 ReplicaAction::GetMany { query, reply } => {
830 let iter = self
831 .states
832 .ensure_open(&namespace)
833 .and_then(|_| self.store.get_many(namespace, query));
834 self.tasks
835 .spawn_local(iter_to_channel_async(reply, iter).map(|_| ()));
836 Ok(())
837 }
838 ReplicaAction::DropReplica { reply } => send_reply_with(reply, self, |this| {
839 this.close(namespace);
840 this.store.remove_replica(&namespace)
841 }),
842 ReplicaAction::ExportSecretKey { reply } => {
843 let res = self
844 .states
845 .get_mut(&namespace)
846 .and_then(|state| Ok(state.info.capability.secret_key()?.clone()));
847 send_reply(reply, res)
848 }
849 ReplicaAction::GetState { reply } => send_reply_with(reply, self, move |this| {
850 let state = this.states.get_mut(&namespace)?;
851 let handles = state.handles;
852 let sync = state.sync;
853 let subscribers = state.info.subscribers_count();
854 Ok(OpenState {
855 handles,
856 sync,
857 subscribers,
858 })
859 }),
860 ReplicaAction::HasNewsForUs { heads, reply } => {
861 let res = self.store.has_news_for_us(namespace, &heads);
862 send_reply(reply, res)
863 }
864 ReplicaAction::SetDownloadPolicy { policy, reply } => {
865 send_reply(reply, self.store.set_download_policy(&namespace, policy))
866 }
867 ReplicaAction::GetDownloadPolicy { reply } => {
868 send_reply(reply, self.store.get_download_policy(&namespace))
869 }
870 }
871 }
872
873 fn close(&mut self, namespace: NamespaceId) -> bool {
874 let res = self.states.close(namespace);
875 if res {
876 self.store.close_replica(namespace);
877 }
878 res
879 }
880
881 fn close_all(&mut self) {
882 for id in self.states.close_all() {
883 self.store.close_replica(id);
884 }
885 }
886
887 fn open(&mut self, namespace: NamespaceId, opts: OpenOpts) -> Result<()> {
888 let open_cb = || {
889 let mut info = self.store.load_replica_info(&namespace)?;
890 if let Some(cb) = &self.content_status_callback {
891 info.set_content_status_callback(Arc::clone(cb));
892 }
893 Ok(info)
894 };
895 self.states.open_with(namespace, opts, open_cb)
896 }
897}
898
899#[derive(Default)]
900struct OpenReplicas(HashMap<NamespaceId, OpenReplica>);
901
902impl OpenReplicas {
903 fn replica<'a, 'b>(
904 &'a mut self,
905 namespace: NamespaceId,
906 store: &'b mut Store,
907 ) -> Result<Replica<'b, &'a mut ReplicaInfo>> {
908 let state = self.get_mut(&namespace)?;
909 Ok(Replica::new(
910 StoreInstance::new(state.info.capability.id(), store),
911 &mut state.info,
912 ))
913 }
914
915 fn replica_if_syncing<'a, 'b>(
916 &'a mut self,
917 namespace: &NamespaceId,
918 store: &'b mut Store,
919 ) -> Result<Replica<'b, &'a mut ReplicaInfo>> {
920 let state = self.get_mut(namespace)?;
921 anyhow::ensure!(state.sync, "sync is not enabled for replica");
922 Ok(Replica::new(
923 StoreInstance::new(state.info.capability.id(), store),
924 &mut state.info,
925 ))
926 }
927
928 fn get_mut(&mut self, namespace: &NamespaceId) -> Result<&mut OpenReplica> {
929 self.0.get_mut(namespace).context("replica not open")
930 }
931
932 fn is_open(&self, namespace: &NamespaceId) -> bool {
933 self.0.contains_key(namespace)
934 }
935
936 fn ensure_open(&self, namespace: &NamespaceId) -> Result<()> {
937 match self.is_open(namespace) {
938 true => Ok(()),
939 false => Err(anyhow!("replica not open")),
940 }
941 }
942 fn open_with(
943 &mut self,
944 namespace: NamespaceId,
945 opts: OpenOpts,
946 mut open_cb: impl FnMut() -> Result<ReplicaInfo>,
947 ) -> Result<()> {
948 match self.0.entry(namespace) {
949 hash_map::Entry::Vacant(e) => {
950 let mut info = open_cb()?;
951 if let Some(sender) = opts.subscribe {
952 info.subscribe(sender);
953 }
954 debug!(namespace = %namespace.fmt_short(), "open");
955 let state = OpenReplica {
956 info,
957 sync: opts.sync,
958 handles: 1,
959 };
960 e.insert(state);
961 }
962 hash_map::Entry::Occupied(mut e) => {
963 let state = e.get_mut();
964 state.handles += 1;
965 state.sync = state.sync || opts.sync;
966 if let Some(sender) = opts.subscribe {
967 state.info.subscribe(sender);
968 }
969 }
970 }
971 Ok(())
972 }
973 fn close(&mut self, namespace: NamespaceId) -> bool {
974 match self.0.entry(namespace) {
975 hash_map::Entry::Vacant(_e) => {
976 warn!(namespace = %namespace.fmt_short(), "received close request for closed replica");
977 true
978 }
979 hash_map::Entry::Occupied(mut e) => {
980 let state = e.get_mut();
981 state.handles = state.handles.wrapping_sub(1);
982 if state.handles == 0 {
983 let _ = e.remove_entry();
984 debug!(namespace = %namespace.fmt_short(), "close");
985 true
986 } else {
987 false
988 }
989 }
990 }
991 }
992
993 fn close_all(&mut self) -> impl Iterator<Item = NamespaceId> + '_ {
994 self.0.drain().map(|(n, _s)| n)
995 }
996}
997
998async fn iter_to_channel_async<T: Send + 'static>(
999 channel: async_channel::Sender<Result<T>>,
1000 iter: Result<impl Iterator<Item = Result<T>>>,
1001) -> Result<(), SendReplyError> {
1002 match iter {
1003 Err(err) => channel.send(Err(err)).await.map_err(send_reply_error)?,
1004 Ok(iter) => {
1005 for item in iter {
1006 channel.send(item).await.map_err(send_reply_error)?;
1007 }
1008 }
1009 }
1010 Ok(())
1011}
1012
1013fn get_author(store: &mut Store, id: &AuthorId) -> Result<Author> {
1014 store.get_author(id)?.context("author not found")
1015}
1016
1017#[derive(Debug)]
1018struct SendReplyError;
1019
1020fn send_reply<T>(sender: oneshot::Sender<T>, value: T) -> Result<(), SendReplyError> {
1021 sender.send(value).map_err(send_reply_error)
1022}
1023
1024fn send_reply_with<T>(
1025 sender: oneshot::Sender<Result<T>>,
1026 this: &mut Actor,
1027 f: impl FnOnce(&mut Actor) -> Result<T>,
1028) -> Result<(), SendReplyError> {
1029 sender.send(f(this)).map_err(send_reply_error)
1030}
1031
1032fn send_reply_error<T>(_err: T) -> SendReplyError {
1033 SendReplyError
1034}
1035
1036#[cfg(test)]
1037mod tests {
1038 use super::*;
1039 use crate::store;
1040 #[tokio::test]
1041 async fn open_close() -> anyhow::Result<()> {
1042 let store = store::Store::memory();
1043 let sync = SyncHandle::spawn(store, None, "foo".into());
1044 let namespace = NamespaceSecret::new(&mut rand::rngs::OsRng {});
1045 let id = namespace.id();
1046 sync.import_namespace(namespace.into()).await?;
1047 sync.open(id, Default::default()).await?;
1048 let (tx, rx) = async_channel::bounded(10);
1049 sync.subscribe(id, tx).await?;
1050 sync.close(id).await?;
1051 assert!(rx.recv().await.is_err());
1052 Ok(())
1053 }
1054}