Skip to main content

omnigraph/db/
commit_graph.rs

1use std::collections::{HashMap, VecDeque};
2use std::sync::Arc;
3use std::time::{SystemTime, UNIX_EPOCH};
4
5use arrow_array::{
6    Array, RecordBatch, RecordBatchIterator, StringArray, TimestampMicrosecondArray, UInt64Array,
7};
8use arrow_schema::{DataType, Field, Schema, SchemaRef, TimeUnit};
9use futures::TryStreamExt;
10use lance::Dataset;
11use lance::dataset::{WriteMode, WriteParams};
12use lance_file::version::LanceFileVersion;
13
14use crate::error::{OmniError, Result};
15
16const GRAPH_COMMITS_DIR: &str = "_graph_commits.lance";
17const GRAPH_COMMIT_ACTORS_DIR: &str = "_graph_commit_actors.lance";
18
19#[derive(Debug, Clone)]
20pub struct GraphCommit {
21    pub graph_commit_id: String,
22    pub manifest_branch: Option<String>,
23    pub manifest_version: u64,
24    pub parent_commit_id: Option<String>,
25    pub merged_parent_commit_id: Option<String>,
26    pub actor_id: Option<String>,
27    pub created_at: i64,
28}
29
30pub struct CommitGraph {
31    root_uri: String,
32    dataset: Dataset,
33    actor_dataset: Option<Dataset>,
34    active_branch: Option<String>,
35    actor_by_commit_id: HashMap<String, String>,
36    commit_by_id: HashMap<String, GraphCommit>,
37    head_commit: Option<GraphCommit>,
38}
39
40impl CommitGraph {
41    pub async fn init(root_uri: &str, manifest_version: u64) -> Result<Self> {
42        let root = root_uri.trim_end_matches('/');
43        let uri = graph_commits_uri(root);
44        let genesis = GraphCommit {
45            graph_commit_id: ulid::Ulid::new().to_string(),
46            manifest_branch: None,
47            manifest_version,
48            parent_commit_id: None,
49            merged_parent_commit_id: None,
50            actor_id: None,
51            created_at: now_micros()?,
52        };
53
54        let batch = commits_to_batch(&[genesis.clone()])?;
55        let reader = RecordBatchIterator::new(vec![Ok(batch)], commit_graph_schema());
56        let params = WriteParams {
57            mode: WriteMode::Create,
58            enable_stable_row_ids: true,
59            data_storage_version: Some(LanceFileVersion::V2_2),
60            ..Default::default()
61        };
62        let dataset = Dataset::write(reader, &uri as &str, Some(params))
63            .await
64            .map_err(|e| OmniError::Lance(e.to_string()))?;
65        let actor_dataset = create_commit_actor_dataset(root).await?;
66
67        Ok(Self {
68            root_uri: root.to_string(),
69            dataset,
70            actor_dataset: Some(actor_dataset),
71            active_branch: None,
72            actor_by_commit_id: HashMap::new(),
73            commit_by_id: HashMap::from([(genesis.graph_commit_id.clone(), genesis.clone())]),
74            head_commit: Some(genesis),
75        })
76    }
77
78    pub async fn open(root_uri: &str) -> Result<Self> {
79        let root = root_uri.trim_end_matches('/');
80        let dataset = Dataset::open(&graph_commits_uri(root))
81            .await
82            .map_err(|e| OmniError::Lance(e.to_string()))?;
83        let actor_dataset = Dataset::open(&graph_commit_actors_uri(root)).await.ok();
84        let actor_by_commit_id = match &actor_dataset {
85            Some(dataset) => load_commit_actor_cache(dataset).await?,
86            None => HashMap::new(),
87        };
88        let (commit_by_id, head_commit) = load_commit_cache(&dataset, &actor_by_commit_id).await?;
89        Ok(Self {
90            root_uri: root.to_string(),
91            dataset,
92            actor_dataset,
93            active_branch: None,
94            actor_by_commit_id,
95            commit_by_id,
96            head_commit,
97        })
98    }
99
100    pub async fn open_at_branch(root_uri: &str, branch: &str) -> Result<Self> {
101        let root = root_uri.trim_end_matches('/');
102        let dataset = Dataset::open(&graph_commits_uri(root))
103            .await
104            .map_err(|e| OmniError::Lance(e.to_string()))?;
105        let dataset = dataset
106            .checkout_branch(branch)
107            .await
108            .map_err(|e| OmniError::Lance(e.to_string()))?;
109        let actor_dataset = Dataset::open(&graph_commit_actors_uri(root)).await.ok();
110        let actor_by_commit_id = match &actor_dataset {
111            Some(dataset) => load_commit_actor_cache(dataset).await?,
112            None => HashMap::new(),
113        };
114        let (commit_by_id, head_commit) = load_commit_cache(&dataset, &actor_by_commit_id).await?;
115        Ok(Self {
116            root_uri: root.to_string(),
117            dataset,
118            actor_dataset,
119            active_branch: Some(branch.to_string()),
120            actor_by_commit_id,
121            commit_by_id,
122            head_commit,
123        })
124    }
125
126    pub async fn refresh(&mut self) -> Result<()> {
127        let root = self.root_uri.clone();
128        self.dataset = Dataset::open(&graph_commits_uri(&root))
129            .await
130            .map_err(|e| OmniError::Lance(e.to_string()))?;
131        if let Some(branch) = &self.active_branch {
132            self.dataset = self
133                .dataset
134                .checkout_branch(branch)
135                .await
136                .map_err(|e| OmniError::Lance(e.to_string()))?;
137        }
138        self.actor_dataset = Dataset::open(&graph_commit_actors_uri(&root)).await.ok();
139        self.actor_by_commit_id = match &self.actor_dataset {
140            Some(dataset) => load_commit_actor_cache(dataset).await?,
141            None => HashMap::new(),
142        };
143        let (commit_by_id, head_commit) =
144            load_commit_cache(&self.dataset, &self.actor_by_commit_id).await?;
145        self.commit_by_id = commit_by_id;
146        self.head_commit = head_commit;
147        Ok(())
148    }
149
150    pub fn version(&self) -> u64 {
151        self.dataset.version().version
152    }
153
154    pub async fn create_branch(&mut self, name: &str) -> Result<()> {
155        let mut ds = self.dataset.clone();
156        ds.create_branch(name, self.version(), None)
157            .await
158            .map_err(|e| OmniError::Lance(e.to_string()))?;
159        Ok(())
160    }
161
162    pub async fn delete_branch(&mut self, name: &str) -> Result<()> {
163        let mut ds = Dataset::open(&graph_commits_uri(&self.root_uri))
164            .await
165            .map_err(|e| OmniError::Lance(e.to_string()))?;
166        ds.delete_branch(name)
167            .await
168            .map_err(|e| OmniError::Lance(e.to_string()))?;
169        self.refresh().await
170    }
171
172    /// Idempotently drop the commit-graph branch `name`, tolerating an
173    /// already-absent branch (see [`TableStore::force_delete_branch`] for the
174    /// same semantics). Used by the best-effort reclaim in `branch_delete` and
175    /// the `cleanup` orphan reconciler. `RefConflict` (referencing descendants)
176    /// is still surfaced.
177    pub async fn force_delete_branch(&mut self, name: &str) -> Result<()> {
178        let mut ds = Dataset::open(&graph_commits_uri(&self.root_uri))
179            .await
180            .map_err(|e| OmniError::Lance(e.to_string()))?;
181        match ds.force_delete_branch(name).await {
182            Ok(()) => {}
183            Err(lance::Error::RefNotFound { .. }) | Err(lance::Error::NotFound { .. }) => {}
184            Err(e) => return Err(OmniError::Lance(e.to_string())),
185        }
186        self.refresh().await
187    }
188
189    /// List the named branches present on the commit-graph dataset. The
190    /// `cleanup` reconciler diffs this against the manifest branch set to find
191    /// orphaned commit-graph branches to reclaim.
192    pub async fn list_branches(&self) -> Result<Vec<String>> {
193        let ds = Dataset::open(&graph_commits_uri(&self.root_uri))
194            .await
195            .map_err(|e| OmniError::Lance(e.to_string()))?;
196        let branches = ds
197            .list_branches()
198            .await
199            .map_err(|e| OmniError::Lance(e.to_string()))?;
200        Ok(branches.into_keys().collect())
201    }
202
203    pub async fn append_commit(
204        &mut self,
205        manifest_branch: Option<&str>,
206        manifest_version: u64,
207        actor_id: Option<&str>,
208    ) -> Result<String> {
209        let parent_commit_id = self.head_commit_id().await?;
210        self.append_commit_with_parents(
211            manifest_branch,
212            manifest_version,
213            parent_commit_id.as_deref(),
214            None,
215            actor_id,
216        )
217        .await
218    }
219
220    pub async fn append_merge_commit(
221        &mut self,
222        manifest_branch: Option<&str>,
223        manifest_version: u64,
224        parent_commit_id: &str,
225        merged_parent_commit_id: &str,
226        actor_id: Option<&str>,
227    ) -> Result<String> {
228        self.append_commit_with_parents(
229            manifest_branch,
230            manifest_version,
231            Some(parent_commit_id),
232            Some(merged_parent_commit_id),
233            actor_id,
234        )
235        .await
236    }
237
238    async fn append_commit_with_parents(
239        &mut self,
240        manifest_branch: Option<&str>,
241        manifest_version: u64,
242        parent_commit_id: Option<&str>,
243        merged_parent_commit_id: Option<&str>,
244        actor_id: Option<&str>,
245    ) -> Result<String> {
246        let graph_commit_id = ulid::Ulid::new().to_string();
247        let commit = GraphCommit {
248            graph_commit_id: graph_commit_id.clone(),
249            manifest_branch: manifest_branch.map(|s| s.to_string()),
250            manifest_version,
251            parent_commit_id: parent_commit_id.map(|s| s.to_string()),
252            merged_parent_commit_id: merged_parent_commit_id.map(|s| s.to_string()),
253            actor_id: actor_id.map(str::to_string),
254            created_at: now_micros()?,
255        };
256
257        let batch = commits_to_batch(&[commit.clone()])?;
258        let reader = RecordBatchIterator::new(vec![Ok(batch)], commit_graph_schema());
259        let mut ds = self.dataset.clone();
260        ds.append(reader, None)
261            .await
262            .map_err(|e| OmniError::Lance(e.to_string()))?;
263        self.dataset = ds;
264        if let Some(actor_id) = actor_id {
265            self.append_actor(&graph_commit_id, actor_id).await?;
266        }
267        self.commit_by_id
268            .insert(graph_commit_id.clone(), commit.clone());
269        if should_replace_head(self.head_commit.as_ref(), &commit) {
270            self.head_commit = Some(commit);
271        }
272
273        Ok(graph_commit_id)
274    }
275
276    async fn append_actor(&mut self, graph_commit_id: &str, actor_id: &str) -> Result<()> {
277        if self
278            .actor_by_commit_id
279            .get(graph_commit_id)
280            .is_some_and(|existing| existing == actor_id)
281        {
282            return Ok(());
283        }
284
285        let record = CommitActorRecord {
286            graph_commit_id: graph_commit_id.to_string(),
287            actor_id: actor_id.to_string(),
288            created_at: now_micros()?,
289        };
290        let batch = commit_actors_to_batch(&[record])?;
291        let reader = RecordBatchIterator::new(vec![Ok(batch)], commit_actor_schema());
292        let mut dataset = match self.actor_dataset.take() {
293            Some(dataset) => dataset,
294            None => create_commit_actor_dataset(&self.root_uri).await?,
295        };
296        dataset
297            .append(reader, None)
298            .await
299            .map_err(|e| OmniError::Lance(e.to_string()))?;
300        self.actor_by_commit_id
301            .insert(graph_commit_id.to_string(), actor_id.to_string());
302        self.actor_dataset = Some(dataset);
303        Ok(())
304    }
305
306    pub async fn head_commit(&self) -> Result<Option<GraphCommit>> {
307        Ok(self.head_commit.clone())
308    }
309
310    pub async fn head_commit_id(&self) -> Result<Option<String>> {
311        Ok(self.head_commit().await?.map(|c| c.graph_commit_id))
312    }
313
314    pub async fn load_commits(&self) -> Result<Vec<GraphCommit>> {
315        let mut commits = self.commit_by_id.values().cloned().collect::<Vec<_>>();
316        commits.sort_by(|a, b| {
317            a.manifest_version
318                .cmp(&b.manifest_version)
319                .then_with(|| a.created_at.cmp(&b.created_at))
320                .then_with(|| a.graph_commit_id.cmp(&b.graph_commit_id))
321        });
322        Ok(commits)
323    }
324
325    pub fn get_commit(&self, commit_id: &str) -> Option<GraphCommit> {
326        self.commit_by_id.get(commit_id).cloned()
327    }
328
329    pub async fn merge_base(
330        root_uri: &str,
331        source_branch: Option<&str>,
332        target_branch: Option<&str>,
333    ) -> Result<Option<GraphCommit>> {
334        let source = open_for_branch(root_uri, source_branch).await?;
335        let target = open_for_branch(root_uri, target_branch).await?;
336
337        let source_head = match source.head_commit().await? {
338            Some(commit) => commit,
339            None => return Ok(None),
340        };
341        let target_head = match target.head_commit().await? {
342            Some(commit) => commit,
343            None => return Ok(None),
344        };
345
346        let mut commits = HashMap::new();
347        for commit in source.load_commits().await? {
348            commits.insert(commit.graph_commit_id.clone(), commit);
349        }
350        for commit in target.load_commits().await? {
351            commits.insert(commit.graph_commit_id.clone(), commit);
352        }
353
354        let source_distances = ancestor_distances(&source_head.graph_commit_id, &commits);
355        let target_distances = ancestor_distances(&target_head.graph_commit_id, &commits);
356
357        let best = source_distances
358            .iter()
359            .filter_map(|(id, source_distance)| {
360                target_distances.get(id).and_then(|target_distance| {
361                    commits.get(id).map(|commit| {
362                        (
363                            (
364                                *source_distance + *target_distance,
365                                u64::MAX - commit.manifest_version,
366                            ),
367                            commit.clone(),
368                        )
369                    })
370                })
371            })
372            .min_by_key(|(score, _)| *score)
373            .map(|(_, commit)| commit);
374
375        Ok(best)
376    }
377}
378
379pub(crate) fn graph_commits_uri(root_uri: &str) -> String {
380    format!("{}/{}", root_uri.trim_end_matches('/'), GRAPH_COMMITS_DIR)
381}
382
383fn graph_commit_actors_uri(root_uri: &str) -> String {
384    format!(
385        "{}/{}",
386        root_uri.trim_end_matches('/'),
387        GRAPH_COMMIT_ACTORS_DIR
388    )
389}
390
391fn commit_graph_schema() -> SchemaRef {
392    Arc::new(Schema::new(vec![
393        Field::new("graph_commit_id", DataType::Utf8, false),
394        Field::new("manifest_branch", DataType::Utf8, true),
395        Field::new("manifest_version", DataType::UInt64, false),
396        Field::new("parent_commit_id", DataType::Utf8, true),
397        Field::new("merged_parent_commit_id", DataType::Utf8, true),
398        Field::new(
399            "created_at",
400            DataType::Timestamp(TimeUnit::Microsecond, None),
401            false,
402        ),
403    ]))
404}
405
406fn commit_actor_schema() -> SchemaRef {
407    Arc::new(Schema::new(vec![
408        Field::new("graph_commit_id", DataType::Utf8, false),
409        Field::new("actor_id", DataType::Utf8, false),
410        Field::new(
411            "created_at",
412            DataType::Timestamp(TimeUnit::Microsecond, None),
413            false,
414        ),
415    ]))
416}
417
418#[derive(Debug, Clone)]
419struct CommitActorRecord {
420    graph_commit_id: String,
421    actor_id: String,
422    created_at: i64,
423}
424
425async fn create_commit_actor_dataset(root_uri: &str) -> Result<Dataset> {
426    let uri = graph_commit_actors_uri(root_uri);
427    let batch = RecordBatch::new_empty(commit_actor_schema());
428    let reader = RecordBatchIterator::new(vec![Ok(batch)], commit_actor_schema());
429    let params = WriteParams {
430        mode: WriteMode::Create,
431        enable_stable_row_ids: true,
432        data_storage_version: Some(LanceFileVersion::V2_2),
433        ..Default::default()
434    };
435    match Dataset::write(reader, &uri as &str, Some(params)).await {
436        Ok(dataset) => Ok(dataset),
437        Err(err) if err.to_string().contains("Dataset already exists") => Dataset::open(&uri)
438            .await
439            .map_err(|open_err| OmniError::Lance(open_err.to_string())),
440        Err(err) => Err(OmniError::Lance(err.to_string())),
441    }
442}
443
444fn commits_to_batch(commits: &[GraphCommit]) -> Result<RecordBatch> {
445    let ids: Vec<&str> = commits.iter().map(|c| c.graph_commit_id.as_str()).collect();
446    let branches: Vec<Option<&str>> = commits
447        .iter()
448        .map(|c| c.manifest_branch.as_deref())
449        .collect();
450    let versions: Vec<u64> = commits.iter().map(|c| c.manifest_version).collect();
451    let parents: Vec<Option<&str>> = commits
452        .iter()
453        .map(|c| c.parent_commit_id.as_deref())
454        .collect();
455    let merged_parents: Vec<Option<&str>> = commits
456        .iter()
457        .map(|c| c.merged_parent_commit_id.as_deref())
458        .collect();
459    let created_at: Vec<i64> = commits.iter().map(|c| c.created_at).collect();
460
461    RecordBatch::try_new(
462        commit_graph_schema(),
463        vec![
464            Arc::new(StringArray::from(ids)),
465            Arc::new(StringArray::from(branches)),
466            Arc::new(UInt64Array::from(versions)),
467            Arc::new(StringArray::from(parents)),
468            Arc::new(StringArray::from(merged_parents)),
469            Arc::new(TimestampMicrosecondArray::from(created_at)),
470        ],
471    )
472    .map_err(|e| OmniError::Lance(e.to_string()))
473}
474
475async fn load_commit_cache(
476    dataset: &Dataset,
477    actor_by_commit_id: &HashMap<String, String>,
478) -> Result<(HashMap<String, GraphCommit>, Option<GraphCommit>)> {
479    let batches: Vec<RecordBatch> = dataset
480        .scan()
481        .try_into_stream()
482        .await
483        .map_err(|e| OmniError::Lance(e.to_string()))?
484        .try_collect()
485        .await
486        .map_err(|e| OmniError::Lance(e.to_string()))?;
487
488    let mut commits = load_commits_from_batches(&batches)?;
489    for commit in &mut commits {
490        commit.actor_id = actor_by_commit_id
491            .get(commit.graph_commit_id.as_str())
492            .cloned();
493    }
494    let mut commit_by_id = HashMap::with_capacity(commits.len());
495    let mut head_commit = None;
496    for commit in commits {
497        if should_replace_head(head_commit.as_ref(), &commit) {
498            head_commit = Some(commit.clone());
499        }
500        commit_by_id.insert(commit.graph_commit_id.clone(), commit);
501    }
502    Ok((commit_by_id, head_commit))
503}
504
505async fn load_commit_actor_cache(dataset: &Dataset) -> Result<HashMap<String, String>> {
506    let batches: Vec<RecordBatch> = dataset
507        .scan()
508        .try_into_stream()
509        .await
510        .map_err(|e| OmniError::Lance(e.to_string()))?
511        .try_collect()
512        .await
513        .map_err(|e| OmniError::Lance(e.to_string()))?;
514
515    let mut actors = HashMap::new();
516    for batch in batches {
517        let commit_ids = string_column(&batch, "graph_commit_id", "commit actor registry")?;
518        let actor_ids = string_column(&batch, "actor_id", "commit actor registry")?;
519        for row in 0..batch.num_rows() {
520            actors.insert(
521                commit_ids.value(row).to_string(),
522                actor_ids.value(row).to_string(),
523            );
524        }
525    }
526    Ok(actors)
527}
528
529fn load_commits_from_batches(batches: &[RecordBatch]) -> Result<Vec<GraphCommit>> {
530    let mut commits = Vec::new();
531    for batch in batches {
532        let ids = string_column(batch, "graph_commit_id", "commit graph")?;
533        let branches = string_column(batch, "manifest_branch", "commit graph")?;
534        let versions = u64_column(batch, "manifest_version", "commit graph")?;
535        let parents = string_column(batch, "parent_commit_id", "commit graph")?;
536        let merged_parents = string_column(batch, "merged_parent_commit_id", "commit graph")?;
537        let created = timestamp_micros_column(batch, "created_at", "commit graph")?;
538
539        for row in 0..batch.num_rows() {
540            commits.push(GraphCommit {
541                graph_commit_id: ids.value(row).to_string(),
542                manifest_branch: if branches.is_null(row) {
543                    None
544                } else {
545                    Some(branches.value(row).to_string())
546                },
547                manifest_version: versions.value(row),
548                parent_commit_id: if parents.is_null(row) {
549                    None
550                } else {
551                    Some(parents.value(row).to_string())
552                },
553                merged_parent_commit_id: if merged_parents.is_null(row) {
554                    None
555                } else {
556                    Some(merged_parents.value(row).to_string())
557                },
558                actor_id: None,
559                created_at: created.value(row),
560            });
561        }
562    }
563    Ok(commits)
564}
565
566fn commit_actors_to_batch(records: &[CommitActorRecord]) -> Result<RecordBatch> {
567    let commit_ids: Vec<&str> = records
568        .iter()
569        .map(|record| record.graph_commit_id.as_str())
570        .collect();
571    let actor_ids: Vec<&str> = records
572        .iter()
573        .map(|record| record.actor_id.as_str())
574        .collect();
575    let created_at: Vec<i64> = records.iter().map(|record| record.created_at).collect();
576
577    RecordBatch::try_new(
578        commit_actor_schema(),
579        vec![
580            Arc::new(StringArray::from(commit_ids)),
581            Arc::new(StringArray::from(actor_ids)),
582            Arc::new(TimestampMicrosecondArray::from(created_at)),
583        ],
584    )
585    .map_err(|e| OmniError::Lance(e.to_string()))
586}
587
588fn should_replace_head(current: Option<&GraphCommit>, candidate: &GraphCommit) -> bool {
589    current.is_none_or(|existing| {
590        candidate
591            .manifest_version
592            .cmp(&existing.manifest_version)
593            .then_with(|| candidate.created_at.cmp(&existing.created_at))
594            .then_with(|| candidate.graph_commit_id.cmp(&existing.graph_commit_id))
595            .is_gt()
596    })
597}
598
599fn string_column<'a>(batch: &'a RecordBatch, name: &str, context: &str) -> Result<&'a StringArray> {
600    batch
601        .column_by_name(name)
602        .ok_or_else(|| {
603            OmniError::manifest_internal(format!("{context} batch missing '{name}' column"))
604        })?
605        .as_any()
606        .downcast_ref::<StringArray>()
607        .ok_or_else(|| {
608            OmniError::manifest_internal(format!("{context} column '{name}' is not Utf8"))
609        })
610}
611
612fn u64_column<'a>(batch: &'a RecordBatch, name: &str, context: &str) -> Result<&'a UInt64Array> {
613    batch
614        .column_by_name(name)
615        .ok_or_else(|| {
616            OmniError::manifest_internal(format!("{context} batch missing '{name}' column"))
617        })?
618        .as_any()
619        .downcast_ref::<UInt64Array>()
620        .ok_or_else(|| {
621            OmniError::manifest_internal(format!("{context} column '{name}' is not UInt64"))
622        })
623}
624
625fn timestamp_micros_column<'a>(
626    batch: &'a RecordBatch,
627    name: &str,
628    context: &str,
629) -> Result<&'a TimestampMicrosecondArray> {
630    batch
631        .column_by_name(name)
632        .ok_or_else(|| {
633            OmniError::manifest_internal(format!("{context} batch missing '{name}' column"))
634        })?
635        .as_any()
636        .downcast_ref::<TimestampMicrosecondArray>()
637        .ok_or_else(|| {
638            OmniError::manifest_internal(format!(
639                "{context} column '{name}' is not Timestamp(Microsecond)"
640            ))
641        })
642}
643
644fn ancestor_distances(
645    start_id: &str,
646    commits: &HashMap<String, GraphCommit>,
647) -> HashMap<String, u64> {
648    let mut distances = HashMap::new();
649    let mut queue = VecDeque::from([(start_id.to_string(), 0u64)]);
650
651    while let Some((id, distance)) = queue.pop_front() {
652        if let Some(existing) = distances.get(&id) {
653            if *existing <= distance {
654                continue;
655            }
656        }
657        distances.insert(id.clone(), distance);
658
659        if let Some(commit) = commits.get(&id) {
660            if let Some(parent) = &commit.parent_commit_id {
661                queue.push_back((parent.clone(), distance + 1));
662            }
663            if let Some(parent) = &commit.merged_parent_commit_id {
664                queue.push_back((parent.clone(), distance + 1));
665            }
666        }
667    }
668
669    distances
670}
671
672async fn open_for_branch(root_uri: &str, branch: Option<&str>) -> Result<CommitGraph> {
673    match branch {
674        Some(branch) if branch != "main" => CommitGraph::open_at_branch(root_uri, branch).await,
675        _ => CommitGraph::open(root_uri).await,
676    }
677}
678
679fn now_micros() -> Result<i64> {
680    let duration = SystemTime::now()
681        .duration_since(UNIX_EPOCH)
682        .map_err(|e| OmniError::manifest(format!("system clock before UNIX_EPOCH: {}", e)))?;
683    Ok(duration.as_micros() as i64)
684}
685
686#[cfg(test)]
687mod tests {
688    use std::sync::Arc;
689
690    use arrow_schema::{DataType, Field, Schema};
691
692    use super::*;
693
694    #[test]
695    fn load_commits_from_batches_returns_error_for_bad_schema() {
696        let batch = RecordBatch::try_new(
697            Arc::new(Schema::new(vec![
698                Field::new("graph_commit_id", DataType::UInt64, false),
699                Field::new("manifest_branch", DataType::Utf8, true),
700                Field::new("manifest_version", DataType::UInt64, false),
701                Field::new("parent_commit_id", DataType::Utf8, true),
702                Field::new("merged_parent_commit_id", DataType::Utf8, true),
703                Field::new(
704                    "created_at",
705                    DataType::Timestamp(TimeUnit::Microsecond, None),
706                    false,
707                ),
708            ])),
709            vec![
710                Arc::new(UInt64Array::from(vec![1_u64])),
711                Arc::new(StringArray::from(vec![None::<&str>])),
712                Arc::new(UInt64Array::from(vec![1_u64])),
713                Arc::new(StringArray::from(vec![None::<&str>])),
714                Arc::new(StringArray::from(vec![None::<&str>])),
715                Arc::new(TimestampMicrosecondArray::from(vec![1_i64])),
716            ],
717        )
718        .unwrap();
719
720        let err = load_commits_from_batches(&[batch]).unwrap_err();
721        assert!(err.to_string().contains("graph_commit_id"));
722    }
723}