1use std::cmp::Ordering;
18use std::collections::HashMap;
19use std::collections::HashSet;
20use std::slice;
21use std::sync::Arc;
22
23use futures::Stream;
24use futures::StreamExt as _;
25use futures::TryStreamExt as _;
26use futures::future::ready;
27use futures::future::try_join_all;
28use futures::stream;
29use itertools::Itertools as _;
30use pollster::FutureExt as _;
31use thiserror::Error;
32
33use crate::dag_walk_async;
34use crate::object_id::HexPrefix;
35use crate::object_id::PrefixResolution;
36use crate::op_heads_store;
37use crate::op_heads_store::OpHeadsStore;
38use crate::op_heads_store::OpHeadsStoreError;
39use crate::op_store::OpStore;
40use crate::op_store::OpStoreError;
41use crate::op_store::OpStoreResult;
42use crate::op_store::OperationId;
43use crate::operation::Operation;
44use crate::repo::ReadonlyRepo;
45use crate::repo::Repo as _;
46use crate::repo::RepoLoader;
47
48#[derive(Debug, Error)]
50pub enum OpsetEvaluationError {
51 #[error(transparent)]
53 OpsetResolution(#[from] OpsetResolutionError),
54 #[error(transparent)]
56 OpHeadsStore(#[from] OpHeadsStoreError),
57 #[error(transparent)]
59 OpStore(#[from] OpStoreError),
60}
61
62#[derive(Debug, Error)]
65pub enum OpsetResolutionError {
66 #[error(r#"The "{expr}" expression resolved to more than one operation"#)]
70 MultipleOperations {
71 expr: String,
73 candidates: Vec<OperationId>,
75 },
76 #[error(r#"The "{0}" expression resolved to no operations"#)]
78 EmptyOperations(String),
79 #[error(r#"Operation ID "{0}" is not a valid hexadecimal prefix"#)]
81 InvalidIdPrefix(String),
82 #[error(r#"No operation ID matching "{0}""#)]
84 NoSuchOperation(String),
85 #[error(r#"Operation ID prefix "{0}" is ambiguous"#)]
87 AmbiguousIdPrefix(String),
88}
89
90pub async fn resolve_op_for_load(
92 repo_loader: &RepoLoader,
93 op_str: &str,
94) -> Result<Operation, OpsetEvaluationError> {
95 let op_store = repo_loader.op_store();
96 let op_heads_store = repo_loader.op_heads_store().as_ref();
97 let get_current_op = async || {
98 op_heads_store::resolve_op_heads(op_heads_store, op_store, async |op_heads| {
99 Err(OpsetResolutionError::MultipleOperations {
100 expr: "@".to_owned(),
101 candidates: op_heads.iter().map(|op| op.id().clone()).collect(),
102 }
103 .into())
104 })
105 .await
106 };
107 let get_head_ops = async || get_current_head_ops(op_store, op_heads_store).await;
108 resolve_single_op(op_store, get_current_op, get_head_ops, op_str).await
109}
110
111pub async fn resolve_op_with_repo(
115 repo: &ReadonlyRepo,
116 op_str: &str,
117) -> Result<Operation, OpsetEvaluationError> {
118 resolve_op_at(repo.op_store(), slice::from_ref(repo.operation()), op_str).await
119}
120
121pub async fn resolve_op_at(
123 op_store: &Arc<dyn OpStore>,
124 head_ops: &[Operation],
125 op_str: &str,
126) -> Result<Operation, OpsetEvaluationError> {
127 let get_current_op = async || match head_ops {
128 [head_op] => Ok(head_op.clone()),
129 [] => Err(OpsetResolutionError::EmptyOperations("@".to_owned()).into()),
130 _ => Err(OpsetResolutionError::MultipleOperations {
131 expr: "@".to_owned(),
132 candidates: head_ops.iter().map(|op| op.id().clone()).collect(),
133 }
134 .into()),
135 };
136 let get_head_ops = async || Ok(head_ops.to_vec());
137 resolve_single_op(op_store, get_current_op, get_head_ops, op_str).await
138}
139
140async fn resolve_single_op(
143 op_store: &Arc<dyn OpStore>,
144 get_current_op: impl AsyncFnOnce() -> Result<Operation, OpsetEvaluationError>,
145 get_head_ops: impl AsyncFnOnce() -> Result<Vec<Operation>, OpsetEvaluationError>,
146 op_str: &str,
147) -> Result<Operation, OpsetEvaluationError> {
148 let op_symbol = op_str.trim_end_matches(['-', '+']);
149 let op_postfix = &op_str[op_symbol.len()..];
150 let head_ops = if op_postfix.contains('+') {
151 Some(get_head_ops().await?)
152 } else {
153 None
154 };
155 let mut operation = match op_symbol {
156 "@" => get_current_op().await,
157 s => resolve_single_op_from_store(op_store, s).await,
158 }?;
159 for (i, c) in op_postfix.chars().enumerate() {
160 let mut neighbor_ops = match c {
161 '-' => operation.parents().await?,
162 '+' => find_child_ops(head_ops.as_ref().unwrap(), operation.id()).await?,
163 _ => unreachable!(),
164 };
165 operation = match neighbor_ops.len() {
166 0 => Err(OpsetResolutionError::EmptyOperations(op_str.to_owned()))?,
175 1 => neighbor_ops.pop().unwrap(),
176 _ => Err(OpsetResolutionError::MultipleOperations {
179 expr: op_str[..=op_symbol.len() + i].to_owned(),
180 candidates: neighbor_ops.iter().map(|op| op.id().clone()).collect(),
181 })?,
182 };
183 }
184 Ok(operation)
185}
186
187async fn resolve_single_op_from_store(
188 op_store: &Arc<dyn OpStore>,
189 op_str: &str,
190) -> Result<Operation, OpsetEvaluationError> {
191 if op_str.is_empty() {
192 return Err(OpsetResolutionError::InvalidIdPrefix(op_str.to_owned()).into());
193 }
194 let prefix = HexPrefix::try_from_hex(op_str)
195 .ok_or_else(|| OpsetResolutionError::InvalidIdPrefix(op_str.to_owned()))?;
196 match op_store.resolve_operation_id_prefix(&prefix).await? {
197 PrefixResolution::NoMatch => {
198 Err(OpsetResolutionError::NoSuchOperation(op_str.to_owned()).into())
199 }
200 PrefixResolution::SingleMatch(op_id) => {
201 let data = op_store.read_operation(&op_id).await?;
202 Ok(Operation::new(op_store.clone(), op_id, data))
203 }
204 PrefixResolution::AmbiguousMatch => {
205 Err(OpsetResolutionError::AmbiguousIdPrefix(op_str.to_owned()).into())
206 }
207 }
208}
209
210pub async fn get_current_head_ops(
213 op_store: &Arc<dyn OpStore>,
214 op_heads_store: &dyn OpHeadsStore,
215) -> Result<Vec<Operation>, OpsetEvaluationError> {
216 let head_ops_futures = op_heads_store.get_op_heads().await?.into_iter().map(
217 async |id| -> OpStoreResult<Operation> {
218 let data = op_store.read_operation(&id).await?;
219 Ok(Operation::new(op_store.clone(), id, data))
220 },
221 );
222 let mut head_ops = try_join_all(head_ops_futures).await?;
223 head_ops.sort_by_key(|op| op.metadata().time.end.timestamp);
225 Ok(head_ops)
226}
227
228async fn find_child_ops(
233 head_ops: &[Operation],
234 root_op_id: &OperationId,
235) -> OpStoreResult<Vec<Operation>> {
236 walk_ancestors(head_ops)
237 .take_while(|res| ready(res.as_ref().map_or(true, |op| op.id() != root_op_id)))
238 .try_filter(|op| ready(op.parent_ids().iter().any(|id| id == root_op_id)))
239 .try_collect()
240 .await
241}
242
243#[derive(Clone, Debug, Eq, Hash, PartialEq)]
244struct OperationByEndTime(Operation);
245
246impl Ord for OperationByEndTime {
247 fn cmp(&self, other: &Self) -> Ordering {
248 let self_end_time = &self.0.metadata().time.end;
249 let other_end_time = &other.0.metadata().time.end;
250 self_end_time
251 .cmp(other_end_time)
252 .then_with(|| self.0.cmp(&other.0)) }
254}
255
256impl PartialOrd for OperationByEndTime {
257 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
258 Some(self.cmp(other))
259 }
260}
261
262pub fn walk_ancestors(
264 head_ops: &[Operation],
265) -> impl Stream<Item = OpStoreResult<Operation>> + use<> {
266 let head_ops = head_ops
267 .iter()
268 .cloned()
269 .map(OperationByEndTime)
270 .collect_vec();
271 dag_walk_async::topo_order_reverse_lazy(
274 head_ops.into_iter().map(Ok),
275 |OperationByEndTime(op)| op.id().clone(),
276 async |OperationByEndTime(op)| match op.parents().await {
277 Ok(parents) => parents
278 .into_iter()
279 .map(|parent| Ok(OperationByEndTime(parent)))
280 .collect_vec(),
281 Err(err) => vec![Err(err)],
282 },
283 |_| panic!("graph has cycle"),
284 )
285 .map_ok(|OperationByEndTime(op)| op)
286}
287
288pub fn walk_ancestors_range(
291 head_ops: &[Operation],
292 root_ops: &[Operation],
293) -> impl Stream<Item = OpStoreResult<Operation>> + use<> {
294 let mut start_ops = itertools::chain(head_ops, root_ops)
295 .cloned()
296 .map(OperationByEndTime)
297 .collect_vec();
298
299 let leading_items = if root_ops.is_empty() {
301 vec![]
302 } else {
303 let unwanted_ids = root_ops.iter().map(|op| op.id().clone()).collect();
304 collect_ancestors_until_roots(&mut start_ops, unwanted_ids)
305 };
306
307 let trailing_stream = dag_walk_async::topo_order_reverse_lazy(
310 start_ops.into_iter().map(Ok),
311 |OperationByEndTime(op)| op.id().clone(),
312 async |OperationByEndTime(op)| match op.parents().await {
313 Ok(parents) => parents
314 .into_iter()
315 .map(|op| Ok(OperationByEndTime(op)))
316 .collect_vec(),
317 Err(err) => vec![Err(err)],
318 },
319 |_| panic!("graph has cycle"),
320 )
321 .map_ok(|OperationByEndTime(op)| op);
322 stream::iter(leading_items).chain(trailing_stream)
323}
324
325fn collect_ancestors_until_roots(
326 start_ops: &mut Vec<OperationByEndTime>,
327 mut unwanted_ids: HashSet<OperationId>,
328) -> Vec<OpStoreResult<Operation>> {
329 let sorted_ops = match dag_walk_async::topo_order_reverse_chunked(
330 start_ops,
331 |OperationByEndTime(op)| op.id().clone(),
332 async |OperationByEndTime(op)| match op.parents().await {
333 Ok(parents) => parents
334 .into_iter()
335 .map(|op| Ok(OperationByEndTime(op)))
336 .collect_vec(),
337 Err(err) => vec![Err(err)],
338 },
339 |_| panic!("graph has cycle"),
340 )
341 .block_on()
342 {
343 Ok(sorted_ops) => sorted_ops,
344 Err(err) => return vec![Err(err)],
345 };
346 let mut items = Vec::new();
347 for OperationByEndTime(op) in sorted_ops {
348 if unwanted_ids.contains(op.id()) {
349 unwanted_ids.extend(op.parent_ids().iter().cloned());
350 } else {
351 items.push(Ok(op));
352 }
353 }
354 start_ops.retain(|OperationByEndTime(op)| !unwanted_ids.contains(op.id()));
356 items
357}
358
359pub async fn closest_common_ancestors(
365 set1: impl IntoIterator<Item = Operation>,
366 set2: impl IntoIterator<Item = Operation>,
367) -> OpStoreResult<Vec<Operation>> {
368 let ancestor_ops = dag_walk_async::closest_common_nodes(
369 set1.into_iter().map(OperationByEndTime),
370 set2.into_iter().map(OperationByEndTime),
371 |op: &OperationByEndTime| op.0.id().clone(),
372 async |op: &OperationByEndTime| {
373 op.0.parents()
374 .await
375 .map(|parents| parents.into_iter().map(OperationByEndTime))
376 },
377 )
378 .await?;
379 Ok(ancestor_ops.into_iter().map(|op| op.0).collect())
380}
381
382#[derive(Clone, Debug, Eq, PartialEq)]
384pub struct ReparentStats {
385 pub new_head_ids: Vec<OperationId>,
387 pub rewritten_count: usize,
389 pub unreachable_count: usize,
392}
393
394pub async fn reparent_range(
404 op_store: &dyn OpStore,
405 root_ops: &[Operation],
406 head_ops: &[Operation],
407 dest_op: &Operation,
408) -> OpStoreResult<ReparentStats> {
409 let ops_to_reparent: Vec<_> = walk_ancestors_range(head_ops, root_ops)
410 .try_collect()
411 .await?;
412 let unreachable_count = walk_ancestors_range(root_ops, slice::from_ref(dest_op))
413 .try_fold(0, |acc, _| async move { Ok(acc + 1) })
414 .await?;
415
416 assert!(
417 ops_to_reparent
418 .last()
419 .is_none_or(|op| op.id() != op_store.root_operation_id()),
420 "root operation cannot be rewritten"
421 );
422 let mut rewritten_ids = HashMap::new();
423 for old_op in ops_to_reparent.into_iter().rev() {
424 let mut data = old_op.store_operation().clone();
425 let mut dest_once = Some(dest_op.id());
426 data.parents = data
427 .parents
428 .iter()
429 .filter_map(|id| rewritten_ids.get(id).or_else(|| dest_once.take()))
430 .cloned()
431 .collect();
432 let new_id = op_store.write_operation(&data).await?;
433 rewritten_ids.insert(old_op.id().clone(), new_id);
434 }
435
436 let mut dest_once = Some(dest_op.id());
437 let new_head_ids = head_ops
438 .iter()
439 .filter_map(|op| rewritten_ids.get(op.id()).or_else(|| dest_once.take()))
440 .cloned()
441 .collect();
442 Ok(ReparentStats {
443 new_head_ids,
444 rewritten_count: rewritten_ids.len(),
445 unreachable_count,
446 })
447}