1use std::cmp::Ordering;
18use std::collections::HashMap;
19use std::collections::HashSet;
20use std::slice;
21use std::sync::Arc;
22
23use itertools::Itertools as _;
24use pollster::FutureExt as _;
25use thiserror::Error;
26
27use crate::dag_walk;
28use crate::object_id::HexPrefix;
29use crate::object_id::PrefixResolution;
30use crate::op_heads_store;
31use crate::op_heads_store::OpHeadResolutionError;
32use crate::op_heads_store::OpHeadsStore;
33use crate::op_heads_store::OpHeadsStoreError;
34use crate::op_store::OpStore;
35use crate::op_store::OpStoreError;
36use crate::op_store::OpStoreResult;
37use crate::op_store::OperationId;
38use crate::operation::Operation;
39use crate::repo::ReadonlyRepo;
40use crate::repo::Repo as _;
41use crate::repo::RepoLoader;
42
43#[derive(Debug, Error)]
45pub enum OpsetEvaluationError {
46 #[error(transparent)]
48 OpsetResolution(#[from] OpsetResolutionError),
49 #[error(transparent)]
51 OpHeadsStore(#[from] OpHeadsStoreError),
52 #[error(transparent)]
54 OpHeadResolution(#[from] OpHeadResolutionError),
55 #[error(transparent)]
57 OpStore(#[from] OpStoreError),
58}
59
60#[derive(Debug, Error)]
63pub enum OpsetResolutionError {
64 #[error(r#"The "{expr}" expression resolved to more than one operation"#)]
68 MultipleOperations {
69 expr: String,
71 candidates: Vec<OperationId>,
73 },
74 #[error(r#"The "{0}" expression resolved to no operations"#)]
76 EmptyOperations(String),
77 #[error(r#"Operation ID "{0}" is not a valid hexadecimal prefix"#)]
79 InvalidIdPrefix(String),
80 #[error(r#"No operation ID matching "{0}""#)]
82 NoSuchOperation(String),
83 #[error(r#"Operation ID prefix "{0}" is ambiguous"#)]
85 AmbiguousIdPrefix(String),
86}
87
88pub fn resolve_op_for_load(
90 repo_loader: &RepoLoader,
91 op_str: &str,
92) -> Result<Operation, OpsetEvaluationError> {
93 let op_store = repo_loader.op_store();
94 let op_heads_store = repo_loader.op_heads_store().as_ref();
95 let get_current_op = || {
96 op_heads_store::resolve_op_heads(op_heads_store, op_store, async |op_heads| {
97 Err(OpsetResolutionError::MultipleOperations {
98 expr: "@".to_owned(),
99 candidates: op_heads.iter().map(|op| op.id().clone()).collect(),
100 }
101 .into())
102 })
103 .block_on()
104 };
105 let get_head_ops = || get_current_head_ops(op_store, op_heads_store);
106 resolve_single_op(op_store, get_current_op, get_head_ops, op_str)
107}
108
109pub fn resolve_op_with_repo(
113 repo: &ReadonlyRepo,
114 op_str: &str,
115) -> Result<Operation, OpsetEvaluationError> {
116 resolve_op_at(repo.op_store(), slice::from_ref(repo.operation()), op_str)
117}
118
119pub fn resolve_op_at(
121 op_store: &Arc<dyn OpStore>,
122 head_ops: &[Operation],
123 op_str: &str,
124) -> Result<Operation, OpsetEvaluationError> {
125 let get_current_op = || match head_ops {
126 [head_op] => Ok(head_op.clone()),
127 [] => Err(OpsetResolutionError::EmptyOperations("@".to_owned()).into()),
128 _ => Err(OpsetResolutionError::MultipleOperations {
129 expr: "@".to_owned(),
130 candidates: head_ops.iter().map(|op| op.id().clone()).collect(),
131 }
132 .into()),
133 };
134 let get_head_ops = || Ok(head_ops.to_vec());
135 resolve_single_op(op_store, get_current_op, get_head_ops, op_str)
136}
137
138fn resolve_single_op(
141 op_store: &Arc<dyn OpStore>,
142 get_current_op: impl FnOnce() -> Result<Operation, OpsetEvaluationError>,
143 get_head_ops: impl FnOnce() -> Result<Vec<Operation>, OpsetEvaluationError>,
144 op_str: &str,
145) -> Result<Operation, OpsetEvaluationError> {
146 let op_symbol = op_str.trim_end_matches(['-', '+']);
147 let op_postfix = &op_str[op_symbol.len()..];
148 let head_ops = op_postfix.contains('+').then(get_head_ops).transpose()?;
149 let mut operation = match op_symbol {
150 "@" => get_current_op(),
151 s => resolve_single_op_from_store(op_store, s),
152 }?;
153 for (i, c) in op_postfix.chars().enumerate() {
154 let mut neighbor_ops = match c {
155 '-' => operation.parents().try_collect()?,
156 '+' => find_child_ops(head_ops.as_ref().unwrap(), operation.id())?,
157 _ => unreachable!(),
158 };
159 operation = match neighbor_ops.len() {
160 0 => Err(OpsetResolutionError::EmptyOperations(op_str.to_owned()))?,
169 1 => neighbor_ops.pop().unwrap(),
170 _ => Err(OpsetResolutionError::MultipleOperations {
173 expr: op_str[..=op_symbol.len() + i].to_owned(),
174 candidates: neighbor_ops.iter().map(|op| op.id().clone()).collect(),
175 })?,
176 };
177 }
178 Ok(operation)
179}
180
181fn resolve_single_op_from_store(
182 op_store: &Arc<dyn OpStore>,
183 op_str: &str,
184) -> Result<Operation, OpsetEvaluationError> {
185 if op_str.is_empty() {
186 return Err(OpsetResolutionError::InvalidIdPrefix(op_str.to_owned()).into());
187 }
188 let prefix = HexPrefix::try_from_hex(op_str)
189 .ok_or_else(|| OpsetResolutionError::InvalidIdPrefix(op_str.to_owned()))?;
190 match op_store.resolve_operation_id_prefix(&prefix).block_on()? {
191 PrefixResolution::NoMatch => {
192 Err(OpsetResolutionError::NoSuchOperation(op_str.to_owned()).into())
193 }
194 PrefixResolution::SingleMatch(op_id) => {
195 let data = op_store.read_operation(&op_id).block_on()?;
196 Ok(Operation::new(op_store.clone(), op_id, data))
197 }
198 PrefixResolution::AmbiguousMatch => {
199 Err(OpsetResolutionError::AmbiguousIdPrefix(op_str.to_owned()).into())
200 }
201 }
202}
203
204pub fn get_current_head_ops(
207 op_store: &Arc<dyn OpStore>,
208 op_heads_store: &dyn OpHeadsStore,
209) -> Result<Vec<Operation>, OpsetEvaluationError> {
210 let mut head_ops: Vec<_> = op_heads_store
211 .get_op_heads()
212 .block_on()?
213 .into_iter()
214 .map(|id| -> OpStoreResult<Operation> {
215 let data = op_store.read_operation(&id).block_on()?;
216 Ok(Operation::new(op_store.clone(), id, data))
217 })
218 .try_collect()?;
219 head_ops.sort_by_key(|op| op.metadata().time.end.timestamp);
221 Ok(head_ops)
222}
223
224fn find_child_ops(
229 head_ops: &[Operation],
230 root_op_id: &OperationId,
231) -> OpStoreResult<Vec<Operation>> {
232 walk_ancestors(head_ops)
233 .take_while(|res| res.as_ref().map_or(true, |op| op.id() != root_op_id))
234 .filter_ok(|op| op.parent_ids().iter().any(|id| id == root_op_id))
235 .try_collect()
236}
237
238#[derive(Clone, Debug, Eq, Hash, PartialEq)]
239struct OperationByEndTime(Operation);
240
241impl Ord for OperationByEndTime {
242 fn cmp(&self, other: &Self) -> Ordering {
243 let self_end_time = &self.0.metadata().time.end;
244 let other_end_time = &other.0.metadata().time.end;
245 self_end_time
246 .cmp(other_end_time)
247 .then_with(|| self.0.cmp(&other.0)) }
249}
250
251impl PartialOrd for OperationByEndTime {
252 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
253 Some(self.cmp(other))
254 }
255}
256
257pub fn walk_ancestors(
259 head_ops: &[Operation],
260) -> impl Iterator<Item = OpStoreResult<Operation>> + use<> {
261 let head_ops = head_ops
262 .iter()
263 .cloned()
264 .map(OperationByEndTime)
265 .collect_vec();
266 dag_walk::topo_order_reverse_lazy_ok(
269 head_ops.into_iter().map(Ok),
270 |OperationByEndTime(op)| op.id().clone(),
271 |OperationByEndTime(op)| op.parents().map_ok(OperationByEndTime).collect_vec(),
272 |_| panic!("graph has cycle"),
273 )
274 .map_ok(|OperationByEndTime(op)| op)
275}
276
277pub fn walk_ancestors_range(
280 head_ops: &[Operation],
281 root_ops: &[Operation],
282) -> impl Iterator<Item = OpStoreResult<Operation>> + use<> {
283 let mut start_ops = itertools::chain(head_ops, root_ops)
284 .cloned()
285 .map(OperationByEndTime)
286 .collect_vec();
287
288 let leading_items = if root_ops.is_empty() {
290 vec![]
291 } else {
292 let unwanted_ids = root_ops.iter().map(|op| op.id().clone()).collect();
293 collect_ancestors_until_roots(&mut start_ops, unwanted_ids)
294 };
295
296 let trailing_iter = dag_walk::topo_order_reverse_lazy_ok(
299 start_ops.into_iter().map(Ok),
300 |OperationByEndTime(op)| op.id().clone(),
301 |OperationByEndTime(op)| op.parents().map_ok(OperationByEndTime).collect_vec(),
302 |_| panic!("graph has cycle"),
303 )
304 .map_ok(|OperationByEndTime(op)| op);
305 itertools::chain(leading_items, trailing_iter)
306}
307
308fn collect_ancestors_until_roots(
309 start_ops: &mut Vec<OperationByEndTime>,
310 mut unwanted_ids: HashSet<OperationId>,
311) -> Vec<OpStoreResult<Operation>> {
312 let sorted_ops = match dag_walk::topo_order_reverse_chunked(
313 start_ops,
314 |OperationByEndTime(op)| op.id().clone(),
315 |OperationByEndTime(op)| op.parents().map_ok(OperationByEndTime).collect_vec(),
316 |_| panic!("graph has cycle"),
317 ) {
318 Ok(sorted_ops) => sorted_ops,
319 Err(err) => return vec![Err(err)],
320 };
321 let mut items = Vec::new();
322 for OperationByEndTime(op) in sorted_ops {
323 if unwanted_ids.contains(op.id()) {
324 unwanted_ids.extend(op.parent_ids().iter().cloned());
325 } else {
326 items.push(Ok(op));
327 }
328 }
329 start_ops.retain(|OperationByEndTime(op)| !unwanted_ids.contains(op.id()));
331 items
332}
333
334#[derive(Clone, Debug, Eq, PartialEq)]
336pub struct ReparentStats {
337 pub new_head_ids: Vec<OperationId>,
339 pub rewritten_count: usize,
341 pub unreachable_count: usize,
344}
345
346pub fn reparent_range(
356 op_store: &dyn OpStore,
357 root_ops: &[Operation],
358 head_ops: &[Operation],
359 dest_op: &Operation,
360) -> OpStoreResult<ReparentStats> {
361 let ops_to_reparent: Vec<_> = walk_ancestors_range(head_ops, root_ops).try_collect()?;
362 let unreachable_count = walk_ancestors_range(root_ops, slice::from_ref(dest_op))
363 .process_results(|iter| iter.count())?;
364
365 assert!(
366 ops_to_reparent
367 .last()
368 .is_none_or(|op| op.id() != op_store.root_operation_id()),
369 "root operation cannot be rewritten"
370 );
371 let mut rewritten_ids = HashMap::new();
372 for old_op in ops_to_reparent.into_iter().rev() {
373 let mut data = old_op.store_operation().clone();
374 let mut dest_once = Some(dest_op.id());
375 data.parents = data
376 .parents
377 .iter()
378 .filter_map(|id| rewritten_ids.get(id).or_else(|| dest_once.take()))
379 .cloned()
380 .collect();
381 let new_id = op_store.write_operation(&data).block_on()?;
382 rewritten_ids.insert(old_op.id().clone(), new_id);
383 }
384
385 let mut dest_once = Some(dest_op.id());
386 let new_head_ids = head_ops
387 .iter()
388 .filter_map(|op| rewritten_ids.get(op.id()).or_else(|| dest_once.take()))
389 .cloned()
390 .collect();
391 Ok(ReparentStats {
392 new_head_ids,
393 rewritten_count: rewritten_ids.len(),
394 unreachable_count,
395 })
396}