Skip to main content

omnigraph/db/
commit_graph.rs

1use std::collections::{HashMap, VecDeque};
2use std::sync::Arc;
3use std::time::{SystemTime, UNIX_EPOCH};
4
5use arrow_array::{
6    Array, RecordBatch, RecordBatchIterator, StringArray, TimestampMicrosecondArray, UInt64Array,
7};
8use arrow_schema::{DataType, Field, Schema, SchemaRef, TimeUnit};
9use futures::TryStreamExt;
10use lance::Dataset;
11use lance::dataset::{WriteMode, WriteParams};
12use lance_file::version::LanceFileVersion;
13
14use crate::error::{OmniError, Result};
15
16const GRAPH_COMMITS_DIR: &str = "_graph_commits.lance";
17const GRAPH_COMMIT_ACTORS_DIR: &str = "_graph_commit_actors.lance";
18
19#[derive(Debug, Clone)]
20pub struct GraphCommit {
21    pub graph_commit_id: String,
22    pub manifest_branch: Option<String>,
23    pub manifest_version: u64,
24    pub parent_commit_id: Option<String>,
25    pub merged_parent_commit_id: Option<String>,
26    pub actor_id: Option<String>,
27    pub created_at: i64,
28}
29
30pub struct CommitGraph {
31    root_uri: String,
32    dataset: Dataset,
33    actor_dataset: Option<Dataset>,
34    active_branch: Option<String>,
35    actor_by_commit_id: HashMap<String, String>,
36    commit_by_id: HashMap<String, GraphCommit>,
37    head_commit: Option<GraphCommit>,
38}
39
40impl CommitGraph {
41    pub async fn init(root_uri: &str, manifest_version: u64) -> Result<Self> {
42        let root = root_uri.trim_end_matches('/');
43        let uri = graph_commits_uri(root);
44        let genesis = GraphCommit {
45            graph_commit_id: ulid::Ulid::new().to_string(),
46            manifest_branch: None,
47            manifest_version,
48            parent_commit_id: None,
49            merged_parent_commit_id: None,
50            actor_id: None,
51            created_at: now_micros()?,
52        };
53
54        let batch = commits_to_batch(&[genesis.clone()])?;
55        let reader = RecordBatchIterator::new(vec![Ok(batch)], commit_graph_schema());
56        let params = WriteParams {
57            mode: WriteMode::Create,
58            enable_stable_row_ids: true,
59            data_storage_version: Some(LanceFileVersion::V2_2),
60            ..Default::default()
61        };
62        let dataset = Dataset::write(reader, &uri as &str, Some(params))
63            .await
64            .map_err(|e| OmniError::Lance(e.to_string()))?;
65        let actor_dataset = create_commit_actor_dataset(root).await?;
66
67        Ok(Self {
68            root_uri: root.to_string(),
69            dataset,
70            actor_dataset: Some(actor_dataset),
71            active_branch: None,
72            actor_by_commit_id: HashMap::new(),
73            commit_by_id: HashMap::from([(genesis.graph_commit_id.clone(), genesis.clone())]),
74            head_commit: Some(genesis),
75        })
76    }
77
78    pub async fn open(root_uri: &str) -> Result<Self> {
79        let root = root_uri.trim_end_matches('/');
80        let dataset = Dataset::open(&graph_commits_uri(root))
81            .await
82            .map_err(|e| OmniError::Lance(e.to_string()))?;
83        let actor_dataset = Dataset::open(&graph_commit_actors_uri(root)).await.ok();
84        let actor_by_commit_id = match &actor_dataset {
85            Some(dataset) => load_commit_actor_cache(dataset).await?,
86            None => HashMap::new(),
87        };
88        let (commit_by_id, head_commit) = load_commit_cache(&dataset, &actor_by_commit_id).await?;
89        Ok(Self {
90            root_uri: root.to_string(),
91            dataset,
92            actor_dataset,
93            active_branch: None,
94            actor_by_commit_id,
95            commit_by_id,
96            head_commit,
97        })
98    }
99
100    pub async fn open_at_branch(root_uri: &str, branch: &str) -> Result<Self> {
101        let root = root_uri.trim_end_matches('/');
102        let dataset = Dataset::open(&graph_commits_uri(root))
103            .await
104            .map_err(|e| OmniError::Lance(e.to_string()))?;
105        let dataset = dataset
106            .checkout_branch(branch)
107            .await
108            .map_err(|e| OmniError::Lance(e.to_string()))?;
109        let actor_dataset = Dataset::open(&graph_commit_actors_uri(root)).await.ok();
110        let actor_by_commit_id = match &actor_dataset {
111            Some(dataset) => load_commit_actor_cache(dataset).await?,
112            None => HashMap::new(),
113        };
114        let (commit_by_id, head_commit) = load_commit_cache(&dataset, &actor_by_commit_id).await?;
115        Ok(Self {
116            root_uri: root.to_string(),
117            dataset,
118            actor_dataset,
119            active_branch: Some(branch.to_string()),
120            actor_by_commit_id,
121            commit_by_id,
122            head_commit,
123        })
124    }
125
126    pub async fn refresh(&mut self) -> Result<()> {
127        let root = self.root_uri.clone();
128        self.dataset = Dataset::open(&graph_commits_uri(&root))
129            .await
130            .map_err(|e| OmniError::Lance(e.to_string()))?;
131        if let Some(branch) = &self.active_branch {
132            self.dataset = self
133                .dataset
134                .checkout_branch(branch)
135                .await
136                .map_err(|e| OmniError::Lance(e.to_string()))?;
137        }
138        self.actor_dataset = Dataset::open(&graph_commit_actors_uri(&root)).await.ok();
139        self.actor_by_commit_id = match &self.actor_dataset {
140            Some(dataset) => load_commit_actor_cache(dataset).await?,
141            None => HashMap::new(),
142        };
143        let (commit_by_id, head_commit) =
144            load_commit_cache(&self.dataset, &self.actor_by_commit_id).await?;
145        self.commit_by_id = commit_by_id;
146        self.head_commit = head_commit;
147        Ok(())
148    }
149
150    pub fn version(&self) -> u64 {
151        self.dataset.version().version
152    }
153
154    pub async fn create_branch(&mut self, name: &str) -> Result<()> {
155        let mut ds = self.dataset.clone();
156        ds.create_branch(name, self.version(), None)
157            .await
158            .map_err(|e| OmniError::Lance(e.to_string()))?;
159        Ok(())
160    }
161
162    pub async fn delete_branch(&mut self, name: &str) -> Result<()> {
163        let mut ds = Dataset::open(&graph_commits_uri(&self.root_uri))
164            .await
165            .map_err(|e| OmniError::Lance(e.to_string()))?;
166        ds.delete_branch(name)
167            .await
168            .map_err(|e| OmniError::Lance(e.to_string()))?;
169        self.refresh().await
170    }
171
172    pub async fn append_commit(
173        &mut self,
174        manifest_branch: Option<&str>,
175        manifest_version: u64,
176        actor_id: Option<&str>,
177    ) -> Result<String> {
178        let parent_commit_id = self.head_commit_id().await?;
179        self.append_commit_with_parents(
180            manifest_branch,
181            manifest_version,
182            parent_commit_id.as_deref(),
183            None,
184            actor_id,
185        )
186        .await
187    }
188
189    pub async fn append_merge_commit(
190        &mut self,
191        manifest_branch: Option<&str>,
192        manifest_version: u64,
193        parent_commit_id: &str,
194        merged_parent_commit_id: &str,
195        actor_id: Option<&str>,
196    ) -> Result<String> {
197        self.append_commit_with_parents(
198            manifest_branch,
199            manifest_version,
200            Some(parent_commit_id),
201            Some(merged_parent_commit_id),
202            actor_id,
203        )
204        .await
205    }
206
207    async fn append_commit_with_parents(
208        &mut self,
209        manifest_branch: Option<&str>,
210        manifest_version: u64,
211        parent_commit_id: Option<&str>,
212        merged_parent_commit_id: Option<&str>,
213        actor_id: Option<&str>,
214    ) -> Result<String> {
215        let graph_commit_id = ulid::Ulid::new().to_string();
216        let commit = GraphCommit {
217            graph_commit_id: graph_commit_id.clone(),
218            manifest_branch: manifest_branch.map(|s| s.to_string()),
219            manifest_version,
220            parent_commit_id: parent_commit_id.map(|s| s.to_string()),
221            merged_parent_commit_id: merged_parent_commit_id.map(|s| s.to_string()),
222            actor_id: actor_id.map(str::to_string),
223            created_at: now_micros()?,
224        };
225
226        let batch = commits_to_batch(&[commit.clone()])?;
227        let reader = RecordBatchIterator::new(vec![Ok(batch)], commit_graph_schema());
228        let mut ds = self.dataset.clone();
229        ds.append(reader, None)
230            .await
231            .map_err(|e| OmniError::Lance(e.to_string()))?;
232        self.dataset = ds;
233        if let Some(actor_id) = actor_id {
234            self.append_actor(&graph_commit_id, actor_id).await?;
235        }
236        self.commit_by_id
237            .insert(graph_commit_id.clone(), commit.clone());
238        if should_replace_head(self.head_commit.as_ref(), &commit) {
239            self.head_commit = Some(commit);
240        }
241
242        Ok(graph_commit_id)
243    }
244
245    async fn append_actor(&mut self, graph_commit_id: &str, actor_id: &str) -> Result<()> {
246        if self
247            .actor_by_commit_id
248            .get(graph_commit_id)
249            .is_some_and(|existing| existing == actor_id)
250        {
251            return Ok(());
252        }
253
254        let record = CommitActorRecord {
255            graph_commit_id: graph_commit_id.to_string(),
256            actor_id: actor_id.to_string(),
257            created_at: now_micros()?,
258        };
259        let batch = commit_actors_to_batch(&[record])?;
260        let reader = RecordBatchIterator::new(vec![Ok(batch)], commit_actor_schema());
261        let mut dataset = match self.actor_dataset.take() {
262            Some(dataset) => dataset,
263            None => create_commit_actor_dataset(&self.root_uri).await?,
264        };
265        dataset
266            .append(reader, None)
267            .await
268            .map_err(|e| OmniError::Lance(e.to_string()))?;
269        self.actor_by_commit_id
270            .insert(graph_commit_id.to_string(), actor_id.to_string());
271        self.actor_dataset = Some(dataset);
272        Ok(())
273    }
274
275    pub async fn head_commit(&self) -> Result<Option<GraphCommit>> {
276        Ok(self.head_commit.clone())
277    }
278
279    pub async fn head_commit_id(&self) -> Result<Option<String>> {
280        Ok(self.head_commit().await?.map(|c| c.graph_commit_id))
281    }
282
283    pub async fn load_commits(&self) -> Result<Vec<GraphCommit>> {
284        let mut commits = self.commit_by_id.values().cloned().collect::<Vec<_>>();
285        commits.sort_by(|a, b| {
286            a.manifest_version
287                .cmp(&b.manifest_version)
288                .then_with(|| a.created_at.cmp(&b.created_at))
289                .then_with(|| a.graph_commit_id.cmp(&b.graph_commit_id))
290        });
291        Ok(commits)
292    }
293
294    pub fn get_commit(&self, commit_id: &str) -> Option<GraphCommit> {
295        self.commit_by_id.get(commit_id).cloned()
296    }
297
298    pub async fn merge_base(
299        root_uri: &str,
300        source_branch: Option<&str>,
301        target_branch: Option<&str>,
302    ) -> Result<Option<GraphCommit>> {
303        let source = open_for_branch(root_uri, source_branch).await?;
304        let target = open_for_branch(root_uri, target_branch).await?;
305
306        let source_head = match source.head_commit().await? {
307            Some(commit) => commit,
308            None => return Ok(None),
309        };
310        let target_head = match target.head_commit().await? {
311            Some(commit) => commit,
312            None => return Ok(None),
313        };
314
315        let mut commits = HashMap::new();
316        for commit in source.load_commits().await? {
317            commits.insert(commit.graph_commit_id.clone(), commit);
318        }
319        for commit in target.load_commits().await? {
320            commits.insert(commit.graph_commit_id.clone(), commit);
321        }
322
323        let source_distances = ancestor_distances(&source_head.graph_commit_id, &commits);
324        let target_distances = ancestor_distances(&target_head.graph_commit_id, &commits);
325
326        let best = source_distances
327            .iter()
328            .filter_map(|(id, source_distance)| {
329                target_distances.get(id).and_then(|target_distance| {
330                    commits.get(id).map(|commit| {
331                        (
332                            (
333                                *source_distance + *target_distance,
334                                u64::MAX - commit.manifest_version,
335                            ),
336                            commit.clone(),
337                        )
338                    })
339                })
340            })
341            .min_by_key(|(score, _)| *score)
342            .map(|(_, commit)| commit);
343
344        Ok(best)
345    }
346}
347
348fn graph_commits_uri(root_uri: &str) -> String {
349    format!("{}/{}", root_uri.trim_end_matches('/'), GRAPH_COMMITS_DIR)
350}
351
352fn graph_commit_actors_uri(root_uri: &str) -> String {
353    format!(
354        "{}/{}",
355        root_uri.trim_end_matches('/'),
356        GRAPH_COMMIT_ACTORS_DIR
357    )
358}
359
360fn commit_graph_schema() -> SchemaRef {
361    Arc::new(Schema::new(vec![
362        Field::new("graph_commit_id", DataType::Utf8, false),
363        Field::new("manifest_branch", DataType::Utf8, true),
364        Field::new("manifest_version", DataType::UInt64, false),
365        Field::new("parent_commit_id", DataType::Utf8, true),
366        Field::new("merged_parent_commit_id", DataType::Utf8, true),
367        Field::new(
368            "created_at",
369            DataType::Timestamp(TimeUnit::Microsecond, None),
370            false,
371        ),
372    ]))
373}
374
375fn commit_actor_schema() -> SchemaRef {
376    Arc::new(Schema::new(vec![
377        Field::new("graph_commit_id", DataType::Utf8, false),
378        Field::new("actor_id", DataType::Utf8, false),
379        Field::new(
380            "created_at",
381            DataType::Timestamp(TimeUnit::Microsecond, None),
382            false,
383        ),
384    ]))
385}
386
387#[derive(Debug, Clone)]
388struct CommitActorRecord {
389    graph_commit_id: String,
390    actor_id: String,
391    created_at: i64,
392}
393
394async fn create_commit_actor_dataset(root_uri: &str) -> Result<Dataset> {
395    let uri = graph_commit_actors_uri(root_uri);
396    let batch = RecordBatch::new_empty(commit_actor_schema());
397    let reader = RecordBatchIterator::new(vec![Ok(batch)], commit_actor_schema());
398    let params = WriteParams {
399        mode: WriteMode::Create,
400        enable_stable_row_ids: true,
401        data_storage_version: Some(LanceFileVersion::V2_2),
402        ..Default::default()
403    };
404    match Dataset::write(reader, &uri as &str, Some(params)).await {
405        Ok(dataset) => Ok(dataset),
406        Err(err) if err.to_string().contains("Dataset already exists") => Dataset::open(&uri)
407            .await
408            .map_err(|open_err| OmniError::Lance(open_err.to_string())),
409        Err(err) => Err(OmniError::Lance(err.to_string())),
410    }
411}
412
413fn commits_to_batch(commits: &[GraphCommit]) -> Result<RecordBatch> {
414    let ids: Vec<&str> = commits.iter().map(|c| c.graph_commit_id.as_str()).collect();
415    let branches: Vec<Option<&str>> = commits
416        .iter()
417        .map(|c| c.manifest_branch.as_deref())
418        .collect();
419    let versions: Vec<u64> = commits.iter().map(|c| c.manifest_version).collect();
420    let parents: Vec<Option<&str>> = commits
421        .iter()
422        .map(|c| c.parent_commit_id.as_deref())
423        .collect();
424    let merged_parents: Vec<Option<&str>> = commits
425        .iter()
426        .map(|c| c.merged_parent_commit_id.as_deref())
427        .collect();
428    let created_at: Vec<i64> = commits.iter().map(|c| c.created_at).collect();
429
430    RecordBatch::try_new(
431        commit_graph_schema(),
432        vec![
433            Arc::new(StringArray::from(ids)),
434            Arc::new(StringArray::from(branches)),
435            Arc::new(UInt64Array::from(versions)),
436            Arc::new(StringArray::from(parents)),
437            Arc::new(StringArray::from(merged_parents)),
438            Arc::new(TimestampMicrosecondArray::from(created_at)),
439        ],
440    )
441    .map_err(|e| OmniError::Lance(e.to_string()))
442}
443
444async fn load_commit_cache(
445    dataset: &Dataset,
446    actor_by_commit_id: &HashMap<String, String>,
447) -> Result<(HashMap<String, GraphCommit>, Option<GraphCommit>)> {
448    let batches: Vec<RecordBatch> = dataset
449        .scan()
450        .try_into_stream()
451        .await
452        .map_err(|e| OmniError::Lance(e.to_string()))?
453        .try_collect()
454        .await
455        .map_err(|e| OmniError::Lance(e.to_string()))?;
456
457    let mut commits = load_commits_from_batches(&batches)?;
458    for commit in &mut commits {
459        commit.actor_id = actor_by_commit_id
460            .get(commit.graph_commit_id.as_str())
461            .cloned();
462    }
463    let mut commit_by_id = HashMap::with_capacity(commits.len());
464    let mut head_commit = None;
465    for commit in commits {
466        if should_replace_head(head_commit.as_ref(), &commit) {
467            head_commit = Some(commit.clone());
468        }
469        commit_by_id.insert(commit.graph_commit_id.clone(), commit);
470    }
471    Ok((commit_by_id, head_commit))
472}
473
474async fn load_commit_actor_cache(dataset: &Dataset) -> Result<HashMap<String, String>> {
475    let batches: Vec<RecordBatch> = dataset
476        .scan()
477        .try_into_stream()
478        .await
479        .map_err(|e| OmniError::Lance(e.to_string()))?
480        .try_collect()
481        .await
482        .map_err(|e| OmniError::Lance(e.to_string()))?;
483
484    let mut actors = HashMap::new();
485    for batch in batches {
486        let commit_ids = string_column(&batch, "graph_commit_id", "commit actor registry")?;
487        let actor_ids = string_column(&batch, "actor_id", "commit actor registry")?;
488        for row in 0..batch.num_rows() {
489            actors.insert(
490                commit_ids.value(row).to_string(),
491                actor_ids.value(row).to_string(),
492            );
493        }
494    }
495    Ok(actors)
496}
497
498fn load_commits_from_batches(batches: &[RecordBatch]) -> Result<Vec<GraphCommit>> {
499    let mut commits = Vec::new();
500    for batch in batches {
501        let ids = string_column(batch, "graph_commit_id", "commit graph")?;
502        let branches = string_column(batch, "manifest_branch", "commit graph")?;
503        let versions = u64_column(batch, "manifest_version", "commit graph")?;
504        let parents = string_column(batch, "parent_commit_id", "commit graph")?;
505        let merged_parents = string_column(batch, "merged_parent_commit_id", "commit graph")?;
506        let created = timestamp_micros_column(batch, "created_at", "commit graph")?;
507
508        for row in 0..batch.num_rows() {
509            commits.push(GraphCommit {
510                graph_commit_id: ids.value(row).to_string(),
511                manifest_branch: if branches.is_null(row) {
512                    None
513                } else {
514                    Some(branches.value(row).to_string())
515                },
516                manifest_version: versions.value(row),
517                parent_commit_id: if parents.is_null(row) {
518                    None
519                } else {
520                    Some(parents.value(row).to_string())
521                },
522                merged_parent_commit_id: if merged_parents.is_null(row) {
523                    None
524                } else {
525                    Some(merged_parents.value(row).to_string())
526                },
527                actor_id: None,
528                created_at: created.value(row),
529            });
530        }
531    }
532    Ok(commits)
533}
534
535fn commit_actors_to_batch(records: &[CommitActorRecord]) -> Result<RecordBatch> {
536    let commit_ids: Vec<&str> = records
537        .iter()
538        .map(|record| record.graph_commit_id.as_str())
539        .collect();
540    let actor_ids: Vec<&str> = records
541        .iter()
542        .map(|record| record.actor_id.as_str())
543        .collect();
544    let created_at: Vec<i64> = records.iter().map(|record| record.created_at).collect();
545
546    RecordBatch::try_new(
547        commit_actor_schema(),
548        vec![
549            Arc::new(StringArray::from(commit_ids)),
550            Arc::new(StringArray::from(actor_ids)),
551            Arc::new(TimestampMicrosecondArray::from(created_at)),
552        ],
553    )
554    .map_err(|e| OmniError::Lance(e.to_string()))
555}
556
557fn should_replace_head(current: Option<&GraphCommit>, candidate: &GraphCommit) -> bool {
558    current.is_none_or(|existing| {
559        candidate
560            .manifest_version
561            .cmp(&existing.manifest_version)
562            .then_with(|| candidate.created_at.cmp(&existing.created_at))
563            .then_with(|| candidate.graph_commit_id.cmp(&existing.graph_commit_id))
564            .is_gt()
565    })
566}
567
568fn string_column<'a>(batch: &'a RecordBatch, name: &str, context: &str) -> Result<&'a StringArray> {
569    batch
570        .column_by_name(name)
571        .ok_or_else(|| {
572            OmniError::manifest_internal(format!("{context} batch missing '{name}' column"))
573        })?
574        .as_any()
575        .downcast_ref::<StringArray>()
576        .ok_or_else(|| {
577            OmniError::manifest_internal(format!("{context} column '{name}' is not Utf8"))
578        })
579}
580
581fn u64_column<'a>(batch: &'a RecordBatch, name: &str, context: &str) -> Result<&'a UInt64Array> {
582    batch
583        .column_by_name(name)
584        .ok_or_else(|| {
585            OmniError::manifest_internal(format!("{context} batch missing '{name}' column"))
586        })?
587        .as_any()
588        .downcast_ref::<UInt64Array>()
589        .ok_or_else(|| {
590            OmniError::manifest_internal(format!("{context} column '{name}' is not UInt64"))
591        })
592}
593
594fn timestamp_micros_column<'a>(
595    batch: &'a RecordBatch,
596    name: &str,
597    context: &str,
598) -> Result<&'a TimestampMicrosecondArray> {
599    batch
600        .column_by_name(name)
601        .ok_or_else(|| {
602            OmniError::manifest_internal(format!("{context} batch missing '{name}' column"))
603        })?
604        .as_any()
605        .downcast_ref::<TimestampMicrosecondArray>()
606        .ok_or_else(|| {
607            OmniError::manifest_internal(format!(
608                "{context} column '{name}' is not Timestamp(Microsecond)"
609            ))
610        })
611}
612
613fn ancestor_distances(
614    start_id: &str,
615    commits: &HashMap<String, GraphCommit>,
616) -> HashMap<String, u64> {
617    let mut distances = HashMap::new();
618    let mut queue = VecDeque::from([(start_id.to_string(), 0u64)]);
619
620    while let Some((id, distance)) = queue.pop_front() {
621        if let Some(existing) = distances.get(&id) {
622            if *existing <= distance {
623                continue;
624            }
625        }
626        distances.insert(id.clone(), distance);
627
628        if let Some(commit) = commits.get(&id) {
629            if let Some(parent) = &commit.parent_commit_id {
630                queue.push_back((parent.clone(), distance + 1));
631            }
632            if let Some(parent) = &commit.merged_parent_commit_id {
633                queue.push_back((parent.clone(), distance + 1));
634            }
635        }
636    }
637
638    distances
639}
640
641async fn open_for_branch(root_uri: &str, branch: Option<&str>) -> Result<CommitGraph> {
642    match branch {
643        Some(branch) if branch != "main" => CommitGraph::open_at_branch(root_uri, branch).await,
644        _ => CommitGraph::open(root_uri).await,
645    }
646}
647
648fn now_micros() -> Result<i64> {
649    let duration = SystemTime::now()
650        .duration_since(UNIX_EPOCH)
651        .map_err(|e| OmniError::manifest(format!("system clock before UNIX_EPOCH: {}", e)))?;
652    Ok(duration.as_micros() as i64)
653}
654
655#[cfg(test)]
656mod tests {
657    use std::sync::Arc;
658
659    use arrow_schema::{DataType, Field, Schema};
660
661    use super::*;
662
663    #[test]
664    fn load_commits_from_batches_returns_error_for_bad_schema() {
665        let batch = RecordBatch::try_new(
666            Arc::new(Schema::new(vec![
667                Field::new("graph_commit_id", DataType::UInt64, false),
668                Field::new("manifest_branch", DataType::Utf8, true),
669                Field::new("manifest_version", DataType::UInt64, false),
670                Field::new("parent_commit_id", DataType::Utf8, true),
671                Field::new("merged_parent_commit_id", DataType::Utf8, true),
672                Field::new(
673                    "created_at",
674                    DataType::Timestamp(TimeUnit::Microsecond, None),
675                    false,
676                ),
677            ])),
678            vec![
679                Arc::new(UInt64Array::from(vec![1_u64])),
680                Arc::new(StringArray::from(vec![None::<&str>])),
681                Arc::new(UInt64Array::from(vec![1_u64])),
682                Arc::new(StringArray::from(vec![None::<&str>])),
683                Arc::new(StringArray::from(vec![None::<&str>])),
684                Arc::new(TimestampMicrosecondArray::from(vec![1_i64])),
685            ],
686        )
687        .unwrap();
688
689        let err = load_commits_from_batches(&[batch]).unwrap_err();
690        assert!(err.to_string().contains("graph_commit_id"));
691    }
692}