1use std::cmp::Ordering;
18use std::collections::HashMap;
19use std::collections::HashSet;
20use std::slice;
21use std::sync::Arc;
22
23use futures::Stream;
24use futures::StreamExt as _;
25use futures::TryStreamExt as _;
26use futures::future::ready;
27use futures::future::try_join_all;
28use futures::stream;
29use itertools::Itertools as _;
30use pollster::FutureExt as _;
31use thiserror::Error;
32
33use crate::dag_walk;
34use crate::object_id::HexPrefix;
35use crate::object_id::PrefixResolution;
36use crate::op_heads_store;
37use crate::op_heads_store::OpHeadResolutionError;
38use crate::op_heads_store::OpHeadsStore;
39use crate::op_heads_store::OpHeadsStoreError;
40use crate::op_store::OpStore;
41use crate::op_store::OpStoreError;
42use crate::op_store::OpStoreResult;
43use crate::op_store::OperationId;
44use crate::operation::Operation;
45use crate::repo::ReadonlyRepo;
46use crate::repo::Repo as _;
47use crate::repo::RepoLoader;
48
49#[derive(Debug, Error)]
51pub enum OpsetEvaluationError {
52 #[error(transparent)]
54 OpsetResolution(#[from] OpsetResolutionError),
55 #[error(transparent)]
57 OpHeadsStore(#[from] OpHeadsStoreError),
58 #[error(transparent)]
60 OpHeadResolution(#[from] OpHeadResolutionError),
61 #[error(transparent)]
63 OpStore(#[from] OpStoreError),
64}
65
66#[derive(Debug, Error)]
69pub enum OpsetResolutionError {
70 #[error(r#"The "{expr}" expression resolved to more than one operation"#)]
74 MultipleOperations {
75 expr: String,
77 candidates: Vec<OperationId>,
79 },
80 #[error(r#"The "{0}" expression resolved to no operations"#)]
82 EmptyOperations(String),
83 #[error(r#"Operation ID "{0}" is not a valid hexadecimal prefix"#)]
85 InvalidIdPrefix(String),
86 #[error(r#"No operation ID matching "{0}""#)]
88 NoSuchOperation(String),
89 #[error(r#"Operation ID prefix "{0}" is ambiguous"#)]
91 AmbiguousIdPrefix(String),
92}
93
94pub async fn resolve_op_for_load(
96 repo_loader: &RepoLoader,
97 op_str: &str,
98) -> Result<Operation, OpsetEvaluationError> {
99 let op_store = repo_loader.op_store();
100 let op_heads_store = repo_loader.op_heads_store().as_ref();
101 let get_current_op = async || {
102 op_heads_store::resolve_op_heads(op_heads_store, op_store, async |op_heads| {
103 Err(OpsetResolutionError::MultipleOperations {
104 expr: "@".to_owned(),
105 candidates: op_heads.iter().map(|op| op.id().clone()).collect(),
106 }
107 .into())
108 })
109 .await
110 };
111 let get_head_ops = async || get_current_head_ops(op_store, op_heads_store).await;
112 resolve_single_op(op_store, get_current_op, get_head_ops, op_str).await
113}
114
115pub async fn resolve_op_with_repo(
119 repo: &ReadonlyRepo,
120 op_str: &str,
121) -> Result<Operation, OpsetEvaluationError> {
122 resolve_op_at(repo.op_store(), slice::from_ref(repo.operation()), op_str).await
123}
124
125pub async fn resolve_op_at(
127 op_store: &Arc<dyn OpStore>,
128 head_ops: &[Operation],
129 op_str: &str,
130) -> Result<Operation, OpsetEvaluationError> {
131 let get_current_op = async || match head_ops {
132 [head_op] => Ok(head_op.clone()),
133 [] => Err(OpsetResolutionError::EmptyOperations("@".to_owned()).into()),
134 _ => Err(OpsetResolutionError::MultipleOperations {
135 expr: "@".to_owned(),
136 candidates: head_ops.iter().map(|op| op.id().clone()).collect(),
137 }
138 .into()),
139 };
140 let get_head_ops = async || Ok(head_ops.to_vec());
141 resolve_single_op(op_store, get_current_op, get_head_ops, op_str).await
142}
143
144async fn resolve_single_op(
147 op_store: &Arc<dyn OpStore>,
148 get_current_op: impl AsyncFnOnce() -> Result<Operation, OpsetEvaluationError>,
149 get_head_ops: impl AsyncFnOnce() -> Result<Vec<Operation>, OpsetEvaluationError>,
150 op_str: &str,
151) -> Result<Operation, OpsetEvaluationError> {
152 let op_symbol = op_str.trim_end_matches(['-', '+']);
153 let op_postfix = &op_str[op_symbol.len()..];
154 let head_ops = if op_postfix.contains('+') {
155 Some(get_head_ops().await?)
156 } else {
157 None
158 };
159 let mut operation = match op_symbol {
160 "@" => get_current_op().await,
161 s => resolve_single_op_from_store(op_store, s).await,
162 }?;
163 for (i, c) in op_postfix.chars().enumerate() {
164 let mut neighbor_ops = match c {
165 '-' => operation.parents().await?,
166 '+' => find_child_ops(head_ops.as_ref().unwrap(), operation.id()).await?,
167 _ => unreachable!(),
168 };
169 operation = match neighbor_ops.len() {
170 0 => Err(OpsetResolutionError::EmptyOperations(op_str.to_owned()))?,
179 1 => neighbor_ops.pop().unwrap(),
180 _ => Err(OpsetResolutionError::MultipleOperations {
183 expr: op_str[..=op_symbol.len() + i].to_owned(),
184 candidates: neighbor_ops.iter().map(|op| op.id().clone()).collect(),
185 })?,
186 };
187 }
188 Ok(operation)
189}
190
191async fn resolve_single_op_from_store(
192 op_store: &Arc<dyn OpStore>,
193 op_str: &str,
194) -> Result<Operation, OpsetEvaluationError> {
195 if op_str.is_empty() {
196 return Err(OpsetResolutionError::InvalidIdPrefix(op_str.to_owned()).into());
197 }
198 let prefix = HexPrefix::try_from_hex(op_str)
199 .ok_or_else(|| OpsetResolutionError::InvalidIdPrefix(op_str.to_owned()))?;
200 match op_store.resolve_operation_id_prefix(&prefix).await? {
201 PrefixResolution::NoMatch => {
202 Err(OpsetResolutionError::NoSuchOperation(op_str.to_owned()).into())
203 }
204 PrefixResolution::SingleMatch(op_id) => {
205 let data = op_store.read_operation(&op_id).await?;
206 Ok(Operation::new(op_store.clone(), op_id, data))
207 }
208 PrefixResolution::AmbiguousMatch => {
209 Err(OpsetResolutionError::AmbiguousIdPrefix(op_str.to_owned()).into())
210 }
211 }
212}
213
214pub async fn get_current_head_ops(
217 op_store: &Arc<dyn OpStore>,
218 op_heads_store: &dyn OpHeadsStore,
219) -> Result<Vec<Operation>, OpsetEvaluationError> {
220 let head_ops_futures = op_heads_store.get_op_heads().await?.into_iter().map(
221 async |id| -> OpStoreResult<Operation> {
222 let data = op_store.read_operation(&id).await?;
223 Ok(Operation::new(op_store.clone(), id, data))
224 },
225 );
226 let mut head_ops = try_join_all(head_ops_futures).await?;
227 head_ops.sort_by_key(|op| op.metadata().time.end.timestamp);
229 Ok(head_ops)
230}
231
232async fn find_child_ops(
237 head_ops: &[Operation],
238 root_op_id: &OperationId,
239) -> OpStoreResult<Vec<Operation>> {
240 walk_ancestors(head_ops)
241 .take_while(|res| ready(res.as_ref().map_or(true, |op| op.id() != root_op_id)))
242 .try_filter(|op| ready(op.parent_ids().iter().any(|id| id == root_op_id)))
243 .try_collect()
244 .await
245}
246
247#[derive(Clone, Debug, Eq, Hash, PartialEq)]
248struct OperationByEndTime(Operation);
249
250impl Ord for OperationByEndTime {
251 fn cmp(&self, other: &Self) -> Ordering {
252 let self_end_time = &self.0.metadata().time.end;
253 let other_end_time = &other.0.metadata().time.end;
254 self_end_time
255 .cmp(other_end_time)
256 .then_with(|| self.0.cmp(&other.0)) }
258}
259
260impl PartialOrd for OperationByEndTime {
261 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
262 Some(self.cmp(other))
263 }
264}
265
266pub fn walk_ancestors(
268 head_ops: &[Operation],
269) -> impl Stream<Item = OpStoreResult<Operation>> + use<> {
270 let head_ops = head_ops
271 .iter()
272 .cloned()
273 .map(OperationByEndTime)
274 .collect_vec();
275 stream::iter(dag_walk::topo_order_reverse_lazy_ok(
278 head_ops.into_iter().map(Ok),
279 |OperationByEndTime(op)| op.id().clone(),
280 |OperationByEndTime(op)| match op.parents().block_on() {
281 Ok(parents) => parents
282 .into_iter()
283 .map(|parent| Ok(OperationByEndTime(parent)))
284 .collect_vec(),
285 Err(err) => vec![Err(err)],
286 },
287 |_| panic!("graph has cycle"),
288 ))
289 .map_ok(|OperationByEndTime(op)| op)
290}
291
292pub fn walk_ancestors_range(
295 head_ops: &[Operation],
296 root_ops: &[Operation],
297) -> impl Stream<Item = OpStoreResult<Operation>> + use<> {
298 let mut start_ops = itertools::chain(head_ops, root_ops)
299 .cloned()
300 .map(OperationByEndTime)
301 .collect_vec();
302
303 let leading_items = if root_ops.is_empty() {
305 vec![]
306 } else {
307 let unwanted_ids = root_ops.iter().map(|op| op.id().clone()).collect();
308 collect_ancestors_until_roots(&mut start_ops, unwanted_ids)
309 };
310
311 let trailing_iter = dag_walk::topo_order_reverse_lazy_ok(
314 start_ops.into_iter().map(Ok),
315 |OperationByEndTime(op)| op.id().clone(),
316 |OperationByEndTime(op)| match op.parents().block_on() {
317 Ok(parents) => parents
318 .into_iter()
319 .map(|op| Ok(OperationByEndTime(op)))
320 .collect_vec(),
321 Err(err) => vec![Err(err)],
322 },
323 |_| panic!("graph has cycle"),
324 )
325 .map_ok(|OperationByEndTime(op)| op);
326 stream::iter(leading_items).chain(stream::iter(trailing_iter))
327}
328
329fn collect_ancestors_until_roots(
330 start_ops: &mut Vec<OperationByEndTime>,
331 mut unwanted_ids: HashSet<OperationId>,
332) -> Vec<OpStoreResult<Operation>> {
333 let sorted_ops = match dag_walk::topo_order_reverse_chunked(
334 start_ops,
335 |OperationByEndTime(op)| op.id().clone(),
336 |OperationByEndTime(op)| match op.parents().block_on() {
337 Ok(parents) => parents
338 .into_iter()
339 .map(|op| Ok(OperationByEndTime(op)))
340 .collect_vec(),
341 Err(err) => vec![Err(err)],
342 },
343 |_| panic!("graph has cycle"),
344 ) {
345 Ok(sorted_ops) => sorted_ops,
346 Err(err) => return vec![Err(err)],
347 };
348 let mut items = Vec::new();
349 for OperationByEndTime(op) in sorted_ops {
350 if unwanted_ids.contains(op.id()) {
351 unwanted_ids.extend(op.parent_ids().iter().cloned());
352 } else {
353 items.push(Ok(op));
354 }
355 }
356 start_ops.retain(|OperationByEndTime(op)| !unwanted_ids.contains(op.id()));
358 items
359}
360
361#[derive(Clone, Debug, Eq, PartialEq)]
363pub struct ReparentStats {
364 pub new_head_ids: Vec<OperationId>,
366 pub rewritten_count: usize,
368 pub unreachable_count: usize,
371}
372
373pub async fn reparent_range(
383 op_store: &dyn OpStore,
384 root_ops: &[Operation],
385 head_ops: &[Operation],
386 dest_op: &Operation,
387) -> OpStoreResult<ReparentStats> {
388 let ops_to_reparent: Vec<_> = walk_ancestors_range(head_ops, root_ops)
389 .try_collect()
390 .await?;
391 let unreachable_count = walk_ancestors_range(root_ops, slice::from_ref(dest_op))
392 .try_fold(0, |acc, _| async move { Ok(acc + 1) })
393 .await?;
394
395 assert!(
396 ops_to_reparent
397 .last()
398 .is_none_or(|op| op.id() != op_store.root_operation_id()),
399 "root operation cannot be rewritten"
400 );
401 let mut rewritten_ids = HashMap::new();
402 for old_op in ops_to_reparent.into_iter().rev() {
403 let mut data = old_op.store_operation().clone();
404 let mut dest_once = Some(dest_op.id());
405 data.parents = data
406 .parents
407 .iter()
408 .filter_map(|id| rewritten_ids.get(id).or_else(|| dest_once.take()))
409 .cloned()
410 .collect();
411 let new_id = op_store.write_operation(&data).await?;
412 rewritten_ids.insert(old_op.id().clone(), new_id);
413 }
414
415 let mut dest_once = Some(dest_op.id());
416 let new_head_ids = head_ops
417 .iter()
418 .filter_map(|op| rewritten_ids.get(op.id()).or_else(|| dest_once.take()))
419 .cloned()
420 .collect();
421 Ok(ReparentStats {
422 new_head_ids,
423 rewritten_count: rewritten_ids.len(),
424 unreachable_count,
425 })
426}