Skip to main content

omnigraph/db/
commit_graph.rs

1use std::collections::{HashMap, VecDeque};
2use std::sync::Arc;
3use std::time::{SystemTime, UNIX_EPOCH};
4
5use arrow_array::{
6    Array, RecordBatch, RecordBatchIterator, StringArray, TimestampMicrosecondArray, UInt64Array,
7};
8use arrow_schema::{DataType, Field, Schema, SchemaRef, TimeUnit};
9use futures::TryStreamExt;
10use lance::Dataset;
11use lance::dataset::{WriteMode, WriteParams};
12use lance_file::version::LanceFileVersion;
13
14use crate::error::{OmniError, Result};
15
16const GRAPH_COMMITS_DIR: &str = "_graph_commits.lance";
17const GRAPH_COMMIT_ACTORS_DIR: &str = "_graph_commit_actors.lance";
18
19#[derive(Debug, Clone)]
20pub struct GraphCommit {
21    pub graph_commit_id: String,
22    pub manifest_branch: Option<String>,
23    pub manifest_version: u64,
24    pub parent_commit_id: Option<String>,
25    pub merged_parent_commit_id: Option<String>,
26    pub actor_id: Option<String>,
27    pub created_at: i64,
28}
29
30pub struct CommitGraph {
31    root_uri: String,
32    dataset: Dataset,
33    actor_dataset: Option<Dataset>,
34    active_branch: Option<String>,
35    actor_by_commit_id: HashMap<String, String>,
36    commit_by_id: HashMap<String, GraphCommit>,
37    head_commit: Option<GraphCommit>,
38}
39
40impl CommitGraph {
41    pub async fn init(root_uri: &str, manifest_version: u64) -> Result<Self> {
42        let root = root_uri.trim_end_matches('/');
43        let uri = graph_commits_uri(root);
44        let genesis = GraphCommit {
45            graph_commit_id: ulid::Ulid::new().to_string(),
46            manifest_branch: None,
47            manifest_version,
48            parent_commit_id: None,
49            merged_parent_commit_id: None,
50            actor_id: None,
51            created_at: now_micros()?,
52        };
53
54        let batch = commits_to_batch(&[genesis.clone()])?;
55        let reader = RecordBatchIterator::new(vec![Ok(batch)], commit_graph_schema());
56        let params = WriteParams {
57            mode: WriteMode::Create,
58            enable_stable_row_ids: true,
59            data_storage_version: Some(LanceFileVersion::V2_2),
60            auto_cleanup: None,
61            skip_auto_cleanup: true,
62            ..Default::default()
63        };
64        let dataset = Dataset::write(reader, &uri as &str, Some(params))
65            .await
66            .map_err(|e| OmniError::Lance(e.to_string()))?;
67        let actor_dataset = create_commit_actor_dataset(root).await?;
68
69        Ok(Self {
70            root_uri: root.to_string(),
71            dataset,
72            actor_dataset: Some(actor_dataset),
73            active_branch: None,
74            actor_by_commit_id: HashMap::new(),
75            commit_by_id: HashMap::from([(genesis.graph_commit_id.clone(), genesis.clone())]),
76            head_commit: Some(genesis),
77        })
78    }
79
80    pub async fn open(root_uri: &str) -> Result<Self> {
81        let root = root_uri.trim_end_matches('/');
82        let dataset = Dataset::open(&graph_commits_uri(root))
83            .await
84            .map_err(|e| OmniError::Lance(e.to_string()))?;
85        let actor_dataset = Dataset::open(&graph_commit_actors_uri(root)).await.ok();
86        let actor_by_commit_id = match &actor_dataset {
87            Some(dataset) => load_commit_actor_cache(dataset).await?,
88            None => HashMap::new(),
89        };
90        let (commit_by_id, head_commit) = load_commit_cache(&dataset, &actor_by_commit_id).await?;
91        Ok(Self {
92            root_uri: root.to_string(),
93            dataset,
94            actor_dataset,
95            active_branch: None,
96            actor_by_commit_id,
97            commit_by_id,
98            head_commit,
99        })
100    }
101
102    pub async fn open_at_branch(root_uri: &str, branch: &str) -> Result<Self> {
103        let root = root_uri.trim_end_matches('/');
104        let dataset = Dataset::open(&graph_commits_uri(root))
105            .await
106            .map_err(|e| OmniError::Lance(e.to_string()))?;
107        let dataset = dataset
108            .checkout_branch(branch)
109            .await
110            .map_err(|e| OmniError::Lance(e.to_string()))?;
111        let actor_dataset = Dataset::open(&graph_commit_actors_uri(root)).await.ok();
112        let actor_by_commit_id = match &actor_dataset {
113            Some(dataset) => load_commit_actor_cache(dataset).await?,
114            None => HashMap::new(),
115        };
116        let (commit_by_id, head_commit) = load_commit_cache(&dataset, &actor_by_commit_id).await?;
117        Ok(Self {
118            root_uri: root.to_string(),
119            dataset,
120            actor_dataset,
121            active_branch: Some(branch.to_string()),
122            actor_by_commit_id,
123            commit_by_id,
124            head_commit,
125        })
126    }
127
128    pub async fn refresh(&mut self) -> Result<()> {
129        let root = self.root_uri.clone();
130        self.dataset = Dataset::open(&graph_commits_uri(&root))
131            .await
132            .map_err(|e| OmniError::Lance(e.to_string()))?;
133        if let Some(branch) = &self.active_branch {
134            self.dataset = self
135                .dataset
136                .checkout_branch(branch)
137                .await
138                .map_err(|e| OmniError::Lance(e.to_string()))?;
139        }
140        self.actor_dataset = Dataset::open(&graph_commit_actors_uri(&root)).await.ok();
141        self.actor_by_commit_id = match &self.actor_dataset {
142            Some(dataset) => load_commit_actor_cache(dataset).await?,
143            None => HashMap::new(),
144        };
145        let (commit_by_id, head_commit) =
146            load_commit_cache(&self.dataset, &self.actor_by_commit_id).await?;
147        self.commit_by_id = commit_by_id;
148        self.head_commit = head_commit;
149        Ok(())
150    }
151
152    pub fn version(&self) -> u64 {
153        self.dataset.version().version
154    }
155
156    pub async fn create_branch(&mut self, name: &str) -> Result<()> {
157        let mut ds = self.dataset.clone();
158        ds.create_branch(name, self.version(), None)
159            .await
160            .map_err(|e| OmniError::Lance(e.to_string()))?;
161        Ok(())
162    }
163
164    pub async fn delete_branch(&mut self, name: &str) -> Result<()> {
165        let mut ds = Dataset::open(&graph_commits_uri(&self.root_uri))
166            .await
167            .map_err(|e| OmniError::Lance(e.to_string()))?;
168        ds.delete_branch(name)
169            .await
170            .map_err(|e| OmniError::Lance(e.to_string()))?;
171        self.refresh().await
172    }
173
174    /// Idempotently drop the commit-graph branch `name`, tolerating an
175    /// already-absent branch (see [`TableStore::force_delete_branch`] for the
176    /// same semantics). Used by the best-effort reclaim in `branch_delete` and
177    /// the `cleanup` orphan reconciler. `RefConflict` (referencing descendants)
178    /// is still surfaced.
179    pub async fn force_delete_branch(&mut self, name: &str) -> Result<()> {
180        let mut ds = Dataset::open(&graph_commits_uri(&self.root_uri))
181            .await
182            .map_err(|e| OmniError::Lance(e.to_string()))?;
183        match ds.force_delete_branch(name).await {
184            Ok(()) => {}
185            Err(lance::Error::RefNotFound { .. }) | Err(lance::Error::NotFound { .. }) => {}
186            Err(e) => return Err(OmniError::Lance(e.to_string())),
187        }
188        self.refresh().await
189    }
190
191    /// List the named branches present on the commit-graph dataset. The
192    /// `cleanup` reconciler diffs this against the manifest branch set to find
193    /// orphaned commit-graph branches to reclaim.
194    pub async fn list_branches(&self) -> Result<Vec<String>> {
195        let ds = Dataset::open(&graph_commits_uri(&self.root_uri))
196            .await
197            .map_err(|e| OmniError::Lance(e.to_string()))?;
198        let branches = ds
199            .list_branches()
200            .await
201            .map_err(|e| OmniError::Lance(e.to_string()))?;
202        Ok(branches.into_keys().collect())
203    }
204
205    pub async fn append_commit(
206        &mut self,
207        manifest_branch: Option<&str>,
208        manifest_version: u64,
209        actor_id: Option<&str>,
210    ) -> Result<String> {
211        let parent_commit_id = self.head_commit_id().await?;
212        self.append_commit_with_parents(
213            manifest_branch,
214            manifest_version,
215            parent_commit_id.as_deref(),
216            None,
217            actor_id,
218        )
219        .await
220    }
221
222    pub async fn append_merge_commit(
223        &mut self,
224        manifest_branch: Option<&str>,
225        manifest_version: u64,
226        parent_commit_id: &str,
227        merged_parent_commit_id: &str,
228        actor_id: Option<&str>,
229    ) -> Result<String> {
230        self.append_commit_with_parents(
231            manifest_branch,
232            manifest_version,
233            Some(parent_commit_id),
234            Some(merged_parent_commit_id),
235            actor_id,
236        )
237        .await
238    }
239
240    async fn append_commit_with_parents(
241        &mut self,
242        manifest_branch: Option<&str>,
243        manifest_version: u64,
244        parent_commit_id: Option<&str>,
245        merged_parent_commit_id: Option<&str>,
246        actor_id: Option<&str>,
247    ) -> Result<String> {
248        let graph_commit_id = ulid::Ulid::new().to_string();
249        let commit = GraphCommit {
250            graph_commit_id: graph_commit_id.clone(),
251            manifest_branch: manifest_branch.map(|s| s.to_string()),
252            manifest_version,
253            parent_commit_id: parent_commit_id.map(|s| s.to_string()),
254            merged_parent_commit_id: merged_parent_commit_id.map(|s| s.to_string()),
255            actor_id: actor_id.map(str::to_string),
256            created_at: now_micros()?,
257        };
258
259        let batch = commits_to_batch(&[commit.clone()])?;
260        let reader = RecordBatchIterator::new(vec![Ok(batch)], commit_graph_schema());
261        let mut ds = self.dataset.clone();
262        ds.append(reader, None)
263            .await
264            .map_err(|e| OmniError::Lance(e.to_string()))?;
265        self.dataset = ds;
266        if let Some(actor_id) = actor_id {
267            self.append_actor(&graph_commit_id, actor_id).await?;
268        }
269        self.commit_by_id
270            .insert(graph_commit_id.clone(), commit.clone());
271        if should_replace_head(self.head_commit.as_ref(), &commit) {
272            self.head_commit = Some(commit);
273        }
274
275        Ok(graph_commit_id)
276    }
277
278    async fn append_actor(&mut self, graph_commit_id: &str, actor_id: &str) -> Result<()> {
279        if self
280            .actor_by_commit_id
281            .get(graph_commit_id)
282            .is_some_and(|existing| existing == actor_id)
283        {
284            return Ok(());
285        }
286
287        let record = CommitActorRecord {
288            graph_commit_id: graph_commit_id.to_string(),
289            actor_id: actor_id.to_string(),
290            created_at: now_micros()?,
291        };
292        let batch = commit_actors_to_batch(&[record])?;
293        let reader = RecordBatchIterator::new(vec![Ok(batch)], commit_actor_schema());
294        let mut dataset = match self.actor_dataset.take() {
295            Some(dataset) => dataset,
296            None => create_commit_actor_dataset(&self.root_uri).await?,
297        };
298        dataset
299            .append(reader, None)
300            .await
301            .map_err(|e| OmniError::Lance(e.to_string()))?;
302        self.actor_by_commit_id
303            .insert(graph_commit_id.to_string(), actor_id.to_string());
304        self.actor_dataset = Some(dataset);
305        Ok(())
306    }
307
308    pub async fn head_commit(&self) -> Result<Option<GraphCommit>> {
309        Ok(self.head_commit.clone())
310    }
311
312    pub async fn head_commit_id(&self) -> Result<Option<String>> {
313        Ok(self.head_commit().await?.map(|c| c.graph_commit_id))
314    }
315
316    pub async fn load_commits(&self) -> Result<Vec<GraphCommit>> {
317        let mut commits = self.commit_by_id.values().cloned().collect::<Vec<_>>();
318        commits.sort_by(|a, b| {
319            a.manifest_version
320                .cmp(&b.manifest_version)
321                .then_with(|| a.created_at.cmp(&b.created_at))
322                .then_with(|| a.graph_commit_id.cmp(&b.graph_commit_id))
323        });
324        Ok(commits)
325    }
326
327    pub fn get_commit(&self, commit_id: &str) -> Option<GraphCommit> {
328        self.commit_by_id.get(commit_id).cloned()
329    }
330
331    pub async fn merge_base(
332        root_uri: &str,
333        source_branch: Option<&str>,
334        target_branch: Option<&str>,
335    ) -> Result<Option<GraphCommit>> {
336        let source = open_for_branch(root_uri, source_branch).await?;
337        let target = open_for_branch(root_uri, target_branch).await?;
338
339        let source_head = match source.head_commit().await? {
340            Some(commit) => commit,
341            None => return Ok(None),
342        };
343        let target_head = match target.head_commit().await? {
344            Some(commit) => commit,
345            None => return Ok(None),
346        };
347
348        let mut commits = HashMap::new();
349        for commit in source.load_commits().await? {
350            commits.insert(commit.graph_commit_id.clone(), commit);
351        }
352        for commit in target.load_commits().await? {
353            commits.insert(commit.graph_commit_id.clone(), commit);
354        }
355
356        let source_distances = ancestor_distances(&source_head.graph_commit_id, &commits);
357        let target_distances = ancestor_distances(&target_head.graph_commit_id, &commits);
358
359        let best = source_distances
360            .iter()
361            .filter_map(|(id, source_distance)| {
362                target_distances.get(id).and_then(|target_distance| {
363                    commits.get(id).map(|commit| {
364                        (
365                            (
366                                *source_distance + *target_distance,
367                                u64::MAX - commit.manifest_version,
368                            ),
369                            commit.clone(),
370                        )
371                    })
372                })
373            })
374            .min_by_key(|(score, _)| *score)
375            .map(|(_, commit)| commit);
376
377        Ok(best)
378    }
379}
380
381pub(crate) fn graph_commits_uri(root_uri: &str) -> String {
382    format!("{}/{}", root_uri.trim_end_matches('/'), GRAPH_COMMITS_DIR)
383}
384
385fn graph_commit_actors_uri(root_uri: &str) -> String {
386    format!(
387        "{}/{}",
388        root_uri.trim_end_matches('/'),
389        GRAPH_COMMIT_ACTORS_DIR
390    )
391}
392
393fn commit_graph_schema() -> SchemaRef {
394    Arc::new(Schema::new(vec![
395        Field::new("graph_commit_id", DataType::Utf8, false),
396        Field::new("manifest_branch", DataType::Utf8, true),
397        Field::new("manifest_version", DataType::UInt64, false),
398        Field::new("parent_commit_id", DataType::Utf8, true),
399        Field::new("merged_parent_commit_id", DataType::Utf8, true),
400        Field::new(
401            "created_at",
402            DataType::Timestamp(TimeUnit::Microsecond, None),
403            false,
404        ),
405    ]))
406}
407
408fn commit_actor_schema() -> SchemaRef {
409    Arc::new(Schema::new(vec![
410        Field::new("graph_commit_id", DataType::Utf8, false),
411        Field::new("actor_id", DataType::Utf8, false),
412        Field::new(
413            "created_at",
414            DataType::Timestamp(TimeUnit::Microsecond, None),
415            false,
416        ),
417    ]))
418}
419
420#[derive(Debug, Clone)]
421struct CommitActorRecord {
422    graph_commit_id: String,
423    actor_id: String,
424    created_at: i64,
425}
426
427async fn create_commit_actor_dataset(root_uri: &str) -> Result<Dataset> {
428    let uri = graph_commit_actors_uri(root_uri);
429    let batch = RecordBatch::new_empty(commit_actor_schema());
430    let reader = RecordBatchIterator::new(vec![Ok(batch)], commit_actor_schema());
431    let params = WriteParams {
432        mode: WriteMode::Create,
433        enable_stable_row_ids: true,
434        data_storage_version: Some(LanceFileVersion::V2_2),
435        auto_cleanup: None,
436        skip_auto_cleanup: true,
437        ..Default::default()
438    };
439    match Dataset::write(reader, &uri as &str, Some(params)).await {
440        Ok(dataset) => Ok(dataset),
441        Err(err) if err.to_string().contains("Dataset already exists") => Dataset::open(&uri)
442            .await
443            .map_err(|open_err| OmniError::Lance(open_err.to_string())),
444        Err(err) => Err(OmniError::Lance(err.to_string())),
445    }
446}
447
448fn commits_to_batch(commits: &[GraphCommit]) -> Result<RecordBatch> {
449    let ids: Vec<&str> = commits.iter().map(|c| c.graph_commit_id.as_str()).collect();
450    let branches: Vec<Option<&str>> = commits
451        .iter()
452        .map(|c| c.manifest_branch.as_deref())
453        .collect();
454    let versions: Vec<u64> = commits.iter().map(|c| c.manifest_version).collect();
455    let parents: Vec<Option<&str>> = commits
456        .iter()
457        .map(|c| c.parent_commit_id.as_deref())
458        .collect();
459    let merged_parents: Vec<Option<&str>> = commits
460        .iter()
461        .map(|c| c.merged_parent_commit_id.as_deref())
462        .collect();
463    let created_at: Vec<i64> = commits.iter().map(|c| c.created_at).collect();
464
465    RecordBatch::try_new(
466        commit_graph_schema(),
467        vec![
468            Arc::new(StringArray::from(ids)),
469            Arc::new(StringArray::from(branches)),
470            Arc::new(UInt64Array::from(versions)),
471            Arc::new(StringArray::from(parents)),
472            Arc::new(StringArray::from(merged_parents)),
473            Arc::new(TimestampMicrosecondArray::from(created_at)),
474        ],
475    )
476    .map_err(|e| OmniError::Lance(e.to_string()))
477}
478
479async fn load_commit_cache(
480    dataset: &Dataset,
481    actor_by_commit_id: &HashMap<String, String>,
482) -> Result<(HashMap<String, GraphCommit>, Option<GraphCommit>)> {
483    let batches: Vec<RecordBatch> = dataset
484        .scan()
485        .try_into_stream()
486        .await
487        .map_err(|e| OmniError::Lance(e.to_string()))?
488        .try_collect()
489        .await
490        .map_err(|e| OmniError::Lance(e.to_string()))?;
491
492    let mut commits = load_commits_from_batches(&batches)?;
493    for commit in &mut commits {
494        commit.actor_id = actor_by_commit_id
495            .get(commit.graph_commit_id.as_str())
496            .cloned();
497    }
498    let mut commit_by_id = HashMap::with_capacity(commits.len());
499    let mut head_commit = None;
500    for commit in commits {
501        if should_replace_head(head_commit.as_ref(), &commit) {
502            head_commit = Some(commit.clone());
503        }
504        commit_by_id.insert(commit.graph_commit_id.clone(), commit);
505    }
506    Ok((commit_by_id, head_commit))
507}
508
509async fn load_commit_actor_cache(dataset: &Dataset) -> Result<HashMap<String, String>> {
510    let batches: Vec<RecordBatch> = dataset
511        .scan()
512        .try_into_stream()
513        .await
514        .map_err(|e| OmniError::Lance(e.to_string()))?
515        .try_collect()
516        .await
517        .map_err(|e| OmniError::Lance(e.to_string()))?;
518
519    let mut actors = HashMap::new();
520    for batch in batches {
521        let commit_ids = string_column(&batch, "graph_commit_id", "commit actor registry")?;
522        let actor_ids = string_column(&batch, "actor_id", "commit actor registry")?;
523        for row in 0..batch.num_rows() {
524            actors.insert(
525                commit_ids.value(row).to_string(),
526                actor_ids.value(row).to_string(),
527            );
528        }
529    }
530    Ok(actors)
531}
532
533fn load_commits_from_batches(batches: &[RecordBatch]) -> Result<Vec<GraphCommit>> {
534    let mut commits = Vec::new();
535    for batch in batches {
536        let ids = string_column(batch, "graph_commit_id", "commit graph")?;
537        let branches = string_column(batch, "manifest_branch", "commit graph")?;
538        let versions = u64_column(batch, "manifest_version", "commit graph")?;
539        let parents = string_column(batch, "parent_commit_id", "commit graph")?;
540        let merged_parents = string_column(batch, "merged_parent_commit_id", "commit graph")?;
541        let created = timestamp_micros_column(batch, "created_at", "commit graph")?;
542
543        for row in 0..batch.num_rows() {
544            commits.push(GraphCommit {
545                graph_commit_id: ids.value(row).to_string(),
546                manifest_branch: if branches.is_null(row) {
547                    None
548                } else {
549                    Some(branches.value(row).to_string())
550                },
551                manifest_version: versions.value(row),
552                parent_commit_id: if parents.is_null(row) {
553                    None
554                } else {
555                    Some(parents.value(row).to_string())
556                },
557                merged_parent_commit_id: if merged_parents.is_null(row) {
558                    None
559                } else {
560                    Some(merged_parents.value(row).to_string())
561                },
562                actor_id: None,
563                created_at: created.value(row),
564            });
565        }
566    }
567    Ok(commits)
568}
569
570fn commit_actors_to_batch(records: &[CommitActorRecord]) -> Result<RecordBatch> {
571    let commit_ids: Vec<&str> = records
572        .iter()
573        .map(|record| record.graph_commit_id.as_str())
574        .collect();
575    let actor_ids: Vec<&str> = records
576        .iter()
577        .map(|record| record.actor_id.as_str())
578        .collect();
579    let created_at: Vec<i64> = records.iter().map(|record| record.created_at).collect();
580
581    RecordBatch::try_new(
582        commit_actor_schema(),
583        vec![
584            Arc::new(StringArray::from(commit_ids)),
585            Arc::new(StringArray::from(actor_ids)),
586            Arc::new(TimestampMicrosecondArray::from(created_at)),
587        ],
588    )
589    .map_err(|e| OmniError::Lance(e.to_string()))
590}
591
592fn should_replace_head(current: Option<&GraphCommit>, candidate: &GraphCommit) -> bool {
593    current.is_none_or(|existing| {
594        candidate
595            .manifest_version
596            .cmp(&existing.manifest_version)
597            .then_with(|| candidate.created_at.cmp(&existing.created_at))
598            .then_with(|| candidate.graph_commit_id.cmp(&existing.graph_commit_id))
599            .is_gt()
600    })
601}
602
603fn string_column<'a>(batch: &'a RecordBatch, name: &str, context: &str) -> Result<&'a StringArray> {
604    batch
605        .column_by_name(name)
606        .ok_or_else(|| {
607            OmniError::manifest_internal(format!("{context} batch missing '{name}' column"))
608        })?
609        .as_any()
610        .downcast_ref::<StringArray>()
611        .ok_or_else(|| {
612            OmniError::manifest_internal(format!("{context} column '{name}' is not Utf8"))
613        })
614}
615
616fn u64_column<'a>(batch: &'a RecordBatch, name: &str, context: &str) -> Result<&'a UInt64Array> {
617    batch
618        .column_by_name(name)
619        .ok_or_else(|| {
620            OmniError::manifest_internal(format!("{context} batch missing '{name}' column"))
621        })?
622        .as_any()
623        .downcast_ref::<UInt64Array>()
624        .ok_or_else(|| {
625            OmniError::manifest_internal(format!("{context} column '{name}' is not UInt64"))
626        })
627}
628
629fn timestamp_micros_column<'a>(
630    batch: &'a RecordBatch,
631    name: &str,
632    context: &str,
633) -> Result<&'a TimestampMicrosecondArray> {
634    batch
635        .column_by_name(name)
636        .ok_or_else(|| {
637            OmniError::manifest_internal(format!("{context} batch missing '{name}' column"))
638        })?
639        .as_any()
640        .downcast_ref::<TimestampMicrosecondArray>()
641        .ok_or_else(|| {
642            OmniError::manifest_internal(format!(
643                "{context} column '{name}' is not Timestamp(Microsecond)"
644            ))
645        })
646}
647
648fn ancestor_distances(
649    start_id: &str,
650    commits: &HashMap<String, GraphCommit>,
651) -> HashMap<String, u64> {
652    let mut distances = HashMap::new();
653    let mut queue = VecDeque::from([(start_id.to_string(), 0u64)]);
654
655    while let Some((id, distance)) = queue.pop_front() {
656        if let Some(existing) = distances.get(&id) {
657            if *existing <= distance {
658                continue;
659            }
660        }
661        distances.insert(id.clone(), distance);
662
663        if let Some(commit) = commits.get(&id) {
664            if let Some(parent) = &commit.parent_commit_id {
665                queue.push_back((parent.clone(), distance + 1));
666            }
667            if let Some(parent) = &commit.merged_parent_commit_id {
668                queue.push_back((parent.clone(), distance + 1));
669            }
670        }
671    }
672
673    distances
674}
675
676async fn open_for_branch(root_uri: &str, branch: Option<&str>) -> Result<CommitGraph> {
677    match branch {
678        Some(branch) if branch != "main" => CommitGraph::open_at_branch(root_uri, branch).await,
679        _ => CommitGraph::open(root_uri).await,
680    }
681}
682
683fn now_micros() -> Result<i64> {
684    let duration = SystemTime::now()
685        .duration_since(UNIX_EPOCH)
686        .map_err(|e| OmniError::manifest(format!("system clock before UNIX_EPOCH: {}", e)))?;
687    Ok(duration.as_micros() as i64)
688}
689
690#[cfg(test)]
691mod tests {
692    use std::sync::Arc;
693
694    use arrow_schema::{DataType, Field, Schema};
695
696    use super::*;
697
698    #[test]
699    fn load_commits_from_batches_returns_error_for_bad_schema() {
700        let batch = RecordBatch::try_new(
701            Arc::new(Schema::new(vec![
702                Field::new("graph_commit_id", DataType::UInt64, false),
703                Field::new("manifest_branch", DataType::Utf8, true),
704                Field::new("manifest_version", DataType::UInt64, false),
705                Field::new("parent_commit_id", DataType::Utf8, true),
706                Field::new("merged_parent_commit_id", DataType::Utf8, true),
707                Field::new(
708                    "created_at",
709                    DataType::Timestamp(TimeUnit::Microsecond, None),
710                    false,
711                ),
712            ])),
713            vec![
714                Arc::new(UInt64Array::from(vec![1_u64])),
715                Arc::new(StringArray::from(vec![None::<&str>])),
716                Arc::new(UInt64Array::from(vec![1_u64])),
717                Arc::new(StringArray::from(vec![None::<&str>])),
718                Arc::new(StringArray::from(vec![None::<&str>])),
719                Arc::new(TimestampMicrosecondArray::from(vec![1_i64])),
720            ],
721        )
722        .unwrap();
723
724        let err = load_commits_from_batches(&[batch]).unwrap_err();
725        assert!(err.to_string().contains("graph_commit_id"));
726    }
727}