1use automerge::{transaction::Transaction, AutomergeError, ObjId, Patch};
2use dashmap::DashMap;
3use futures::{
4 channel::mpsc::{unbounded, UnboundedReceiver, UnboundedSender},
5 StreamExt,
6};
7use hypercore_protocol::{hypercore::SigningKey, CommandTx};
8#[cfg(not(target_arch = "wasm32"))]
9use random_access_disk::RandomAccessDisk;
10use random_access_memory::RandomAccessMemory;
11use random_access_storage::RandomAccess;
12#[cfg(not(target_arch = "wasm32"))]
13use std::path::Path;
14use std::path::PathBuf;
15use std::sync::Arc;
16use std::{collections::HashMap, fmt::Debug};
17use tracing::{debug, instrument};
18
19#[cfg(all(not(target_arch = "wasm32"), feature = "async-std"))]
20use async_std::task;
21#[cfg(all(not(target_arch = "wasm32"), feature = "tokio"))]
22use tokio::task;
23#[cfg(target_arch = "wasm32")]
24use wasm_bindgen_futures::spawn_local;
25
26#[cfg(not(target_arch = "wasm32"))]
27use crate::{
28 common::cipher::{encode_document_id, DocumentSecret},
29 feeds::FeedDiskPersistence,
30 options::{
31 AttachDocumentDiskOptions, CreateNewDocumentDiskOptions, OpenDiskOptions,
32 PeermergeDiskOptions,
33 },
34};
35use crate::{
36 common::{
37 cipher::{
38 decode_doc_url, decode_document_secret, decode_reattach_secret, encode_document_secret,
39 encode_reattach_secret,
40 },
41 keys::{signing_key_from_bytes, signing_key_to_bytes},
42 state::DocumentIdWithParents,
43 storage::PeermergeStateWrapper,
44 utils::Mutex,
45 FeedEventContent,
46 },
47 document::{
48 get_document_by_discovery_key, DocumentParent, DocumentSettings, NewDocumentResult,
49 },
50 feeds::{FeedMemoryPersistence, FeedPersistence, FeedProtocol, SHUTDOWN_SIGNAL_NAME},
51 options::PeermergeMemoryOptions,
52 AttachDocumentMemoryOptions, AutomergeDoc, CreateNewDocumentMemoryOptions, DocumentSharingInfo,
53 PeerId, PeermergeError, StateEventContent,
54};
55use crate::{
56 common::{DocumentInfo, FeedEvent},
57 document::{get_document, get_document_ids},
58 feeds::on_protocol,
59 DocumentId, NameDescription, IO,
60};
61use crate::{document::Document, StateEvent};
62
63#[derive(derivative::Derivative)]
65#[derivative(Clone(bound = ""))]
66#[derive(Debug)]
67pub struct Peermerge<T, U>
68where
69 T: RandomAccess + Debug + Send,
70 U: FeedPersistence,
71{
72 peer_id: PeerId,
74 default_peer_header: NameDescription,
78 document_settings: DocumentSettings,
80 prefix: PathBuf,
82 peermerge_state: Arc<Mutex<PeermergeStateWrapper<T>>>,
84 documents: Arc<DashMap<DocumentId, Document<T, U>>>,
86 protocols: Arc<DashMap<Vec<u8>, CommandTx>>,
88 state_event_sender: Arc<Mutex<Option<UnboundedSender<StateEvent>>>>,
90 reattach_secrets: Option<HashMap<DocumentId, SigningKey>>,
94}
95
96impl<T, U> Peermerge<T, U>
97where
98 T: RandomAccess + Debug + Send + 'static,
99 U: FeedPersistence,
100{
101 #[instrument(skip(self), fields(peer_name = self.default_peer_header.name))]
103 pub fn peer_id(&self) -> PeerId {
104 self.peer_id
105 }
106
107 #[instrument(skip(self), fields(peer_name = self.default_peer_header.name))]
109 pub async fn peer_ids(&self, document_id: &DocumentId) -> Result<Vec<PeerId>, PeermergeError> {
110 let document = self.get_document(document_id).await?;
111 Ok(document.peer_ids().await)
112 }
113
114 pub fn default_peer_header(&self) -> NameDescription {
116 self.default_peer_header.clone()
117 }
118
119 pub async fn peer_header(
121 &self,
122 document_id: &DocumentId,
123 peer_id: &PeerId,
124 ) -> Result<Option<NameDescription>, PeermergeError> {
125 let document = self.get_document(document_id).await?;
126 Ok(document.peer_header(peer_id).await)
127 }
128
129 pub async fn set_state_event_sender(
132 &mut self,
133 state_event_sender: Option<UnboundedSender<StateEvent>>,
134 ) -> Result<(), PeermergeError> {
135 let not_empty = state_event_sender.is_some();
136
137 {
138 *self.state_event_sender.lock().await = state_event_sender;
139 }
140 if not_empty {
141 let mut state_event_sender = self.state_event_sender.lock().await;
145 if let Some(sender) = state_event_sender.as_mut() {
146 if sender.is_closed() {
147 *state_event_sender = None;
148 } else {
149 let mut document_patches: Vec<(DocumentId, Vec<Patch>)> = vec![];
150 for document_id in get_document_ids(&self.documents).await {
151 let mut document = self.get_document(&document_id).await?;
152 let new_patches = document.take_patches().await;
153 if !new_patches.is_empty() {
154 document_patches.push((document_id, new_patches))
155 }
156 }
157 for (document_id, patches) in document_patches {
158 sender
159 .unbounded_send(StateEvent::new(
160 document_id,
161 StateEventContent::DocumentChanged {
162 change_id: None,
163 patches,
164 },
165 ))
166 .unwrap();
167 }
168 }
169 }
170 }
171 Ok(())
172 }
173
174 #[instrument(skip(self, cb), fields(peer_name = self.default_peer_header.name))]
176 pub async fn transact<F, O>(&self, document_id: &DocumentId, cb: F) -> Result<O, PeermergeError>
177 where
178 F: FnOnce(&AutomergeDoc) -> Result<O, AutomergeError>,
179 {
180 let result = {
181 let document = self.get_document(document_id).await?;
182 document.transact(cb).await?
183 };
184 Ok(result)
185 }
186
187 #[instrument(skip(self, cb), fields(peer_name = self.default_peer_header.name))]
190 pub async fn transact_mut<F, O>(
191 &mut self,
192 document_id: &DocumentId,
193 cb: F,
194 change_id: Option<Vec<u8>>,
195 ) -> Result<O, PeermergeError>
196 where
197 F: FnOnce(&mut AutomergeDoc) -> Result<O, AutomergeError>,
198 {
199 let (result, state_events) = {
200 let mut document = self.get_document(document_id).await?;
201 document.transact_mut(cb, change_id).await?
202 };
203 if !state_events.is_empty() {
204 if let Some(state_event_sender) = self.state_event_sender.lock().await.as_mut() {
205 send_state_events(state_event_sender, state_events, &self.peermerge_state).await;
206 }
207 }
208 Ok(result)
209 }
210
211 #[instrument(skip(self))]
215 pub async fn watch(
216 &mut self,
217 document_id: &DocumentId,
218 ids: Option<Vec<ObjId>>,
219 ) -> Result<(), PeermergeError> {
220 let mut document = self.get_document(document_id).await?;
221 document.watch(ids).await;
222 Ok(())
223 }
224
225 #[instrument(skip(self, obj), fields(obj = obj.as_ref().to_string(), peer_name = self.default_peer_header.name))]
230 pub async fn reserve_object<O: AsRef<ObjId>>(
231 &mut self,
232 document_id: &DocumentId,
233 obj: O,
234 ) -> Result<(), PeermergeError> {
235 let mut document = self.get_document(document_id).await?;
236 document.reserve_object(obj.as_ref().clone()).await
237 }
238
239 #[instrument(skip(self, obj), fields(obj = obj.as_ref().to_string(), peer_name = self.default_peer_header.name))]
241 pub async fn unreserve_object<O: AsRef<ObjId>>(
242 &mut self,
243 document_id: &DocumentId,
244 obj: O,
245 ) -> Result<(), PeermergeError> {
246 let mut document = self.get_document(document_id).await?;
247 let state_events = document.unreserve_object(obj).await?;
248 if !state_events.is_empty() {
249 if let Some(state_event_sender) = self.state_event_sender.lock().await.as_mut() {
250 send_state_events(state_event_sender, state_events, &self.peermerge_state).await;
251 }
252 }
253 Ok(())
254 }
255
256 #[instrument(skip(self), fields(peer_name = self.default_peer_header.name))]
258 pub async fn sharing_info(
259 &self,
260 document_id: &DocumentId,
261 ) -> Result<DocumentSharingInfo, PeermergeError> {
262 let document = self.get_document(document_id).await?;
263 document.sharing_info().await
264 }
265
266 #[instrument(skip(self), fields(peer_name = self.default_peer_header.name))]
268 pub async fn document_secret(
269 &self,
270 document_id: &DocumentId,
271 ) -> Result<Option<String>, PeermergeError> {
272 let document = self.get_document(document_id).await?;
273 let document_secret = document.document_secret();
274 Ok(document_secret.as_ref().map(encode_document_secret))
275 }
276
277 #[instrument(skip(self), fields(peer_name = self.default_peer_header.name))]
279 pub async fn reattach_secret(
280 &self,
281 document_id: &DocumentId,
282 ) -> Result<String, PeermergeError> {
283 let document = self.get_document(document_id).await?;
284 let write_feed_signing_key = document.write_feed_signing_key().await;
285 Ok(encode_reattach_secret(
286 &self.peer_id,
287 &signing_key_to_bytes(&write_feed_signing_key),
288 ))
289 }
290
291 #[instrument(skip(self), fields(peer_name = self.default_peer_header.name))]
293 pub async fn protocol_ids(&self) -> Result<Vec<Vec<u8>>, PeermergeError> {
294 Ok(self
295 .protocols
296 .iter()
297 .map(|multi| multi.key().clone())
298 .collect())
299 }
300
301 #[instrument(skip(self), fields(peer_name = self.default_peer_header.name))]
303 pub async fn disconnect_protocol(
304 &mut self,
305 protocol_id: &[u8],
306 ) -> Result<bool, PeermergeError> {
307 if let Some(mut protocol_sender) = self.protocols.get_mut(protocol_id) {
308 protocol_sender
309 .signal_local(SHUTDOWN_SIGNAL_NAME, vec![])
310 .await?;
311 Ok(true)
312 } else {
313 Ok(false)
314 }
315 }
316
317 #[instrument(skip(self), fields(peer_name = self.default_peer_header.name))]
319 pub async fn disconnect_all_protocols(&mut self) -> Result<usize, PeermergeError> {
320 let protocol_ids: Vec<Vec<u8>> = {
322 self.protocols
323 .iter()
324 .map(|multi| multi.key().clone())
325 .collect()
326 };
327
328 for document_id in get_document_ids(&self.documents).await {
330 let mut document = self.get_document(&document_id).await?;
331 document.close().await?;
332 }
333
334 let mut count: usize = 0;
336 for protocol_id in protocol_ids {
337 if let Some(mut protocol_sender) = self.protocols.get_mut(&protocol_id) {
338 protocol_sender
339 .signal_local(SHUTDOWN_SIGNAL_NAME, vec![])
340 .await?;
341 count += 1;
342 }
343 }
344 Ok(count)
345 }
346
347 async fn get_document(
354 &self,
355 document_id: &DocumentId,
356 ) -> Result<Document<T, U>, PeermergeError> {
357 get_document(&self.documents, document_id)
358 .await
359 .ok_or_else(|| PeermergeError::BadArgument {
360 context: format!("No document found with given document id: {document_id:02X?}"),
361 })
362 }
363
364 async fn add_document(
365 &mut self,
366 document: Document<T, U>,
367 parent_id: Option<DocumentId>,
368 ) -> DocumentInfo {
369 let mut state = self.peermerge_state.lock().await;
370 let info = document.info().await;
371 self.documents.insert(info.id(), document);
372 state.add_document_id_to_state(info.id(), parent_id).await;
373 info
374 }
375
376 async fn parent_document_info(
377 &self,
378 parent_id: Option<DocumentId>,
379 parent_header: Option<NameDescription>,
380 ) -> Result<
381 (
382 Option<Document<T, U>>,
383 Option<(DocumentId, SigningKey, NameDescription)>,
384 ),
385 PeermergeError,
386 > {
387 if let Some(parent_id) = parent_id {
388 let document = self.get_document(&parent_id).await?;
389 let signing_key = document.doc_signature_signing_key().ok_or_else(|| {
390 PeermergeError::BadArgument {
391 context: "Can not create a child to parent without write access".to_string(),
392 }
393 })?;
394 let parent_header: NameDescription = if let Some(parent_header) = parent_header {
395 parent_header
396 } else {
397 document
398 .document_header()
399 .await
400 .ok_or_else(|| PeermergeError::BadArgument {
401 context: "Without parent_header the parent document needs \
402 a header to create a child document"
403 .to_string(),
404 })?
405 };
406 Ok((
407 Some(document),
408 Some((parent_id, signing_key, parent_header)),
409 ))
410 } else {
411 Ok((None, None))
412 }
413 }
414
415 async fn process_new_document_result(
416 &self,
417 result: NewDocumentResult<T, U>,
418 mut parent_document: Option<Document<T, U>>,
419 ) -> Result<(Document<T, U>, Option<DocumentId>), PeermergeError> {
420 if !result.state_events.is_empty() {
421 if let Some(state_event_sender) = self.state_event_sender.lock().await.as_mut() {
422 send_state_events(
423 state_event_sender,
424 result.state_events,
425 &self.peermerge_state,
426 )
427 .await;
428 }
429 }
430 let parent_id: Option<DocumentId> =
431 if let Some(child_document_info) = result.child_document_info {
432 let mut parent_document = parent_document.take().unwrap();
433 let parent_id = parent_document.id();
434 let document_secret = result.document.document_secret().unwrap();
435 let document_url = &result
436 .document
437 .sharing_info()
438 .await
439 .unwrap()
440 .read_write_document_url;
441 parent_document
442 .add_created_child_document(child_document_info, document_url, document_secret)
443 .await?;
444 Some(parent_id)
445 } else {
446 None
447 };
448 Ok((result.document, parent_id))
449 }
450}
451
452impl Peermerge<RandomAccessMemory, FeedMemoryPersistence> {
457 pub async fn new_memory(options: PeermergeMemoryOptions) -> Result<Self, PeermergeError> {
459 let document_settings = DocumentSettings {
460 max_entry_data_size_bytes: options.max_entry_data_size_bytes,
461 max_write_feed_length: options.max_write_feed_length,
462 };
463 let (reattach_secrets, peer_id) = if let Some(reattach_secrets) = options.reattach_secrets {
464 let mut secrets: HashMap<DocumentId, SigningKey> = HashMap::new();
465 let mut new_peer_id: Option<PeerId> = None;
466 for (document_id, reattach_secret) in reattach_secrets {
467 let (peer_id, write_feed_key_pair_bytes) =
468 decode_reattach_secret(&reattach_secret)?;
469 if let Some(id) = new_peer_id {
470 if peer_id != id {
471 return Err(PeermergeError::BadArgument {
472 context: "Invalid reattach secrets, peer id is not the same"
473 .to_string(),
474 });
475 }
476 } else {
477 new_peer_id = Some(peer_id);
478 }
479 let write_feed_signing_key = signing_key_from_bytes(&write_feed_key_pair_bytes);
480 secrets.insert(document_id, write_feed_signing_key);
481 }
482 (Some(secrets), new_peer_id)
483 } else {
484 (None, None)
485 };
486 let wrapper = PeermergeStateWrapper::new_memory(
487 &options.default_peer_header,
488 document_settings.clone(),
489 peer_id,
490 )
491 .await;
492 Ok(Self {
493 peer_id: wrapper.state.peer_id,
494 default_peer_header: options.default_peer_header,
495 prefix: PathBuf::new(),
496 peermerge_state: Arc::new(Mutex::new(wrapper)),
497 documents: Arc::new(DashMap::new()),
498 protocols: Arc::new(DashMap::new()),
499 state_event_sender: Arc::new(Mutex::new(options.state_event_sender)),
500 document_settings,
501 reattach_secrets,
502 })
503 }
504
505 pub async fn create_new_document_memory<F, O>(
507 &mut self,
508 options: CreateNewDocumentMemoryOptions,
509 init_cb: F,
510 change_id: Option<Vec<u8>>,
511 ) -> Result<(DocumentInfo, O), PeermergeError>
512 where
513 F: FnOnce(&mut Transaction) -> Result<O, AutomergeError>,
514 {
515 let (parent_document, parent_id_signing_key_and_header) = self
516 .parent_document_info(options.parent_id, options.parent_header)
517 .await?;
518 let (create_result, init_result) = Document::create_new_memory(
519 self.peer_id,
520 &self.default_peer_header,
521 &options.document_type,
522 options.document_header,
523 options.encrypted,
524 parent_id_signing_key_and_header,
525 self.document_settings.clone(),
526 init_cb,
527 change_id,
528 )
529 .await?;
530 let (document, parent_id) = self
531 .process_new_document_result(create_result, parent_document)
532 .await?;
533 Ok((self.add_document(document, parent_id).await, init_result))
534 }
535
536 pub async fn attach_document_memory(
538 &mut self,
539 options: AttachDocumentMemoryOptions,
540 ) -> Result<DocumentInfo, PeermergeError> {
541 let (parent_document, parent_id_signing_key_and_header) = self
542 .parent_document_info(options.parent_id, options.parent_header)
543 .await?;
544 let document_secret = options
545 .document_secret
546 .map(|secret| decode_document_secret(&secret))
547 .transpose()?;
548 let decoded_document_url = decode_doc_url(&options.document_url, &document_secret)?;
549
550 if self.reattach_secrets.is_some() {
552 if decoded_document_url.static_info.child {
553 return Err(PeermergeError::BadArgument {
554 context: "Can not reattach a child document".to_string(),
555 });
556 }
557 if !self.documents.is_empty() {
558 return Err(PeermergeError::BadArgument {
559 context: "Can only reattach to an empty peermerge".to_string(),
560 });
561 }
562 }
563
564 let attach_result = Document::attach_memory(
565 self.peer_id,
566 &self.default_peer_header,
567 decoded_document_url,
568 self.reattach_secrets.as_mut(),
569 parent_id_signing_key_and_header.map(|value| DocumentParent::New {
570 parent_id: value.0,
571 signing_key: value.1,
572 parent_header: value.2,
573 }),
574 self.document_settings.clone(),
575 )
576 .await?;
577 let (document, parent_id) = self
578 .process_new_document_result(attach_result, parent_document)
579 .await?;
580 Ok(self.add_document(document, parent_id).await)
581 }
582
583 #[instrument(skip_all, fields(peer_name = self.default_peer_header.name))]
585 pub async fn connect_protocol_memory<T>(
586 &mut self,
587 protocol_id: &[u8],
588 protocol: &mut FeedProtocol<T>,
589 ) -> Result<(), PeermergeError>
590 where
591 T: IO,
592 {
593 let (mut feed_event_sender, feed_event_receiver): (
594 UnboundedSender<FeedEvent>,
595 UnboundedReceiver<FeedEvent>,
596 ) = unbounded();
597 if self.state_event_sender.lock().await.is_none() {
598 return Err(PeermergeError::BadArgument {
599 context: "State event sender must be set before connecting protocol".to_string(),
600 });
601 };
602 if self.protocols.contains_key(protocol_id) {
603 return Err(PeermergeError::BadArgument {
604 context: format!("Protocol with id {protocol_id:?} already connected"),
605 });
606 }
607 let state_event_sender_for_task = self.state_event_sender.clone();
608 let documents_for_task = self.documents.clone();
609 let peermerge_state_for_task = self.peermerge_state.clone();
610 let task_span = tracing::debug_span!("call_on_feed_event_memory").or_current();
611 let peer_id = self.peer_id;
612 let default_peer_header = self.default_peer_header.clone();
613 let document_settings = self.document_settings.clone();
614 let reattach_secrets = self.reattach_secrets.clone();
615
616 #[cfg(not(target_arch = "wasm32"))]
617 task::spawn(async move {
618 let _entered = task_span.enter();
619 on_feed_event_memory(
620 peer_id,
621 default_peer_header,
622 document_settings,
623 feed_event_receiver,
624 state_event_sender_for_task,
625 documents_for_task,
626 peermerge_state_for_task,
627 reattach_secrets,
628 )
629 .await;
630 });
631 #[cfg(target_arch = "wasm32")]
632 spawn_local(async move {
633 let _entered = task_span.enter();
634 on_feed_event_memory(
635 peer_id,
636 default_peer_header,
637 document_settings,
638 feed_event_receiver,
639 state_event_sender_for_task,
640 documents_for_task,
641 peermerge_state_for_task,
642 reattach_secrets,
643 )
644 .await;
645 });
646
647 self.protocols
649 .insert(protocol_id.to_vec(), protocol.commands());
650
651 let result = on_protocol(
653 self.peer_id,
654 protocol,
655 self.documents.clone(),
656 &mut feed_event_sender,
657 )
658 .await;
659
660 if self.protocols.contains_key(protocol_id) {
662 self.protocols.remove(protocol_id);
663 }
664
665 result
666 }
667}
668
669#[instrument(level = "debug", skip_all)]
670#[allow(clippy::too_many_arguments)]
671async fn on_feed_event_memory(
672 peer_id: PeerId,
673 default_peer_header: NameDescription,
674 document_settings: DocumentSettings,
675 mut feed_event_receiver: UnboundedReceiver<FeedEvent>,
676 state_event_sender_mutex: Arc<Mutex<Option<UnboundedSender<StateEvent>>>>,
677 mut documents: Arc<DashMap<DocumentId, Document<RandomAccessMemory, FeedMemoryPersistence>>>,
678 peermerge_state: Arc<Mutex<PeermergeStateWrapper<RandomAccessMemory>>>,
679 mut reattach_secrets: Option<HashMap<DocumentId, SigningKey>>,
680) {
681 let mut state_event_sender: UnboundedSender<StateEvent> = {
682 state_event_sender_mutex
683 .lock()
684 .await
685 .clone()
686 .expect("Should always be present")
687 };
688 while let Some(event) = feed_event_receiver.next().await {
689 debug!("Received event {:?}", event);
690 if state_event_sender.is_closed() {
692 if let Some(sender) = state_event_sender_mutex.lock().await.clone() {
693 state_event_sender = sender;
694 }
695 }
696 match event.content {
697 FeedEventContent::NewFeedsBroadcasted { new_feeds } => {
698 let mut document =
699 get_document_by_discovery_key(&documents, &event.doc_discovery_key)
700 .await
701 .unwrap();
702 let state_events = document
703 .process_new_feeds_broadcasted_memory(new_feeds)
704 .await;
705
706 if !state_events.is_empty() {
707 send_state_events(&mut state_event_sender, state_events, &peermerge_state)
708 .await;
709 }
710 }
711 FeedEventContent::NewChildDocumentsBroadcasted {
712 new_child_documents,
713 } => {
714 let mut parent_document =
715 get_document_by_discovery_key(&documents, &event.doc_discovery_key)
716 .await
717 .unwrap();
718 let parent_id = parent_document.id();
719 for mut new_child_document in new_child_documents {
720 if let Some(decoded_document_url) = parent_document
721 .merge_remote_child_document(&mut new_child_document)
722 .await
723 .unwrap()
724 {
725 let document_id = decoded_document_url.static_info.document_id;
726
727 if !documents.contains_key(&decoded_document_url.static_info.document_id) {
730 let attach_result = Document::attach_memory(
731 peer_id,
732 &default_peer_header,
733 decoded_document_url,
734 reattach_secrets.as_mut(),
735 Some(DocumentParent::Registered {
736 child_document_info: new_child_document.clone(),
737 parent_id,
738 }),
739 document_settings.clone(),
740 )
741 .await
742 .unwrap();
743 if !attach_result.state_events.is_empty() {
744 send_state_events(
745 &mut state_event_sender,
746 attach_result.state_events,
747 &peermerge_state,
748 )
749 .await;
750 }
751 documents.insert(document_id, attach_result.document);
752 }
753 {
754 let mut state = peermerge_state.lock().await;
755 state
756 .add_document_id_to_state(document_id, Some(parent_id))
757 .await;
758 }
759 parent_document
761 .set_child_document_created(&new_child_document)
762 .await
763 .unwrap();
764 }
765 }
766 }
767 FeedEventContent::FeedMaxLengthReached { discovery_key } => {
768 let mut document =
769 get_document_by_discovery_key(&documents, &event.doc_discovery_key)
770 .await
771 .unwrap();
772 let state_events = document
773 .replace_write_feed_memory(&discovery_key)
774 .await
775 .unwrap();
776 send_state_events(&mut state_event_sender, state_events, &peermerge_state).await;
777 }
778 _ => {
779 process_feed_event(
780 event,
781 &mut state_event_sender,
782 &mut documents,
783 &peermerge_state,
784 )
785 .await
786 }
787 }
788 }
789 debug!("Exiting");
790}
791
792#[cfg(not(target_arch = "wasm32"))]
797impl Peermerge<RandomAccessDisk, FeedDiskPersistence> {
798 pub async fn new_disk(options: PeermergeDiskOptions) -> Result<Self, PeermergeError> {
800 let document_settings = DocumentSettings {
801 max_entry_data_size_bytes: options.max_entry_data_size_bytes,
802 max_write_feed_length: options.max_write_feed_length,
803 };
804 let wrapper = PeermergeStateWrapper::new_disk(
805 &options.default_peer_header,
806 &options.data_root_dir,
807 document_settings.clone(),
808 )
809 .await?;
810 Ok(Self {
811 peer_id: wrapper.state.peer_id,
812 default_peer_header: options.default_peer_header,
813 prefix: options.data_root_dir.clone(),
814 peermerge_state: Arc::new(Mutex::new(wrapper)),
815 documents: Arc::new(DashMap::new()),
816 protocols: Arc::new(DashMap::new()),
817 state_event_sender: Arc::new(Mutex::new(options.state_event_sender)),
818 document_settings,
819 reattach_secrets: None,
820 })
821 }
822
823 pub async fn document_infos_disk(
825 data_root_dir: &Path,
826 ) -> Result<Option<Vec<DocumentInfo>>, PeermergeError> {
827 if let Some(state_wrapper) = PeermergeStateWrapper::open_disk(data_root_dir).await? {
828 let mut document_infos: Vec<DocumentInfo> = vec![];
829 for document_id_with_parents in &state_wrapper.state.document_ids {
830 let postfix = encode_document_id(&document_id_with_parents.document_id);
831 let document_data_root_dir = data_root_dir.join(postfix);
832 document_infos.push(Document::info_disk(&document_data_root_dir).await?);
833 }
834 Ok(Some(document_infos))
835 } else {
836 Ok(None)
837 }
838 }
839
840 pub async fn open_disk(options: OpenDiskOptions) -> Result<Self, PeermergeError> {
842 let mut document_secrets: HashMap<DocumentId, DocumentSecret> = HashMap::new();
843 for (document_id, document_secret) in options.document_secrets.unwrap_or_default() {
844 let document_secret = decode_document_secret(&document_secret)?;
845 document_secrets.insert(document_id, document_secret);
846 }
847 let state_wrapper = PeermergeStateWrapper::open_disk(&options.data_root_dir)
848 .await?
849 .expect("Not a valid peermerge directory");
850 let state = state_wrapper.state();
851 let peer_id = state.peer_id;
852 let default_peer_header = state.default_peer_header.clone();
853 let document_settings = state.document_settings.clone();
854 let documents: DashMap<DocumentId, Document<RandomAccessDisk, FeedDiskPersistence>> =
855 DashMap::new();
856 let mut state_events: Vec<StateEvent> = vec![];
857
858 let mut document_ids_with_parents: Vec<DocumentIdWithParents> = state_wrapper
861 .state
862 .document_ids
863 .iter()
864 .filter(|document_id| document_id.parent_document_ids.is_empty())
865 .cloned()
866 .collect();
867 document_ids_with_parents.extend(
868 state_wrapper
869 .state
870 .document_ids
871 .iter()
872 .filter(|document_id| !document_id.parent_document_ids.is_empty())
873 .cloned(),
874 );
875 for document_id_with_parents in &state_wrapper.state.document_ids {
876 let document_id = &document_id_with_parents.document_id;
877 let postfix = encode_document_id(document_id);
878 let document_data_root_dir = options.data_root_dir.join(postfix);
879 let (document, document_state_events) = Document::open_disk(
880 peer_id,
881 &mut document_secrets,
882 &document_data_root_dir,
883 document_settings.clone(),
884 )
885 .await?;
886 state_events.extend(document_state_events);
887 documents.insert(*document_id, document);
888 }
889 let documents = Arc::new(documents);
890 let peermerge_state = Arc::new(Mutex::new(state_wrapper));
891 let mut state_event_sender = options.state_event_sender;
892 if let Some(state_event_sender) = state_event_sender.as_mut() {
893 send_state_events(state_event_sender, state_events, &peermerge_state).await;
894 }
895
896 Ok(Self {
897 peer_id,
898 default_peer_header,
899 prefix: options.data_root_dir,
900 peermerge_state,
901 documents,
902 protocols: Arc::new(DashMap::new()),
903 state_event_sender: Arc::new(Mutex::new(state_event_sender)),
904 document_settings,
905 reattach_secrets: None,
906 })
907 }
908
909 pub async fn create_new_document_disk<F, O>(
911 &mut self,
912 options: CreateNewDocumentDiskOptions,
913 init_cb: F,
914 change_id: Option<Vec<u8>>,
915 ) -> Result<(DocumentInfo, O), PeermergeError>
916 where
917 F: FnOnce(&mut Transaction) -> Result<O, AutomergeError>,
918 {
919 let (parent_document, parent_id_signing_key_and_header) = self
920 .parent_document_info(options.parent_id, options.parent_header)
921 .await?;
922 let (create_result, init_result) = Document::create_new_disk(
923 self.peer_id,
924 &self.default_peer_header,
925 &options.document_type,
926 options.document_header,
927 options.encrypted,
928 parent_id_signing_key_and_header,
929 self.document_settings.clone(),
930 init_cb,
931 change_id,
932 &self.prefix,
933 )
934 .await?;
935 let (document, parent_id) = self
936 .process_new_document_result(create_result, parent_document)
937 .await?;
938 Ok((self.add_document(document, parent_id).await, init_result))
939 }
940
941 pub async fn attach_document_disk(
943 &mut self,
944 options: AttachDocumentDiskOptions,
945 ) -> Result<DocumentInfo, PeermergeError> {
946 let (parent_document, parent_id_signing_key_and_header) = self
947 .parent_document_info(options.parent_id, options.parent_header)
948 .await?;
949 let document_secret = options
950 .document_secret
951 .map(|secret| decode_document_secret(&secret))
952 .transpose()?;
953 let decoded_document_url = decode_doc_url(&options.document_url, &document_secret)?;
954 let attach_result = Document::attach_disk(
955 self.peer_id,
956 &self.default_peer_header,
957 decoded_document_url,
958 parent_id_signing_key_and_header.map(|value| DocumentParent::New {
959 parent_id: value.0,
960 signing_key: value.1,
961 parent_header: value.2,
962 }),
963 &self.prefix,
964 self.document_settings.clone(),
965 )
966 .await?;
967 let (document, parent_id) = self
968 .process_new_document_result(attach_result, parent_document)
969 .await?;
970 Ok(self.add_document(document, parent_id).await)
971 }
972
973 #[instrument(skip_all, fields(name = self.default_peer_header.name))]
975 pub async fn connect_protocol_disk<T>(
976 &mut self,
977 protocol_id: &[u8],
978 protocol: &mut FeedProtocol<T>,
979 ) -> Result<(), PeermergeError>
980 where
981 T: IO,
982 {
983 let (mut feed_event_sender, feed_event_receiver): (
984 UnboundedSender<FeedEvent>,
985 UnboundedReceiver<FeedEvent>,
986 ) = unbounded();
987 if self.state_event_sender.lock().await.is_none() {
988 return Err(PeermergeError::BadArgument {
989 context: "State event sender must be set before connecting protocol".to_string(),
990 });
991 };
992 if self.protocols.contains_key(protocol_id) {
993 return Err(PeermergeError::BadArgument {
994 context: format!("Protocol with id {protocol_id:?} already connected"),
995 });
996 }
997 let state_event_sender_for_task = self.state_event_sender.clone();
998 let documents_for_task = self.documents.clone();
999 let peemerge_state_for_task = self.peermerge_state.clone();
1000 let peer_id = self.peer_id;
1001 let default_peer_header = self.default_peer_header.clone();
1002 let document_settings = self.document_settings.clone();
1003 let prefix = self.prefix.clone();
1004 let task_span = tracing::debug_span!("call_on_feed_event_disk").or_current();
1005 task::spawn(async move {
1006 let _entered = task_span.enter();
1007 on_feed_event_disk(
1008 peer_id,
1009 default_peer_header,
1010 document_settings,
1011 &prefix,
1012 feed_event_receiver,
1013 state_event_sender_for_task,
1014 documents_for_task,
1015 peemerge_state_for_task,
1016 )
1017 .await;
1018 });
1019
1020 self.protocols
1022 .insert(protocol_id.to_vec(), protocol.commands());
1023
1024 let result = on_protocol(
1026 self.peer_id,
1027 protocol,
1028 self.documents.clone(),
1029 &mut feed_event_sender,
1030 )
1031 .await;
1032
1033 if self.protocols.contains_key(protocol_id) {
1035 self.protocols.remove(protocol_id);
1036 }
1037
1038 result
1039 }
1040}
1041
1042#[cfg(not(target_arch = "wasm32"))]
1043#[instrument(level = "debug", skip_all)]
1044#[allow(clippy::too_many_arguments)]
1045async fn on_feed_event_disk(
1046 peer_id: PeerId,
1047 default_peer_header: NameDescription,
1048 document_settings: DocumentSettings,
1049 prefix: &Path,
1050 mut feed_event_receiver: UnboundedReceiver<FeedEvent>,
1051 state_event_sender_mutex: Arc<Mutex<Option<UnboundedSender<StateEvent>>>>,
1052 mut documents: Arc<DashMap<DocumentId, Document<RandomAccessDisk, FeedDiskPersistence>>>,
1053 peermerge_state: Arc<Mutex<PeermergeStateWrapper<RandomAccessDisk>>>,
1054) {
1055 let mut state_event_sender: UnboundedSender<StateEvent> = {
1056 state_event_sender_mutex
1057 .lock()
1058 .await
1059 .clone()
1060 .expect("Should always be present")
1061 };
1062 while let Some(event) = feed_event_receiver.next().await {
1063 if state_event_sender.is_closed() {
1065 if let Some(sender) = state_event_sender_mutex.lock().await.clone() {
1066 state_event_sender = sender;
1067 }
1068 }
1069 debug!("Received event {:?}", event);
1070 match event.content {
1071 FeedEventContent::NewFeedsBroadcasted { new_feeds } => {
1072 let mut document =
1073 get_document_by_discovery_key(&documents, &event.doc_discovery_key)
1074 .await
1075 .unwrap();
1076 let state_events = document
1077 .process_new_feeds_broadcasted_disk(new_feeds)
1078 .await
1079 .unwrap();
1080 if !state_events.is_empty() {
1081 send_state_events(&mut state_event_sender, state_events, &peermerge_state)
1082 .await;
1083 }
1084 }
1085 FeedEventContent::NewChildDocumentsBroadcasted {
1086 new_child_documents,
1087 } => {
1088 let mut parent_document =
1089 get_document_by_discovery_key(&documents, &event.doc_discovery_key)
1090 .await
1091 .unwrap();
1092 let parent_id = parent_document.id();
1093 for mut new_child_document in new_child_documents {
1094 if let Some(decoded_document_url) = parent_document
1095 .merge_remote_child_document(&mut new_child_document)
1096 .await
1097 .unwrap()
1098 {
1099 let document_id = decoded_document_url.static_info.document_id;
1100 if !documents.contains_key(&document_id) {
1103 let attach_result = Document::attach_disk(
1104 peer_id,
1105 &default_peer_header,
1106 decoded_document_url,
1107 Some(DocumentParent::Registered {
1108 child_document_info: new_child_document.clone(),
1109 parent_id,
1110 }),
1111 prefix,
1112 document_settings.clone(),
1113 )
1114 .await
1115 .unwrap();
1116
1117 if !attach_result.state_events.is_empty() {
1118 send_state_events(
1119 &mut state_event_sender,
1120 attach_result.state_events,
1121 &peermerge_state,
1122 )
1123 .await;
1124 }
1125 documents.insert(document_id, attach_result.document);
1126 }
1127 {
1128 let mut state = peermerge_state.lock().await;
1129 state
1130 .add_document_id_to_state(document_id, Some(parent_id))
1131 .await;
1132 }
1133
1134 parent_document
1136 .set_child_document_created(&new_child_document)
1137 .await
1138 .unwrap();
1139 }
1140 }
1141 }
1142 FeedEventContent::FeedMaxLengthReached { discovery_key } => {
1143 let mut document =
1144 get_document_by_discovery_key(&documents, &event.doc_discovery_key)
1145 .await
1146 .unwrap();
1147 let state_events = document
1148 .replace_write_feed_disk(&discovery_key)
1149 .await
1150 .unwrap();
1151 send_state_events(&mut state_event_sender, state_events, &peermerge_state).await;
1152 }
1153 _ => {
1154 process_feed_event(
1155 event,
1156 &mut state_event_sender,
1157 &mut documents,
1158 &peermerge_state,
1159 )
1160 .await
1161 }
1162 }
1163 }
1164 debug!("Exiting");
1165}
1166
1167async fn send_state_events<T>(
1173 state_event_sender: &mut UnboundedSender<StateEvent>,
1174 state_events: Vec<StateEvent>,
1175 peermerge_state: &Arc<Mutex<PeermergeStateWrapper<T>>>,
1176) where
1177 T: RandomAccess + Debug + Send + 'static,
1178{
1179 if !state_event_sender.is_closed() {
1180 for mut state_event in state_events {
1181 post_process_state_event(&mut state_event, peermerge_state).await;
1182 state_event_sender.unbounded_send(state_event).unwrap();
1183 }
1184 }
1185}
1186
1187#[instrument(level = "debug", skip_all)]
1188async fn process_feed_event<T, U>(
1189 event: FeedEvent,
1190 state_event_sender: &mut UnboundedSender<StateEvent>,
1191 documents: &mut Arc<DashMap<DocumentId, Document<T, U>>>,
1192 peermerge_state: &Arc<Mutex<PeermergeStateWrapper<T>>>,
1193) where
1194 T: RandomAccess + Debug + Send + 'static,
1195 U: FeedPersistence,
1196{
1197 match event.content {
1198 FeedEventContent::NewFeedsBroadcasted { .. } => {
1199 unreachable!("Implemented by concrete type")
1200 }
1201 FeedEventContent::NewChildDocumentsBroadcasted { .. } => {
1202 unreachable!("Implemented by concrete type")
1203 }
1204 FeedEventContent::FeedMaxLengthReached { .. } => {
1205 unreachable!("Implemented by concrete type")
1206 }
1207 FeedEventContent::FeedDisconnected { .. } => {
1208 }
1210 FeedEventContent::FeedVerified {
1211 peer_id,
1212 discovery_key,
1213 verified,
1214 } => {
1215 let document = get_document_by_discovery_key(documents, &event.doc_discovery_key)
1216 .await
1217 .unwrap();
1218 if verified {
1219 document.set_feed_verified(&discovery_key, &peer_id).await;
1220 } else {
1221 unimplemented!("TODO: Invalid feed deletion");
1222 }
1223 }
1224 FeedEventContent::RemoteFeedSynced {
1225 peer_id,
1226 discovery_key,
1227 contiguous_length,
1228 } => {
1229 let document = get_document_by_discovery_key(documents, &event.doc_discovery_key)
1230 .await
1231 .unwrap();
1232 let state_events = document
1233 .process_remote_feed_synced(peer_id, discovery_key, contiguous_length)
1234 .await;
1235 send_state_events(state_event_sender, state_events, peermerge_state).await;
1236 }
1237 FeedEventContent::FeedSynced {
1238 peer_id,
1239 discovery_key,
1240 contiguous_length,
1241 } => {
1242 let mut document = get_document_by_discovery_key(documents, &event.doc_discovery_key)
1243 .await
1244 .unwrap();
1245 let state_events = document
1246 .process_feed_synced(peer_id, discovery_key, contiguous_length)
1247 .await;
1248 send_state_events(state_event_sender, state_events, peermerge_state).await;
1249 }
1250 }
1251}
1252
1253async fn post_process_state_event<T>(
1254 state_event: &mut StateEvent,
1255 peermerge_state: &Arc<Mutex<PeermergeStateWrapper<T>>>,
1256) where
1257 T: RandomAccess + Debug + Send + 'static,
1258{
1259 if let StateEventContent::DocumentInitialized {
1260 child,
1261 ref mut parent_document_ids,
1262 ..
1263 } = state_event.content
1264 {
1265 if child && parent_document_ids.is_empty() {
1266 let peermerge_state = peermerge_state.lock().await;
1268 parent_document_ids.extend(
1269 peermerge_state
1270 .state
1271 .document_ids
1272 .iter()
1273 .find(|id_with_parents| state_event.document_id == id_with_parents.document_id)
1274 .unwrap()
1275 .parent_document_ids
1276 .clone(),
1277 );
1278 }
1279 }
1280}