radicle/cob/
store.rs

1//! Generic COB storage.
2#![allow(clippy::large_enum_variant)]
3#![allow(clippy::type_complexity)]
4use std::fmt::Debug;
5use std::marker::PhantomData;
6
7use nonempty::NonEmpty;
8use radicle_cob::CollaborativeObject;
9use serde::{Deserialize, Serialize};
10
11use crate::cob::op::Op;
12use crate::cob::{Create, Embed, EntryId, ObjectId, TypeName, Update, Updated, Uri, Version};
13use crate::git;
14use crate::node::device::Device;
15use crate::prelude::*;
16use crate::storage::git as storage;
17use crate::storage::SignRepository;
18use crate::{cob, identity};
19
20pub trait CobAction {
21    /// Parent objects this action depends on. For example, patch revisions
22    /// have the commit objects as their parent.
23    fn parents(&self) -> Vec<git::Oid> {
24        Vec::new()
25    }
26
27    /// The outcome of some actions are to be referred later.
28    /// For example, one action may create a comment, followed by another
29    /// action that may create a reply to the comment, referring to it.
30    /// Since actions are stored as part of [`crate::cob::op::Op`],
31    /// and operations are the smallest identifiable units,
32    /// this may lead to ambiguity.
33    /// It would not be possible to to, say, address one particular comment out
34    /// of two, if the corresponding actions of creations were part of the
35    /// same operation.
36    /// To help avoid this, implementations signal whether specific actions
37    /// require "their own" identifier.
38    /// This allows checking for multiple such actions before creating an
39    /// operation.
40    fn produces_identifier(&self) -> bool {
41        false
42    }
43}
44
45/// A collaborative object. Can be materialized from an operation history.
46pub trait Cob: Sized {
47    /// The underlying action composing each operation.
48    type Action: CobAction + for<'de> Deserialize<'de> + Serialize;
49    /// Error returned by `apply` function.
50    type Error: std::error::Error + Send + Sync + 'static;
51
52    /// Initialize a collaborative object from a root operation.
53    fn from_root<R: ReadRepository>(op: Op<Self::Action>, repo: &R) -> Result<Self, Self::Error>;
54
55    /// Apply an operation to the state.
56    fn op<'a, R: ReadRepository, I: IntoIterator<Item = &'a cob::Entry>>(
57        &mut self,
58        op: Op<Self::Action>,
59        concurrent: I,
60        repo: &R,
61    ) -> Result<(), <Self as Cob>::Error>;
62
63    #[cfg(test)]
64    /// Create an object from a history.
65    fn from_history<R: ReadRepository>(
66        history: &crate::cob::History,
67        repo: &R,
68    ) -> Result<Self, test::HistoryError<Self>>
69    where
70        Self: CobWithType,
71    {
72        test::from_history::<R, Self>(history, repo)
73    }
74
75    #[cfg(test)]
76    /// Create an object from individual operations.
77    /// Returns an error if any of the operations fails to apply.
78    fn from_ops<R: ReadRepository>(
79        ops: impl IntoIterator<Item = Op<Self::Action>>,
80        repo: &R,
81    ) -> Result<Self, Self::Error> {
82        let mut ops = ops.into_iter();
83        let Some(init) = ops.next() else {
84            panic!("FromHistory::from_ops: operations list is empty");
85        };
86        let mut state = Self::from_root(init, repo)?;
87        for op in ops {
88            state.op(op, [].into_iter(), repo)?;
89        }
90        Ok(state)
91    }
92}
93
94/// Implementations are statically associated with a particular
95/// type name of a collaborative object.
96///
97/// In most cases, this trait should be used in tandem with [`Cob`].
98pub trait CobWithType {
99    /// The type name of the collaborative object type which backs this implementation.
100    fn type_name() -> &'static TypeName;
101}
102
103/// Store error.
104#[derive(Debug, thiserror::Error)]
105pub enum Error {
106    #[error("create error: {0}")]
107    Create(#[from] cob::error::Create),
108    #[error("update error: {0}")]
109    Update(#[from] cob::error::Update),
110    #[error("retrieve error: {0}")]
111    Retrieve(#[from] cob::error::Retrieve),
112    #[error("remove error: {0}")]
113    Remove(#[from] cob::error::Remove),
114    #[error(transparent)]
115    Identity(#[from] identity::doc::DocError),
116    #[error(transparent)]
117    Serialize(#[from] serde_json::Error),
118    #[error("object `{1}` of type `{0}` was not found")]
119    NotFound(TypeName, ObjectId),
120    #[error("signed refs: {0}")]
121    SignRefs(Box<storage::RepositoryError>),
122    #[error("invalid or unknown embed URI: {0}")]
123    EmbedUri(Uri),
124    #[error(transparent)]
125    Git(git::raw::Error),
126    #[error("failed to find reference '{name}': {err}")]
127    RefLookup {
128        name: git::RefString,
129        #[source]
130        err: git::raw::Error,
131    },
132    #[error("transaction already contains action {0} which produces an identifier, denying to add action {1} which also produces an identifier")]
133    ClashingIdentifiers(String, String),
134}
135
136/// Storage for collaborative objects of a specific type `T` in a single repository.
137pub struct Store<'a, T, R> {
138    identity: Option<git::Oid>,
139    repo: &'a R,
140    witness: PhantomData<T>,
141    type_name: &'a TypeName,
142}
143
144impl<T, R> AsRef<R> for Store<'_, T, R> {
145    fn as_ref(&self) -> &R {
146        self.repo
147    }
148}
149
150impl<'a, T, R> Store<'a, T, R>
151where
152    R: ReadRepository + cob::Store,
153{
154    /// Open a new generic store.
155    pub fn open_for(type_name: &'a TypeName, repo: &'a R) -> Result<Self, Error> {
156        Ok(Self {
157            repo,
158            identity: None,
159            witness: PhantomData,
160            type_name,
161        })
162    }
163
164    /// Return a new store with the attached identity.
165    pub fn identity(self, identity: git::Oid) -> Self {
166        Self {
167            repo: self.repo,
168            witness: self.witness,
169            identity: Some(identity),
170            type_name: self.type_name,
171        }
172    }
173}
174
175impl<'a, T, R> Store<'a, T, R>
176where
177    R: ReadRepository + cob::Store<Namespace = NodeId>,
178    T: CobWithType,
179{
180    /// Open a new generic store.
181    pub fn open(repo: &'a R) -> Result<Self, Error> {
182        Ok(Self {
183            repo,
184            identity: None,
185            witness: PhantomData,
186            type_name: T::type_name(),
187        })
188    }
189}
190
191impl<T, R> Store<'_, T, R>
192where
193    R: ReadRepository + cob::Store<Namespace = NodeId>,
194    T: Cob + cob::Evaluate<R>,
195{
196    pub fn transaction(
197        &self,
198        actions: Vec<T::Action>,
199        embeds: Vec<Embed<Uri>>,
200    ) -> Transaction<T, R> {
201        Transaction::new(self.type_name.clone(), actions, embeds)
202    }
203}
204
205impl<T, R> Store<'_, T, R>
206where
207    R: ReadRepository + SignRepository + cob::Store<Namespace = NodeId>,
208    T: Cob + cob::Evaluate<R>,
209    T::Action: Serialize,
210{
211    /// Update an object.
212    pub fn update<G>(
213        &self,
214        type_name: &TypeName,
215        object_id: ObjectId,
216        message: &str,
217        actions: impl Into<NonEmpty<T::Action>>,
218        embeds: Vec<Embed<Uri>>,
219        signer: &Device<G>,
220    ) -> Result<Updated<T>, Error>
221    where
222        G: crypto::signature::Signer<crypto::Signature>,
223    {
224        let actions = actions.into();
225        let related = actions.iter().flat_map(T::Action::parents).collect();
226        let changes = actions.try_map(encoding::encode)?;
227        let embeds = embeds
228            .into_iter()
229            .map(|e| {
230                Ok::<_, Error>(Embed {
231                    content: git::Oid::try_from(&e.content).map_err(Error::EmbedUri)?,
232                    name: e.name.clone(),
233                })
234            })
235            .collect::<Result<_, _>>()?;
236        let updated = cob::update(
237            self.repo,
238            signer,
239            self.identity,
240            related,
241            signer.public_key(),
242            Update {
243                object_id,
244                type_name: type_name.clone(),
245                message: message.to_owned(),
246                embeds,
247                changes,
248            },
249        )?;
250        self.repo
251            .sign_refs(signer)
252            .map_err(|e| Error::SignRefs(Box::new(e)))?;
253
254        Ok(updated)
255    }
256
257    /// Create an object.
258    pub fn create<G>(
259        &self,
260        message: &str,
261        actions: impl Into<NonEmpty<T::Action>>,
262        embeds: Vec<Embed<Uri>>,
263        signer: &Device<G>,
264    ) -> Result<(ObjectId, T), Error>
265    where
266        G: crypto::signature::Signer<crypto::Signature>,
267    {
268        let actions = actions.into();
269        let parents = actions.iter().flat_map(T::Action::parents).collect();
270        let contents = actions.try_map(encoding::encode)?;
271        let embeds = embeds
272            .into_iter()
273            .map(|e| {
274                Ok::<_, Error>(Embed {
275                    content: git::Oid::try_from(&e.content).map_err(Error::EmbedUri)?,
276                    name: e.name.clone(),
277                })
278            })
279            .collect::<Result<_, _>>()?;
280        let cob = cob::create::<T, _, _>(
281            self.repo,
282            signer,
283            self.identity,
284            parents,
285            signer.public_key(),
286            Create {
287                type_name: self.type_name.clone(),
288                version: Version::default(),
289                message: message.to_owned(),
290                embeds,
291                contents,
292            },
293        )?;
294        // Nb. We can't sign our refs before the identity refs exist, which are created after
295        // the identity COB is created. Therefore we manually sign refs when creating identity
296        // COBs.
297        if self.type_name != &*crate::cob::identity::TYPENAME {
298            self.repo
299                .sign_refs(signer)
300                .map_err(|e| Error::SignRefs(Box::new(e)))?;
301        }
302        Ok((*cob.id(), cob.object))
303    }
304
305    /// Remove an object.
306    pub fn remove<G>(&self, id: &ObjectId, signer: &Device<G>) -> Result<(), Error>
307    where
308        G: crypto::signature::Signer<crypto::Signature>,
309    {
310        let name = git::refs::storage::cob(signer.public_key(), self.type_name, id);
311        match self
312            .repo
313            .reference_oid(signer.public_key(), &name.strip_namespace())
314        {
315            Ok(_) => {
316                cob::remove(self.repo, signer.public_key(), self.type_name, id)?;
317                self.repo
318                    .sign_refs(signer)
319                    .map_err(|e| Error::SignRefs(Box::new(e)))?;
320                Ok(())
321            }
322            Err(err) if err.code() == git::raw::ErrorCode::NotFound => Ok(()),
323            Err(err) => Err(Error::RefLookup {
324                name: name.to_ref_string(),
325                err,
326            }),
327        }
328    }
329}
330
331impl<'a, T, R> Store<'a, T, R>
332where
333    R: ReadRepository + cob::Store,
334    T: Cob + cob::Evaluate<R> + 'a,
335    T::Action: Serialize,
336{
337    /// Get an object.
338    pub fn get(&self, id: &ObjectId) -> Result<Option<T>, Error> {
339        cob::get::<T, _>(self.repo, self.type_name, id)
340            .map(|r| r.map(|cob| cob.object))
341            .map_err(Error::from)
342    }
343
344    /// Return all objects.
345    pub fn all(
346        &self,
347    ) -> Result<impl ExactSizeIterator<Item = Result<(ObjectId, T), Error>> + 'a, Error> {
348        let raw = cob::list::<T, _>(self.repo, self.type_name)?;
349
350        Ok(raw.into_iter().map(|o| Ok((*o.id(), o.object))))
351    }
352
353    /// Return true if the list of issues is empty.
354    pub fn is_empty(&self) -> Result<bool, Error> {
355        Ok(self.count()? == 0)
356    }
357
358    /// Return objects count.
359    pub fn count(&self) -> Result<usize, Error> {
360        let raw = cob::list::<T, _>(self.repo, self.type_name)?;
361
362        Ok(raw.len())
363    }
364}
365
366/// Allows operations to be batched atomically.
367#[derive(Debug)]
368pub struct Transaction<T: Cob + cob::Evaluate<R>, R> {
369    actions: Vec<T::Action>,
370    embeds: Vec<Embed<Uri>>,
371
372    // Internal state kept for validation of the transaction.
373    // If an action that produces an identifier is added to
374    // the transaction, then this will track its index,
375    // so that adding a second action that produces an identifier
376    // can fail with a useful error.
377    produces_identifier: Option<usize>,
378
379    repo: PhantomData<R>,
380    type_name: TypeName,
381}
382
383impl<T: Cob + CobWithType + cob::Evaluate<R>, R> Default for Transaction<T, R> {
384    fn default() -> Self {
385        Self {
386            actions: Vec::new(),
387            embeds: Vec::new(),
388            produces_identifier: None,
389            repo: PhantomData,
390            type_name: T::type_name().clone(),
391        }
392    }
393}
394
395impl<T, R> Transaction<T, R>
396where
397    T: Cob + cob::Evaluate<R>,
398{
399    pub fn new(type_name: TypeName, actions: Vec<T::Action>, embeds: Vec<Embed<Uri>>) -> Self {
400        Self {
401            actions,
402            embeds,
403            produces_identifier: None,
404            repo: PhantomData,
405            type_name,
406        }
407    }
408}
409
410impl<T, R> Transaction<T, R>
411where
412    T: Cob + CobWithType + cob::Evaluate<R>,
413{
414    /// Create a new transaction to be used as the initial set of operations for a COB.
415    pub fn initial<G, F, Tx>(
416        message: &str,
417        store: &mut Store<T, R>,
418        signer: &Device<G>,
419        operations: F,
420    ) -> Result<(ObjectId, T), Error>
421    where
422        Tx: From<Self>,
423        Self: From<Tx>,
424        G: crypto::signature::Signer<crypto::Signature>,
425        F: FnOnce(&mut Tx, &R) -> Result<(), Error>,
426        R: ReadRepository + SignRepository + cob::Store<Namespace = NodeId>,
427        T::Action: Serialize + Clone,
428    {
429        let mut tx = Tx::from(Transaction::default());
430        operations(&mut tx, store.as_ref())?;
431        let tx = Self::from(tx);
432
433        let actions = NonEmpty::from_vec(tx.actions)
434            .expect("Transaction::initial: transaction must contain at least one action");
435
436        store.create(message, actions, tx.embeds, signer)
437    }
438}
439
440impl<T, R> Transaction<T, R>
441where
442    T: Cob + cob::Evaluate<R>,
443{
444    /// Add an action to this transaction.
445    pub fn push(&mut self, action: T::Action) -> Result<(), Error> {
446        if action.produces_identifier() {
447            if let Some(index) = self.produces_identifier {
448                return Err(Error::ClashingIdentifiers(
449                    serde_json::to_string(&self.actions[index])?,
450                    serde_json::to_string(&action)?,
451                ));
452            } else {
453                self.produces_identifier = Some(self.actions.len())
454            }
455        }
456
457        self.actions.push(action);
458
459        Ok(())
460    }
461
462    /// Add actions to this transaction.
463    /// Note that we cannot implement [`std::iter::Extend`] because [`Self::push`]
464    /// validates the action being pushed, and therefore is falliable.
465    pub fn extend<I: IntoIterator<Item = T::Action>>(&mut self, actions: I) -> Result<(), Error> {
466        for action in actions {
467            self.push(action)?;
468        }
469        Ok(())
470    }
471
472    /// Embed media into the transaction.
473    pub fn embed(&mut self, embeds: impl IntoIterator<Item = Embed<Uri>>) -> Result<(), Error> {
474        self.embeds.extend(embeds);
475
476        Ok(())
477    }
478
479    /// Commit transaction.
480    ///
481    /// Returns an operation that can be applied onto an in-memory state.
482    pub fn commit<G>(
483        self,
484        msg: &str,
485        id: ObjectId,
486        store: &mut Store<T, R>,
487        signer: &Device<G>,
488    ) -> Result<(T, EntryId), Error>
489    where
490        R: ReadRepository + SignRepository + cob::Store<Namespace = NodeId>,
491        T::Action: Serialize + Clone,
492        G: crypto::signature::Signer<crypto::Signature>,
493    {
494        let actions = NonEmpty::from_vec(self.actions)
495            .expect("Transaction::commit: transaction must not be empty");
496        let Updated {
497            head,
498            object: CollaborativeObject { object, .. },
499            ..
500        } = store.update(&self.type_name, id, msg, actions, self.embeds, signer)?;
501
502        Ok((object, head))
503    }
504}
505
506/// Get an object's operations without decoding them.
507pub fn ops<R: cob::Store>(
508    id: &ObjectId,
509    type_name: &TypeName,
510    repo: &R,
511) -> Result<NonEmpty<Op<Vec<u8>>>, Error> {
512    let cob = cob::get::<NonEmpty<cob::Entry>, _>(repo, type_name, id)?;
513
514    if let Some(cob) = cob {
515        Ok(cob.object.map(Op::from))
516    } else {
517        Err(Error::NotFound(type_name.clone(), *id))
518    }
519}
520
521pub mod encoding {
522    use serde::Serialize;
523
524    use crate::canonical::formatter::CanonicalFormatter;
525
526    /// Serialize the change into a byte string.
527    pub fn encode<A: Serialize>(action: A) -> Result<Vec<u8>, serde_json::Error> {
528        let mut buf = Vec::new();
529        let mut serializer =
530            serde_json::Serializer::with_formatter(&mut buf, CanonicalFormatter::new());
531
532        action.serialize(&mut serializer)?;
533
534        Ok(buf)
535    }
536}
537
538#[cfg(test)]
539pub mod test {
540    use super::*;
541
542    #[derive(Debug, thiserror::Error)]
543    pub enum HistoryError<T: Cob> {
544        #[error("apply: {0}")]
545        Apply(T::Error),
546        #[error("operation decoding failed: {0}")]
547        Op(#[from] cob::op::OpEncodingError),
548    }
549
550    /// Turn a history into a concrete type, by traversing the history and applying each operation
551    /// to the state, skipping branches that return errors.
552    pub fn from_history<R: ReadRepository, T: Cob + CobWithType>(
553        history: &crate::cob::History,
554        repo: &R,
555    ) -> Result<T, HistoryError<T>> {
556        use std::ops::ControlFlow;
557
558        let root = history.root();
559        let children = history.children_of(root.id());
560        let op = Op::try_from(root)?;
561        let initial = T::from_root(op, repo).map_err(HistoryError::Apply)?;
562        let obj = history.traverse(initial, &children, |mut acc, _, entry| {
563            match Op::try_from(entry) {
564                Ok(op) => {
565                    if let Err(err) = acc.op(op, [], repo) {
566                        log::warn!("Error applying op to `{}` state: {err}", T::type_name());
567                        return ControlFlow::Break(acc);
568                    }
569                }
570                Err(err) => {
571                    log::warn!("Error decoding ops for `{}` state: {err}", T::type_name());
572                    return ControlFlow::Break(acc);
573                }
574            }
575            ControlFlow::Continue(acc)
576        });
577
578        Ok(obj)
579    }
580}