peermerge/
core.rs

1use automerge::{transaction::Transaction, AutomergeError, ObjId, Patch};
2use dashmap::DashMap;
3use futures::{
4    channel::mpsc::{unbounded, UnboundedReceiver, UnboundedSender},
5    StreamExt,
6};
7use hypercore_protocol::{hypercore::SigningKey, CommandTx};
8#[cfg(not(target_arch = "wasm32"))]
9use random_access_disk::RandomAccessDisk;
10use random_access_memory::RandomAccessMemory;
11use random_access_storage::RandomAccess;
12#[cfg(not(target_arch = "wasm32"))]
13use std::path::Path;
14use std::path::PathBuf;
15use std::sync::Arc;
16use std::{collections::HashMap, fmt::Debug};
17use tracing::{debug, instrument};
18
19#[cfg(all(not(target_arch = "wasm32"), feature = "async-std"))]
20use async_std::task;
21#[cfg(all(not(target_arch = "wasm32"), feature = "tokio"))]
22use tokio::task;
23#[cfg(target_arch = "wasm32")]
24use wasm_bindgen_futures::spawn_local;
25
26#[cfg(not(target_arch = "wasm32"))]
27use crate::{
28    common::cipher::{encode_document_id, DocumentSecret},
29    feeds::FeedDiskPersistence,
30    options::{
31        AttachDocumentDiskOptions, CreateNewDocumentDiskOptions, OpenDiskOptions,
32        PeermergeDiskOptions,
33    },
34};
35use crate::{
36    common::{
37        cipher::{
38            decode_doc_url, decode_document_secret, decode_reattach_secret, encode_document_secret,
39            encode_reattach_secret,
40        },
41        keys::{signing_key_from_bytes, signing_key_to_bytes},
42        state::DocumentIdWithParents,
43        storage::PeermergeStateWrapper,
44        utils::Mutex,
45        FeedEventContent,
46    },
47    document::{
48        get_document_by_discovery_key, DocumentParent, DocumentSettings, NewDocumentResult,
49    },
50    feeds::{FeedMemoryPersistence, FeedPersistence, FeedProtocol, SHUTDOWN_SIGNAL_NAME},
51    options::PeermergeMemoryOptions,
52    AttachDocumentMemoryOptions, AutomergeDoc, CreateNewDocumentMemoryOptions, DocumentSharingInfo,
53    PeerId, PeermergeError, StateEventContent,
54};
55use crate::{
56    common::{DocumentInfo, FeedEvent},
57    document::{get_document, get_document_ids},
58    feeds::on_protocol,
59    DocumentId, NameDescription, IO,
60};
61use crate::{document::Document, StateEvent};
62
63/// Peermerge is the main abstraction and a store for multiple documents.
64#[derive(derivative::Derivative)]
65#[derivative(Clone(bound = ""))]
66#[derive(Debug)]
67pub struct Peermerge<T, U>
68where
69    T: RandomAccess + Debug + Send,
70    U: FeedPersistence,
71{
72    /// Id of this peer
73    peer_id: PeerId,
74    /// Name and description of this peer that's given by default
75    /// to new documents. Can be something different within individual
76    /// documents as they change over time.
77    default_peer_header: NameDescription,
78    // General settings for all documents
79    document_settings: DocumentSettings,
80    /// Prefix
81    prefix: PathBuf,
82    /// Current storable state
83    peermerge_state: Arc<Mutex<PeermergeStateWrapper<T>>>,
84    /// Created documents
85    documents: Arc<DashMap<DocumentId, Document<T, U>>>,
86    /// Attached protocols
87    protocols: Arc<DashMap<Vec<u8>, CommandTx>>,
88    /// Sender for events
89    state_event_sender: Arc<Mutex<Option<UnboundedSender<StateEvent>>>>,
90    /// Transient save of reattach secrets for child documents, used only for memory
91    /// peermerges. Need to be stored because child documents' write feeds can't be
92    /// created immediately.
93    reattach_secrets: Option<HashMap<DocumentId, SigningKey>>,
94}
95
96impl<T, U> Peermerge<T, U>
97where
98    T: RandomAccess + Debug + Send + 'static,
99    U: FeedPersistence,
100{
101    /// Get my peer id
102    #[instrument(skip(self), fields(peer_name = self.default_peer_header.name))]
103    pub fn peer_id(&self) -> PeerId {
104        self.peer_id
105    }
106
107    /// Get all known peer ids in a document
108    #[instrument(skip(self), fields(peer_name = self.default_peer_header.name))]
109    pub async fn peer_ids(&self, document_id: &DocumentId) -> Result<Vec<PeerId>, PeermergeError> {
110        let document = self.get_document(document_id).await?;
111        Ok(document.peer_ids().await)
112    }
113
114    /// Get my default peer header given to a new document
115    pub fn default_peer_header(&self) -> NameDescription {
116        self.default_peer_header.clone()
117    }
118
119    /// Get the current peer header value in a new document
120    pub async fn peer_header(
121        &self,
122        document_id: &DocumentId,
123        peer_id: &PeerId,
124    ) -> Result<Option<NameDescription>, PeermergeError> {
125        let document = self.get_document(document_id).await?;
126        Ok(document.peer_header(peer_id).await)
127    }
128
129    /// Add or change the state event sender. The sender can already be given when
130    /// peermerge is created but sometimes it's more convinient to set it later.
131    pub async fn set_state_event_sender(
132        &mut self,
133        state_event_sender: Option<UnboundedSender<StateEvent>>,
134    ) -> Result<(), PeermergeError> {
135        let not_empty = state_event_sender.is_some();
136
137        {
138            *self.state_event_sender.lock().await = state_event_sender;
139        }
140        if not_empty {
141            // Let's drain any patches that are not yet sent out, and push them out. These can
142            // be created by values inserted with peermerge.create_new_document_memory/disk()
143            // or other mutating calls executed before this call without a state_event_sender.
144            let mut state_event_sender = self.state_event_sender.lock().await;
145            if let Some(sender) = state_event_sender.as_mut() {
146                if sender.is_closed() {
147                    *state_event_sender = None;
148                } else {
149                    let mut document_patches: Vec<(DocumentId, Vec<Patch>)> = vec![];
150                    for document_id in get_document_ids(&self.documents).await {
151                        let mut document = self.get_document(&document_id).await?;
152                        let new_patches = document.take_patches().await;
153                        if !new_patches.is_empty() {
154                            document_patches.push((document_id, new_patches))
155                        }
156                    }
157                    for (document_id, patches) in document_patches {
158                        sender
159                            .unbounded_send(StateEvent::new(
160                                document_id,
161                                StateEventContent::DocumentChanged {
162                                    change_id: None,
163                                    patches,
164                                },
165                            ))
166                            .unwrap();
167                    }
168                }
169            }
170        }
171        Ok(())
172    }
173
174    /// Read from a document in a single transaction
175    #[instrument(skip(self, cb), fields(peer_name = self.default_peer_header.name))]
176    pub async fn transact<F, O>(&self, document_id: &DocumentId, cb: F) -> Result<O, PeermergeError>
177    where
178        F: FnOnce(&AutomergeDoc) -> Result<O, AutomergeError>,
179    {
180        let result = {
181            let document = self.get_document(document_id).await?;
182            document.transact(cb).await?
183        };
184        Ok(result)
185    }
186
187    /// Read and write to a document in a single transaction. `change_id` is an optional
188    /// parameter that will be returned in the corresponding DocumentChanged StateEvent.
189    #[instrument(skip(self, cb), fields(peer_name = self.default_peer_header.name))]
190    pub async fn transact_mut<F, O>(
191        &mut self,
192        document_id: &DocumentId,
193        cb: F,
194        change_id: Option<Vec<u8>>,
195    ) -> Result<O, PeermergeError>
196    where
197        F: FnOnce(&mut AutomergeDoc) -> Result<O, AutomergeError>,
198    {
199        let (result, state_events) = {
200            let mut document = self.get_document(document_id).await?;
201            document.transact_mut(cb, change_id).await?
202        };
203        if !state_events.is_empty() {
204            if let Some(state_event_sender) = self.state_event_sender.lock().await.as_mut() {
205                send_state_events(state_event_sender, state_events, &self.peermerge_state).await;
206            }
207        }
208        Ok(result)
209    }
210
211    /// Set watching to list of Automerge object ids so that DocumentChanged StateEvents
212    /// are returned only from changes to those objects. None means sending events of all
213    /// changes which is the default.
214    #[instrument(skip(self))]
215    pub async fn watch(
216        &mut self,
217        document_id: &DocumentId,
218        ids: Option<Vec<ObjId>>,
219    ) -> Result<(), PeermergeError> {
220        let mut document = self.get_document(document_id).await?;
221        document.watch(ids).await;
222        Ok(())
223    }
224
225    /// Reserve a given object for only local changes, preventing any peers from making changes
226    /// to it at the same time before `unreserve_object` has been called. Useful especially when
227    /// editing a text to avoid having to update remote changes to the field while typing.
228    /// Reserve is not persisted to storage.
229    #[instrument(skip(self, obj), fields(obj = obj.as_ref().to_string(), peer_name = self.default_peer_header.name))]
230    pub async fn reserve_object<O: AsRef<ObjId>>(
231        &mut self,
232        document_id: &DocumentId,
233        obj: O,
234    ) -> Result<(), PeermergeError> {
235        let mut document = self.get_document(document_id).await?;
236        document.reserve_object(obj.as_ref().clone()).await
237    }
238
239    /// Un-reserve a given object previously reserved with `reserve_object`.
240    #[instrument(skip(self, obj), fields(obj = obj.as_ref().to_string(), peer_name = self.default_peer_header.name))]
241    pub async fn unreserve_object<O: AsRef<ObjId>>(
242        &mut self,
243        document_id: &DocumentId,
244        obj: O,
245    ) -> Result<(), PeermergeError> {
246        let mut document = self.get_document(document_id).await?;
247        let state_events = document.unreserve_object(obj).await?;
248        if !state_events.is_empty() {
249            if let Some(state_event_sender) = self.state_event_sender.lock().await.as_mut() {
250                send_state_events(state_event_sender, state_events, &self.peermerge_state).await;
251            }
252        }
253        Ok(())
254    }
255
256    /// Get sharing information about a document.
257    #[instrument(skip(self), fields(peer_name = self.default_peer_header.name))]
258    pub async fn sharing_info(
259        &self,
260        document_id: &DocumentId,
261    ) -> Result<DocumentSharingInfo, PeermergeError> {
262        let document = self.get_document(document_id).await?;
263        document.sharing_info().await
264    }
265
266    /// Get the document secret from a given document_id.
267    #[instrument(skip(self), fields(peer_name = self.default_peer_header.name))]
268    pub async fn document_secret(
269        &self,
270        document_id: &DocumentId,
271    ) -> Result<Option<String>, PeermergeError> {
272        let document = self.get_document(document_id).await?;
273        let document_secret = document.document_secret();
274        Ok(document_secret.as_ref().map(encode_document_secret))
275    }
276
277    /// Get the reattach secret of a given document_id. Useful only for in-memory storage.
278    #[instrument(skip(self), fields(peer_name = self.default_peer_header.name))]
279    pub async fn reattach_secret(
280        &self,
281        document_id: &DocumentId,
282    ) -> Result<String, PeermergeError> {
283        let document = self.get_document(document_id).await?;
284        let write_feed_signing_key = document.write_feed_signing_key().await;
285        Ok(encode_reattach_secret(
286            &self.peer_id,
287            &signing_key_to_bytes(&write_feed_signing_key),
288        ))
289    }
290
291    /// Get all connected protocol ids
292    #[instrument(skip(self), fields(peer_name = self.default_peer_header.name))]
293    pub async fn protocol_ids(&self) -> Result<Vec<Vec<u8>>, PeermergeError> {
294        Ok(self
295            .protocols
296            .iter()
297            .map(|multi| multi.key().clone())
298            .collect())
299    }
300
301    /// Disconnect protocol by id, returns if protocol with id was found
302    #[instrument(skip(self), fields(peer_name = self.default_peer_header.name))]
303    pub async fn disconnect_protocol(
304        &mut self,
305        protocol_id: &[u8],
306    ) -> Result<bool, PeermergeError> {
307        if let Some(mut protocol_sender) = self.protocols.get_mut(protocol_id) {
308            protocol_sender
309                .signal_local(SHUTDOWN_SIGNAL_NAME, vec![])
310                .await?;
311            Ok(true)
312        } else {
313            Ok(false)
314        }
315    }
316
317    /// Disconnect all protocols, returns total number of protocols disconnected
318    #[instrument(skip(self), fields(peer_name = self.default_peer_header.name))]
319    pub async fn disconnect_all_protocols(&mut self) -> Result<usize, PeermergeError> {
320        // Get protocol ids first to avoid deadlocs
321        let protocol_ids: Vec<Vec<u8>> = {
322            self.protocols
323                .iter()
324                .map(|multi| multi.key().clone())
325                .collect()
326        };
327
328        // Send close signals to notify remote parties of close as well
329        for document_id in get_document_ids(&self.documents).await {
330            let mut document = self.get_document(&document_id).await?;
331            document.close().await?;
332        }
333
334        // Only after that start closing all protocols (that can still be found)
335        let mut count: usize = 0;
336        for protocol_id in protocol_ids {
337            if let Some(mut protocol_sender) = self.protocols.get_mut(&protocol_id) {
338                protocol_sender
339                    .signal_local(SHUTDOWN_SIGNAL_NAME, vec![])
340                    .await?;
341                count += 1;
342            }
343        }
344        Ok(count)
345    }
346
347    // ///////////////////////
348    //
349    // Private
350    //
351
352    /// Get a document based on document_id
353    async fn get_document(
354        &self,
355        document_id: &DocumentId,
356    ) -> Result<Document<T, U>, PeermergeError> {
357        get_document(&self.documents, document_id)
358            .await
359            .ok_or_else(|| PeermergeError::BadArgument {
360                context: format!("No document found with given document id: {document_id:02X?}"),
361            })
362    }
363
364    async fn add_document(
365        &mut self,
366        document: Document<T, U>,
367        parent_id: Option<DocumentId>,
368    ) -> DocumentInfo {
369        let mut state = self.peermerge_state.lock().await;
370        let info = document.info().await;
371        self.documents.insert(info.id(), document);
372        state.add_document_id_to_state(info.id(), parent_id).await;
373        info
374    }
375
376    async fn parent_document_info(
377        &self,
378        parent_id: Option<DocumentId>,
379        parent_header: Option<NameDescription>,
380    ) -> Result<
381        (
382            Option<Document<T, U>>,
383            Option<(DocumentId, SigningKey, NameDescription)>,
384        ),
385        PeermergeError,
386    > {
387        if let Some(parent_id) = parent_id {
388            let document = self.get_document(&parent_id).await?;
389            let signing_key = document.doc_signature_signing_key().ok_or_else(|| {
390                PeermergeError::BadArgument {
391                    context: "Can not create a child to parent without write access".to_string(),
392                }
393            })?;
394            let parent_header: NameDescription = if let Some(parent_header) = parent_header {
395                parent_header
396            } else {
397                document
398                    .document_header()
399                    .await
400                    .ok_or_else(|| PeermergeError::BadArgument {
401                        context: "Without parent_header the parent document needs \
402                                  a header to create a child document"
403                            .to_string(),
404                    })?
405            };
406            Ok((
407                Some(document),
408                Some((parent_id, signing_key, parent_header)),
409            ))
410        } else {
411            Ok((None, None))
412        }
413    }
414
415    async fn process_new_document_result(
416        &self,
417        result: NewDocumentResult<T, U>,
418        mut parent_document: Option<Document<T, U>>,
419    ) -> Result<(Document<T, U>, Option<DocumentId>), PeermergeError> {
420        if !result.state_events.is_empty() {
421            if let Some(state_event_sender) = self.state_event_sender.lock().await.as_mut() {
422                send_state_events(
423                    state_event_sender,
424                    result.state_events,
425                    &self.peermerge_state,
426                )
427                .await;
428            }
429        }
430        let parent_id: Option<DocumentId> =
431            if let Some(child_document_info) = result.child_document_info {
432                let mut parent_document = parent_document.take().unwrap();
433                let parent_id = parent_document.id();
434                let document_secret = result.document.document_secret().unwrap();
435                let document_url = &result
436                    .document
437                    .sharing_info()
438                    .await
439                    .unwrap()
440                    .read_write_document_url;
441                parent_document
442                    .add_created_child_document(child_document_info, document_url, document_secret)
443                    .await?;
444                Some(parent_id)
445            } else {
446                None
447            };
448        Ok((result.document, parent_id))
449    }
450}
451
452//////////////////////////////////////////////////////
453//
454// Memory
455
456impl Peermerge<RandomAccessMemory, FeedMemoryPersistence> {
457    /// Create a new memory Peermerge
458    pub async fn new_memory(options: PeermergeMemoryOptions) -> Result<Self, PeermergeError> {
459        let document_settings = DocumentSettings {
460            max_entry_data_size_bytes: options.max_entry_data_size_bytes,
461            max_write_feed_length: options.max_write_feed_length,
462        };
463        let (reattach_secrets, peer_id) = if let Some(reattach_secrets) = options.reattach_secrets {
464            let mut secrets: HashMap<DocumentId, SigningKey> = HashMap::new();
465            let mut new_peer_id: Option<PeerId> = None;
466            for (document_id, reattach_secret) in reattach_secrets {
467                let (peer_id, write_feed_key_pair_bytes) =
468                    decode_reattach_secret(&reattach_secret)?;
469                if let Some(id) = new_peer_id {
470                    if peer_id != id {
471                        return Err(PeermergeError::BadArgument {
472                            context: "Invalid reattach secrets, peer id is not the same"
473                                .to_string(),
474                        });
475                    }
476                } else {
477                    new_peer_id = Some(peer_id);
478                }
479                let write_feed_signing_key = signing_key_from_bytes(&write_feed_key_pair_bytes);
480                secrets.insert(document_id, write_feed_signing_key);
481            }
482            (Some(secrets), new_peer_id)
483        } else {
484            (None, None)
485        };
486        let wrapper = PeermergeStateWrapper::new_memory(
487            &options.default_peer_header,
488            document_settings.clone(),
489            peer_id,
490        )
491        .await;
492        Ok(Self {
493            peer_id: wrapper.state.peer_id,
494            default_peer_header: options.default_peer_header,
495            prefix: PathBuf::new(),
496            peermerge_state: Arc::new(Mutex::new(wrapper)),
497            documents: Arc::new(DashMap::new()),
498            protocols: Arc::new(DashMap::new()),
499            state_event_sender: Arc::new(Mutex::new(options.state_event_sender)),
500            document_settings,
501            reattach_secrets,
502        })
503    }
504
505    /// Create a new document in-memory
506    pub async fn create_new_document_memory<F, O>(
507        &mut self,
508        options: CreateNewDocumentMemoryOptions,
509        init_cb: F,
510        change_id: Option<Vec<u8>>,
511    ) -> Result<(DocumentInfo, O), PeermergeError>
512    where
513        F: FnOnce(&mut Transaction) -> Result<O, AutomergeError>,
514    {
515        let (parent_document, parent_id_signing_key_and_header) = self
516            .parent_document_info(options.parent_id, options.parent_header)
517            .await?;
518        let (create_result, init_result) = Document::create_new_memory(
519            self.peer_id,
520            &self.default_peer_header,
521            &options.document_type,
522            options.document_header,
523            options.encrypted,
524            parent_id_signing_key_and_header,
525            self.document_settings.clone(),
526            init_cb,
527            change_id,
528        )
529        .await?;
530        let (document, parent_id) = self
531            .process_new_document_result(create_result, parent_document)
532            .await?;
533        Ok((self.add_document(document, parent_id).await, init_result))
534    }
535
536    /// Attach an existing document in-memory
537    pub async fn attach_document_memory(
538        &mut self,
539        options: AttachDocumentMemoryOptions,
540    ) -> Result<DocumentInfo, PeermergeError> {
541        let (parent_document, parent_id_signing_key_and_header) = self
542            .parent_document_info(options.parent_id, options.parent_header)
543            .await?;
544        let document_secret = options
545            .document_secret
546            .map(|secret| decode_document_secret(&secret))
547            .transpose()?;
548        let decoded_document_url = decode_doc_url(&options.document_url, &document_secret)?;
549
550        // If reattach secrets have been given, there are conditions to attaching
551        if self.reattach_secrets.is_some() {
552            if decoded_document_url.static_info.child {
553                return Err(PeermergeError::BadArgument {
554                    context: "Can not reattach a child document".to_string(),
555                });
556            }
557            if !self.documents.is_empty() {
558                return Err(PeermergeError::BadArgument {
559                    context: "Can only reattach to an empty peermerge".to_string(),
560                });
561            }
562        }
563
564        let attach_result = Document::attach_memory(
565            self.peer_id,
566            &self.default_peer_header,
567            decoded_document_url,
568            self.reattach_secrets.as_mut(),
569            parent_id_signing_key_and_header.map(|value| DocumentParent::New {
570                parent_id: value.0,
571                signing_key: value.1,
572                parent_header: value.2,
573            }),
574            self.document_settings.clone(),
575        )
576        .await?;
577        let (document, parent_id) = self
578            .process_new_document_result(attach_result, parent_document)
579            .await?;
580        Ok(self.add_document(document, parent_id).await)
581    }
582
583    /// Connect peer protocol in-memory
584    #[instrument(skip_all, fields(peer_name = self.default_peer_header.name))]
585    pub async fn connect_protocol_memory<T>(
586        &mut self,
587        protocol_id: &[u8],
588        protocol: &mut FeedProtocol<T>,
589    ) -> Result<(), PeermergeError>
590    where
591        T: IO,
592    {
593        let (mut feed_event_sender, feed_event_receiver): (
594            UnboundedSender<FeedEvent>,
595            UnboundedReceiver<FeedEvent>,
596        ) = unbounded();
597        if self.state_event_sender.lock().await.is_none() {
598            return Err(PeermergeError::BadArgument {
599                context: "State event sender must be set before connecting protocol".to_string(),
600            });
601        };
602        if self.protocols.contains_key(protocol_id) {
603            return Err(PeermergeError::BadArgument {
604                context: format!("Protocol with id {protocol_id:?} already connected"),
605            });
606        }
607        let state_event_sender_for_task = self.state_event_sender.clone();
608        let documents_for_task = self.documents.clone();
609        let peermerge_state_for_task = self.peermerge_state.clone();
610        let task_span = tracing::debug_span!("call_on_feed_event_memory").or_current();
611        let peer_id = self.peer_id;
612        let default_peer_header = self.default_peer_header.clone();
613        let document_settings = self.document_settings.clone();
614        let reattach_secrets = self.reattach_secrets.clone();
615
616        #[cfg(not(target_arch = "wasm32"))]
617        task::spawn(async move {
618            let _entered = task_span.enter();
619            on_feed_event_memory(
620                peer_id,
621                default_peer_header,
622                document_settings,
623                feed_event_receiver,
624                state_event_sender_for_task,
625                documents_for_task,
626                peermerge_state_for_task,
627                reattach_secrets,
628            )
629            .await;
630        });
631        #[cfg(target_arch = "wasm32")]
632        spawn_local(async move {
633            let _entered = task_span.enter();
634            on_feed_event_memory(
635                peer_id,
636                default_peer_header,
637                document_settings,
638                feed_event_receiver,
639                state_event_sender_for_task,
640                documents_for_task,
641                peermerge_state_for_task,
642                reattach_secrets,
643            )
644            .await;
645        });
646
647        // Add protocol to map
648        self.protocols
649            .insert(protocol_id.to_vec(), protocol.commands());
650
651        // Start listening to protocol
652        let result = on_protocol(
653            self.peer_id,
654            protocol,
655            self.documents.clone(),
656            &mut feed_event_sender,
657        )
658        .await;
659
660        // Remove protocol from map
661        if self.protocols.contains_key(protocol_id) {
662            self.protocols.remove(protocol_id);
663        }
664
665        result
666    }
667}
668
669#[instrument(level = "debug", skip_all)]
670#[allow(clippy::too_many_arguments)]
671async fn on_feed_event_memory(
672    peer_id: PeerId,
673    default_peer_header: NameDescription,
674    document_settings: DocumentSettings,
675    mut feed_event_receiver: UnboundedReceiver<FeedEvent>,
676    state_event_sender_mutex: Arc<Mutex<Option<UnboundedSender<StateEvent>>>>,
677    mut documents: Arc<DashMap<DocumentId, Document<RandomAccessMemory, FeedMemoryPersistence>>>,
678    peermerge_state: Arc<Mutex<PeermergeStateWrapper<RandomAccessMemory>>>,
679    mut reattach_secrets: Option<HashMap<DocumentId, SigningKey>>,
680) {
681    let mut state_event_sender: UnboundedSender<StateEvent> = {
682        state_event_sender_mutex
683            .lock()
684            .await
685            .clone()
686            .expect("Should always be present")
687    };
688    while let Some(event) = feed_event_receiver.next().await {
689        debug!("Received event {:?}", event);
690        // The state event sender might change so that the other side closes
691        if state_event_sender.is_closed() {
692            if let Some(sender) = state_event_sender_mutex.lock().await.clone() {
693                state_event_sender = sender;
694            }
695        }
696        match event.content {
697            FeedEventContent::NewFeedsBroadcasted { new_feeds } => {
698                let mut document =
699                    get_document_by_discovery_key(&documents, &event.doc_discovery_key)
700                        .await
701                        .unwrap();
702                let state_events = document
703                    .process_new_feeds_broadcasted_memory(new_feeds)
704                    .await;
705
706                if !state_events.is_empty() {
707                    send_state_events(&mut state_event_sender, state_events, &peermerge_state)
708                        .await;
709                }
710            }
711            FeedEventContent::NewChildDocumentsBroadcasted {
712                new_child_documents,
713            } => {
714                let mut parent_document =
715                    get_document_by_discovery_key(&documents, &event.doc_discovery_key)
716                        .await
717                        .unwrap();
718                let parent_id = parent_document.id();
719                for mut new_child_document in new_child_documents {
720                    if let Some(decoded_document_url) = parent_document
721                        .merge_remote_child_document(&mut new_child_document)
722                        .await
723                        .unwrap()
724                    {
725                        let document_id = decoded_document_url.static_info.document_id;
726
727                        // It is possible that this child document has multiple parents, and is already
728                        // attached by another parent.
729                        if !documents.contains_key(&decoded_document_url.static_info.document_id) {
730                            let attach_result = Document::attach_memory(
731                                peer_id,
732                                &default_peer_header,
733                                decoded_document_url,
734                                reattach_secrets.as_mut(),
735                                Some(DocumentParent::Registered {
736                                    child_document_info: new_child_document.clone(),
737                                    parent_id,
738                                }),
739                                document_settings.clone(),
740                            )
741                            .await
742                            .unwrap();
743                            if !attach_result.state_events.is_empty() {
744                                send_state_events(
745                                    &mut state_event_sender,
746                                    attach_result.state_events,
747                                    &peermerge_state,
748                                )
749                                .await;
750                            }
751                            documents.insert(document_id, attach_result.document);
752                        }
753                        {
754                            let mut state = peermerge_state.lock().await;
755                            state
756                                .add_document_id_to_state(document_id, Some(parent_id))
757                                .await;
758                        }
759                        // Finally, set child document to created to parent
760                        parent_document
761                            .set_child_document_created(&new_child_document)
762                            .await
763                            .unwrap();
764                    }
765                }
766            }
767            FeedEventContent::FeedMaxLengthReached { discovery_key } => {
768                let mut document =
769                    get_document_by_discovery_key(&documents, &event.doc_discovery_key)
770                        .await
771                        .unwrap();
772                let state_events = document
773                    .replace_write_feed_memory(&discovery_key)
774                    .await
775                    .unwrap();
776                send_state_events(&mut state_event_sender, state_events, &peermerge_state).await;
777            }
778            _ => {
779                process_feed_event(
780                    event,
781                    &mut state_event_sender,
782                    &mut documents,
783                    &peermerge_state,
784                )
785                .await
786            }
787        }
788    }
789    debug!("Exiting");
790}
791
792//////////////////////////////////////////////////////
793//
794// Disk
795
796#[cfg(not(target_arch = "wasm32"))]
797impl Peermerge<RandomAccessDisk, FeedDiskPersistence> {
798    /// Create a new disk Peermerge
799    pub async fn new_disk(options: PeermergeDiskOptions) -> Result<Self, PeermergeError> {
800        let document_settings = DocumentSettings {
801            max_entry_data_size_bytes: options.max_entry_data_size_bytes,
802            max_write_feed_length: options.max_write_feed_length,
803        };
804        let wrapper = PeermergeStateWrapper::new_disk(
805            &options.default_peer_header,
806            &options.data_root_dir,
807            document_settings.clone(),
808        )
809        .await?;
810        Ok(Self {
811            peer_id: wrapper.state.peer_id,
812            default_peer_header: options.default_peer_header,
813            prefix: options.data_root_dir.clone(),
814            peermerge_state: Arc::new(Mutex::new(wrapper)),
815            documents: Arc::new(DashMap::new()),
816            protocols: Arc::new(DashMap::new()),
817            state_event_sender: Arc::new(Mutex::new(options.state_event_sender)),
818            document_settings,
819            reattach_secrets: None,
820        })
821    }
822
823    /// Get information about possible documents stored to a given root directory
824    pub async fn document_infos_disk(
825        data_root_dir: &Path,
826    ) -> Result<Option<Vec<DocumentInfo>>, PeermergeError> {
827        if let Some(state_wrapper) = PeermergeStateWrapper::open_disk(data_root_dir).await? {
828            let mut document_infos: Vec<DocumentInfo> = vec![];
829            for document_id_with_parents in &state_wrapper.state.document_ids {
830                let postfix = encode_document_id(&document_id_with_parents.document_id);
831                let document_data_root_dir = data_root_dir.join(postfix);
832                document_infos.push(Document::info_disk(&document_data_root_dir).await?);
833            }
834            Ok(Some(document_infos))
835        } else {
836            Ok(None)
837        }
838    }
839
840    /// Open peermerge stored to disk
841    pub async fn open_disk(options: OpenDiskOptions) -> Result<Self, PeermergeError> {
842        let mut document_secrets: HashMap<DocumentId, DocumentSecret> = HashMap::new();
843        for (document_id, document_secret) in options.document_secrets.unwrap_or_default() {
844            let document_secret = decode_document_secret(&document_secret)?;
845            document_secrets.insert(document_id, document_secret);
846        }
847        let state_wrapper = PeermergeStateWrapper::open_disk(&options.data_root_dir)
848            .await?
849            .expect("Not a valid peermerge directory");
850        let state = state_wrapper.state();
851        let peer_id = state.peer_id;
852        let default_peer_header = state.default_peer_header.clone();
853        let document_settings = state.document_settings.clone();
854        let documents: DashMap<DocumentId, Document<RandomAccessDisk, FeedDiskPersistence>> =
855            DashMap::new();
856        let mut state_events: Vec<StateEvent> = vec![];
857
858        // First order document ids so that parents come first so that enryption keys
859        // can be fetched from parents' meta docs
860        let mut document_ids_with_parents: Vec<DocumentIdWithParents> = state_wrapper
861            .state
862            .document_ids
863            .iter()
864            .filter(|document_id| document_id.parent_document_ids.is_empty())
865            .cloned()
866            .collect();
867        document_ids_with_parents.extend(
868            state_wrapper
869                .state
870                .document_ids
871                .iter()
872                .filter(|document_id| !document_id.parent_document_ids.is_empty())
873                .cloned(),
874        );
875        for document_id_with_parents in &state_wrapper.state.document_ids {
876            let document_id = &document_id_with_parents.document_id;
877            let postfix = encode_document_id(document_id);
878            let document_data_root_dir = options.data_root_dir.join(postfix);
879            let (document, document_state_events) = Document::open_disk(
880                peer_id,
881                &mut document_secrets,
882                &document_data_root_dir,
883                document_settings.clone(),
884            )
885            .await?;
886            state_events.extend(document_state_events);
887            documents.insert(*document_id, document);
888        }
889        let documents = Arc::new(documents);
890        let peermerge_state = Arc::new(Mutex::new(state_wrapper));
891        let mut state_event_sender = options.state_event_sender;
892        if let Some(state_event_sender) = state_event_sender.as_mut() {
893            send_state_events(state_event_sender, state_events, &peermerge_state).await;
894        }
895
896        Ok(Self {
897            peer_id,
898            default_peer_header,
899            prefix: options.data_root_dir,
900            peermerge_state,
901            documents,
902            protocols: Arc::new(DashMap::new()),
903            state_event_sender: Arc::new(Mutex::new(state_event_sender)),
904            document_settings,
905            reattach_secrets: None,
906        })
907    }
908
909    /// Create a new document on disk
910    pub async fn create_new_document_disk<F, O>(
911        &mut self,
912        options: CreateNewDocumentDiskOptions,
913        init_cb: F,
914        change_id: Option<Vec<u8>>,
915    ) -> Result<(DocumentInfo, O), PeermergeError>
916    where
917        F: FnOnce(&mut Transaction) -> Result<O, AutomergeError>,
918    {
919        let (parent_document, parent_id_signing_key_and_header) = self
920            .parent_document_info(options.parent_id, options.parent_header)
921            .await?;
922        let (create_result, init_result) = Document::create_new_disk(
923            self.peer_id,
924            &self.default_peer_header,
925            &options.document_type,
926            options.document_header,
927            options.encrypted,
928            parent_id_signing_key_and_header,
929            self.document_settings.clone(),
930            init_cb,
931            change_id,
932            &self.prefix,
933        )
934        .await?;
935        let (document, parent_id) = self
936            .process_new_document_result(create_result, parent_document)
937            .await?;
938        Ok((self.add_document(document, parent_id).await, init_result))
939    }
940
941    /// Attach existing document to disk
942    pub async fn attach_document_disk(
943        &mut self,
944        options: AttachDocumentDiskOptions,
945    ) -> Result<DocumentInfo, PeermergeError> {
946        let (parent_document, parent_id_signing_key_and_header) = self
947            .parent_document_info(options.parent_id, options.parent_header)
948            .await?;
949        let document_secret = options
950            .document_secret
951            .map(|secret| decode_document_secret(&secret))
952            .transpose()?;
953        let decoded_document_url = decode_doc_url(&options.document_url, &document_secret)?;
954        let attach_result = Document::attach_disk(
955            self.peer_id,
956            &self.default_peer_header,
957            decoded_document_url,
958            parent_id_signing_key_and_header.map(|value| DocumentParent::New {
959                parent_id: value.0,
960                signing_key: value.1,
961                parent_header: value.2,
962            }),
963            &self.prefix,
964            self.document_settings.clone(),
965        )
966        .await?;
967        let (document, parent_id) = self
968            .process_new_document_result(attach_result, parent_document)
969            .await?;
970        Ok(self.add_document(document, parent_id).await)
971    }
972
973    /// Connect peer protocol to disk
974    #[instrument(skip_all, fields(name = self.default_peer_header.name))]
975    pub async fn connect_protocol_disk<T>(
976        &mut self,
977        protocol_id: &[u8],
978        protocol: &mut FeedProtocol<T>,
979    ) -> Result<(), PeermergeError>
980    where
981        T: IO,
982    {
983        let (mut feed_event_sender, feed_event_receiver): (
984            UnboundedSender<FeedEvent>,
985            UnboundedReceiver<FeedEvent>,
986        ) = unbounded();
987        if self.state_event_sender.lock().await.is_none() {
988            return Err(PeermergeError::BadArgument {
989                context: "State event sender must be set before connecting protocol".to_string(),
990            });
991        };
992        if self.protocols.contains_key(protocol_id) {
993            return Err(PeermergeError::BadArgument {
994                context: format!("Protocol with id {protocol_id:?} already connected"),
995            });
996        }
997        let state_event_sender_for_task = self.state_event_sender.clone();
998        let documents_for_task = self.documents.clone();
999        let peemerge_state_for_task = self.peermerge_state.clone();
1000        let peer_id = self.peer_id;
1001        let default_peer_header = self.default_peer_header.clone();
1002        let document_settings = self.document_settings.clone();
1003        let prefix = self.prefix.clone();
1004        let task_span = tracing::debug_span!("call_on_feed_event_disk").or_current();
1005        task::spawn(async move {
1006            let _entered = task_span.enter();
1007            on_feed_event_disk(
1008                peer_id,
1009                default_peer_header,
1010                document_settings,
1011                &prefix,
1012                feed_event_receiver,
1013                state_event_sender_for_task,
1014                documents_for_task,
1015                peemerge_state_for_task,
1016            )
1017            .await;
1018        });
1019
1020        // Add protocol to map
1021        self.protocols
1022            .insert(protocol_id.to_vec(), protocol.commands());
1023
1024        // Start listening to protocol
1025        let result = on_protocol(
1026            self.peer_id,
1027            protocol,
1028            self.documents.clone(),
1029            &mut feed_event_sender,
1030        )
1031        .await;
1032
1033        // Remove protocol from map
1034        if self.protocols.contains_key(protocol_id) {
1035            self.protocols.remove(protocol_id);
1036        }
1037
1038        result
1039    }
1040}
1041
1042#[cfg(not(target_arch = "wasm32"))]
1043#[instrument(level = "debug", skip_all)]
1044#[allow(clippy::too_many_arguments)]
1045async fn on_feed_event_disk(
1046    peer_id: PeerId,
1047    default_peer_header: NameDescription,
1048    document_settings: DocumentSettings,
1049    prefix: &Path,
1050    mut feed_event_receiver: UnboundedReceiver<FeedEvent>,
1051    state_event_sender_mutex: Arc<Mutex<Option<UnboundedSender<StateEvent>>>>,
1052    mut documents: Arc<DashMap<DocumentId, Document<RandomAccessDisk, FeedDiskPersistence>>>,
1053    peermerge_state: Arc<Mutex<PeermergeStateWrapper<RandomAccessDisk>>>,
1054) {
1055    let mut state_event_sender: UnboundedSender<StateEvent> = {
1056        state_event_sender_mutex
1057            .lock()
1058            .await
1059            .clone()
1060            .expect("Should always be present")
1061    };
1062    while let Some(event) = feed_event_receiver.next().await {
1063        // The state event sender might change so that the other side closes
1064        if state_event_sender.is_closed() {
1065            if let Some(sender) = state_event_sender_mutex.lock().await.clone() {
1066                state_event_sender = sender;
1067            }
1068        }
1069        debug!("Received event {:?}", event);
1070        match event.content {
1071            FeedEventContent::NewFeedsBroadcasted { new_feeds } => {
1072                let mut document =
1073                    get_document_by_discovery_key(&documents, &event.doc_discovery_key)
1074                        .await
1075                        .unwrap();
1076                let state_events = document
1077                    .process_new_feeds_broadcasted_disk(new_feeds)
1078                    .await
1079                    .unwrap();
1080                if !state_events.is_empty() {
1081                    send_state_events(&mut state_event_sender, state_events, &peermerge_state)
1082                        .await;
1083                }
1084            }
1085            FeedEventContent::NewChildDocumentsBroadcasted {
1086                new_child_documents,
1087            } => {
1088                let mut parent_document =
1089                    get_document_by_discovery_key(&documents, &event.doc_discovery_key)
1090                        .await
1091                        .unwrap();
1092                let parent_id = parent_document.id();
1093                for mut new_child_document in new_child_documents {
1094                    if let Some(decoded_document_url) = parent_document
1095                        .merge_remote_child_document(&mut new_child_document)
1096                        .await
1097                        .unwrap()
1098                    {
1099                        let document_id = decoded_document_url.static_info.document_id;
1100                        // It is possible that this child document has multiple parents, and is already
1101                        // attached by another parent.
1102                        if !documents.contains_key(&document_id) {
1103                            let attach_result = Document::attach_disk(
1104                                peer_id,
1105                                &default_peer_header,
1106                                decoded_document_url,
1107                                Some(DocumentParent::Registered {
1108                                    child_document_info: new_child_document.clone(),
1109                                    parent_id,
1110                                }),
1111                                prefix,
1112                                document_settings.clone(),
1113                            )
1114                            .await
1115                            .unwrap();
1116
1117                            if !attach_result.state_events.is_empty() {
1118                                send_state_events(
1119                                    &mut state_event_sender,
1120                                    attach_result.state_events,
1121                                    &peermerge_state,
1122                                )
1123                                .await;
1124                            }
1125                            documents.insert(document_id, attach_result.document);
1126                        }
1127                        {
1128                            let mut state = peermerge_state.lock().await;
1129                            state
1130                                .add_document_id_to_state(document_id, Some(parent_id))
1131                                .await;
1132                        }
1133
1134                        // Finally, set child document to created to parent
1135                        parent_document
1136                            .set_child_document_created(&new_child_document)
1137                            .await
1138                            .unwrap();
1139                    }
1140                }
1141            }
1142            FeedEventContent::FeedMaxLengthReached { discovery_key } => {
1143                let mut document =
1144                    get_document_by_discovery_key(&documents, &event.doc_discovery_key)
1145                        .await
1146                        .unwrap();
1147                let state_events = document
1148                    .replace_write_feed_disk(&discovery_key)
1149                    .await
1150                    .unwrap();
1151                send_state_events(&mut state_event_sender, state_events, &peermerge_state).await;
1152            }
1153            _ => {
1154                process_feed_event(
1155                    event,
1156                    &mut state_event_sender,
1157                    &mut documents,
1158                    &peermerge_state,
1159                )
1160                .await
1161            }
1162        }
1163    }
1164    debug!("Exiting");
1165}
1166
1167//////////////////////////////////////////////////////
1168//
1169// Utilities
1170//
1171
1172async fn send_state_events<T>(
1173    state_event_sender: &mut UnboundedSender<StateEvent>,
1174    state_events: Vec<StateEvent>,
1175    peermerge_state: &Arc<Mutex<PeermergeStateWrapper<T>>>,
1176) where
1177    T: RandomAccess + Debug + Send + 'static,
1178{
1179    if !state_event_sender.is_closed() {
1180        for mut state_event in state_events {
1181            post_process_state_event(&mut state_event, peermerge_state).await;
1182            state_event_sender.unbounded_send(state_event).unwrap();
1183        }
1184    }
1185}
1186
1187#[instrument(level = "debug", skip_all)]
1188async fn process_feed_event<T, U>(
1189    event: FeedEvent,
1190    state_event_sender: &mut UnboundedSender<StateEvent>,
1191    documents: &mut Arc<DashMap<DocumentId, Document<T, U>>>,
1192    peermerge_state: &Arc<Mutex<PeermergeStateWrapper<T>>>,
1193) where
1194    T: RandomAccess + Debug + Send + 'static,
1195    U: FeedPersistence,
1196{
1197    match event.content {
1198        FeedEventContent::NewFeedsBroadcasted { .. } => {
1199            unreachable!("Implemented by concrete type")
1200        }
1201        FeedEventContent::NewChildDocumentsBroadcasted { .. } => {
1202            unreachable!("Implemented by concrete type")
1203        }
1204        FeedEventContent::FeedMaxLengthReached { .. } => {
1205            unreachable!("Implemented by concrete type")
1206        }
1207        FeedEventContent::FeedDisconnected { .. } => {
1208            // This is an FYI message, just continue for now
1209        }
1210        FeedEventContent::FeedVerified {
1211            peer_id,
1212            discovery_key,
1213            verified,
1214        } => {
1215            let document = get_document_by_discovery_key(documents, &event.doc_discovery_key)
1216                .await
1217                .unwrap();
1218            if verified {
1219                document.set_feed_verified(&discovery_key, &peer_id).await;
1220            } else {
1221                unimplemented!("TODO: Invalid feed deletion");
1222            }
1223        }
1224        FeedEventContent::RemoteFeedSynced {
1225            peer_id,
1226            discovery_key,
1227            contiguous_length,
1228        } => {
1229            let document = get_document_by_discovery_key(documents, &event.doc_discovery_key)
1230                .await
1231                .unwrap();
1232            let state_events = document
1233                .process_remote_feed_synced(peer_id, discovery_key, contiguous_length)
1234                .await;
1235            send_state_events(state_event_sender, state_events, peermerge_state).await;
1236        }
1237        FeedEventContent::FeedSynced {
1238            peer_id,
1239            discovery_key,
1240            contiguous_length,
1241        } => {
1242            let mut document = get_document_by_discovery_key(documents, &event.doc_discovery_key)
1243                .await
1244                .unwrap();
1245            let state_events = document
1246                .process_feed_synced(peer_id, discovery_key, contiguous_length)
1247                .await;
1248            send_state_events(state_event_sender, state_events, peermerge_state).await;
1249        }
1250    }
1251}
1252
1253async fn post_process_state_event<T>(
1254    state_event: &mut StateEvent,
1255    peermerge_state: &Arc<Mutex<PeermergeStateWrapper<T>>>,
1256) where
1257    T: RandomAccess + Debug + Send + 'static,
1258{
1259    if let StateEventContent::DocumentInitialized {
1260        child,
1261        ref mut parent_document_ids,
1262        ..
1263    } = state_event.content
1264    {
1265        if child && parent_document_ids.is_empty() {
1266            // Parents are saved in peermerge state
1267            let peermerge_state = peermerge_state.lock().await;
1268            parent_document_ids.extend(
1269                peermerge_state
1270                    .state
1271                    .document_ids
1272                    .iter()
1273                    .find(|id_with_parents| state_event.document_id == id_with_parents.document_id)
1274                    .unwrap()
1275                    .parent_document_ids
1276                    .clone(),
1277            );
1278        }
1279    }
1280}