1use std::{
4 collections::{hash_map, HashMap},
5 num::NonZeroU64,
6 sync::Arc,
7 thread::JoinHandle,
8 time::Duration,
9};
10
11use anyhow::{anyhow, Context, Result};
12use bytes::Bytes;
13use iroh_base::hash::Hash;
14use serde::{Deserialize, Serialize};
15use tokio::sync::oneshot;
16use tracing::{debug, error, error_span, trace, warn};
17
18use crate::{
19 ranger::Message,
20 store::{
21 fs::{ContentHashesIterator, StoreInstance},
22 DownloadPolicy, ImportNamespaceOutcome, Query, Store,
23 },
24 Author, AuthorHeads, AuthorId, Capability, CapabilityKind, ContentStatus,
25 ContentStatusCallback, Event, NamespaceId, NamespaceSecret, PeerIdBytes, Replica, ReplicaInfo,
26 SignedEntry, SyncOutcome,
27};
28
29const ACTION_CAP: usize = 1024;
30const MAX_COMMIT_DELAY: Duration = Duration::from_millis(500);
31
32#[derive(derive_more::Debug, derive_more::Display)]
33enum Action {
34 #[display("NewAuthor")]
35 ImportAuthor {
36 author: Author,
37 #[debug("reply")]
38 reply: oneshot::Sender<Result<AuthorId>>,
39 },
40 #[display("ExportAuthor")]
41 ExportAuthor {
42 author: AuthorId,
43 #[debug("reply")]
44 reply: oneshot::Sender<Result<Option<Author>>>,
45 },
46 #[display("DeleteAuthor")]
47 DeleteAuthor {
48 author: AuthorId,
49 #[debug("reply")]
50 reply: oneshot::Sender<Result<()>>,
51 },
52 #[display("NewReplica")]
53 ImportNamespace {
54 capability: Capability,
55 #[debug("reply")]
56 reply: oneshot::Sender<Result<NamespaceId>>,
57 },
58 #[display("ListAuthors")]
59 ListAuthors {
60 #[debug("reply")]
61 reply: flume::Sender<Result<AuthorId>>,
62 },
63 #[display("ListReplicas")]
64 ListReplicas {
65 #[debug("reply")]
66 reply: flume::Sender<Result<(NamespaceId, CapabilityKind)>>,
67 },
68 #[display("ContentHashes")]
69 ContentHashes {
70 #[debug("reply")]
71 reply: oneshot::Sender<Result<ContentHashesIterator>>,
72 },
73 #[display("Replica({}, {})", _0.fmt_short(), _1)]
74 Replica(NamespaceId, ReplicaAction),
75 #[display("Shutdown")]
76 Shutdown {
77 #[debug("reply")]
78 reply: Option<oneshot::Sender<Store>>,
79 },
80}
81
82#[derive(derive_more::Debug, strum::Display)]
83enum ReplicaAction {
84 Open {
85 #[debug("reply")]
86 reply: oneshot::Sender<Result<()>>,
87 opts: OpenOpts,
88 },
89 Close {
90 #[debug("reply")]
91 reply: oneshot::Sender<Result<bool>>,
92 },
93 GetState {
94 #[debug("reply")]
95 reply: oneshot::Sender<Result<OpenState>>,
96 },
97 SetSync {
98 sync: bool,
99 #[debug("reply")]
100 reply: oneshot::Sender<Result<()>>,
101 },
102 Subscribe {
103 sender: flume::Sender<Event>,
104 #[debug("reply")]
105 reply: oneshot::Sender<Result<()>>,
106 },
107 Unsubscribe {
108 sender: flume::Sender<Event>,
109 #[debug("reply")]
110 reply: oneshot::Sender<Result<()>>,
111 },
112 InsertLocal {
113 author: AuthorId,
114 key: Bytes,
115 hash: Hash,
116 len: u64,
117 #[debug("reply")]
118 reply: oneshot::Sender<Result<()>>,
119 },
120 DeletePrefix {
121 author: AuthorId,
122 key: Bytes,
123 #[debug("reply")]
124 reply: oneshot::Sender<Result<usize>>,
125 },
126 InsertRemote {
127 entry: SignedEntry,
128 from: PeerIdBytes,
129 content_status: ContentStatus,
130 #[debug("reply")]
131 reply: oneshot::Sender<Result<()>>,
132 },
133 SyncInitialMessage {
134 #[debug("reply")]
135 reply: oneshot::Sender<Result<Message<SignedEntry>>>,
136 },
137 SyncProcessMessage {
138 message: Message<SignedEntry>,
139 from: PeerIdBytes,
140 state: SyncOutcome,
141 #[debug("reply")]
142 reply: oneshot::Sender<Result<(Option<Message<SignedEntry>>, SyncOutcome)>>,
143 },
144 GetSyncPeers {
145 #[debug("reply")]
146 reply: oneshot::Sender<Result<Option<Vec<PeerIdBytes>>>>,
147 },
148 RegisterUsefulPeer {
149 peer: PeerIdBytes,
150 #[debug("reply")]
151 reply: oneshot::Sender<Result<()>>,
152 },
153 GetExact {
154 author: AuthorId,
155 key: Bytes,
156 include_empty: bool,
157 reply: oneshot::Sender<Result<Option<SignedEntry>>>,
158 },
159 GetMany {
160 query: Query,
161 reply: flume::Sender<Result<SignedEntry>>,
162 },
163 DropReplica {
164 reply: oneshot::Sender<Result<()>>,
165 },
166 ExportSecretKey {
167 reply: oneshot::Sender<Result<NamespaceSecret>>,
168 },
169 HasNewsForUs {
170 heads: AuthorHeads,
171 #[debug("reply")]
172 reply: oneshot::Sender<Result<Option<NonZeroU64>>>,
173 },
174 SetDownloadPolicy {
175 policy: DownloadPolicy,
176 #[debug("reply")]
177 reply: oneshot::Sender<Result<()>>,
178 },
179 GetDownloadPolicy {
180 #[debug("reply")]
181 reply: oneshot::Sender<Result<DownloadPolicy>>,
182 },
183}
184
185#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize)]
187pub struct OpenState {
188 pub sync: bool,
190 pub subscribers: usize,
192 pub handles: usize,
194}
195
196#[derive(Debug)]
197struct OpenReplica {
198 info: ReplicaInfo,
199 sync: bool,
200 handles: usize,
201}
202
203#[derive(Debug, Clone)]
216pub struct SyncHandle {
217 tx: flume::Sender<Action>,
218 join_handle: Arc<Option<JoinHandle<()>>>,
219}
220
221#[derive(Debug, Default)]
223pub struct OpenOpts {
224 pub sync: bool,
226 pub subscribe: Option<flume::Sender<Event>>,
228}
229impl OpenOpts {
230 pub fn sync(mut self) -> Self {
232 self.sync = true;
233 self
234 }
235 pub fn subscribe(mut self, subscribe: flume::Sender<Event>) -> Self {
237 self.subscribe = Some(subscribe);
238 self
239 }
240}
241
242#[allow(missing_docs)]
243impl SyncHandle {
244 pub fn spawn(
246 store: Store,
247 content_status_callback: Option<ContentStatusCallback>,
248 me: String,
249 ) -> SyncHandle {
250 let (action_tx, action_rx) = flume::bounded(ACTION_CAP);
251 let actor = Actor {
252 store,
253 states: Default::default(),
254 action_rx,
255 content_status_callback,
256 };
257 let join_handle = std::thread::Builder::new()
258 .name("sync-actor".to_string())
259 .spawn(move || {
260 let span = error_span!("sync", %me);
261 let _enter = span.enter();
262
263 if let Err(err) = actor.run() {
264 error!("Sync actor closed with error: {err:?}");
265 }
266 })
267 .expect("failed to spawn thread");
268 let join_handle = Arc::new(Some(join_handle));
269 SyncHandle {
270 tx: action_tx,
271 join_handle,
272 }
273 }
274
275 pub async fn open(&self, namespace: NamespaceId, opts: OpenOpts) -> Result<()> {
276 let (reply, rx) = oneshot::channel();
277 let action = ReplicaAction::Open { reply, opts };
278 self.send_replica(namespace, action).await?;
279 rx.await?
280 }
281
282 pub async fn close(&self, namespace: NamespaceId) -> Result<bool> {
283 let (reply, rx) = oneshot::channel();
284 self.send_replica(namespace, ReplicaAction::Close { reply })
285 .await?;
286 rx.await?
287 }
288
289 pub async fn subscribe(
290 &self,
291 namespace: NamespaceId,
292 sender: flume::Sender<Event>,
293 ) -> Result<()> {
294 let (reply, rx) = oneshot::channel();
295 self.send_replica(namespace, ReplicaAction::Subscribe { sender, reply })
296 .await?;
297 rx.await?
298 }
299
300 pub async fn unsubscribe(
301 &self,
302 namespace: NamespaceId,
303 sender: flume::Sender<Event>,
304 ) -> Result<()> {
305 let (reply, rx) = oneshot::channel();
306 self.send_replica(namespace, ReplicaAction::Unsubscribe { sender, reply })
307 .await?;
308 rx.await?
309 }
310
311 pub async fn set_sync(&self, namespace: NamespaceId, sync: bool) -> Result<()> {
312 let (reply, rx) = oneshot::channel();
313 let action = ReplicaAction::SetSync { sync, reply };
314 self.send_replica(namespace, action).await?;
315 rx.await?
316 }
317
318 pub async fn insert_local(
319 &self,
320 namespace: NamespaceId,
321 author: AuthorId,
322 key: Bytes,
323 hash: Hash,
324 len: u64,
325 ) -> Result<()> {
326 let (reply, rx) = oneshot::channel();
327 let action = ReplicaAction::InsertLocal {
328 author,
329 key,
330 hash,
331 len,
332 reply,
333 };
334 self.send_replica(namespace, action).await?;
335 rx.await?
336 }
337
338 pub async fn delete_prefix(
339 &self,
340 namespace: NamespaceId,
341 author: AuthorId,
342 key: Bytes,
343 ) -> Result<usize> {
344 let (reply, rx) = oneshot::channel();
345 let action = ReplicaAction::DeletePrefix { author, key, reply };
346 self.send_replica(namespace, action).await?;
347 rx.await?
348 }
349
350 pub async fn insert_remote(
351 &self,
352 namespace: NamespaceId,
353 entry: SignedEntry,
354 from: PeerIdBytes,
355 content_status: ContentStatus,
356 ) -> Result<()> {
357 let (reply, rx) = oneshot::channel();
358 let action = ReplicaAction::InsertRemote {
359 entry,
360 from,
361 content_status,
362 reply,
363 };
364 self.send_replica(namespace, action).await?;
365 rx.await?
366 }
367
368 pub async fn sync_initial_message(
369 &self,
370 namespace: NamespaceId,
371 ) -> Result<Message<SignedEntry>> {
372 let (reply, rx) = oneshot::channel();
373 let action = ReplicaAction::SyncInitialMessage { reply };
374 self.send_replica(namespace, action).await?;
375 rx.await?
376 }
377
378 pub async fn sync_process_message(
379 &self,
380 namespace: NamespaceId,
381 message: Message<SignedEntry>,
382 from: PeerIdBytes,
383 state: SyncOutcome,
384 ) -> Result<(Option<Message<SignedEntry>>, SyncOutcome)> {
385 let (reply, rx) = oneshot::channel();
386 let action = ReplicaAction::SyncProcessMessage {
387 reply,
388 message,
389 from,
390 state,
391 };
392 self.send_replica(namespace, action).await?;
393 rx.await?
394 }
395
396 pub async fn get_sync_peers(&self, namespace: NamespaceId) -> Result<Option<Vec<PeerIdBytes>>> {
397 let (reply, rx) = oneshot::channel();
398 let action = ReplicaAction::GetSyncPeers { reply };
399 self.send_replica(namespace, action).await?;
400 rx.await?
401 }
402
403 pub async fn register_useful_peer(
404 &self,
405 namespace: NamespaceId,
406 peer: PeerIdBytes,
407 ) -> Result<()> {
408 let (reply, rx) = oneshot::channel();
409 let action = ReplicaAction::RegisterUsefulPeer { reply, peer };
410 self.send_replica(namespace, action).await?;
411 rx.await?
412 }
413
414 pub async fn has_news_for_us(
415 &self,
416 namespace: NamespaceId,
417 heads: AuthorHeads,
418 ) -> Result<Option<NonZeroU64>> {
419 let (reply, rx) = oneshot::channel();
420 let action = ReplicaAction::HasNewsForUs { reply, heads };
421 self.send_replica(namespace, action).await?;
422 rx.await?
423 }
424
425 pub async fn get_many(
426 &self,
427 namespace: NamespaceId,
428 query: Query,
429 reply: flume::Sender<Result<SignedEntry>>,
430 ) -> Result<()> {
431 let action = ReplicaAction::GetMany { query, reply };
432 self.send_replica(namespace, action).await?;
433 Ok(())
434 }
435
436 pub async fn get_exact(
437 &self,
438 namespace: NamespaceId,
439 author: AuthorId,
440 key: Bytes,
441 include_empty: bool,
442 ) -> Result<Option<SignedEntry>> {
443 let (reply, rx) = oneshot::channel();
444 let action = ReplicaAction::GetExact {
445 author,
446 key,
447 include_empty,
448 reply,
449 };
450 self.send_replica(namespace, action).await?;
451 rx.await?
452 }
453
454 pub async fn drop_replica(&self, namespace: NamespaceId) -> Result<()> {
455 let (reply, rx) = oneshot::channel();
456 let action = ReplicaAction::DropReplica { reply };
457 self.send_replica(namespace, action).await?;
458 rx.await?
459 }
460
461 pub async fn export_secret_key(&self, namespace: NamespaceId) -> Result<NamespaceSecret> {
462 let (reply, rx) = oneshot::channel();
463 let action = ReplicaAction::ExportSecretKey { reply };
464 self.send_replica(namespace, action).await?;
465 rx.await?
466 }
467
468 pub async fn get_state(&self, namespace: NamespaceId) -> Result<OpenState> {
469 let (reply, rx) = oneshot::channel();
470 let action = ReplicaAction::GetState { reply };
471 self.send_replica(namespace, action).await?;
472 rx.await?
473 }
474
475 pub async fn shutdown(&self) -> Result<Store> {
476 let (reply, rx) = oneshot::channel();
477 let action = Action::Shutdown { reply: Some(reply) };
478 self.send(action).await.ok();
479 Ok(rx.await?)
480 }
481
482 pub async fn list_authors(&self, reply: flume::Sender<Result<AuthorId>>) -> Result<()> {
483 self.send(Action::ListAuthors { reply }).await
484 }
485
486 pub async fn list_replicas(
487 &self,
488 reply: flume::Sender<Result<(NamespaceId, CapabilityKind)>>,
489 ) -> Result<()> {
490 self.send(Action::ListReplicas { reply }).await
491 }
492
493 pub async fn import_author(&self, author: Author) -> Result<AuthorId> {
494 let (reply, rx) = oneshot::channel();
495 self.send(Action::ImportAuthor { author, reply }).await?;
496 rx.await?
497 }
498
499 pub async fn export_author(&self, author: AuthorId) -> Result<Option<Author>> {
500 let (reply, rx) = oneshot::channel();
501 self.send(Action::ExportAuthor { author, reply }).await?;
502 rx.await?
503 }
504
505 pub async fn delete_author(&self, author: AuthorId) -> Result<()> {
506 let (reply, rx) = oneshot::channel();
507 self.send(Action::DeleteAuthor { author, reply }).await?;
508 rx.await?
509 }
510
511 pub async fn import_namespace(&self, capability: Capability) -> Result<NamespaceId> {
512 let (reply, rx) = oneshot::channel();
513 self.send(Action::ImportNamespace { capability, reply })
514 .await?;
515 rx.await?
516 }
517
518 pub async fn get_download_policy(&self, namespace: NamespaceId) -> Result<DownloadPolicy> {
519 let (reply, rx) = oneshot::channel();
520 let action = ReplicaAction::GetDownloadPolicy { reply };
521 self.send_replica(namespace, action).await?;
522 rx.await?
523 }
524
525 pub async fn set_download_policy(
526 &self,
527 namespace: NamespaceId,
528 policy: DownloadPolicy,
529 ) -> Result<()> {
530 let (reply, rx) = oneshot::channel();
531 let action = ReplicaAction::SetDownloadPolicy { reply, policy };
532 self.send_replica(namespace, action).await?;
533 rx.await?
534 }
535
536 pub async fn content_hashes(&self) -> Result<ContentHashesIterator> {
537 let (reply, rx) = oneshot::channel();
538 self.send(Action::ContentHashes { reply }).await?;
539 rx.await?
540 }
541
542 async fn send(&self, action: Action) -> Result<()> {
543 self.tx
544 .send_async(action)
545 .await
546 .context("sending to iroh_sync actor failed")?;
547 Ok(())
548 }
549 async fn send_replica(&self, namespace: NamespaceId, action: ReplicaAction) -> Result<()> {
550 self.send(Action::Replica(namespace, action)).await?;
551 Ok(())
552 }
553}
554
555impl Drop for SyncHandle {
556 fn drop(&mut self) {
557 if let Some(handle) = Arc::get_mut(&mut self.join_handle) {
559 self.tx.send(Action::Shutdown { reply: None }).ok();
560 let handle = handle.take().expect("this can only run once");
561 if let Err(err) = handle.join() {
562 warn!(?err, "Failed to join sync actor");
563 }
564 }
565 }
566}
567
568struct Actor {
569 store: Store,
570 states: OpenReplicas,
571 action_rx: flume::Receiver<Action>,
572 content_status_callback: Option<ContentStatusCallback>,
573}
574
575impl Actor {
576 fn run(mut self) -> Result<()> {
577 loop {
578 let action = match self.action_rx.recv_timeout(MAX_COMMIT_DELAY) {
579 Ok(action) => action,
580 Err(flume::RecvTimeoutError::Timeout) => {
581 if let Err(cause) = self.store.flush() {
582 error!(?cause, "failed to flush store");
583 }
584 continue;
585 }
586 Err(flume::RecvTimeoutError::Disconnected) => {
587 debug!("action channel disconnected");
588 break;
589 }
590 };
591 trace!(%action, "tick");
592 match action {
593 Action::Shutdown { reply } => {
594 if let Err(cause) = self.store.flush() {
595 warn!(?cause, "failed to flush store");
596 }
597 self.close_all();
598 if let Some(reply) = reply {
599 send_reply(reply, self.store).ok();
600 }
601 break;
602 }
603 action => {
604 if self.on_action(action).is_err() {
605 warn!("failed to send reply: receiver dropped");
606 }
607 }
608 }
609 }
610 debug!("shutdown");
611 Ok(())
612 }
613
614 fn on_action(&mut self, action: Action) -> Result<(), SendReplyError> {
615 match action {
616 Action::Shutdown { .. } => {
617 unreachable!("Shutdown action should be handled in run()")
618 }
619 Action::ImportAuthor { author, reply } => {
620 let id = author.id();
621 send_reply(reply, self.store.import_author(author).map(|_| id))
622 }
623 Action::ExportAuthor { author, reply } => {
624 send_reply(reply, self.store.get_author(&author))
625 }
626 Action::DeleteAuthor { author, reply } => {
627 send_reply(reply, self.store.delete_author(author))
628 }
629 Action::ImportNamespace { capability, reply } => send_reply_with(reply, self, |this| {
630 let id = capability.id();
631 let outcome = this.store.import_namespace(capability.clone())?;
632 if let ImportNamespaceOutcome::Upgraded = outcome {
633 if let Ok(state) = this.states.get_mut(&id) {
634 state.info.merge_capability(capability)?;
635 }
636 }
637 Ok(id)
638 }),
639 Action::ListAuthors { reply } => iter_to_channel(
640 reply,
641 self.store
642 .list_authors()
643 .map(|a| a.map(|a| a.map(|a| a.id()))),
644 ),
645 Action::ListReplicas { reply } => iter_to_channel(reply, self.store.list_namespaces()),
646 Action::ContentHashes { reply } => {
647 send_reply_with(reply, self, |this| this.store.content_hashes())
648 }
649 Action::Replica(namespace, action) => self.on_replica_action(namespace, action),
650 }
651 }
652
653 fn on_replica_action(
654 &mut self,
655 namespace: NamespaceId,
656 action: ReplicaAction,
657 ) -> Result<(), SendReplyError> {
658 match action {
659 ReplicaAction::Open { reply, opts } => {
660 let res = self.open(namespace, opts);
661 send_reply(reply, res)
662 }
663 ReplicaAction::Close { reply } => {
664 let res = self.close(namespace);
665 reply.send(Ok(res)).ok();
667 Ok(())
668 }
669 ReplicaAction::Subscribe { sender, reply } => send_reply_with(reply, self, |this| {
670 let state = this.states.get_mut(&namespace)?;
671 state.info.subscribe(sender);
672 Ok(())
673 }),
674 ReplicaAction::Unsubscribe { sender, reply } => send_reply_with(reply, self, |this| {
675 let state = this.states.get_mut(&namespace)?;
676 state.info.unsubscribe(&sender);
677 drop(sender);
678 Ok(())
679 }),
680 ReplicaAction::SetSync { sync, reply } => send_reply_with(reply, self, |this| {
681 let state = this.states.get_mut(&namespace)?;
682 state.sync = sync;
683 Ok(())
684 }),
685 ReplicaAction::InsertLocal {
686 author,
687 key,
688 hash,
689 len,
690 reply,
691 } => send_reply_with(reply, self, move |this| {
692 let author = get_author(&mut this.store, &author)?;
693 let mut replica = this.states.replica(namespace, &mut this.store)?;
694 replica.insert(&key, &author, hash, len)?;
695 Ok(())
696 }),
697 ReplicaAction::DeletePrefix { author, key, reply } => {
698 send_reply_with(reply, self, |this| {
699 let author = get_author(&mut this.store, &author)?;
700 let mut replica = this.states.replica(namespace, &mut this.store)?;
701 let res = replica.delete_prefix(&key, &author)?;
702 Ok(res)
703 })
704 }
705 ReplicaAction::InsertRemote {
706 entry,
707 from,
708 content_status,
709 reply,
710 } => send_reply_with(reply, self, move |this| {
711 let mut replica = this
712 .states
713 .replica_if_syncing(&namespace, &mut this.store)?;
714 replica.insert_remote_entry(entry, from, content_status)?;
715 Ok(())
716 }),
717
718 ReplicaAction::SyncInitialMessage { reply } => {
719 send_reply_with(reply, self, move |this| {
720 let mut replica = this
721 .states
722 .replica_if_syncing(&namespace, &mut this.store)?;
723 let res = replica.sync_initial_message()?;
724 Ok(res)
725 })
726 }
727 ReplicaAction::SyncProcessMessage {
728 message,
729 from,
730 mut state,
731 reply,
732 } => send_reply_with(reply, self, move |this| {
733 let mut replica = this
734 .states
735 .replica_if_syncing(&namespace, &mut this.store)?;
736 let res = replica.sync_process_message(message, from, &mut state)?;
737 Ok((res, state))
738 }),
739 ReplicaAction::GetSyncPeers { reply } => send_reply_with(reply, self, move |this| {
740 this.states.ensure_open(&namespace)?;
741 let peers = this.store.get_sync_peers(&namespace)?;
742 Ok(peers.map(|iter| iter.collect()))
743 }),
744 ReplicaAction::RegisterUsefulPeer { peer, reply } => {
745 let res = self.store.register_useful_peer(namespace, peer);
746 send_reply(reply, res)
747 }
748 ReplicaAction::GetExact {
749 author,
750 key,
751 include_empty,
752 reply,
753 } => send_reply_with(reply, self, move |this| {
754 this.states.ensure_open(&namespace)?;
755 this.store.get_exact(namespace, author, key, include_empty)
756 }),
757 ReplicaAction::GetMany { query, reply } => {
758 let iter = self
759 .states
760 .ensure_open(&namespace)
761 .and_then(|_| self.store.get_many(namespace, query));
762 iter_to_channel(reply, iter)
763 }
764 ReplicaAction::DropReplica { reply } => send_reply_with(reply, self, |this| {
765 this.close(namespace);
766 this.store.remove_replica(&namespace)
767 }),
768 ReplicaAction::ExportSecretKey { reply } => {
769 let res = self
770 .states
771 .get_mut(&namespace)
772 .and_then(|state| Ok(state.info.capability.secret_key()?.clone()));
773 send_reply(reply, res)
774 }
775 ReplicaAction::GetState { reply } => send_reply_with(reply, self, move |this| {
776 let state = this.states.get_mut(&namespace)?;
777 let handles = state.handles;
778 let sync = state.sync;
779 let subscribers = state.info.subscribers_count();
780 Ok(OpenState {
781 handles,
782 sync,
783 subscribers,
784 })
785 }),
786 ReplicaAction::HasNewsForUs { heads, reply } => {
787 let res = self.store.has_news_for_us(namespace, &heads);
788 send_reply(reply, res)
789 }
790 ReplicaAction::SetDownloadPolicy { policy, reply } => {
791 send_reply(reply, self.store.set_download_policy(&namespace, policy))
792 }
793 ReplicaAction::GetDownloadPolicy { reply } => {
794 send_reply(reply, self.store.get_download_policy(&namespace))
795 }
796 }
797 }
798
799 fn close(&mut self, namespace: NamespaceId) -> bool {
800 let res = self.states.close(namespace);
801 if res {
802 self.store.close_replica(namespace);
803 }
804 res
805 }
806
807 fn close_all(&mut self) {
808 for id in self.states.close_all() {
809 self.store.close_replica(id);
810 }
811 }
812
813 fn open(&mut self, namespace: NamespaceId, opts: OpenOpts) -> Result<()> {
814 let open_cb = || {
815 let mut info = self.store.load_replica_info(&namespace)?;
816 if let Some(cb) = &self.content_status_callback {
817 info.set_content_status_callback(Arc::clone(cb));
818 }
819 Ok(info)
820 };
821 self.states.open_with(namespace, opts, open_cb)
822 }
823}
824
825#[derive(Default)]
826struct OpenReplicas(HashMap<NamespaceId, OpenReplica>);
827
828impl OpenReplicas {
829 fn replica<'a, 'b>(
830 &'a mut self,
831 namespace: NamespaceId,
832 store: &'b mut Store,
833 ) -> Result<Replica<'b, &'a mut ReplicaInfo>> {
834 let state = self.get_mut(&namespace)?;
835 Ok(Replica::new(
836 StoreInstance::new(state.info.capability.id(), store),
837 &mut state.info,
838 ))
839 }
840
841 fn replica_if_syncing<'a, 'b>(
842 &'a mut self,
843 namespace: &NamespaceId,
844 store: &'b mut Store,
845 ) -> Result<Replica<'b, &'a mut ReplicaInfo>> {
846 let state = self.get_mut(namespace)?;
847 anyhow::ensure!(state.sync, "sync is not enabled for replica");
848 Ok(Replica::new(
849 StoreInstance::new(state.info.capability.id(), store),
850 &mut state.info,
851 ))
852 }
853
854 fn get_mut(&mut self, namespace: &NamespaceId) -> Result<&mut OpenReplica> {
855 self.0.get_mut(namespace).context("replica not open")
856 }
857
858 fn is_open(&self, namespace: &NamespaceId) -> bool {
859 self.0.contains_key(namespace)
860 }
861
862 fn ensure_open(&self, namespace: &NamespaceId) -> Result<()> {
863 match self.is_open(namespace) {
864 true => Ok(()),
865 false => Err(anyhow!("replica not open")),
866 }
867 }
868 fn open_with(
869 &mut self,
870 namespace: NamespaceId,
871 opts: OpenOpts,
872 mut open_cb: impl FnMut() -> Result<ReplicaInfo>,
873 ) -> Result<()> {
874 match self.0.entry(namespace) {
875 hash_map::Entry::Vacant(e) => {
876 let mut info = open_cb()?;
877 if let Some(sender) = opts.subscribe {
878 info.subscribe(sender);
879 }
880 debug!(namespace = %namespace.fmt_short(), "open");
881 let state = OpenReplica {
882 info,
883 sync: opts.sync,
884 handles: 1,
885 };
886 e.insert(state);
887 }
888 hash_map::Entry::Occupied(mut e) => {
889 let state = e.get_mut();
890 state.handles += 1;
891 state.sync = state.sync || opts.sync;
892 if let Some(sender) = opts.subscribe {
893 state.info.subscribe(sender);
894 }
895 }
896 }
897 Ok(())
898 }
899 fn close(&mut self, namespace: NamespaceId) -> bool {
900 match self.0.entry(namespace) {
901 hash_map::Entry::Vacant(_e) => {
902 warn!(namespace = %namespace.fmt_short(), "received close request for closed replica");
903 true
904 }
905 hash_map::Entry::Occupied(mut e) => {
906 let state = e.get_mut();
907 state.handles = state.handles.wrapping_sub(1);
908 if state.handles == 0 {
909 let _ = e.remove_entry();
910 debug!(namespace = %namespace.fmt_short(), "close");
911 true
912 } else {
913 false
914 }
915 }
916 }
917 }
918
919 fn close_all(&mut self) -> impl Iterator<Item = NamespaceId> + '_ {
920 self.0.drain().map(|(n, _s)| n)
921 }
922}
923
924fn iter_to_channel<T: Send + 'static>(
925 channel: flume::Sender<Result<T>>,
926 iter: Result<impl Iterator<Item = Result<T>>>,
927) -> Result<(), SendReplyError> {
928 match iter {
929 Err(err) => channel.send(Err(err)).map_err(send_reply_error)?,
930 Ok(iter) => {
931 for item in iter {
932 channel.send(item).map_err(send_reply_error)?;
933 }
934 }
935 }
936 Ok(())
937}
938
939fn get_author(store: &mut Store, id: &AuthorId) -> Result<Author> {
940 store.get_author(id)?.context("author not found")
941}
942
943#[derive(Debug)]
944struct SendReplyError;
945
946fn send_reply<T>(sender: oneshot::Sender<T>, value: T) -> Result<(), SendReplyError> {
947 sender.send(value).map_err(send_reply_error)
948}
949
950fn send_reply_with<T>(
951 sender: oneshot::Sender<Result<T>>,
952 this: &mut Actor,
953 f: impl FnOnce(&mut Actor) -> Result<T>,
954) -> Result<(), SendReplyError> {
955 sender.send(f(this)).map_err(send_reply_error)
956}
957
958fn send_reply_error<T>(_err: T) -> SendReplyError {
959 SendReplyError
960}
961
962#[cfg(test)]
963mod tests {
964 use crate::store;
965
966 use super::*;
967 #[tokio::test]
968 async fn open_close() -> anyhow::Result<()> {
969 let store = store::Store::memory();
970 let sync = SyncHandle::spawn(store, None, "foo".into());
971 let namespace = NamespaceSecret::new(&mut rand::rngs::OsRng {});
972 let id = namespace.id();
973 sync.import_namespace(namespace.into()).await?;
974 sync.open(id, Default::default()).await?;
975 let (tx, rx) = flume::bounded(10);
976 sync.subscribe(id, tx).await?;
977 sync.close(id).await?;
978 assert!(rx.recv_async().await.is_err());
979 Ok(())
980 }
981}