1#![expect(missing_docs)]
16
17use std::any::Any;
18use std::collections::HashSet;
19use std::fmt::Debug;
20use std::sync::Arc;
21
22use async_trait::async_trait;
23use itertools::Itertools as _;
24use pollster::FutureExt as _;
25use thiserror::Error;
26
27use crate::dag_walk;
28use crate::op_store::OpStore;
29use crate::op_store::OpStoreError;
30use crate::op_store::OperationId;
31use crate::operation::Operation;
32
33#[derive(Debug, Error)]
34pub enum OpHeadsStoreError {
35 #[error("Failed to read operation heads")]
36 Read(#[source] Box<dyn std::error::Error + Send + Sync>),
37 #[error("Failed to record operation head {new_op_id}")]
38 Write {
39 new_op_id: OperationId,
40 source: Box<dyn std::error::Error + Send + Sync>,
41 },
42 #[error("Failed to lock operation heads store")]
43 Lock(#[source] Box<dyn std::error::Error + Send + Sync>),
44}
45
46#[derive(Debug, Error)]
47pub enum OpHeadResolutionError {
48 #[error("Operation log has no heads")]
49 NoHeads,
50}
51
52pub trait OpHeadsStoreLock {}
53
54#[async_trait]
56pub trait OpHeadsStore: Any + Send + Sync + Debug {
57 fn name(&self) -> &str;
58
59 async fn update_op_heads(
63 &self,
64 old_ids: &[OperationId],
65 new_id: &OperationId,
66 ) -> Result<(), OpHeadsStoreError>;
67
68 async fn get_op_heads(&self) -> Result<Vec<OperationId>, OpHeadsStoreError>;
69
70 async fn lock(&self) -> Result<Box<dyn OpHeadsStoreLock + '_>, OpHeadsStoreError>;
75}
76
77impl dyn OpHeadsStore {
78 pub fn downcast_ref<T: OpHeadsStore>(&self) -> Option<&T> {
80 (self as &dyn Any).downcast_ref()
81 }
82}
83
84pub fn resolve_op_heads<E>(
89 op_heads_store: &dyn OpHeadsStore,
90 op_store: &Arc<dyn OpStore>,
91 resolver: impl FnOnce(Vec<Operation>) -> Result<Operation, E>,
92) -> Result<Operation, E>
93where
94 E: From<OpHeadResolutionError> + From<OpHeadsStoreError> + From<OpStoreError>,
95{
96 let mut op_heads = op_heads_store.get_op_heads().block_on()?;
100
101 if op_heads.len() == 1 {
102 let operation_id = op_heads.pop().unwrap();
103 let operation = op_store.read_operation(&operation_id).block_on()?;
104 return Ok(Operation::new(op_store.clone(), operation_id, operation));
105 }
106
107 let _lock = op_heads_store.lock().block_on()?;
116 let op_head_ids = op_heads_store.get_op_heads().block_on()?;
117
118 if op_head_ids.is_empty() {
119 return Err(OpHeadResolutionError::NoHeads.into());
120 }
121
122 if op_head_ids.len() == 1 {
123 let op_head_id = op_head_ids[0].clone();
124 let op_head = op_store.read_operation(&op_head_id).block_on()?;
125 return Ok(Operation::new(op_store.clone(), op_head_id, op_head));
126 }
127
128 let op_heads: Vec<_> = op_head_ids
129 .iter()
130 .map(|op_id: &OperationId| -> Result<Operation, OpStoreError> {
131 let data = op_store.read_operation(op_id).block_on()?;
132 Ok(Operation::new(op_store.clone(), op_id.clone(), data))
133 })
134 .try_collect()?;
135 let op_head_ids_before: HashSet<_> = op_heads.iter().map(|op| op.id().clone()).collect();
138 let filtered_op_heads = dag_walk::heads_ok(
139 op_heads.into_iter().map(Ok),
140 |op: &Operation| op.id().clone(),
141 |op: &Operation| op.parents().collect_vec(),
142 )?;
143 let op_head_ids_after: HashSet<_> =
144 filtered_op_heads.iter().map(|op| op.id().clone()).collect();
145 let ancestor_op_heads = op_head_ids_before
146 .difference(&op_head_ids_after)
147 .cloned()
148 .collect_vec();
149 let mut op_heads = filtered_op_heads.into_iter().collect_vec();
150
151 if let [op_head] = &*op_heads {
153 op_heads_store
154 .update_op_heads(&ancestor_op_heads, op_head.id())
155 .block_on()?;
156 return Ok(op_head.clone());
157 }
158
159 op_heads.sort_by_key(|op| op.metadata().time.end.timestamp);
160 let new_op = resolver(op_heads)?;
161 let mut old_op_heads = ancestor_op_heads;
162 old_op_heads.extend_from_slice(new_op.parent_ids());
163 op_heads_store
164 .update_op_heads(&old_op_heads, new_op.id())
165 .block_on()?;
166 Ok(new_op)
167}