1use std::{
4 collections::{hash_map, HashMap},
5 num::NonZeroU64,
6 sync::Arc,
7 thread::JoinHandle,
8 time::Duration,
9};
10
11use anyhow::{anyhow, Context, Result};
12use bytes::Bytes;
13use futures_util::FutureExt;
14use iroh_blobs::Hash;
15use irpc::channel::mpsc;
16use serde::{Deserialize, Serialize};
17use tokio::{sync::oneshot, task::JoinSet};
18use tracing::{debug, error, error_span, trace, warn};
19
20use crate::{
21 api::{
22 protocol::{AuthorListResponse, ListResponse},
23 RpcError, RpcResult,
24 },
25 metrics::Metrics,
26 ranger::Message,
27 store::{
28 fs::{ContentHashesIterator, StoreInstance},
29 DownloadPolicy, ImportNamespaceOutcome, Query, Store,
30 },
31 Author, AuthorHeads, AuthorId, Capability, ContentStatus, ContentStatusCallback, Event,
32 NamespaceId, NamespaceSecret, PeerIdBytes, Replica, ReplicaInfo, SignedEntry, SyncOutcome,
33};
34
35const ACTION_CAP: usize = 1024;
36pub(crate) const MAX_COMMIT_DELAY: Duration = Duration::from_millis(500);
37
38#[derive(derive_more::Debug, derive_more::Display)]
39enum Action {
40 #[display("NewAuthor")]
41 ImportAuthor {
42 author: Author,
43 #[debug("reply")]
44 reply: oneshot::Sender<Result<AuthorId>>,
45 },
46 #[display("ExportAuthor")]
47 ExportAuthor {
48 author: AuthorId,
49 #[debug("reply")]
50 reply: oneshot::Sender<Result<Option<Author>>>,
51 },
52 #[display("DeleteAuthor")]
53 DeleteAuthor {
54 author: AuthorId,
55 #[debug("reply")]
56 reply: oneshot::Sender<Result<()>>,
57 },
58 #[display("NewReplica")]
59 ImportNamespace {
60 capability: Capability,
61 #[debug("reply")]
62 reply: oneshot::Sender<Result<NamespaceId>>,
63 },
64 #[display("ListAuthors")]
65 ListAuthors {
66 #[debug("reply")]
67 reply: mpsc::Sender<RpcResult<AuthorListResponse>>,
68 },
69 #[display("ListReplicas")]
70 ListReplicas {
71 #[debug("reply")]
72 reply: mpsc::Sender<RpcResult<ListResponse>>,
73 },
74 #[display("ContentHashes")]
75 ContentHashes {
76 #[debug("reply")]
77 reply: oneshot::Sender<Result<ContentHashesIterator>>,
78 },
79 #[display("FlushStore")]
80 FlushStore {
81 #[debug("reply")]
82 reply: oneshot::Sender<Result<()>>,
83 },
84 #[display("Replica({}, {})", _0.fmt_short(), _1)]
85 Replica(NamespaceId, ReplicaAction),
86 #[display("Shutdown")]
87 Shutdown {
88 #[debug("reply")]
89 reply: Option<oneshot::Sender<Store>>,
90 },
91}
92
93#[derive(derive_more::Debug, strum::Display)]
94enum ReplicaAction {
95 Open {
96 #[debug("reply")]
97 reply: oneshot::Sender<Result<()>>,
98 opts: OpenOpts,
99 },
100 Close {
101 #[debug("reply")]
102 reply: oneshot::Sender<Result<bool>>,
103 },
104 GetState {
105 #[debug("reply")]
106 reply: oneshot::Sender<Result<OpenState>>,
107 },
108 SetSync {
109 sync: bool,
110 #[debug("reply")]
111 reply: oneshot::Sender<Result<()>>,
112 },
113 Subscribe {
114 sender: async_channel::Sender<Event>,
115 #[debug("reply")]
116 reply: oneshot::Sender<Result<()>>,
117 },
118 Unsubscribe {
119 sender: async_channel::Sender<Event>,
120 #[debug("reply")]
121 reply: oneshot::Sender<Result<()>>,
122 },
123 InsertLocal {
124 author: AuthorId,
125 key: Bytes,
126 hash: Hash,
127 len: u64,
128 #[debug("reply")]
129 reply: oneshot::Sender<Result<()>>,
130 },
131 DeletePrefix {
132 author: AuthorId,
133 key: Bytes,
134 #[debug("reply")]
135 reply: oneshot::Sender<Result<usize>>,
136 },
137 InsertRemote {
138 entry: SignedEntry,
139 from: PeerIdBytes,
140 content_status: ContentStatus,
141 #[debug("reply")]
142 reply: oneshot::Sender<Result<()>>,
143 },
144 SyncInitialMessage {
145 #[debug("reply")]
146 reply: oneshot::Sender<Result<Message<SignedEntry>>>,
147 },
148 SyncProcessMessage {
149 message: Message<SignedEntry>,
150 from: PeerIdBytes,
151 state: SyncOutcome,
152 #[debug("reply")]
153 reply: oneshot::Sender<Result<(Option<Message<SignedEntry>>, SyncOutcome)>>,
154 },
155 GetSyncPeers {
156 #[debug("reply")]
157 reply: oneshot::Sender<Result<Option<Vec<PeerIdBytes>>>>,
158 },
159 RegisterUsefulPeer {
160 peer: PeerIdBytes,
161 #[debug("reply")]
162 reply: oneshot::Sender<Result<()>>,
163 },
164 GetExact {
165 author: AuthorId,
166 key: Bytes,
167 include_empty: bool,
168 reply: oneshot::Sender<Result<Option<SignedEntry>>>,
169 },
170 GetMany {
171 query: Query,
172 reply: mpsc::Sender<RpcResult<SignedEntry>>,
173 },
174 DropReplica {
175 reply: oneshot::Sender<Result<()>>,
176 },
177 ExportSecretKey {
178 reply: oneshot::Sender<Result<NamespaceSecret>>,
179 },
180 HasNewsForUs {
181 heads: AuthorHeads,
182 #[debug("reply")]
183 reply: oneshot::Sender<Result<Option<NonZeroU64>>>,
184 },
185 SetDownloadPolicy {
186 policy: DownloadPolicy,
187 #[debug("reply")]
188 reply: oneshot::Sender<Result<()>>,
189 },
190 GetDownloadPolicy {
191 #[debug("reply")]
192 reply: oneshot::Sender<Result<DownloadPolicy>>,
193 },
194}
195
196#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize)]
198pub struct OpenState {
199 pub sync: bool,
201 pub subscribers: usize,
203 pub handles: usize,
205}
206
207#[derive(Debug)]
208struct OpenReplica {
209 info: ReplicaInfo,
210 sync: bool,
211 handles: usize,
212}
213
214#[derive(Debug, Clone)]
227pub struct SyncHandle {
228 tx: async_channel::Sender<Action>,
229 join_handle: Arc<Option<JoinHandle<()>>>,
230 metrics: Arc<Metrics>,
231}
232
233#[derive(Debug, Default)]
235pub struct OpenOpts {
236 pub sync: bool,
238 pub subscribe: Option<async_channel::Sender<Event>>,
240}
241
242impl OpenOpts {
243 pub fn sync(mut self) -> Self {
245 self.sync = true;
246 self
247 }
248 pub fn subscribe(mut self, subscribe: async_channel::Sender<Event>) -> Self {
250 self.subscribe = Some(subscribe);
251 self
252 }
253}
254
255#[allow(missing_docs)]
256impl SyncHandle {
257 pub fn spawn(
259 store: Store,
260 content_status_callback: Option<ContentStatusCallback>,
261 me: String,
262 ) -> SyncHandle {
263 let metrics = Arc::new(Metrics::default());
264 let (action_tx, action_rx) = async_channel::bounded(ACTION_CAP);
265 let actor = Actor {
266 store,
267 states: Default::default(),
268 action_rx,
269 content_status_callback,
270 tasks: Default::default(),
271 metrics: metrics.clone(),
272 };
273 let join_handle = std::thread::Builder::new()
274 .name("sync-actor".to_string())
275 .spawn(move || {
276 let span = error_span!("sync", %me);
277 let _enter = span.enter();
278
279 if let Err(err) = actor.run() {
280 error!("Sync actor closed with error: {err:?}");
281 }
282 })
283 .expect("failed to spawn thread");
284 let join_handle = Arc::new(Some(join_handle));
285 SyncHandle {
286 tx: action_tx,
287 join_handle,
288 metrics,
289 }
290 }
291
292 pub fn metrics(&self) -> &Arc<Metrics> {
294 &self.metrics
295 }
296
297 pub async fn open(&self, namespace: NamespaceId, opts: OpenOpts) -> Result<()> {
298 tracing::debug!("SyncHandle::open called");
299 let (reply, rx) = oneshot::channel();
300 let action = ReplicaAction::Open { reply, opts };
301 self.send_replica(namespace, action).await?;
302 rx.await?
303 }
304
305 pub async fn close(&self, namespace: NamespaceId) -> Result<bool> {
306 let (reply, rx) = oneshot::channel();
307 self.send_replica(namespace, ReplicaAction::Close { reply })
308 .await?;
309 rx.await?
310 }
311
312 pub async fn subscribe(
313 &self,
314 namespace: NamespaceId,
315 sender: async_channel::Sender<Event>,
316 ) -> Result<()> {
317 let (reply, rx) = oneshot::channel();
318 self.send_replica(namespace, ReplicaAction::Subscribe { sender, reply })
319 .await?;
320 rx.await?
321 }
322
323 pub async fn unsubscribe(
324 &self,
325 namespace: NamespaceId,
326 sender: async_channel::Sender<Event>,
327 ) -> Result<()> {
328 let (reply, rx) = oneshot::channel();
329 self.send_replica(namespace, ReplicaAction::Unsubscribe { sender, reply })
330 .await?;
331 rx.await?
332 }
333
334 pub async fn set_sync(&self, namespace: NamespaceId, sync: bool) -> Result<()> {
335 let (reply, rx) = oneshot::channel();
336 let action = ReplicaAction::SetSync { sync, reply };
337 self.send_replica(namespace, action).await?;
338 rx.await?
339 }
340
341 pub async fn insert_local(
342 &self,
343 namespace: NamespaceId,
344 author: AuthorId,
345 key: Bytes,
346 hash: Hash,
347 len: u64,
348 ) -> Result<()> {
349 let (reply, rx) = oneshot::channel();
350 let action = ReplicaAction::InsertLocal {
351 author,
352 key,
353 hash,
354 len,
355 reply,
356 };
357 self.send_replica(namespace, action).await?;
358 rx.await?
359 }
360
361 pub async fn delete_prefix(
362 &self,
363 namespace: NamespaceId,
364 author: AuthorId,
365 key: Bytes,
366 ) -> Result<usize> {
367 let (reply, rx) = oneshot::channel();
368 let action = ReplicaAction::DeletePrefix { author, key, reply };
369 self.send_replica(namespace, action).await?;
370 rx.await?
371 }
372
373 pub async fn insert_remote(
374 &self,
375 namespace: NamespaceId,
376 entry: SignedEntry,
377 from: PeerIdBytes,
378 content_status: ContentStatus,
379 ) -> Result<()> {
380 let (reply, rx) = oneshot::channel();
381 let action = ReplicaAction::InsertRemote {
382 entry,
383 from,
384 content_status,
385 reply,
386 };
387 self.send_replica(namespace, action).await?;
388 rx.await?
389 }
390
391 pub async fn sync_initial_message(
392 &self,
393 namespace: NamespaceId,
394 ) -> Result<Message<SignedEntry>> {
395 let (reply, rx) = oneshot::channel();
396 let action = ReplicaAction::SyncInitialMessage { reply };
397 self.send_replica(namespace, action).await?;
398 rx.await?
399 }
400
401 pub async fn sync_process_message(
402 &self,
403 namespace: NamespaceId,
404 message: Message<SignedEntry>,
405 from: PeerIdBytes,
406 state: SyncOutcome,
407 ) -> Result<(Option<Message<SignedEntry>>, SyncOutcome)> {
408 let (reply, rx) = oneshot::channel();
409 let action = ReplicaAction::SyncProcessMessage {
410 reply,
411 message,
412 from,
413 state,
414 };
415 self.send_replica(namespace, action).await?;
416 rx.await?
417 }
418
419 pub async fn get_sync_peers(&self, namespace: NamespaceId) -> Result<Option<Vec<PeerIdBytes>>> {
420 let (reply, rx) = oneshot::channel();
421 let action = ReplicaAction::GetSyncPeers { reply };
422 self.send_replica(namespace, action).await?;
423 rx.await?
424 }
425
426 pub async fn register_useful_peer(
427 &self,
428 namespace: NamespaceId,
429 peer: PeerIdBytes,
430 ) -> Result<()> {
431 let (reply, rx) = oneshot::channel();
432 let action = ReplicaAction::RegisterUsefulPeer { reply, peer };
433 self.send_replica(namespace, action).await?;
434 rx.await?
435 }
436
437 pub async fn has_news_for_us(
438 &self,
439 namespace: NamespaceId,
440 heads: AuthorHeads,
441 ) -> Result<Option<NonZeroU64>> {
442 let (reply, rx) = oneshot::channel();
443 let action = ReplicaAction::HasNewsForUs { reply, heads };
444 self.send_replica(namespace, action).await?;
445 rx.await?
446 }
447
448 pub async fn get_many(
449 &self,
450 namespace: NamespaceId,
451 query: Query,
452 reply: mpsc::Sender<RpcResult<SignedEntry>>,
453 ) -> Result<()> {
454 let action = ReplicaAction::GetMany { query, reply };
455 self.send_replica(namespace, action).await?;
456 Ok(())
457 }
458
459 pub async fn get_exact(
460 &self,
461 namespace: NamespaceId,
462 author: AuthorId,
463 key: Bytes,
464 include_empty: bool,
465 ) -> Result<Option<SignedEntry>> {
466 let (reply, rx) = oneshot::channel();
467 let action = ReplicaAction::GetExact {
468 author,
469 key,
470 include_empty,
471 reply,
472 };
473 self.send_replica(namespace, action).await?;
474 rx.await?
475 }
476
477 pub async fn drop_replica(&self, namespace: NamespaceId) -> Result<()> {
478 let (reply, rx) = oneshot::channel();
479 let action = ReplicaAction::DropReplica { reply };
480 self.send_replica(namespace, action).await?;
481 rx.await?
482 }
483
484 pub async fn export_secret_key(&self, namespace: NamespaceId) -> Result<NamespaceSecret> {
485 let (reply, rx) = oneshot::channel();
486 let action = ReplicaAction::ExportSecretKey { reply };
487 self.send_replica(namespace, action).await?;
488 rx.await?
489 }
490
491 pub async fn get_state(&self, namespace: NamespaceId) -> Result<OpenState> {
492 let (reply, rx) = oneshot::channel();
493 let action = ReplicaAction::GetState { reply };
494 self.send_replica(namespace, action).await?;
495 rx.await?
496 }
497
498 pub async fn shutdown(&self) -> Result<Store> {
499 let (reply, rx) = oneshot::channel();
500 let action = Action::Shutdown { reply: Some(reply) };
501 self.send(action).await?;
502 let store = rx.await?;
503 Ok(store)
504 }
505
506 pub async fn list_authors(
507 &self,
508 reply: mpsc::Sender<RpcResult<AuthorListResponse>>,
509 ) -> Result<()> {
510 self.send(Action::ListAuthors { reply }).await
511 }
512
513 pub async fn list_replicas(&self, reply: mpsc::Sender<RpcResult<ListResponse>>) -> Result<()> {
514 self.send(Action::ListReplicas { reply }).await
515 }
516
517 pub async fn import_author(&self, author: Author) -> Result<AuthorId> {
518 let (reply, rx) = oneshot::channel();
519 self.send(Action::ImportAuthor { author, reply }).await?;
520 rx.await?
521 }
522
523 pub async fn export_author(&self, author: AuthorId) -> Result<Option<Author>> {
524 let (reply, rx) = oneshot::channel();
525 self.send(Action::ExportAuthor { author, reply }).await?;
526 rx.await?
527 }
528
529 pub async fn delete_author(&self, author: AuthorId) -> Result<()> {
530 let (reply, rx) = oneshot::channel();
531 self.send(Action::DeleteAuthor { author, reply }).await?;
532 rx.await?
533 }
534
535 pub async fn import_namespace(&self, capability: Capability) -> Result<NamespaceId> {
536 let (reply, rx) = oneshot::channel();
537 self.send(Action::ImportNamespace { capability, reply })
538 .await?;
539 rx.await?
540 }
541
542 pub async fn get_download_policy(&self, namespace: NamespaceId) -> Result<DownloadPolicy> {
543 let (reply, rx) = oneshot::channel();
544 let action = ReplicaAction::GetDownloadPolicy { reply };
545 self.send_replica(namespace, action).await?;
546 rx.await?
547 }
548
549 pub async fn set_download_policy(
550 &self,
551 namespace: NamespaceId,
552 policy: DownloadPolicy,
553 ) -> Result<()> {
554 let (reply, rx) = oneshot::channel();
555 let action = ReplicaAction::SetDownloadPolicy { reply, policy };
556 self.send_replica(namespace, action).await?;
557 rx.await?
558 }
559
560 pub async fn content_hashes(&self) -> Result<ContentHashesIterator> {
561 let (reply, rx) = oneshot::channel();
562 self.send(Action::ContentHashes { reply }).await?;
563 rx.await?
564 }
565
566 pub async fn flush_store(&self) -> Result<()> {
576 let (reply, rx) = oneshot::channel();
577 self.send(Action::FlushStore { reply }).await?;
578 rx.await?
579 }
580
581 async fn send(&self, action: Action) -> Result<()> {
582 self.tx
583 .send(action)
584 .await
585 .context("sending to iroh_docs actor failed")?;
586 Ok(())
587 }
588 async fn send_replica(&self, namespace: NamespaceId, action: ReplicaAction) -> Result<()> {
589 self.send(Action::Replica(namespace, action)).await?;
590 Ok(())
591 }
592}
593
594impl Drop for SyncHandle {
595 fn drop(&mut self) {
596 if let Some(handle) = Arc::get_mut(&mut self.join_handle) {
598 self.tx.send_blocking(Action::Shutdown { reply: None }).ok();
602 let handle = handle.take().expect("this can only run once");
603 if let Err(err) = handle.join() {
604 warn!(?err, "Failed to join sync actor");
605 }
606 }
607 }
608}
609
610struct Actor {
611 store: Store,
612 states: OpenReplicas,
613 action_rx: async_channel::Receiver<Action>,
614 content_status_callback: Option<ContentStatusCallback>,
615 tasks: JoinSet<()>,
616 metrics: Arc<Metrics>,
617}
618
619impl Actor {
620 fn run(self) -> Result<()> {
621 let rt = tokio::runtime::Builder::new_current_thread()
622 .enable_time()
623 .build()?;
624 let local_set = tokio::task::LocalSet::new();
625 local_set.block_on(&rt, async move { self.run_async().await });
626 Ok(())
627 }
628
629 async fn run_async(mut self) {
630 let reply = loop {
631 let timeout = tokio::time::sleep(MAX_COMMIT_DELAY);
632 tokio::pin!(timeout);
633 let action = tokio::select! {
634 _ = &mut timeout => {
635 if let Err(cause) = self.store.flush() {
636 error!(?cause, "failed to flush store");
637 }
638 continue;
639 }
640 action = self.action_rx.recv() => {
641 match action {
642 Ok(action) => action,
643 Err(async_channel::RecvError) => {
644 debug!("action channel disconnected");
645 break None;
646 }
647
648 }
649 }
650 };
651 trace!(%action, "tick");
652 self.metrics.actor_tick_main.inc();
653 match action {
654 Action::Shutdown { reply } => {
655 break reply;
656 }
657 action => {
658 if self.on_action(action).await.is_err() {
659 warn!("failed to send reply: receiver dropped");
660 }
661 }
662 }
663 };
664
665 if let Err(cause) = self.store.flush() {
666 warn!(?cause, "failed to flush store");
667 }
668 self.close_all();
669 self.tasks.abort_all();
670 debug!("docs actor shutdown");
671 if let Some(reply) = reply {
672 reply.send(self.store).ok();
673 }
674 }
675
676 async fn on_action(&mut self, action: Action) -> Result<(), SendReplyError> {
677 match action {
678 Action::Shutdown { .. } => {
679 unreachable!("Shutdown is handled in run()")
680 }
681 Action::ImportAuthor { author, reply } => {
682 let id = author.id();
683 send_reply(reply, self.store.import_author(author).map(|_| id))
684 }
685 Action::ExportAuthor { author, reply } => {
686 send_reply(reply, self.store.get_author(&author))
687 }
688 Action::DeleteAuthor { author, reply } => {
689 send_reply(reply, self.store.delete_author(author))
690 }
691 Action::ImportNamespace { capability, reply } => send_reply_with(reply, self, |this| {
692 let id = capability.id();
693 let outcome = this.store.import_namespace(capability.clone())?;
694 if let ImportNamespaceOutcome::Upgraded = outcome {
695 if let Ok(state) = this.states.get_mut(&id) {
696 state.info.merge_capability(capability)?;
697 }
698 }
699 Ok(id)
700 }),
701 Action::ListAuthors { reply } => {
702 let iter = self
703 .store
704 .list_authors()
705 .map(|a| a.map(|a| a.map(|a| AuthorListResponse { author_id: a.id() })));
706 self.tasks
707 .spawn_local(iter_to_irpc(reply, iter).map(|_| ()));
708 Ok(())
709 }
710 Action::ListReplicas { reply } => {
711 let iter = self.store.list_namespaces();
712 let iter = iter.map(|inner| {
713 inner.map(|res| res.map(|(id, capability)| ListResponse { id, capability }))
714 });
715 self.tasks
716 .spawn_local(iter_to_irpc(reply, iter).map(|_| ()));
717 Ok(())
718 }
719 Action::ContentHashes { reply } => {
720 send_reply_with(reply, self, |this| this.store.content_hashes())
721 }
722 Action::FlushStore { reply } => send_reply(reply, self.store.flush()),
723 Action::Replica(namespace, action) => self.on_replica_action(namespace, action).await,
724 }
725 }
726
727 async fn on_replica_action(
728 &mut self,
729 namespace: NamespaceId,
730 action: ReplicaAction,
731 ) -> Result<(), SendReplyError> {
732 match action {
733 ReplicaAction::Open { reply, opts } => {
734 tracing::trace!("open in");
735 let res = self.open(namespace, opts);
736 tracing::trace!("open out");
737 send_reply(reply, res)
738 }
739 ReplicaAction::Close { reply } => {
740 let res = self.close(namespace);
741 reply.send(Ok(res)).ok();
743 Ok(())
744 }
745 ReplicaAction::Subscribe { sender, reply } => send_reply_with(reply, self, |this| {
746 let state = this.states.get_mut(&namespace)?;
747 state.info.subscribe(sender);
748 Ok(())
749 }),
750 ReplicaAction::Unsubscribe { sender, reply } => send_reply_with(reply, self, |this| {
751 let state = this.states.get_mut(&namespace)?;
752 state.info.unsubscribe(&sender);
753 drop(sender);
754 Ok(())
755 }),
756 ReplicaAction::SetSync { sync, reply } => send_reply_with(reply, self, |this| {
757 let state = this.states.get_mut(&namespace)?;
758 state.sync = sync;
759 Ok(())
760 }),
761 ReplicaAction::InsertLocal {
762 author,
763 key,
764 hash,
765 len,
766 reply,
767 } => send_reply_with(reply, self, move |this| {
768 let author = get_author(&mut this.store, &author)?;
769 let mut replica = this.states.replica(namespace, &mut this.store)?;
770 replica.insert(&key, &author, hash, len)?;
771 this.metrics.new_entries_local.inc();
772 this.metrics.new_entries_local_size.inc_by(len);
773 Ok(())
774 }),
775 ReplicaAction::DeletePrefix { author, key, reply } => {
776 send_reply_with(reply, self, |this| {
777 let author = get_author(&mut this.store, &author)?;
778 let mut replica = this.states.replica(namespace, &mut this.store)?;
779 let res = replica.delete_prefix(&key, &author)?;
780 Ok(res)
781 })
782 }
783 ReplicaAction::InsertRemote {
784 entry,
785 from,
786 content_status,
787 reply,
788 } => send_reply_with(reply, self, move |this| {
789 let mut replica = this
790 .states
791 .replica_if_syncing(&namespace, &mut this.store)?;
792 let len = entry.content_len();
793 replica.insert_remote_entry(entry, from, content_status)?;
794 this.metrics.new_entries_remote.inc();
795 this.metrics.new_entries_remote_size.inc_by(len);
796 Ok(())
797 }),
798
799 ReplicaAction::SyncInitialMessage { reply } => {
800 send_reply_with(reply, self, move |this| {
801 let mut replica = this
802 .states
803 .replica_if_syncing(&namespace, &mut this.store)?;
804 let res = replica.sync_initial_message()?;
805 Ok(res)
806 })
807 }
808 ReplicaAction::SyncProcessMessage {
809 message,
810 from,
811 mut state,
812 reply,
813 } => {
814 let res = async {
815 let mut replica = self
816 .states
817 .replica_if_syncing(&namespace, &mut self.store)?;
818 let res = replica
819 .sync_process_message(message, from, &mut state)
820 .await?;
821 Ok((res, state))
822 }
823 .await;
824 reply.send(res).map_err(send_reply_error)
825 }
826 ReplicaAction::GetSyncPeers { reply } => send_reply_with(reply, self, move |this| {
827 this.states.ensure_open(&namespace)?;
828 let peers = this.store.get_sync_peers(&namespace)?;
829 Ok(peers.map(|iter| iter.collect()))
830 }),
831 ReplicaAction::RegisterUsefulPeer { peer, reply } => {
832 let res = self.store.register_useful_peer(namespace, peer);
833 send_reply(reply, res)
834 }
835 ReplicaAction::GetExact {
836 author,
837 key,
838 include_empty,
839 reply,
840 } => send_reply_with(reply, self, move |this| {
841 this.states.ensure_open(&namespace)?;
842 this.store.get_exact(namespace, author, key, include_empty)
843 }),
844 ReplicaAction::GetMany { query, reply } => {
845 let iter = self
846 .states
847 .ensure_open(&namespace)
848 .and_then(|_| self.store.get_many(namespace, query));
849 self.tasks
850 .spawn_local(iter_to_irpc(reply, iter).map(|_| ()));
851 Ok(())
852 }
853 ReplicaAction::DropReplica { reply } => send_reply_with(reply, self, |this| {
854 this.close(namespace);
855 this.store.remove_replica(&namespace)
856 }),
857 ReplicaAction::ExportSecretKey { reply } => {
858 let res = self
859 .states
860 .get_mut(&namespace)
861 .and_then(|state| Ok(state.info.capability.secret_key()?.clone()));
862 send_reply(reply, res)
863 }
864 ReplicaAction::GetState { reply } => send_reply_with(reply, self, move |this| {
865 let state = this.states.get_mut(&namespace)?;
866 let handles = state.handles;
867 let sync = state.sync;
868 let subscribers = state.info.subscribers_count();
869 Ok(OpenState {
870 handles,
871 sync,
872 subscribers,
873 })
874 }),
875 ReplicaAction::HasNewsForUs { heads, reply } => {
876 let res = self.store.has_news_for_us(namespace, &heads);
877 send_reply(reply, res)
878 }
879 ReplicaAction::SetDownloadPolicy { policy, reply } => {
880 send_reply(reply, self.store.set_download_policy(&namespace, policy))
881 }
882 ReplicaAction::GetDownloadPolicy { reply } => {
883 send_reply(reply, self.store.get_download_policy(&namespace))
884 }
885 }
886 }
887
888 fn close(&mut self, namespace: NamespaceId) -> bool {
889 let res = self.states.close(namespace);
890 if res {
891 self.store.close_replica(namespace);
892 }
893 res
894 }
895
896 fn close_all(&mut self) {
897 for id in self.states.close_all() {
898 self.store.close_replica(id);
899 }
900 }
901
902 fn open(&mut self, namespace: NamespaceId, opts: OpenOpts) -> Result<()> {
903 let open_cb = || {
904 let mut info = self.store.load_replica_info(&namespace)?;
905 if let Some(cb) = &self.content_status_callback {
906 info.set_content_status_callback(Arc::clone(cb));
907 }
908 Ok(info)
909 };
910 self.states.open_with(namespace, opts, open_cb)
911 }
912}
913
914#[derive(Default)]
915struct OpenReplicas(HashMap<NamespaceId, OpenReplica>);
916
917impl OpenReplicas {
918 fn replica<'a, 'b>(
919 &'a mut self,
920 namespace: NamespaceId,
921 store: &'b mut Store,
922 ) -> Result<Replica<'b, &'a mut ReplicaInfo>> {
923 let state = self.get_mut(&namespace)?;
924 Ok(Replica::new(
925 StoreInstance::new(state.info.capability.id(), store),
926 &mut state.info,
927 ))
928 }
929
930 fn replica_if_syncing<'a, 'b>(
931 &'a mut self,
932 namespace: &NamespaceId,
933 store: &'b mut Store,
934 ) -> Result<Replica<'b, &'a mut ReplicaInfo>> {
935 let state = self.get_mut(namespace)?;
936 anyhow::ensure!(state.sync, "sync is not enabled for replica");
937 Ok(Replica::new(
938 StoreInstance::new(state.info.capability.id(), store),
939 &mut state.info,
940 ))
941 }
942
943 fn get_mut(&mut self, namespace: &NamespaceId) -> Result<&mut OpenReplica> {
944 self.0.get_mut(namespace).context("replica not open")
945 }
946
947 fn is_open(&self, namespace: &NamespaceId) -> bool {
948 self.0.contains_key(namespace)
949 }
950
951 fn ensure_open(&self, namespace: &NamespaceId) -> Result<()> {
952 match self.is_open(namespace) {
953 true => Ok(()),
954 false => Err(anyhow!("replica not open")),
955 }
956 }
957 fn open_with(
958 &mut self,
959 namespace: NamespaceId,
960 opts: OpenOpts,
961 mut open_cb: impl FnMut() -> Result<ReplicaInfo>,
962 ) -> Result<()> {
963 match self.0.entry(namespace) {
964 hash_map::Entry::Vacant(e) => {
965 let mut info = open_cb()?;
966 if let Some(sender) = opts.subscribe {
967 info.subscribe(sender);
968 }
969 debug!(namespace = %namespace.fmt_short(), "open");
970 let state = OpenReplica {
971 info,
972 sync: opts.sync,
973 handles: 1,
974 };
975 e.insert(state);
976 }
977 hash_map::Entry::Occupied(mut e) => {
978 let state = e.get_mut();
979 state.handles += 1;
980 state.sync = state.sync || opts.sync;
981 if let Some(sender) = opts.subscribe {
982 state.info.subscribe(sender);
983 }
984 }
985 }
986 Ok(())
987 }
988 fn close(&mut self, namespace: NamespaceId) -> bool {
989 match self.0.entry(namespace) {
990 hash_map::Entry::Vacant(_e) => {
991 warn!(namespace = %namespace.fmt_short(), "received close request for closed replica");
992 true
993 }
994 hash_map::Entry::Occupied(mut e) => {
995 let state = e.get_mut();
996 tracing::debug!("STATE {state:?}");
997 state.handles = state.handles.wrapping_sub(1);
998 if state.handles == 0 {
999 let _ = e.remove_entry();
1000 debug!(namespace = %namespace.fmt_short(), "close");
1001 true
1002 } else {
1003 false
1004 }
1005 }
1006 }
1007 }
1008
1009 fn close_all(&mut self) -> impl Iterator<Item = NamespaceId> + '_ {
1010 self.0.drain().map(|(n, _s)| n)
1011 }
1012}
1013
1014async fn iter_to_irpc<T: irpc::RpcMessage>(
1015 channel: mpsc::Sender<RpcResult<T>>,
1016 iter: Result<impl Iterator<Item = Result<T>>>,
1017) -> Result<(), SendReplyError> {
1018 match iter {
1019 Err(err) => channel
1020 .send(Err(RpcError::new(&*err)))
1021 .await
1022 .map_err(send_reply_error)?,
1023 Ok(iter) => {
1024 for item in iter {
1025 let item = item.map_err(|err| RpcError::new(&*err));
1026 channel.send(item).await.map_err(send_reply_error)?;
1027 }
1028 }
1029 }
1030 Ok(())
1031}
1032
1033fn get_author(store: &mut Store, id: &AuthorId) -> Result<Author> {
1034 store.get_author(id)?.context("author not found")
1035}
1036
1037#[derive(Debug)]
1038struct SendReplyError;
1039
1040fn send_reply<T>(sender: oneshot::Sender<T>, value: T) -> Result<(), SendReplyError> {
1041 sender.send(value).map_err(send_reply_error)
1042}
1043
1044fn send_reply_with<T>(
1045 sender: oneshot::Sender<Result<T>>,
1046 this: &mut Actor,
1047 f: impl FnOnce(&mut Actor) -> Result<T>,
1048) -> Result<(), SendReplyError> {
1049 sender.send(f(this)).map_err(send_reply_error)
1050}
1051
1052fn send_reply_error<T>(_err: T) -> SendReplyError {
1053 SendReplyError
1054}
1055
1056#[cfg(test)]
1057mod tests {
1058 use super::*;
1059 use crate::store;
1060 #[tokio::test]
1061 async fn open_close() -> anyhow::Result<()> {
1062 let store = store::Store::memory();
1063 let sync = SyncHandle::spawn(store, None, "foo".into());
1064 let namespace = NamespaceSecret::new(&mut rand::rngs::OsRng {});
1065 let id = namespace.id();
1066 sync.import_namespace(namespace.into()).await?;
1067 sync.open(id, Default::default()).await?;
1068 let (tx, rx) = async_channel::bounded(10);
1069 sync.subscribe(id, tx).await?;
1070 sync.close(id).await?;
1071 assert!(rx.recv().await.is_err());
1072 Ok(())
1073 }
1074}