1#![allow(clippy::large_enum_variant)]
3#![allow(clippy::type_complexity)]
4use std::fmt::Debug;
5use std::marker::PhantomData;
6
7use nonempty::NonEmpty;
8use radicle_cob::CollaborativeObject;
9use serde::{Deserialize, Serialize};
10
11use crate::cob::op::Op;
12use crate::cob::{Create, Embed, EntryId, ObjectId, TypeName, Update, Updated, Uri, Version};
13use crate::git;
14use crate::node::device::Device;
15use crate::prelude::*;
16use crate::storage::git as storage;
17use crate::storage::SignRepository;
18use crate::{cob, identity};
19
20pub trait CobAction {
21 fn parents(&self) -> Vec<git::Oid> {
24 Vec::new()
25 }
26
27 fn produces_identifier(&self) -> bool {
41 false
42 }
43}
44
45pub trait Cob: Sized {
47 type Action: CobAction + for<'de> Deserialize<'de> + Serialize;
49 type Error: std::error::Error + Send + Sync + 'static;
51
52 fn from_root<R: ReadRepository>(op: Op<Self::Action>, repo: &R) -> Result<Self, Self::Error>;
54
55 fn op<'a, R: ReadRepository, I: IntoIterator<Item = &'a cob::Entry>>(
57 &mut self,
58 op: Op<Self::Action>,
59 concurrent: I,
60 repo: &R,
61 ) -> Result<(), <Self as Cob>::Error>;
62
63 #[cfg(test)]
64 fn from_history<R: ReadRepository>(
66 history: &crate::cob::History,
67 repo: &R,
68 ) -> Result<Self, test::HistoryError<Self>>
69 where
70 Self: CobWithType,
71 {
72 test::from_history::<R, Self>(history, repo)
73 }
74
75 #[cfg(test)]
76 fn from_ops<R: ReadRepository>(
79 ops: impl IntoIterator<Item = Op<Self::Action>>,
80 repo: &R,
81 ) -> Result<Self, Self::Error> {
82 let mut ops = ops.into_iter();
83 let Some(init) = ops.next() else {
84 panic!("FromHistory::from_ops: operations list is empty");
85 };
86 let mut state = Self::from_root(init, repo)?;
87 for op in ops {
88 state.op(op, [].into_iter(), repo)?;
89 }
90 Ok(state)
91 }
92}
93
94pub trait CobWithType {
99 fn type_name() -> &'static TypeName;
101}
102
103#[derive(Debug, thiserror::Error)]
105pub enum Error {
106 #[error("create error: {0}")]
107 Create(#[from] cob::error::Create),
108 #[error("update error: {0}")]
109 Update(#[from] cob::error::Update),
110 #[error("retrieve error: {0}")]
111 Retrieve(#[from] cob::error::Retrieve),
112 #[error("remove error: {0}")]
113 Remove(#[from] cob::error::Remove),
114 #[error(transparent)]
115 Identity(#[from] identity::doc::DocError),
116 #[error(transparent)]
117 Serialize(#[from] serde_json::Error),
118 #[error("object `{1}` of type `{0}` was not found")]
119 NotFound(TypeName, ObjectId),
120 #[error("signed refs: {0}")]
121 SignRefs(Box<storage::RepositoryError>),
122 #[error("invalid or unknown embed URI: {0}")]
123 EmbedUri(Uri),
124 #[error(transparent)]
125 Git(git::raw::Error),
126 #[error("failed to find reference '{name}': {err}")]
127 RefLookup {
128 name: git::RefString,
129 #[source]
130 err: git::raw::Error,
131 },
132 #[error("transaction already contains action {0} which produces an identifier, denying to add action {1} which also produces an identifier")]
133 ClashingIdentifiers(String, String),
134}
135
136pub struct Store<'a, T, R> {
138 identity: Option<git::Oid>,
139 repo: &'a R,
140 witness: PhantomData<T>,
141 type_name: &'a TypeName,
142}
143
144impl<T, R> AsRef<R> for Store<'_, T, R> {
145 fn as_ref(&self) -> &R {
146 self.repo
147 }
148}
149
150impl<'a, T, R> Store<'a, T, R>
151where
152 R: ReadRepository + cob::Store,
153{
154 pub fn open_for(type_name: &'a TypeName, repo: &'a R) -> Result<Self, Error> {
156 Ok(Self {
157 repo,
158 identity: None,
159 witness: PhantomData,
160 type_name,
161 })
162 }
163
164 pub fn identity(self, identity: git::Oid) -> Self {
166 Self {
167 repo: self.repo,
168 witness: self.witness,
169 identity: Some(identity),
170 type_name: self.type_name,
171 }
172 }
173}
174
175impl<'a, T, R> Store<'a, T, R>
176where
177 R: ReadRepository + cob::Store<Namespace = NodeId>,
178 T: CobWithType,
179{
180 pub fn open(repo: &'a R) -> Result<Self, Error> {
182 Ok(Self {
183 repo,
184 identity: None,
185 witness: PhantomData,
186 type_name: T::type_name(),
187 })
188 }
189}
190
191impl<T, R> Store<'_, T, R>
192where
193 R: ReadRepository + cob::Store<Namespace = NodeId>,
194 T: Cob + cob::Evaluate<R>,
195{
196 pub fn transaction(
197 &self,
198 actions: Vec<T::Action>,
199 embeds: Vec<Embed<Uri>>,
200 ) -> Transaction<T, R> {
201 Transaction::new(self.type_name.clone(), actions, embeds)
202 }
203}
204
205impl<T, R> Store<'_, T, R>
206where
207 R: ReadRepository + SignRepository + cob::Store<Namespace = NodeId>,
208 T: Cob + cob::Evaluate<R>,
209 T::Action: Serialize,
210{
211 pub fn update<G>(
213 &self,
214 type_name: &TypeName,
215 object_id: ObjectId,
216 message: &str,
217 actions: impl Into<NonEmpty<T::Action>>,
218 embeds: Vec<Embed<Uri>>,
219 signer: &Device<G>,
220 ) -> Result<Updated<T>, Error>
221 where
222 G: crypto::signature::Signer<crypto::Signature>,
223 {
224 let actions = actions.into();
225 let related = actions.iter().flat_map(T::Action::parents).collect();
226 let changes = actions.try_map(encoding::encode)?;
227 let embeds = embeds
228 .into_iter()
229 .map(|e| {
230 Ok::<_, Error>(Embed {
231 content: git::Oid::try_from(&e.content).map_err(Error::EmbedUri)?,
232 name: e.name.clone(),
233 })
234 })
235 .collect::<Result<_, _>>()?;
236 let updated = cob::update(
237 self.repo,
238 signer,
239 self.identity,
240 related,
241 signer.public_key(),
242 Update {
243 object_id,
244 type_name: type_name.clone(),
245 message: message.to_owned(),
246 embeds,
247 changes,
248 },
249 )?;
250 self.repo
251 .sign_refs(signer)
252 .map_err(|e| Error::SignRefs(Box::new(e)))?;
253
254 Ok(updated)
255 }
256
257 pub fn create<G>(
259 &self,
260 message: &str,
261 actions: impl Into<NonEmpty<T::Action>>,
262 embeds: Vec<Embed<Uri>>,
263 signer: &Device<G>,
264 ) -> Result<(ObjectId, T), Error>
265 where
266 G: crypto::signature::Signer<crypto::Signature>,
267 {
268 let actions = actions.into();
269 let parents = actions.iter().flat_map(T::Action::parents).collect();
270 let contents = actions.try_map(encoding::encode)?;
271 let embeds = embeds
272 .into_iter()
273 .map(|e| {
274 Ok::<_, Error>(Embed {
275 content: git::Oid::try_from(&e.content).map_err(Error::EmbedUri)?,
276 name: e.name.clone(),
277 })
278 })
279 .collect::<Result<_, _>>()?;
280 let cob = cob::create::<T, _, _>(
281 self.repo,
282 signer,
283 self.identity,
284 parents,
285 signer.public_key(),
286 Create {
287 type_name: self.type_name.clone(),
288 version: Version::default(),
289 message: message.to_owned(),
290 embeds,
291 contents,
292 },
293 )?;
294 if self.type_name != &*crate::cob::identity::TYPENAME {
298 self.repo
299 .sign_refs(signer)
300 .map_err(|e| Error::SignRefs(Box::new(e)))?;
301 }
302 Ok((*cob.id(), cob.object))
303 }
304
305 pub fn remove<G>(&self, id: &ObjectId, signer: &Device<G>) -> Result<(), Error>
307 where
308 G: crypto::signature::Signer<crypto::Signature>,
309 {
310 let name = git::refs::storage::cob(signer.public_key(), self.type_name, id);
311 match self
312 .repo
313 .reference_oid(signer.public_key(), &name.strip_namespace())
314 {
315 Ok(_) => {
316 cob::remove(self.repo, signer.public_key(), self.type_name, id)?;
317 self.repo
318 .sign_refs(signer)
319 .map_err(|e| Error::SignRefs(Box::new(e)))?;
320 Ok(())
321 }
322 Err(err) if err.code() == git::raw::ErrorCode::NotFound => Ok(()),
323 Err(err) => Err(Error::RefLookup {
324 name: name.to_ref_string(),
325 err,
326 }),
327 }
328 }
329}
330
331impl<'a, T, R> Store<'a, T, R>
332where
333 R: ReadRepository + cob::Store,
334 T: Cob + cob::Evaluate<R> + 'a,
335 T::Action: Serialize,
336{
337 pub fn get(&self, id: &ObjectId) -> Result<Option<T>, Error> {
339 cob::get::<T, _>(self.repo, self.type_name, id)
340 .map(|r| r.map(|cob| cob.object))
341 .map_err(Error::from)
342 }
343
344 pub fn all(
346 &self,
347 ) -> Result<impl ExactSizeIterator<Item = Result<(ObjectId, T), Error>> + 'a, Error> {
348 let raw = cob::list::<T, _>(self.repo, self.type_name)?;
349
350 Ok(raw.into_iter().map(|o| Ok((*o.id(), o.object))))
351 }
352
353 pub fn is_empty(&self) -> Result<bool, Error> {
355 Ok(self.count()? == 0)
356 }
357
358 pub fn count(&self) -> Result<usize, Error> {
360 let raw = cob::list::<T, _>(self.repo, self.type_name)?;
361
362 Ok(raw.len())
363 }
364}
365
366#[derive(Debug)]
368pub struct Transaction<T: Cob + cob::Evaluate<R>, R> {
369 actions: Vec<T::Action>,
370 embeds: Vec<Embed<Uri>>,
371
372 produces_identifier: Option<usize>,
378
379 repo: PhantomData<R>,
380 type_name: TypeName,
381}
382
383impl<T: Cob + CobWithType + cob::Evaluate<R>, R> Default for Transaction<T, R> {
384 fn default() -> Self {
385 Self {
386 actions: Vec::new(),
387 embeds: Vec::new(),
388 produces_identifier: None,
389 repo: PhantomData,
390 type_name: T::type_name().clone(),
391 }
392 }
393}
394
395impl<T, R> Transaction<T, R>
396where
397 T: Cob + cob::Evaluate<R>,
398{
399 pub fn new(type_name: TypeName, actions: Vec<T::Action>, embeds: Vec<Embed<Uri>>) -> Self {
400 Self {
401 actions,
402 embeds,
403 produces_identifier: None,
404 repo: PhantomData,
405 type_name,
406 }
407 }
408}
409
410impl<T, R> Transaction<T, R>
411where
412 T: Cob + CobWithType + cob::Evaluate<R>,
413{
414 pub fn initial<G, F, Tx>(
416 message: &str,
417 store: &mut Store<T, R>,
418 signer: &Device<G>,
419 operations: F,
420 ) -> Result<(ObjectId, T), Error>
421 where
422 Tx: From<Self>,
423 Self: From<Tx>,
424 G: crypto::signature::Signer<crypto::Signature>,
425 F: FnOnce(&mut Tx, &R) -> Result<(), Error>,
426 R: ReadRepository + SignRepository + cob::Store<Namespace = NodeId>,
427 T::Action: Serialize + Clone,
428 {
429 let mut tx = Tx::from(Transaction::default());
430 operations(&mut tx, store.as_ref())?;
431 let tx = Self::from(tx);
432
433 let actions = NonEmpty::from_vec(tx.actions)
434 .expect("Transaction::initial: transaction must contain at least one action");
435
436 store.create(message, actions, tx.embeds, signer)
437 }
438}
439
440impl<T, R> Transaction<T, R>
441where
442 T: Cob + cob::Evaluate<R>,
443{
444 pub fn push(&mut self, action: T::Action) -> Result<(), Error> {
446 if action.produces_identifier() {
447 if let Some(index) = self.produces_identifier {
448 return Err(Error::ClashingIdentifiers(
449 serde_json::to_string(&self.actions[index])?,
450 serde_json::to_string(&action)?,
451 ));
452 } else {
453 self.produces_identifier = Some(self.actions.len())
454 }
455 }
456
457 self.actions.push(action);
458
459 Ok(())
460 }
461
462 pub fn extend<I: IntoIterator<Item = T::Action>>(&mut self, actions: I) -> Result<(), Error> {
466 for action in actions {
467 self.push(action)?;
468 }
469 Ok(())
470 }
471
472 pub fn embed(&mut self, embeds: impl IntoIterator<Item = Embed<Uri>>) -> Result<(), Error> {
474 self.embeds.extend(embeds);
475
476 Ok(())
477 }
478
479 pub fn commit<G>(
483 self,
484 msg: &str,
485 id: ObjectId,
486 store: &mut Store<T, R>,
487 signer: &Device<G>,
488 ) -> Result<(T, EntryId), Error>
489 where
490 R: ReadRepository + SignRepository + cob::Store<Namespace = NodeId>,
491 T::Action: Serialize + Clone,
492 G: crypto::signature::Signer<crypto::Signature>,
493 {
494 let actions = NonEmpty::from_vec(self.actions)
495 .expect("Transaction::commit: transaction must not be empty");
496 let Updated {
497 head,
498 object: CollaborativeObject { object, .. },
499 ..
500 } = store.update(&self.type_name, id, msg, actions, self.embeds, signer)?;
501
502 Ok((object, head))
503 }
504}
505
506pub fn ops<R: cob::Store>(
508 id: &ObjectId,
509 type_name: &TypeName,
510 repo: &R,
511) -> Result<NonEmpty<Op<Vec<u8>>>, Error> {
512 let cob = cob::get::<NonEmpty<cob::Entry>, _>(repo, type_name, id)?;
513
514 if let Some(cob) = cob {
515 Ok(cob.object.map(Op::from))
516 } else {
517 Err(Error::NotFound(type_name.clone(), *id))
518 }
519}
520
521pub mod encoding {
522 use serde::Serialize;
523
524 use crate::canonical::formatter::CanonicalFormatter;
525
526 pub fn encode<A: Serialize>(action: A) -> Result<Vec<u8>, serde_json::Error> {
528 let mut buf = Vec::new();
529 let mut serializer =
530 serde_json::Serializer::with_formatter(&mut buf, CanonicalFormatter::new());
531
532 action.serialize(&mut serializer)?;
533
534 Ok(buf)
535 }
536}
537
538#[cfg(test)]
539pub mod test {
540 use super::*;
541
542 #[derive(Debug, thiserror::Error)]
543 pub enum HistoryError<T: Cob> {
544 #[error("apply: {0}")]
545 Apply(T::Error),
546 #[error("operation decoding failed: {0}")]
547 Op(#[from] cob::op::OpEncodingError),
548 }
549
550 pub fn from_history<R: ReadRepository, T: Cob + CobWithType>(
553 history: &crate::cob::History,
554 repo: &R,
555 ) -> Result<T, HistoryError<T>> {
556 use std::ops::ControlFlow;
557
558 let root = history.root();
559 let children = history.children_of(root.id());
560 let op = Op::try_from(root)?;
561 let initial = T::from_root(op, repo).map_err(HistoryError::Apply)?;
562 let obj = history.traverse(initial, &children, |mut acc, _, entry| {
563 match Op::try_from(entry) {
564 Ok(op) => {
565 if let Err(err) = acc.op(op, [], repo) {
566 log::warn!("Error applying op to `{}` state: {err}", T::type_name());
567 return ControlFlow::Break(acc);
568 }
569 }
570 Err(err) => {
571 log::warn!("Error decoding ops for `{}` state: {err}", T::type_name());
572 return ControlFlow::Break(acc);
573 }
574 }
575 ControlFlow::Continue(acc)
576 });
577
578 Ok(obj)
579 }
580}