1#![allow(missing_docs)]
16
17use std::any::Any;
18use std::collections::HashSet;
19use std::fmt::Debug;
20use std::sync::Arc;
21
22use itertools::Itertools as _;
23use thiserror::Error;
24
25use crate::dag_walk;
26use crate::op_store::OpStore;
27use crate::op_store::OpStoreError;
28use crate::op_store::OperationId;
29use crate::operation::Operation;
30
31#[derive(Debug, Error)]
32pub enum OpHeadsStoreError {
33 #[error("Failed to read operation heads")]
34 Read(#[source] Box<dyn std::error::Error + Send + Sync>),
35 #[error("Failed to record operation head {new_op_id}")]
36 Write {
37 new_op_id: OperationId,
38 source: Box<dyn std::error::Error + Send + Sync>,
39 },
40 #[error("Failed to lock operation heads store")]
41 Lock(#[source] Box<dyn std::error::Error + Send + Sync>),
42}
43
44#[derive(Debug, Error)]
45pub enum OpHeadResolutionError {
46 #[error("Operation log has no heads")]
47 NoHeads,
48}
49
50pub trait OpHeadsStoreLock {}
51
52pub trait OpHeadsStore: Send + Sync + Debug {
54 fn as_any(&self) -> &dyn Any;
55
56 fn name(&self) -> &str;
57
58 fn update_op_heads(
62 &self,
63 old_ids: &[OperationId],
64 new_id: &OperationId,
65 ) -> Result<(), OpHeadsStoreError>;
66
67 fn get_op_heads(&self) -> Result<Vec<OperationId>, OpHeadsStoreError>;
68
69 fn lock(&self) -> Result<Box<dyn OpHeadsStoreLock + '_>, OpHeadsStoreError>;
74}
75
76pub fn resolve_op_heads<E>(
81 op_heads_store: &dyn OpHeadsStore,
82 op_store: &Arc<dyn OpStore>,
83 resolver: impl FnOnce(Vec<Operation>) -> Result<Operation, E>,
84) -> Result<Operation, E>
85where
86 E: From<OpHeadResolutionError> + From<OpHeadsStoreError> + From<OpStoreError>,
87{
88 let mut op_heads = op_heads_store.get_op_heads()?;
92
93 if op_heads.len() == 1 {
94 let operation_id = op_heads.pop().unwrap();
95 let operation = op_store.read_operation(&operation_id)?;
96 return Ok(Operation::new(op_store.clone(), operation_id, operation));
97 }
98
99 let _lock = op_heads_store.lock()?;
108 let op_head_ids = op_heads_store.get_op_heads()?;
109
110 if op_head_ids.is_empty() {
111 return Err(OpHeadResolutionError::NoHeads.into());
112 }
113
114 if op_head_ids.len() == 1 {
115 let op_head_id = op_head_ids[0].clone();
116 let op_head = op_store.read_operation(&op_head_id)?;
117 return Ok(Operation::new(op_store.clone(), op_head_id, op_head));
118 }
119
120 let op_heads: Vec<_> = op_head_ids
121 .iter()
122 .map(|op_id: &OperationId| -> Result<Operation, OpStoreError> {
123 let data = op_store.read_operation(op_id)?;
124 Ok(Operation::new(op_store.clone(), op_id.clone(), data))
125 })
126 .try_collect()?;
127 let op_head_ids_before: HashSet<_> = op_heads.iter().map(|op| op.id().clone()).collect();
130 let filtered_op_heads = dag_walk::heads_ok(
131 op_heads.into_iter().map(Ok),
132 |op: &Operation| op.id().clone(),
133 |op: &Operation| op.parents().collect_vec(),
134 )?;
135 let op_head_ids_after: HashSet<_> =
136 filtered_op_heads.iter().map(|op| op.id().clone()).collect();
137 let ancestor_op_heads = op_head_ids_before
138 .difference(&op_head_ids_after)
139 .cloned()
140 .collect_vec();
141 let mut op_heads = filtered_op_heads.into_iter().collect_vec();
142
143 if let [op_head] = &*op_heads {
145 op_heads_store.update_op_heads(&ancestor_op_heads, op_head.id())?;
146 return Ok(op_head.clone());
147 }
148
149 op_heads.sort_by_key(|op| op.metadata().end_time.timestamp);
150 let new_op = resolver(op_heads)?;
151 let mut old_op_heads = ancestor_op_heads;
152 old_op_heads.extend_from_slice(new_op.parent_ids());
153 op_heads_store.update_op_heads(&old_op_heads, new_op.id())?;
154 Ok(new_op)
155}