Skip to main content

omnigraph/db/
commit_graph.rs

1use std::collections::{HashMap, VecDeque};
2use std::sync::Arc;
3use std::time::{SystemTime, UNIX_EPOCH};
4
5use arrow_array::{
6    Array, RecordBatch, RecordBatchIterator, StringArray, TimestampMicrosecondArray, UInt64Array,
7};
8use arrow_schema::{DataType, Field, Schema, SchemaRef, TimeUnit};
9use futures::TryStreamExt;
10use lance::Dataset;
11use lance::dataset::{WriteMode, WriteParams};
12use lance_file::version::LanceFileVersion;
13
14use crate::error::{OmniError, Result};
15
16const GRAPH_COMMITS_DIR: &str = "_graph_commits.lance";
17const GRAPH_COMMIT_ACTORS_DIR: &str = "_graph_commit_actors.lance";
18
19#[derive(Debug, Clone)]
20pub struct GraphCommit {
21    pub graph_commit_id: String,
22    pub manifest_branch: Option<String>,
23    pub manifest_version: u64,
24    pub parent_commit_id: Option<String>,
25    pub merged_parent_commit_id: Option<String>,
26    pub actor_id: Option<String>,
27    pub created_at: i64,
28}
29
30pub struct CommitGraph {
31    root_uri: String,
32    dataset: Dataset,
33    actor_dataset: Option<Dataset>,
34    active_branch: Option<String>,
35    actor_by_commit_id: HashMap<String, String>,
36    commit_by_id: HashMap<String, GraphCommit>,
37    head_commit: Option<GraphCommit>,
38}
39
40impl CommitGraph {
41    pub async fn init(root_uri: &str, manifest_version: u64) -> Result<Self> {
42        let root = root_uri.trim_end_matches('/');
43        let uri = graph_commits_uri(root);
44        let genesis = GraphCommit {
45            graph_commit_id: ulid::Ulid::new().to_string(),
46            manifest_branch: None,
47            manifest_version,
48            parent_commit_id: None,
49            merged_parent_commit_id: None,
50            actor_id: None,
51            created_at: now_micros()?,
52        };
53
54        let batch = commits_to_batch(&[genesis.clone()])?;
55        let reader = RecordBatchIterator::new(vec![Ok(batch)], commit_graph_schema());
56        let params = WriteParams {
57            mode: WriteMode::Create,
58            enable_stable_row_ids: true,
59            data_storage_version: Some(LanceFileVersion::V2_2),
60            auto_cleanup: None,
61            skip_auto_cleanup: true,
62            ..Default::default()
63        };
64        let dataset = Dataset::write(reader, &uri as &str, Some(params))
65            .await
66            .map_err(|e| OmniError::Lance(e.to_string()))?;
67        let actor_dataset = create_commit_actor_dataset(root).await?;
68
69        Ok(Self {
70            root_uri: root.to_string(),
71            dataset,
72            actor_dataset: Some(actor_dataset),
73            active_branch: None,
74            actor_by_commit_id: HashMap::new(),
75            commit_by_id: HashMap::from([(genesis.graph_commit_id.clone(), genesis.clone())]),
76            head_commit: Some(genesis),
77        })
78    }
79
80    pub async fn open(root_uri: &str) -> Result<Self> {
81        let root = root_uri.trim_end_matches('/');
82        let wrapper = crate::instrumentation::commit_graph_wrapper();
83        let dataset =
84            crate::instrumentation::open_dataset_tracked(&graph_commits_uri(root), wrapper.clone())
85                .await?;
86        let actor_dataset =
87            crate::instrumentation::open_dataset_tracked(&graph_commit_actors_uri(root), wrapper)
88                .await
89                .ok();
90        let actor_by_commit_id = match &actor_dataset {
91            Some(dataset) => load_commit_actor_cache(dataset).await?,
92            None => HashMap::new(),
93        };
94        let (commit_by_id, head_commit) = load_commit_cache(&dataset, &actor_by_commit_id).await?;
95        Ok(Self {
96            root_uri: root.to_string(),
97            dataset,
98            actor_dataset,
99            active_branch: None,
100            actor_by_commit_id,
101            commit_by_id,
102            head_commit,
103        })
104    }
105
106    pub async fn open_at_branch(root_uri: &str, branch: &str) -> Result<Self> {
107        let root = root_uri.trim_end_matches('/');
108        let wrapper = crate::instrumentation::commit_graph_wrapper();
109        let dataset =
110            crate::instrumentation::open_dataset_tracked(&graph_commits_uri(root), wrapper.clone())
111                .await?;
112        let dataset = dataset
113            .checkout_branch(branch)
114            .await
115            .map_err(|e| OmniError::Lance(e.to_string()))?;
116        let actor_dataset =
117            crate::instrumentation::open_dataset_tracked(&graph_commit_actors_uri(root), wrapper)
118                .await
119                .ok();
120        let actor_by_commit_id = match &actor_dataset {
121            Some(dataset) => load_commit_actor_cache(dataset).await?,
122            None => HashMap::new(),
123        };
124        let (commit_by_id, head_commit) = load_commit_cache(&dataset, &actor_by_commit_id).await?;
125        Ok(Self {
126            root_uri: root.to_string(),
127            dataset,
128            actor_dataset,
129            active_branch: Some(branch.to_string()),
130            actor_by_commit_id,
131            commit_by_id,
132            head_commit,
133        })
134    }
135
136    pub async fn refresh(&mut self) -> Result<()> {
137        let root = self.root_uri.clone();
138        let wrapper = crate::instrumentation::commit_graph_wrapper();
139        self.dataset = crate::instrumentation::open_dataset_tracked(
140            &graph_commits_uri(&root),
141            wrapper.clone(),
142        )
143        .await?;
144        if let Some(branch) = &self.active_branch {
145            self.dataset = self
146                .dataset
147                .checkout_branch(branch)
148                .await
149                .map_err(|e| OmniError::Lance(e.to_string()))?;
150        }
151        self.actor_dataset =
152            crate::instrumentation::open_dataset_tracked(&graph_commit_actors_uri(&root), wrapper)
153                .await
154                .ok();
155        self.actor_by_commit_id = match &self.actor_dataset {
156            Some(dataset) => load_commit_actor_cache(dataset).await?,
157            None => HashMap::new(),
158        };
159        let (commit_by_id, head_commit) =
160            load_commit_cache(&self.dataset, &self.actor_by_commit_id).await?;
161        self.commit_by_id = commit_by_id;
162        self.head_commit = head_commit;
163        Ok(())
164    }
165
166    pub fn version(&self) -> u64 {
167        self.dataset.version().version
168    }
169
170    pub async fn create_branch(&mut self, name: &str) -> Result<()> {
171        let mut ds = self.dataset.clone();
172        ds.create_branch(name, self.version(), None)
173            .await
174            .map_err(|e| OmniError::Lance(e.to_string()))?;
175        Ok(())
176    }
177
178    pub async fn delete_branch(&mut self, name: &str) -> Result<()> {
179        let mut ds = Dataset::open(&graph_commits_uri(&self.root_uri))
180            .await
181            .map_err(|e| OmniError::Lance(e.to_string()))?;
182        ds.delete_branch(name)
183            .await
184            .map_err(|e| OmniError::Lance(e.to_string()))?;
185        self.refresh().await
186    }
187
188    /// Idempotently drop the commit-graph branch `name`, tolerating an
189    /// already-absent branch (see [`TableStore::force_delete_branch`] for the
190    /// same semantics). Used by the best-effort reclaim in `branch_delete` and
191    /// the `cleanup` orphan reconciler. `RefConflict` (referencing descendants)
192    /// is still surfaced.
193    pub async fn force_delete_branch(&mut self, name: &str) -> Result<()> {
194        let mut ds = Dataset::open(&graph_commits_uri(&self.root_uri))
195            .await
196            .map_err(|e| OmniError::Lance(e.to_string()))?;
197        match ds.force_delete_branch(name).await {
198            Ok(()) => {}
199            Err(lance::Error::RefNotFound { .. }) | Err(lance::Error::NotFound { .. }) => {}
200            Err(e) => return Err(OmniError::Lance(e.to_string())),
201        }
202        self.refresh().await
203    }
204
205    /// List the named branches present on the commit-graph dataset. The
206    /// `cleanup` reconciler diffs this against the manifest branch set to find
207    /// orphaned commit-graph branches to reclaim.
208    pub async fn list_branches(&self) -> Result<Vec<String>> {
209        let ds = Dataset::open(&graph_commits_uri(&self.root_uri))
210            .await
211            .map_err(|e| OmniError::Lance(e.to_string()))?;
212        let branches = ds
213            .list_branches()
214            .await
215            .map_err(|e| OmniError::Lance(e.to_string()))?;
216        Ok(branches.into_keys().collect())
217    }
218
219    pub async fn append_commit(
220        &mut self,
221        manifest_branch: Option<&str>,
222        manifest_version: u64,
223        actor_id: Option<&str>,
224    ) -> Result<String> {
225        let parent_commit_id = self.head_commit_id().await?;
226        self.append_commit_with_parents(
227            manifest_branch,
228            manifest_version,
229            parent_commit_id.as_deref(),
230            None,
231            actor_id,
232        )
233        .await
234    }
235
236    pub async fn append_merge_commit(
237        &mut self,
238        manifest_branch: Option<&str>,
239        manifest_version: u64,
240        parent_commit_id: &str,
241        merged_parent_commit_id: &str,
242        actor_id: Option<&str>,
243    ) -> Result<String> {
244        self.append_commit_with_parents(
245            manifest_branch,
246            manifest_version,
247            Some(parent_commit_id),
248            Some(merged_parent_commit_id),
249            actor_id,
250        )
251        .await
252    }
253
254    async fn append_commit_with_parents(
255        &mut self,
256        manifest_branch: Option<&str>,
257        manifest_version: u64,
258        parent_commit_id: Option<&str>,
259        merged_parent_commit_id: Option<&str>,
260        actor_id: Option<&str>,
261    ) -> Result<String> {
262        let graph_commit_id = ulid::Ulid::new().to_string();
263        let commit = GraphCommit {
264            graph_commit_id: graph_commit_id.clone(),
265            manifest_branch: manifest_branch.map(|s| s.to_string()),
266            manifest_version,
267            parent_commit_id: parent_commit_id.map(|s| s.to_string()),
268            merged_parent_commit_id: merged_parent_commit_id.map(|s| s.to_string()),
269            actor_id: actor_id.map(str::to_string),
270            created_at: now_micros()?,
271        };
272
273        let batch = commits_to_batch(&[commit.clone()])?;
274        let reader = RecordBatchIterator::new(vec![Ok(batch)], commit_graph_schema());
275        let mut ds = self.dataset.clone();
276        ds.append(reader, None)
277            .await
278            .map_err(|e| OmniError::Lance(e.to_string()))?;
279        self.dataset = ds;
280        if let Some(actor_id) = actor_id {
281            self.append_actor(&graph_commit_id, actor_id).await?;
282        }
283        self.commit_by_id
284            .insert(graph_commit_id.clone(), commit.clone());
285        if should_replace_head(self.head_commit.as_ref(), &commit) {
286            self.head_commit = Some(commit);
287        }
288
289        Ok(graph_commit_id)
290    }
291
292    async fn append_actor(&mut self, graph_commit_id: &str, actor_id: &str) -> Result<()> {
293        if self
294            .actor_by_commit_id
295            .get(graph_commit_id)
296            .is_some_and(|existing| existing == actor_id)
297        {
298            return Ok(());
299        }
300
301        let record = CommitActorRecord {
302            graph_commit_id: graph_commit_id.to_string(),
303            actor_id: actor_id.to_string(),
304            created_at: now_micros()?,
305        };
306        let batch = commit_actors_to_batch(&[record])?;
307        let reader = RecordBatchIterator::new(vec![Ok(batch)], commit_actor_schema());
308        let mut dataset = match self.actor_dataset.take() {
309            Some(dataset) => dataset,
310            None => create_commit_actor_dataset(&self.root_uri).await?,
311        };
312        dataset
313            .append(reader, None)
314            .await
315            .map_err(|e| OmniError::Lance(e.to_string()))?;
316        self.actor_by_commit_id
317            .insert(graph_commit_id.to_string(), actor_id.to_string());
318        self.actor_dataset = Some(dataset);
319        Ok(())
320    }
321
322    pub async fn head_commit(&self) -> Result<Option<GraphCommit>> {
323        Ok(self.head_commit.clone())
324    }
325
326    pub async fn head_commit_id(&self) -> Result<Option<String>> {
327        Ok(self.head_commit().await?.map(|c| c.graph_commit_id))
328    }
329
330    pub async fn load_commits(&self) -> Result<Vec<GraphCommit>> {
331        let mut commits = self.commit_by_id.values().cloned().collect::<Vec<_>>();
332        commits.sort_by(|a, b| {
333            a.manifest_version
334                .cmp(&b.manifest_version)
335                .then_with(|| a.created_at.cmp(&b.created_at))
336                .then_with(|| a.graph_commit_id.cmp(&b.graph_commit_id))
337        });
338        Ok(commits)
339    }
340
341    pub fn get_commit(&self, commit_id: &str) -> Option<GraphCommit> {
342        self.commit_by_id.get(commit_id).cloned()
343    }
344
345    pub async fn merge_base(
346        root_uri: &str,
347        source_branch: Option<&str>,
348        target_branch: Option<&str>,
349    ) -> Result<Option<GraphCommit>> {
350        let source = open_for_branch(root_uri, source_branch).await?;
351        let target = open_for_branch(root_uri, target_branch).await?;
352
353        let source_head = match source.head_commit().await? {
354            Some(commit) => commit,
355            None => return Ok(None),
356        };
357        let target_head = match target.head_commit().await? {
358            Some(commit) => commit,
359            None => return Ok(None),
360        };
361
362        let mut commits = HashMap::new();
363        for commit in source.load_commits().await? {
364            commits.insert(commit.graph_commit_id.clone(), commit);
365        }
366        for commit in target.load_commits().await? {
367            commits.insert(commit.graph_commit_id.clone(), commit);
368        }
369
370        let source_distances = ancestor_distances(&source_head.graph_commit_id, &commits);
371        let target_distances = ancestor_distances(&target_head.graph_commit_id, &commits);
372
373        let best = source_distances
374            .iter()
375            .filter_map(|(id, source_distance)| {
376                target_distances.get(id).and_then(|target_distance| {
377                    commits.get(id).map(|commit| {
378                        (
379                            (
380                                *source_distance + *target_distance,
381                                u64::MAX - commit.manifest_version,
382                            ),
383                            commit.clone(),
384                        )
385                    })
386                })
387            })
388            .min_by_key(|(score, _)| *score)
389            .map(|(_, commit)| commit);
390
391        Ok(best)
392    }
393}
394
395pub(crate) fn graph_commits_uri(root_uri: &str) -> String {
396    format!("{}/{}", root_uri.trim_end_matches('/'), GRAPH_COMMITS_DIR)
397}
398
399fn graph_commit_actors_uri(root_uri: &str) -> String {
400    format!(
401        "{}/{}",
402        root_uri.trim_end_matches('/'),
403        GRAPH_COMMIT_ACTORS_DIR
404    )
405}
406
407fn commit_graph_schema() -> SchemaRef {
408    Arc::new(Schema::new(vec![
409        Field::new("graph_commit_id", DataType::Utf8, false),
410        Field::new("manifest_branch", DataType::Utf8, true),
411        Field::new("manifest_version", DataType::UInt64, false),
412        Field::new("parent_commit_id", DataType::Utf8, true),
413        Field::new("merged_parent_commit_id", DataType::Utf8, true),
414        Field::new(
415            "created_at",
416            DataType::Timestamp(TimeUnit::Microsecond, None),
417            false,
418        ),
419    ]))
420}
421
422fn commit_actor_schema() -> SchemaRef {
423    Arc::new(Schema::new(vec![
424        Field::new("graph_commit_id", DataType::Utf8, false),
425        Field::new("actor_id", DataType::Utf8, false),
426        Field::new(
427            "created_at",
428            DataType::Timestamp(TimeUnit::Microsecond, None),
429            false,
430        ),
431    ]))
432}
433
434#[derive(Debug, Clone)]
435struct CommitActorRecord {
436    graph_commit_id: String,
437    actor_id: String,
438    created_at: i64,
439}
440
441async fn create_commit_actor_dataset(root_uri: &str) -> Result<Dataset> {
442    let uri = graph_commit_actors_uri(root_uri);
443    let batch = RecordBatch::new_empty(commit_actor_schema());
444    let reader = RecordBatchIterator::new(vec![Ok(batch)], commit_actor_schema());
445    let params = WriteParams {
446        mode: WriteMode::Create,
447        enable_stable_row_ids: true,
448        data_storage_version: Some(LanceFileVersion::V2_2),
449        auto_cleanup: None,
450        skip_auto_cleanup: true,
451        ..Default::default()
452    };
453    match Dataset::write(reader, &uri as &str, Some(params)).await {
454        Ok(dataset) => Ok(dataset),
455        Err(err) if err.to_string().contains("Dataset already exists") => Dataset::open(&uri)
456            .await
457            .map_err(|open_err| OmniError::Lance(open_err.to_string())),
458        Err(err) => Err(OmniError::Lance(err.to_string())),
459    }
460}
461
462fn commits_to_batch(commits: &[GraphCommit]) -> Result<RecordBatch> {
463    let ids: Vec<&str> = commits.iter().map(|c| c.graph_commit_id.as_str()).collect();
464    let branches: Vec<Option<&str>> = commits
465        .iter()
466        .map(|c| c.manifest_branch.as_deref())
467        .collect();
468    let versions: Vec<u64> = commits.iter().map(|c| c.manifest_version).collect();
469    let parents: Vec<Option<&str>> = commits
470        .iter()
471        .map(|c| c.parent_commit_id.as_deref())
472        .collect();
473    let merged_parents: Vec<Option<&str>> = commits
474        .iter()
475        .map(|c| c.merged_parent_commit_id.as_deref())
476        .collect();
477    let created_at: Vec<i64> = commits.iter().map(|c| c.created_at).collect();
478
479    RecordBatch::try_new(
480        commit_graph_schema(),
481        vec![
482            Arc::new(StringArray::from(ids)),
483            Arc::new(StringArray::from(branches)),
484            Arc::new(UInt64Array::from(versions)),
485            Arc::new(StringArray::from(parents)),
486            Arc::new(StringArray::from(merged_parents)),
487            Arc::new(TimestampMicrosecondArray::from(created_at)),
488        ],
489    )
490    .map_err(|e| OmniError::Lance(e.to_string()))
491}
492
493async fn load_commit_cache(
494    dataset: &Dataset,
495    actor_by_commit_id: &HashMap<String, String>,
496) -> Result<(HashMap<String, GraphCommit>, Option<GraphCommit>)> {
497    let batches: Vec<RecordBatch> = dataset
498        .scan()
499        .try_into_stream()
500        .await
501        .map_err(|e| OmniError::Lance(e.to_string()))?
502        .try_collect()
503        .await
504        .map_err(|e| OmniError::Lance(e.to_string()))?;
505
506    let mut commits = load_commits_from_batches(&batches)?;
507    for commit in &mut commits {
508        commit.actor_id = actor_by_commit_id
509            .get(commit.graph_commit_id.as_str())
510            .cloned();
511    }
512    let mut commit_by_id = HashMap::with_capacity(commits.len());
513    let mut head_commit = None;
514    for commit in commits {
515        if should_replace_head(head_commit.as_ref(), &commit) {
516            head_commit = Some(commit.clone());
517        }
518        commit_by_id.insert(commit.graph_commit_id.clone(), commit);
519    }
520    Ok((commit_by_id, head_commit))
521}
522
523async fn load_commit_actor_cache(dataset: &Dataset) -> Result<HashMap<String, String>> {
524    let batches: Vec<RecordBatch> = dataset
525        .scan()
526        .try_into_stream()
527        .await
528        .map_err(|e| OmniError::Lance(e.to_string()))?
529        .try_collect()
530        .await
531        .map_err(|e| OmniError::Lance(e.to_string()))?;
532
533    let mut actors = HashMap::new();
534    for batch in batches {
535        let commit_ids = string_column(&batch, "graph_commit_id", "commit actor registry")?;
536        let actor_ids = string_column(&batch, "actor_id", "commit actor registry")?;
537        for row in 0..batch.num_rows() {
538            actors.insert(
539                commit_ids.value(row).to_string(),
540                actor_ids.value(row).to_string(),
541            );
542        }
543    }
544    Ok(actors)
545}
546
547fn load_commits_from_batches(batches: &[RecordBatch]) -> Result<Vec<GraphCommit>> {
548    let mut commits = Vec::new();
549    for batch in batches {
550        let ids = string_column(batch, "graph_commit_id", "commit graph")?;
551        let branches = string_column(batch, "manifest_branch", "commit graph")?;
552        let versions = u64_column(batch, "manifest_version", "commit graph")?;
553        let parents = string_column(batch, "parent_commit_id", "commit graph")?;
554        let merged_parents = string_column(batch, "merged_parent_commit_id", "commit graph")?;
555        let created = timestamp_micros_column(batch, "created_at", "commit graph")?;
556
557        for row in 0..batch.num_rows() {
558            commits.push(GraphCommit {
559                graph_commit_id: ids.value(row).to_string(),
560                manifest_branch: if branches.is_null(row) {
561                    None
562                } else {
563                    Some(branches.value(row).to_string())
564                },
565                manifest_version: versions.value(row),
566                parent_commit_id: if parents.is_null(row) {
567                    None
568                } else {
569                    Some(parents.value(row).to_string())
570                },
571                merged_parent_commit_id: if merged_parents.is_null(row) {
572                    None
573                } else {
574                    Some(merged_parents.value(row).to_string())
575                },
576                actor_id: None,
577                created_at: created.value(row),
578            });
579        }
580    }
581    Ok(commits)
582}
583
584fn commit_actors_to_batch(records: &[CommitActorRecord]) -> Result<RecordBatch> {
585    let commit_ids: Vec<&str> = records
586        .iter()
587        .map(|record| record.graph_commit_id.as_str())
588        .collect();
589    let actor_ids: Vec<&str> = records
590        .iter()
591        .map(|record| record.actor_id.as_str())
592        .collect();
593    let created_at: Vec<i64> = records.iter().map(|record| record.created_at).collect();
594
595    RecordBatch::try_new(
596        commit_actor_schema(),
597        vec![
598            Arc::new(StringArray::from(commit_ids)),
599            Arc::new(StringArray::from(actor_ids)),
600            Arc::new(TimestampMicrosecondArray::from(created_at)),
601        ],
602    )
603    .map_err(|e| OmniError::Lance(e.to_string()))
604}
605
606fn should_replace_head(current: Option<&GraphCommit>, candidate: &GraphCommit) -> bool {
607    current.is_none_or(|existing| {
608        candidate
609            .manifest_version
610            .cmp(&existing.manifest_version)
611            .then_with(|| candidate.created_at.cmp(&existing.created_at))
612            .then_with(|| candidate.graph_commit_id.cmp(&existing.graph_commit_id))
613            .is_gt()
614    })
615}
616
617fn string_column<'a>(batch: &'a RecordBatch, name: &str, context: &str) -> Result<&'a StringArray> {
618    batch
619        .column_by_name(name)
620        .ok_or_else(|| {
621            OmniError::manifest_internal(format!("{context} batch missing '{name}' column"))
622        })?
623        .as_any()
624        .downcast_ref::<StringArray>()
625        .ok_or_else(|| {
626            OmniError::manifest_internal(format!("{context} column '{name}' is not Utf8"))
627        })
628}
629
630fn u64_column<'a>(batch: &'a RecordBatch, name: &str, context: &str) -> Result<&'a UInt64Array> {
631    batch
632        .column_by_name(name)
633        .ok_or_else(|| {
634            OmniError::manifest_internal(format!("{context} batch missing '{name}' column"))
635        })?
636        .as_any()
637        .downcast_ref::<UInt64Array>()
638        .ok_or_else(|| {
639            OmniError::manifest_internal(format!("{context} column '{name}' is not UInt64"))
640        })
641}
642
643fn timestamp_micros_column<'a>(
644    batch: &'a RecordBatch,
645    name: &str,
646    context: &str,
647) -> Result<&'a TimestampMicrosecondArray> {
648    batch
649        .column_by_name(name)
650        .ok_or_else(|| {
651            OmniError::manifest_internal(format!("{context} batch missing '{name}' column"))
652        })?
653        .as_any()
654        .downcast_ref::<TimestampMicrosecondArray>()
655        .ok_or_else(|| {
656            OmniError::manifest_internal(format!(
657                "{context} column '{name}' is not Timestamp(Microsecond)"
658            ))
659        })
660}
661
662fn ancestor_distances(
663    start_id: &str,
664    commits: &HashMap<String, GraphCommit>,
665) -> HashMap<String, u64> {
666    let mut distances = HashMap::new();
667    let mut queue = VecDeque::from([(start_id.to_string(), 0u64)]);
668
669    while let Some((id, distance)) = queue.pop_front() {
670        if let Some(existing) = distances.get(&id) {
671            if *existing <= distance {
672                continue;
673            }
674        }
675        distances.insert(id.clone(), distance);
676
677        if let Some(commit) = commits.get(&id) {
678            if let Some(parent) = &commit.parent_commit_id {
679                queue.push_back((parent.clone(), distance + 1));
680            }
681            if let Some(parent) = &commit.merged_parent_commit_id {
682                queue.push_back((parent.clone(), distance + 1));
683            }
684        }
685    }
686
687    distances
688}
689
690async fn open_for_branch(root_uri: &str, branch: Option<&str>) -> Result<CommitGraph> {
691    match branch {
692        Some(branch) if branch != "main" => CommitGraph::open_at_branch(root_uri, branch).await,
693        _ => CommitGraph::open(root_uri).await,
694    }
695}
696
697fn now_micros() -> Result<i64> {
698    let duration = SystemTime::now()
699        .duration_since(UNIX_EPOCH)
700        .map_err(|e| OmniError::manifest(format!("system clock before UNIX_EPOCH: {}", e)))?;
701    Ok(duration.as_micros() as i64)
702}
703
704#[cfg(test)]
705mod tests {
706    use std::sync::Arc;
707
708    use arrow_schema::{DataType, Field, Schema};
709
710    use super::*;
711
712    #[test]
713    fn load_commits_from_batches_returns_error_for_bad_schema() {
714        let batch = RecordBatch::try_new(
715            Arc::new(Schema::new(vec![
716                Field::new("graph_commit_id", DataType::UInt64, false),
717                Field::new("manifest_branch", DataType::Utf8, true),
718                Field::new("manifest_version", DataType::UInt64, false),
719                Field::new("parent_commit_id", DataType::Utf8, true),
720                Field::new("merged_parent_commit_id", DataType::Utf8, true),
721                Field::new(
722                    "created_at",
723                    DataType::Timestamp(TimeUnit::Microsecond, None),
724                    false,
725                ),
726            ])),
727            vec![
728                Arc::new(UInt64Array::from(vec![1_u64])),
729                Arc::new(StringArray::from(vec![None::<&str>])),
730                Arc::new(UInt64Array::from(vec![1_u64])),
731                Arc::new(StringArray::from(vec![None::<&str>])),
732                Arc::new(StringArray::from(vec![None::<&str>])),
733                Arc::new(TimestampMicrosecondArray::from(vec![1_i64])),
734            ],
735        )
736        .unwrap();
737
738        let err = load_commits_from_batches(&[batch]).unwrap_err();
739        assert!(err.to_string().contains("graph_commit_id"));
740    }
741}