1use std::collections::{HashMap, VecDeque};
2use std::sync::Arc;
3use std::time::{SystemTime, UNIX_EPOCH};
4
5use arrow_array::{
6 Array, RecordBatch, RecordBatchIterator, StringArray, TimestampMicrosecondArray, UInt64Array,
7};
8use arrow_schema::{DataType, Field, Schema, SchemaRef, TimeUnit};
9use futures::TryStreamExt;
10use lance::Dataset;
11use lance::dataset::{WriteMode, WriteParams};
12use lance_file::version::LanceFileVersion;
13
14use crate::error::{OmniError, Result};
15
16const GRAPH_COMMITS_DIR: &str = "_graph_commits.lance";
17const GRAPH_COMMIT_ACTORS_DIR: &str = "_graph_commit_actors.lance";
18
19#[derive(Debug, Clone)]
20pub struct GraphCommit {
21 pub graph_commit_id: String,
22 pub manifest_branch: Option<String>,
23 pub manifest_version: u64,
24 pub parent_commit_id: Option<String>,
25 pub merged_parent_commit_id: Option<String>,
26 pub actor_id: Option<String>,
27 pub created_at: i64,
28}
29
30pub struct CommitGraph {
31 root_uri: String,
32 dataset: Dataset,
33 actor_dataset: Option<Dataset>,
34 active_branch: Option<String>,
35 actor_by_commit_id: HashMap<String, String>,
36 commit_by_id: HashMap<String, GraphCommit>,
37 head_commit: Option<GraphCommit>,
38}
39
40impl CommitGraph {
41 pub async fn init(root_uri: &str, manifest_version: u64) -> Result<Self> {
42 let root = root_uri.trim_end_matches('/');
43 let uri = graph_commits_uri(root);
44 let genesis = GraphCommit {
45 graph_commit_id: ulid::Ulid::new().to_string(),
46 manifest_branch: None,
47 manifest_version,
48 parent_commit_id: None,
49 merged_parent_commit_id: None,
50 actor_id: None,
51 created_at: now_micros()?,
52 };
53
54 let batch = commits_to_batch(&[genesis.clone()])?;
55 let reader = RecordBatchIterator::new(vec![Ok(batch)], commit_graph_schema());
56 let params = WriteParams {
57 mode: WriteMode::Create,
58 enable_stable_row_ids: true,
59 data_storage_version: Some(LanceFileVersion::V2_2),
60 ..Default::default()
61 };
62 let dataset = Dataset::write(reader, &uri as &str, Some(params))
63 .await
64 .map_err(|e| OmniError::Lance(e.to_string()))?;
65 let actor_dataset = create_commit_actor_dataset(root).await?;
66
67 Ok(Self {
68 root_uri: root.to_string(),
69 dataset,
70 actor_dataset: Some(actor_dataset),
71 active_branch: None,
72 actor_by_commit_id: HashMap::new(),
73 commit_by_id: HashMap::from([(genesis.graph_commit_id.clone(), genesis.clone())]),
74 head_commit: Some(genesis),
75 })
76 }
77
78 pub async fn open(root_uri: &str) -> Result<Self> {
79 let root = root_uri.trim_end_matches('/');
80 let dataset = Dataset::open(&graph_commits_uri(root))
81 .await
82 .map_err(|e| OmniError::Lance(e.to_string()))?;
83 let actor_dataset = Dataset::open(&graph_commit_actors_uri(root)).await.ok();
84 let actor_by_commit_id = match &actor_dataset {
85 Some(dataset) => load_commit_actor_cache(dataset).await?,
86 None => HashMap::new(),
87 };
88 let (commit_by_id, head_commit) = load_commit_cache(&dataset, &actor_by_commit_id).await?;
89 Ok(Self {
90 root_uri: root.to_string(),
91 dataset,
92 actor_dataset,
93 active_branch: None,
94 actor_by_commit_id,
95 commit_by_id,
96 head_commit,
97 })
98 }
99
100 pub async fn open_at_branch(root_uri: &str, branch: &str) -> Result<Self> {
101 let root = root_uri.trim_end_matches('/');
102 let dataset = Dataset::open(&graph_commits_uri(root))
103 .await
104 .map_err(|e| OmniError::Lance(e.to_string()))?;
105 let dataset = dataset
106 .checkout_branch(branch)
107 .await
108 .map_err(|e| OmniError::Lance(e.to_string()))?;
109 let actor_dataset = Dataset::open(&graph_commit_actors_uri(root)).await.ok();
110 let actor_by_commit_id = match &actor_dataset {
111 Some(dataset) => load_commit_actor_cache(dataset).await?,
112 None => HashMap::new(),
113 };
114 let (commit_by_id, head_commit) = load_commit_cache(&dataset, &actor_by_commit_id).await?;
115 Ok(Self {
116 root_uri: root.to_string(),
117 dataset,
118 actor_dataset,
119 active_branch: Some(branch.to_string()),
120 actor_by_commit_id,
121 commit_by_id,
122 head_commit,
123 })
124 }
125
126 pub async fn refresh(&mut self) -> Result<()> {
127 let root = self.root_uri.clone();
128 self.dataset = Dataset::open(&graph_commits_uri(&root))
129 .await
130 .map_err(|e| OmniError::Lance(e.to_string()))?;
131 if let Some(branch) = &self.active_branch {
132 self.dataset = self
133 .dataset
134 .checkout_branch(branch)
135 .await
136 .map_err(|e| OmniError::Lance(e.to_string()))?;
137 }
138 self.actor_dataset = Dataset::open(&graph_commit_actors_uri(&root)).await.ok();
139 self.actor_by_commit_id = match &self.actor_dataset {
140 Some(dataset) => load_commit_actor_cache(dataset).await?,
141 None => HashMap::new(),
142 };
143 let (commit_by_id, head_commit) =
144 load_commit_cache(&self.dataset, &self.actor_by_commit_id).await?;
145 self.commit_by_id = commit_by_id;
146 self.head_commit = head_commit;
147 Ok(())
148 }
149
150 pub fn version(&self) -> u64 {
151 self.dataset.version().version
152 }
153
154 pub async fn create_branch(&mut self, name: &str) -> Result<()> {
155 let mut ds = self.dataset.clone();
156 ds.create_branch(name, self.version(), None)
157 .await
158 .map_err(|e| OmniError::Lance(e.to_string()))?;
159 Ok(())
160 }
161
162 pub async fn delete_branch(&mut self, name: &str) -> Result<()> {
163 let mut ds = Dataset::open(&graph_commits_uri(&self.root_uri))
164 .await
165 .map_err(|e| OmniError::Lance(e.to_string()))?;
166 ds.delete_branch(name)
167 .await
168 .map_err(|e| OmniError::Lance(e.to_string()))?;
169 self.refresh().await
170 }
171
172 pub async fn force_delete_branch(&mut self, name: &str) -> Result<()> {
178 let mut ds = Dataset::open(&graph_commits_uri(&self.root_uri))
179 .await
180 .map_err(|e| OmniError::Lance(e.to_string()))?;
181 match ds.force_delete_branch(name).await {
182 Ok(()) => {}
183 Err(lance::Error::RefNotFound { .. }) | Err(lance::Error::NotFound { .. }) => {}
184 Err(e) => return Err(OmniError::Lance(e.to_string())),
185 }
186 self.refresh().await
187 }
188
189 pub async fn list_branches(&self) -> Result<Vec<String>> {
193 let ds = Dataset::open(&graph_commits_uri(&self.root_uri))
194 .await
195 .map_err(|e| OmniError::Lance(e.to_string()))?;
196 let branches = ds
197 .list_branches()
198 .await
199 .map_err(|e| OmniError::Lance(e.to_string()))?;
200 Ok(branches.into_keys().collect())
201 }
202
203 pub async fn append_commit(
204 &mut self,
205 manifest_branch: Option<&str>,
206 manifest_version: u64,
207 actor_id: Option<&str>,
208 ) -> Result<String> {
209 let parent_commit_id = self.head_commit_id().await?;
210 self.append_commit_with_parents(
211 manifest_branch,
212 manifest_version,
213 parent_commit_id.as_deref(),
214 None,
215 actor_id,
216 )
217 .await
218 }
219
220 pub async fn append_merge_commit(
221 &mut self,
222 manifest_branch: Option<&str>,
223 manifest_version: u64,
224 parent_commit_id: &str,
225 merged_parent_commit_id: &str,
226 actor_id: Option<&str>,
227 ) -> Result<String> {
228 self.append_commit_with_parents(
229 manifest_branch,
230 manifest_version,
231 Some(parent_commit_id),
232 Some(merged_parent_commit_id),
233 actor_id,
234 )
235 .await
236 }
237
238 async fn append_commit_with_parents(
239 &mut self,
240 manifest_branch: Option<&str>,
241 manifest_version: u64,
242 parent_commit_id: Option<&str>,
243 merged_parent_commit_id: Option<&str>,
244 actor_id: Option<&str>,
245 ) -> Result<String> {
246 let graph_commit_id = ulid::Ulid::new().to_string();
247 let commit = GraphCommit {
248 graph_commit_id: graph_commit_id.clone(),
249 manifest_branch: manifest_branch.map(|s| s.to_string()),
250 manifest_version,
251 parent_commit_id: parent_commit_id.map(|s| s.to_string()),
252 merged_parent_commit_id: merged_parent_commit_id.map(|s| s.to_string()),
253 actor_id: actor_id.map(str::to_string),
254 created_at: now_micros()?,
255 };
256
257 let batch = commits_to_batch(&[commit.clone()])?;
258 let reader = RecordBatchIterator::new(vec![Ok(batch)], commit_graph_schema());
259 let mut ds = self.dataset.clone();
260 ds.append(reader, None)
261 .await
262 .map_err(|e| OmniError::Lance(e.to_string()))?;
263 self.dataset = ds;
264 if let Some(actor_id) = actor_id {
265 self.append_actor(&graph_commit_id, actor_id).await?;
266 }
267 self.commit_by_id
268 .insert(graph_commit_id.clone(), commit.clone());
269 if should_replace_head(self.head_commit.as_ref(), &commit) {
270 self.head_commit = Some(commit);
271 }
272
273 Ok(graph_commit_id)
274 }
275
276 async fn append_actor(&mut self, graph_commit_id: &str, actor_id: &str) -> Result<()> {
277 if self
278 .actor_by_commit_id
279 .get(graph_commit_id)
280 .is_some_and(|existing| existing == actor_id)
281 {
282 return Ok(());
283 }
284
285 let record = CommitActorRecord {
286 graph_commit_id: graph_commit_id.to_string(),
287 actor_id: actor_id.to_string(),
288 created_at: now_micros()?,
289 };
290 let batch = commit_actors_to_batch(&[record])?;
291 let reader = RecordBatchIterator::new(vec![Ok(batch)], commit_actor_schema());
292 let mut dataset = match self.actor_dataset.take() {
293 Some(dataset) => dataset,
294 None => create_commit_actor_dataset(&self.root_uri).await?,
295 };
296 dataset
297 .append(reader, None)
298 .await
299 .map_err(|e| OmniError::Lance(e.to_string()))?;
300 self.actor_by_commit_id
301 .insert(graph_commit_id.to_string(), actor_id.to_string());
302 self.actor_dataset = Some(dataset);
303 Ok(())
304 }
305
306 pub async fn head_commit(&self) -> Result<Option<GraphCommit>> {
307 Ok(self.head_commit.clone())
308 }
309
310 pub async fn head_commit_id(&self) -> Result<Option<String>> {
311 Ok(self.head_commit().await?.map(|c| c.graph_commit_id))
312 }
313
314 pub async fn load_commits(&self) -> Result<Vec<GraphCommit>> {
315 let mut commits = self.commit_by_id.values().cloned().collect::<Vec<_>>();
316 commits.sort_by(|a, b| {
317 a.manifest_version
318 .cmp(&b.manifest_version)
319 .then_with(|| a.created_at.cmp(&b.created_at))
320 .then_with(|| a.graph_commit_id.cmp(&b.graph_commit_id))
321 });
322 Ok(commits)
323 }
324
325 pub fn get_commit(&self, commit_id: &str) -> Option<GraphCommit> {
326 self.commit_by_id.get(commit_id).cloned()
327 }
328
329 pub async fn merge_base(
330 root_uri: &str,
331 source_branch: Option<&str>,
332 target_branch: Option<&str>,
333 ) -> Result<Option<GraphCommit>> {
334 let source = open_for_branch(root_uri, source_branch).await?;
335 let target = open_for_branch(root_uri, target_branch).await?;
336
337 let source_head = match source.head_commit().await? {
338 Some(commit) => commit,
339 None => return Ok(None),
340 };
341 let target_head = match target.head_commit().await? {
342 Some(commit) => commit,
343 None => return Ok(None),
344 };
345
346 let mut commits = HashMap::new();
347 for commit in source.load_commits().await? {
348 commits.insert(commit.graph_commit_id.clone(), commit);
349 }
350 for commit in target.load_commits().await? {
351 commits.insert(commit.graph_commit_id.clone(), commit);
352 }
353
354 let source_distances = ancestor_distances(&source_head.graph_commit_id, &commits);
355 let target_distances = ancestor_distances(&target_head.graph_commit_id, &commits);
356
357 let best = source_distances
358 .iter()
359 .filter_map(|(id, source_distance)| {
360 target_distances.get(id).and_then(|target_distance| {
361 commits.get(id).map(|commit| {
362 (
363 (
364 *source_distance + *target_distance,
365 u64::MAX - commit.manifest_version,
366 ),
367 commit.clone(),
368 )
369 })
370 })
371 })
372 .min_by_key(|(score, _)| *score)
373 .map(|(_, commit)| commit);
374
375 Ok(best)
376 }
377}
378
379pub(crate) fn graph_commits_uri(root_uri: &str) -> String {
380 format!("{}/{}", root_uri.trim_end_matches('/'), GRAPH_COMMITS_DIR)
381}
382
383fn graph_commit_actors_uri(root_uri: &str) -> String {
384 format!(
385 "{}/{}",
386 root_uri.trim_end_matches('/'),
387 GRAPH_COMMIT_ACTORS_DIR
388 )
389}
390
391fn commit_graph_schema() -> SchemaRef {
392 Arc::new(Schema::new(vec![
393 Field::new("graph_commit_id", DataType::Utf8, false),
394 Field::new("manifest_branch", DataType::Utf8, true),
395 Field::new("manifest_version", DataType::UInt64, false),
396 Field::new("parent_commit_id", DataType::Utf8, true),
397 Field::new("merged_parent_commit_id", DataType::Utf8, true),
398 Field::new(
399 "created_at",
400 DataType::Timestamp(TimeUnit::Microsecond, None),
401 false,
402 ),
403 ]))
404}
405
406fn commit_actor_schema() -> SchemaRef {
407 Arc::new(Schema::new(vec![
408 Field::new("graph_commit_id", DataType::Utf8, false),
409 Field::new("actor_id", DataType::Utf8, false),
410 Field::new(
411 "created_at",
412 DataType::Timestamp(TimeUnit::Microsecond, None),
413 false,
414 ),
415 ]))
416}
417
418#[derive(Debug, Clone)]
419struct CommitActorRecord {
420 graph_commit_id: String,
421 actor_id: String,
422 created_at: i64,
423}
424
425async fn create_commit_actor_dataset(root_uri: &str) -> Result<Dataset> {
426 let uri = graph_commit_actors_uri(root_uri);
427 let batch = RecordBatch::new_empty(commit_actor_schema());
428 let reader = RecordBatchIterator::new(vec![Ok(batch)], commit_actor_schema());
429 let params = WriteParams {
430 mode: WriteMode::Create,
431 enable_stable_row_ids: true,
432 data_storage_version: Some(LanceFileVersion::V2_2),
433 ..Default::default()
434 };
435 match Dataset::write(reader, &uri as &str, Some(params)).await {
436 Ok(dataset) => Ok(dataset),
437 Err(err) if err.to_string().contains("Dataset already exists") => Dataset::open(&uri)
438 .await
439 .map_err(|open_err| OmniError::Lance(open_err.to_string())),
440 Err(err) => Err(OmniError::Lance(err.to_string())),
441 }
442}
443
444fn commits_to_batch(commits: &[GraphCommit]) -> Result<RecordBatch> {
445 let ids: Vec<&str> = commits.iter().map(|c| c.graph_commit_id.as_str()).collect();
446 let branches: Vec<Option<&str>> = commits
447 .iter()
448 .map(|c| c.manifest_branch.as_deref())
449 .collect();
450 let versions: Vec<u64> = commits.iter().map(|c| c.manifest_version).collect();
451 let parents: Vec<Option<&str>> = commits
452 .iter()
453 .map(|c| c.parent_commit_id.as_deref())
454 .collect();
455 let merged_parents: Vec<Option<&str>> = commits
456 .iter()
457 .map(|c| c.merged_parent_commit_id.as_deref())
458 .collect();
459 let created_at: Vec<i64> = commits.iter().map(|c| c.created_at).collect();
460
461 RecordBatch::try_new(
462 commit_graph_schema(),
463 vec![
464 Arc::new(StringArray::from(ids)),
465 Arc::new(StringArray::from(branches)),
466 Arc::new(UInt64Array::from(versions)),
467 Arc::new(StringArray::from(parents)),
468 Arc::new(StringArray::from(merged_parents)),
469 Arc::new(TimestampMicrosecondArray::from(created_at)),
470 ],
471 )
472 .map_err(|e| OmniError::Lance(e.to_string()))
473}
474
475async fn load_commit_cache(
476 dataset: &Dataset,
477 actor_by_commit_id: &HashMap<String, String>,
478) -> Result<(HashMap<String, GraphCommit>, Option<GraphCommit>)> {
479 let batches: Vec<RecordBatch> = dataset
480 .scan()
481 .try_into_stream()
482 .await
483 .map_err(|e| OmniError::Lance(e.to_string()))?
484 .try_collect()
485 .await
486 .map_err(|e| OmniError::Lance(e.to_string()))?;
487
488 let mut commits = load_commits_from_batches(&batches)?;
489 for commit in &mut commits {
490 commit.actor_id = actor_by_commit_id
491 .get(commit.graph_commit_id.as_str())
492 .cloned();
493 }
494 let mut commit_by_id = HashMap::with_capacity(commits.len());
495 let mut head_commit = None;
496 for commit in commits {
497 if should_replace_head(head_commit.as_ref(), &commit) {
498 head_commit = Some(commit.clone());
499 }
500 commit_by_id.insert(commit.graph_commit_id.clone(), commit);
501 }
502 Ok((commit_by_id, head_commit))
503}
504
505async fn load_commit_actor_cache(dataset: &Dataset) -> Result<HashMap<String, String>> {
506 let batches: Vec<RecordBatch> = dataset
507 .scan()
508 .try_into_stream()
509 .await
510 .map_err(|e| OmniError::Lance(e.to_string()))?
511 .try_collect()
512 .await
513 .map_err(|e| OmniError::Lance(e.to_string()))?;
514
515 let mut actors = HashMap::new();
516 for batch in batches {
517 let commit_ids = string_column(&batch, "graph_commit_id", "commit actor registry")?;
518 let actor_ids = string_column(&batch, "actor_id", "commit actor registry")?;
519 for row in 0..batch.num_rows() {
520 actors.insert(
521 commit_ids.value(row).to_string(),
522 actor_ids.value(row).to_string(),
523 );
524 }
525 }
526 Ok(actors)
527}
528
529fn load_commits_from_batches(batches: &[RecordBatch]) -> Result<Vec<GraphCommit>> {
530 let mut commits = Vec::new();
531 for batch in batches {
532 let ids = string_column(batch, "graph_commit_id", "commit graph")?;
533 let branches = string_column(batch, "manifest_branch", "commit graph")?;
534 let versions = u64_column(batch, "manifest_version", "commit graph")?;
535 let parents = string_column(batch, "parent_commit_id", "commit graph")?;
536 let merged_parents = string_column(batch, "merged_parent_commit_id", "commit graph")?;
537 let created = timestamp_micros_column(batch, "created_at", "commit graph")?;
538
539 for row in 0..batch.num_rows() {
540 commits.push(GraphCommit {
541 graph_commit_id: ids.value(row).to_string(),
542 manifest_branch: if branches.is_null(row) {
543 None
544 } else {
545 Some(branches.value(row).to_string())
546 },
547 manifest_version: versions.value(row),
548 parent_commit_id: if parents.is_null(row) {
549 None
550 } else {
551 Some(parents.value(row).to_string())
552 },
553 merged_parent_commit_id: if merged_parents.is_null(row) {
554 None
555 } else {
556 Some(merged_parents.value(row).to_string())
557 },
558 actor_id: None,
559 created_at: created.value(row),
560 });
561 }
562 }
563 Ok(commits)
564}
565
566fn commit_actors_to_batch(records: &[CommitActorRecord]) -> Result<RecordBatch> {
567 let commit_ids: Vec<&str> = records
568 .iter()
569 .map(|record| record.graph_commit_id.as_str())
570 .collect();
571 let actor_ids: Vec<&str> = records
572 .iter()
573 .map(|record| record.actor_id.as_str())
574 .collect();
575 let created_at: Vec<i64> = records.iter().map(|record| record.created_at).collect();
576
577 RecordBatch::try_new(
578 commit_actor_schema(),
579 vec![
580 Arc::new(StringArray::from(commit_ids)),
581 Arc::new(StringArray::from(actor_ids)),
582 Arc::new(TimestampMicrosecondArray::from(created_at)),
583 ],
584 )
585 .map_err(|e| OmniError::Lance(e.to_string()))
586}
587
588fn should_replace_head(current: Option<&GraphCommit>, candidate: &GraphCommit) -> bool {
589 current.is_none_or(|existing| {
590 candidate
591 .manifest_version
592 .cmp(&existing.manifest_version)
593 .then_with(|| candidate.created_at.cmp(&existing.created_at))
594 .then_with(|| candidate.graph_commit_id.cmp(&existing.graph_commit_id))
595 .is_gt()
596 })
597}
598
599fn string_column<'a>(batch: &'a RecordBatch, name: &str, context: &str) -> Result<&'a StringArray> {
600 batch
601 .column_by_name(name)
602 .ok_or_else(|| {
603 OmniError::manifest_internal(format!("{context} batch missing '{name}' column"))
604 })?
605 .as_any()
606 .downcast_ref::<StringArray>()
607 .ok_or_else(|| {
608 OmniError::manifest_internal(format!("{context} column '{name}' is not Utf8"))
609 })
610}
611
612fn u64_column<'a>(batch: &'a RecordBatch, name: &str, context: &str) -> Result<&'a UInt64Array> {
613 batch
614 .column_by_name(name)
615 .ok_or_else(|| {
616 OmniError::manifest_internal(format!("{context} batch missing '{name}' column"))
617 })?
618 .as_any()
619 .downcast_ref::<UInt64Array>()
620 .ok_or_else(|| {
621 OmniError::manifest_internal(format!("{context} column '{name}' is not UInt64"))
622 })
623}
624
625fn timestamp_micros_column<'a>(
626 batch: &'a RecordBatch,
627 name: &str,
628 context: &str,
629) -> Result<&'a TimestampMicrosecondArray> {
630 batch
631 .column_by_name(name)
632 .ok_or_else(|| {
633 OmniError::manifest_internal(format!("{context} batch missing '{name}' column"))
634 })?
635 .as_any()
636 .downcast_ref::<TimestampMicrosecondArray>()
637 .ok_or_else(|| {
638 OmniError::manifest_internal(format!(
639 "{context} column '{name}' is not Timestamp(Microsecond)"
640 ))
641 })
642}
643
644fn ancestor_distances(
645 start_id: &str,
646 commits: &HashMap<String, GraphCommit>,
647) -> HashMap<String, u64> {
648 let mut distances = HashMap::new();
649 let mut queue = VecDeque::from([(start_id.to_string(), 0u64)]);
650
651 while let Some((id, distance)) = queue.pop_front() {
652 if let Some(existing) = distances.get(&id) {
653 if *existing <= distance {
654 continue;
655 }
656 }
657 distances.insert(id.clone(), distance);
658
659 if let Some(commit) = commits.get(&id) {
660 if let Some(parent) = &commit.parent_commit_id {
661 queue.push_back((parent.clone(), distance + 1));
662 }
663 if let Some(parent) = &commit.merged_parent_commit_id {
664 queue.push_back((parent.clone(), distance + 1));
665 }
666 }
667 }
668
669 distances
670}
671
672async fn open_for_branch(root_uri: &str, branch: Option<&str>) -> Result<CommitGraph> {
673 match branch {
674 Some(branch) if branch != "main" => CommitGraph::open_at_branch(root_uri, branch).await,
675 _ => CommitGraph::open(root_uri).await,
676 }
677}
678
679fn now_micros() -> Result<i64> {
680 let duration = SystemTime::now()
681 .duration_since(UNIX_EPOCH)
682 .map_err(|e| OmniError::manifest(format!("system clock before UNIX_EPOCH: {}", e)))?;
683 Ok(duration.as_micros() as i64)
684}
685
686#[cfg(test)]
687mod tests {
688 use std::sync::Arc;
689
690 use arrow_schema::{DataType, Field, Schema};
691
692 use super::*;
693
694 #[test]
695 fn load_commits_from_batches_returns_error_for_bad_schema() {
696 let batch = RecordBatch::try_new(
697 Arc::new(Schema::new(vec![
698 Field::new("graph_commit_id", DataType::UInt64, false),
699 Field::new("manifest_branch", DataType::Utf8, true),
700 Field::new("manifest_version", DataType::UInt64, false),
701 Field::new("parent_commit_id", DataType::Utf8, true),
702 Field::new("merged_parent_commit_id", DataType::Utf8, true),
703 Field::new(
704 "created_at",
705 DataType::Timestamp(TimeUnit::Microsecond, None),
706 false,
707 ),
708 ])),
709 vec![
710 Arc::new(UInt64Array::from(vec![1_u64])),
711 Arc::new(StringArray::from(vec![None::<&str>])),
712 Arc::new(UInt64Array::from(vec![1_u64])),
713 Arc::new(StringArray::from(vec![None::<&str>])),
714 Arc::new(StringArray::from(vec![None::<&str>])),
715 Arc::new(TimestampMicrosecondArray::from(vec![1_i64])),
716 ],
717 )
718 .unwrap();
719
720 let err = load_commits_from_batches(&[batch]).unwrap_err();
721 assert!(err.to_string().contains("graph_commit_id"));
722 }
723}