1use std::cmp::Ordering;
18use std::collections::HashMap;
19use std::collections::HashSet;
20use std::slice;
21use std::sync::Arc;
22
23use itertools::Itertools as _;
24use pollster::FutureExt as _;
25use thiserror::Error;
26
27use crate::dag_walk;
28use crate::object_id::HexPrefix;
29use crate::object_id::PrefixResolution;
30use crate::op_heads_store;
31use crate::op_heads_store::OpHeadResolutionError;
32use crate::op_heads_store::OpHeadsStore;
33use crate::op_heads_store::OpHeadsStoreError;
34use crate::op_store::OpStore;
35use crate::op_store::OpStoreError;
36use crate::op_store::OpStoreResult;
37use crate::op_store::OperationId;
38use crate::operation::Operation;
39use crate::repo::ReadonlyRepo;
40use crate::repo::Repo as _;
41use crate::repo::RepoLoader;
42
43#[derive(Debug, Error)]
45pub enum OpsetEvaluationError {
46 #[error(transparent)]
48 OpsetResolution(#[from] OpsetResolutionError),
49 #[error(transparent)]
51 OpHeadsStore(#[from] OpHeadsStoreError),
52 #[error(transparent)]
54 OpHeadResolution(#[from] OpHeadResolutionError),
55 #[error(transparent)]
57 OpStore(#[from] OpStoreError),
58}
59
60#[derive(Debug, Error)]
63pub enum OpsetResolutionError {
64 #[error(r#"The "{expr}" expression resolved to more than one operation"#)]
68 MultipleOperations {
69 expr: String,
71 candidates: Vec<OperationId>,
73 },
74 #[error(r#"The "{0}" expression resolved to no operations"#)]
76 EmptyOperations(String),
77 #[error(r#"Operation ID "{0}" is not a valid hexadecimal prefix"#)]
79 InvalidIdPrefix(String),
80 #[error(r#"No operation ID matching "{0}""#)]
82 NoSuchOperation(String),
83 #[error(r#"Operation ID prefix "{0}" is ambiguous"#)]
85 AmbiguousIdPrefix(String),
86}
87
88pub fn resolve_op_for_load(
90 repo_loader: &RepoLoader,
91 op_str: &str,
92) -> Result<Operation, OpsetEvaluationError> {
93 let op_store = repo_loader.op_store();
94 let op_heads_store = repo_loader.op_heads_store().as_ref();
95 let get_current_op = || {
96 op_heads_store::resolve_op_heads(op_heads_store, op_store, |op_heads| {
97 Err(OpsetResolutionError::MultipleOperations {
98 expr: "@".to_owned(),
99 candidates: op_heads.iter().map(|op| op.id().clone()).collect(),
100 }
101 .into())
102 })
103 };
104 let get_head_ops = || get_current_head_ops(op_store, op_heads_store);
105 resolve_single_op(op_store, get_current_op, get_head_ops, op_str)
106}
107
108pub fn resolve_op_with_repo(
112 repo: &ReadonlyRepo,
113 op_str: &str,
114) -> Result<Operation, OpsetEvaluationError> {
115 resolve_op_at(repo.op_store(), slice::from_ref(repo.operation()), op_str)
116}
117
118pub fn resolve_op_at(
120 op_store: &Arc<dyn OpStore>,
121 head_ops: &[Operation],
122 op_str: &str,
123) -> Result<Operation, OpsetEvaluationError> {
124 let get_current_op = || match head_ops {
125 [head_op] => Ok(head_op.clone()),
126 [] => Err(OpsetResolutionError::EmptyOperations("@".to_owned()).into()),
127 _ => Err(OpsetResolutionError::MultipleOperations {
128 expr: "@".to_owned(),
129 candidates: head_ops.iter().map(|op| op.id().clone()).collect(),
130 }
131 .into()),
132 };
133 let get_head_ops = || Ok(head_ops.to_vec());
134 resolve_single_op(op_store, get_current_op, get_head_ops, op_str)
135}
136
137fn resolve_single_op(
140 op_store: &Arc<dyn OpStore>,
141 get_current_op: impl FnOnce() -> Result<Operation, OpsetEvaluationError>,
142 get_head_ops: impl FnOnce() -> Result<Vec<Operation>, OpsetEvaluationError>,
143 op_str: &str,
144) -> Result<Operation, OpsetEvaluationError> {
145 let op_symbol = op_str.trim_end_matches(['-', '+']);
146 let op_postfix = &op_str[op_symbol.len()..];
147 let head_ops = op_postfix.contains('+').then(get_head_ops).transpose()?;
148 let mut operation = match op_symbol {
149 "@" => get_current_op(),
150 s => resolve_single_op_from_store(op_store, s),
151 }?;
152 for (i, c) in op_postfix.chars().enumerate() {
153 let mut neighbor_ops = match c {
154 '-' => operation.parents().try_collect()?,
155 '+' => find_child_ops(head_ops.as_ref().unwrap(), operation.id())?,
156 _ => unreachable!(),
157 };
158 operation = match neighbor_ops.len() {
159 0 => Err(OpsetResolutionError::EmptyOperations(op_str.to_owned()))?,
168 1 => neighbor_ops.pop().unwrap(),
169 _ => Err(OpsetResolutionError::MultipleOperations {
172 expr: op_str[..=op_symbol.len() + i].to_owned(),
173 candidates: neighbor_ops.iter().map(|op| op.id().clone()).collect(),
174 })?,
175 };
176 }
177 Ok(operation)
178}
179
180fn resolve_single_op_from_store(
181 op_store: &Arc<dyn OpStore>,
182 op_str: &str,
183) -> Result<Operation, OpsetEvaluationError> {
184 if op_str.is_empty() {
185 return Err(OpsetResolutionError::InvalidIdPrefix(op_str.to_owned()).into());
186 }
187 let prefix = HexPrefix::try_from_hex(op_str)
188 .ok_or_else(|| OpsetResolutionError::InvalidIdPrefix(op_str.to_owned()))?;
189 match op_store.resolve_operation_id_prefix(&prefix).block_on()? {
190 PrefixResolution::NoMatch => {
191 Err(OpsetResolutionError::NoSuchOperation(op_str.to_owned()).into())
192 }
193 PrefixResolution::SingleMatch(op_id) => {
194 let data = op_store.read_operation(&op_id).block_on()?;
195 Ok(Operation::new(op_store.clone(), op_id, data))
196 }
197 PrefixResolution::AmbiguousMatch => {
198 Err(OpsetResolutionError::AmbiguousIdPrefix(op_str.to_owned()).into())
199 }
200 }
201}
202
203pub fn get_current_head_ops(
206 op_store: &Arc<dyn OpStore>,
207 op_heads_store: &dyn OpHeadsStore,
208) -> Result<Vec<Operation>, OpsetEvaluationError> {
209 let mut head_ops: Vec<_> = op_heads_store
210 .get_op_heads()
211 .block_on()?
212 .into_iter()
213 .map(|id| -> OpStoreResult<Operation> {
214 let data = op_store.read_operation(&id).block_on()?;
215 Ok(Operation::new(op_store.clone(), id, data))
216 })
217 .try_collect()?;
218 head_ops.sort_by_key(|op| op.metadata().time.end.timestamp);
220 Ok(head_ops)
221}
222
223fn find_child_ops(
228 head_ops: &[Operation],
229 root_op_id: &OperationId,
230) -> OpStoreResult<Vec<Operation>> {
231 walk_ancestors(head_ops)
232 .take_while(|res| res.as_ref().map_or(true, |op| op.id() != root_op_id))
233 .filter_ok(|op| op.parent_ids().iter().any(|id| id == root_op_id))
234 .try_collect()
235}
236
237#[derive(Clone, Debug, Eq, Hash, PartialEq)]
238struct OperationByEndTime(Operation);
239
240impl Ord for OperationByEndTime {
241 fn cmp(&self, other: &Self) -> Ordering {
242 let self_end_time = &self.0.metadata().time.end;
243 let other_end_time = &other.0.metadata().time.end;
244 self_end_time
245 .cmp(other_end_time)
246 .then_with(|| self.0.cmp(&other.0)) }
248}
249
250impl PartialOrd for OperationByEndTime {
251 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
252 Some(self.cmp(other))
253 }
254}
255
256pub fn walk_ancestors(
258 head_ops: &[Operation],
259) -> impl Iterator<Item = OpStoreResult<Operation>> + use<> {
260 let head_ops = head_ops
261 .iter()
262 .cloned()
263 .map(OperationByEndTime)
264 .collect_vec();
265 dag_walk::topo_order_reverse_lazy_ok(
268 head_ops.into_iter().map(Ok),
269 |OperationByEndTime(op)| op.id().clone(),
270 |OperationByEndTime(op)| op.parents().map_ok(OperationByEndTime).collect_vec(),
271 |_| panic!("graph has cycle"),
272 )
273 .map_ok(|OperationByEndTime(op)| op)
274}
275
276pub fn walk_ancestors_range(
279 head_ops: &[Operation],
280 root_ops: &[Operation],
281) -> impl Iterator<Item = OpStoreResult<Operation>> + use<> {
282 let mut start_ops = itertools::chain(head_ops, root_ops)
283 .cloned()
284 .map(OperationByEndTime)
285 .collect_vec();
286
287 let leading_items = if root_ops.is_empty() {
289 vec![]
290 } else {
291 let unwanted_ids = root_ops.iter().map(|op| op.id().clone()).collect();
292 collect_ancestors_until_roots(&mut start_ops, unwanted_ids)
293 };
294
295 let trailing_iter = dag_walk::topo_order_reverse_lazy_ok(
298 start_ops.into_iter().map(Ok),
299 |OperationByEndTime(op)| op.id().clone(),
300 |OperationByEndTime(op)| op.parents().map_ok(OperationByEndTime).collect_vec(),
301 |_| panic!("graph has cycle"),
302 )
303 .map_ok(|OperationByEndTime(op)| op);
304 itertools::chain(leading_items, trailing_iter)
305}
306
307fn collect_ancestors_until_roots(
308 start_ops: &mut Vec<OperationByEndTime>,
309 mut unwanted_ids: HashSet<OperationId>,
310) -> Vec<OpStoreResult<Operation>> {
311 let sorted_ops = match dag_walk::topo_order_reverse_chunked(
312 start_ops,
313 |OperationByEndTime(op)| op.id().clone(),
314 |OperationByEndTime(op)| op.parents().map_ok(OperationByEndTime).collect_vec(),
315 |_| panic!("graph has cycle"),
316 ) {
317 Ok(sorted_ops) => sorted_ops,
318 Err(err) => return vec![Err(err)],
319 };
320 let mut items = Vec::new();
321 for OperationByEndTime(op) in sorted_ops {
322 if unwanted_ids.contains(op.id()) {
323 unwanted_ids.extend(op.parent_ids().iter().cloned());
324 } else {
325 items.push(Ok(op));
326 }
327 }
328 start_ops.retain(|OperationByEndTime(op)| !unwanted_ids.contains(op.id()));
330 items
331}
332
333#[derive(Clone, Debug, Eq, PartialEq)]
335pub struct ReparentStats {
336 pub new_head_ids: Vec<OperationId>,
338 pub rewritten_count: usize,
340 pub unreachable_count: usize,
343}
344
345pub fn reparent_range(
355 op_store: &dyn OpStore,
356 root_ops: &[Operation],
357 head_ops: &[Operation],
358 dest_op: &Operation,
359) -> OpStoreResult<ReparentStats> {
360 let ops_to_reparent: Vec<_> = walk_ancestors_range(head_ops, root_ops).try_collect()?;
361 let unreachable_count = walk_ancestors_range(root_ops, slice::from_ref(dest_op))
362 .process_results(|iter| iter.count())?;
363
364 assert!(
365 ops_to_reparent
366 .last()
367 .is_none_or(|op| op.id() != op_store.root_operation_id()),
368 "root operation cannot be rewritten"
369 );
370 let mut rewritten_ids = HashMap::new();
371 for old_op in ops_to_reparent.into_iter().rev() {
372 let mut data = old_op.store_operation().clone();
373 let mut dest_once = Some(dest_op.id());
374 data.parents = data
375 .parents
376 .iter()
377 .filter_map(|id| rewritten_ids.get(id).or_else(|| dest_once.take()))
378 .cloned()
379 .collect();
380 let new_id = op_store.write_operation(&data).block_on()?;
381 rewritten_ids.insert(old_op.id().clone(), new_id);
382 }
383
384 let mut dest_once = Some(dest_op.id());
385 let new_head_ids = head_ops
386 .iter()
387 .filter_map(|op| rewritten_ids.get(op.id()).or_else(|| dest_once.take()))
388 .cloned()
389 .collect();
390 Ok(ReparentStats {
391 new_head_ids,
392 rewritten_count: rewritten_ids.len(),
393 unreachable_count,
394 })
395}