1use std::collections::{HashMap, VecDeque};
2use std::sync::Arc;
3use std::time::{SystemTime, UNIX_EPOCH};
4
5use arrow_array::{
6 Array, RecordBatch, RecordBatchIterator, StringArray, TimestampMicrosecondArray, UInt64Array,
7};
8use arrow_schema::{DataType, Field, Schema, SchemaRef, TimeUnit};
9use futures::TryStreamExt;
10use lance::Dataset;
11use lance::dataset::{WriteMode, WriteParams};
12use lance_file::version::LanceFileVersion;
13
14use crate::error::{OmniError, Result};
15
16const GRAPH_COMMITS_DIR: &str = "_graph_commits.lance";
17const GRAPH_COMMIT_ACTORS_DIR: &str = "_graph_commit_actors.lance";
18
19#[derive(Debug, Clone)]
20pub struct GraphCommit {
21 pub graph_commit_id: String,
22 pub manifest_branch: Option<String>,
23 pub manifest_version: u64,
24 pub parent_commit_id: Option<String>,
25 pub merged_parent_commit_id: Option<String>,
26 pub actor_id: Option<String>,
27 pub created_at: i64,
28}
29
30pub struct CommitGraph {
31 root_uri: String,
32 dataset: Dataset,
33 actor_dataset: Option<Dataset>,
34 active_branch: Option<String>,
35 actor_by_commit_id: HashMap<String, String>,
36 commit_by_id: HashMap<String, GraphCommit>,
37 head_commit: Option<GraphCommit>,
38}
39
40impl CommitGraph {
41 pub async fn init(root_uri: &str, manifest_version: u64) -> Result<Self> {
42 let root = root_uri.trim_end_matches('/');
43 let uri = graph_commits_uri(root);
44 let genesis = GraphCommit {
45 graph_commit_id: ulid::Ulid::new().to_string(),
46 manifest_branch: None,
47 manifest_version,
48 parent_commit_id: None,
49 merged_parent_commit_id: None,
50 actor_id: None,
51 created_at: now_micros()?,
52 };
53
54 let batch = commits_to_batch(&[genesis.clone()])?;
55 let reader = RecordBatchIterator::new(vec![Ok(batch)], commit_graph_schema());
56 let params = WriteParams {
57 mode: WriteMode::Create,
58 enable_stable_row_ids: true,
59 data_storage_version: Some(LanceFileVersion::V2_2),
60 auto_cleanup: None,
61 skip_auto_cleanup: true,
62 ..Default::default()
63 };
64 let dataset = Dataset::write(reader, &uri as &str, Some(params))
65 .await
66 .map_err(|e| OmniError::Lance(e.to_string()))?;
67 let actor_dataset = create_commit_actor_dataset(root).await?;
68
69 Ok(Self {
70 root_uri: root.to_string(),
71 dataset,
72 actor_dataset: Some(actor_dataset),
73 active_branch: None,
74 actor_by_commit_id: HashMap::new(),
75 commit_by_id: HashMap::from([(genesis.graph_commit_id.clone(), genesis.clone())]),
76 head_commit: Some(genesis),
77 })
78 }
79
80 pub async fn open(root_uri: &str) -> Result<Self> {
81 let root = root_uri.trim_end_matches('/');
82 let dataset = Dataset::open(&graph_commits_uri(root))
83 .await
84 .map_err(|e| OmniError::Lance(e.to_string()))?;
85 let actor_dataset = Dataset::open(&graph_commit_actors_uri(root)).await.ok();
86 let actor_by_commit_id = match &actor_dataset {
87 Some(dataset) => load_commit_actor_cache(dataset).await?,
88 None => HashMap::new(),
89 };
90 let (commit_by_id, head_commit) = load_commit_cache(&dataset, &actor_by_commit_id).await?;
91 Ok(Self {
92 root_uri: root.to_string(),
93 dataset,
94 actor_dataset,
95 active_branch: None,
96 actor_by_commit_id,
97 commit_by_id,
98 head_commit,
99 })
100 }
101
102 pub async fn open_at_branch(root_uri: &str, branch: &str) -> Result<Self> {
103 let root = root_uri.trim_end_matches('/');
104 let dataset = Dataset::open(&graph_commits_uri(root))
105 .await
106 .map_err(|e| OmniError::Lance(e.to_string()))?;
107 let dataset = dataset
108 .checkout_branch(branch)
109 .await
110 .map_err(|e| OmniError::Lance(e.to_string()))?;
111 let actor_dataset = Dataset::open(&graph_commit_actors_uri(root)).await.ok();
112 let actor_by_commit_id = match &actor_dataset {
113 Some(dataset) => load_commit_actor_cache(dataset).await?,
114 None => HashMap::new(),
115 };
116 let (commit_by_id, head_commit) = load_commit_cache(&dataset, &actor_by_commit_id).await?;
117 Ok(Self {
118 root_uri: root.to_string(),
119 dataset,
120 actor_dataset,
121 active_branch: Some(branch.to_string()),
122 actor_by_commit_id,
123 commit_by_id,
124 head_commit,
125 })
126 }
127
128 pub async fn refresh(&mut self) -> Result<()> {
129 let root = self.root_uri.clone();
130 self.dataset = Dataset::open(&graph_commits_uri(&root))
131 .await
132 .map_err(|e| OmniError::Lance(e.to_string()))?;
133 if let Some(branch) = &self.active_branch {
134 self.dataset = self
135 .dataset
136 .checkout_branch(branch)
137 .await
138 .map_err(|e| OmniError::Lance(e.to_string()))?;
139 }
140 self.actor_dataset = Dataset::open(&graph_commit_actors_uri(&root)).await.ok();
141 self.actor_by_commit_id = match &self.actor_dataset {
142 Some(dataset) => load_commit_actor_cache(dataset).await?,
143 None => HashMap::new(),
144 };
145 let (commit_by_id, head_commit) =
146 load_commit_cache(&self.dataset, &self.actor_by_commit_id).await?;
147 self.commit_by_id = commit_by_id;
148 self.head_commit = head_commit;
149 Ok(())
150 }
151
152 pub fn version(&self) -> u64 {
153 self.dataset.version().version
154 }
155
156 pub async fn create_branch(&mut self, name: &str) -> Result<()> {
157 let mut ds = self.dataset.clone();
158 ds.create_branch(name, self.version(), None)
159 .await
160 .map_err(|e| OmniError::Lance(e.to_string()))?;
161 Ok(())
162 }
163
164 pub async fn delete_branch(&mut self, name: &str) -> Result<()> {
165 let mut ds = Dataset::open(&graph_commits_uri(&self.root_uri))
166 .await
167 .map_err(|e| OmniError::Lance(e.to_string()))?;
168 ds.delete_branch(name)
169 .await
170 .map_err(|e| OmniError::Lance(e.to_string()))?;
171 self.refresh().await
172 }
173
174 pub async fn force_delete_branch(&mut self, name: &str) -> Result<()> {
180 let mut ds = Dataset::open(&graph_commits_uri(&self.root_uri))
181 .await
182 .map_err(|e| OmniError::Lance(e.to_string()))?;
183 match ds.force_delete_branch(name).await {
184 Ok(()) => {}
185 Err(lance::Error::RefNotFound { .. }) | Err(lance::Error::NotFound { .. }) => {}
186 Err(e) => return Err(OmniError::Lance(e.to_string())),
187 }
188 self.refresh().await
189 }
190
191 pub async fn list_branches(&self) -> Result<Vec<String>> {
195 let ds = Dataset::open(&graph_commits_uri(&self.root_uri))
196 .await
197 .map_err(|e| OmniError::Lance(e.to_string()))?;
198 let branches = ds
199 .list_branches()
200 .await
201 .map_err(|e| OmniError::Lance(e.to_string()))?;
202 Ok(branches.into_keys().collect())
203 }
204
205 pub async fn append_commit(
206 &mut self,
207 manifest_branch: Option<&str>,
208 manifest_version: u64,
209 actor_id: Option<&str>,
210 ) -> Result<String> {
211 let parent_commit_id = self.head_commit_id().await?;
212 self.append_commit_with_parents(
213 manifest_branch,
214 manifest_version,
215 parent_commit_id.as_deref(),
216 None,
217 actor_id,
218 )
219 .await
220 }
221
222 pub async fn append_merge_commit(
223 &mut self,
224 manifest_branch: Option<&str>,
225 manifest_version: u64,
226 parent_commit_id: &str,
227 merged_parent_commit_id: &str,
228 actor_id: Option<&str>,
229 ) -> Result<String> {
230 self.append_commit_with_parents(
231 manifest_branch,
232 manifest_version,
233 Some(parent_commit_id),
234 Some(merged_parent_commit_id),
235 actor_id,
236 )
237 .await
238 }
239
240 async fn append_commit_with_parents(
241 &mut self,
242 manifest_branch: Option<&str>,
243 manifest_version: u64,
244 parent_commit_id: Option<&str>,
245 merged_parent_commit_id: Option<&str>,
246 actor_id: Option<&str>,
247 ) -> Result<String> {
248 let graph_commit_id = ulid::Ulid::new().to_string();
249 let commit = GraphCommit {
250 graph_commit_id: graph_commit_id.clone(),
251 manifest_branch: manifest_branch.map(|s| s.to_string()),
252 manifest_version,
253 parent_commit_id: parent_commit_id.map(|s| s.to_string()),
254 merged_parent_commit_id: merged_parent_commit_id.map(|s| s.to_string()),
255 actor_id: actor_id.map(str::to_string),
256 created_at: now_micros()?,
257 };
258
259 let batch = commits_to_batch(&[commit.clone()])?;
260 let reader = RecordBatchIterator::new(vec![Ok(batch)], commit_graph_schema());
261 let mut ds = self.dataset.clone();
262 ds.append(reader, None)
263 .await
264 .map_err(|e| OmniError::Lance(e.to_string()))?;
265 self.dataset = ds;
266 if let Some(actor_id) = actor_id {
267 self.append_actor(&graph_commit_id, actor_id).await?;
268 }
269 self.commit_by_id
270 .insert(graph_commit_id.clone(), commit.clone());
271 if should_replace_head(self.head_commit.as_ref(), &commit) {
272 self.head_commit = Some(commit);
273 }
274
275 Ok(graph_commit_id)
276 }
277
278 async fn append_actor(&mut self, graph_commit_id: &str, actor_id: &str) -> Result<()> {
279 if self
280 .actor_by_commit_id
281 .get(graph_commit_id)
282 .is_some_and(|existing| existing == actor_id)
283 {
284 return Ok(());
285 }
286
287 let record = CommitActorRecord {
288 graph_commit_id: graph_commit_id.to_string(),
289 actor_id: actor_id.to_string(),
290 created_at: now_micros()?,
291 };
292 let batch = commit_actors_to_batch(&[record])?;
293 let reader = RecordBatchIterator::new(vec![Ok(batch)], commit_actor_schema());
294 let mut dataset = match self.actor_dataset.take() {
295 Some(dataset) => dataset,
296 None => create_commit_actor_dataset(&self.root_uri).await?,
297 };
298 dataset
299 .append(reader, None)
300 .await
301 .map_err(|e| OmniError::Lance(e.to_string()))?;
302 self.actor_by_commit_id
303 .insert(graph_commit_id.to_string(), actor_id.to_string());
304 self.actor_dataset = Some(dataset);
305 Ok(())
306 }
307
308 pub async fn head_commit(&self) -> Result<Option<GraphCommit>> {
309 Ok(self.head_commit.clone())
310 }
311
312 pub async fn head_commit_id(&self) -> Result<Option<String>> {
313 Ok(self.head_commit().await?.map(|c| c.graph_commit_id))
314 }
315
316 pub async fn load_commits(&self) -> Result<Vec<GraphCommit>> {
317 let mut commits = self.commit_by_id.values().cloned().collect::<Vec<_>>();
318 commits.sort_by(|a, b| {
319 a.manifest_version
320 .cmp(&b.manifest_version)
321 .then_with(|| a.created_at.cmp(&b.created_at))
322 .then_with(|| a.graph_commit_id.cmp(&b.graph_commit_id))
323 });
324 Ok(commits)
325 }
326
327 pub fn get_commit(&self, commit_id: &str) -> Option<GraphCommit> {
328 self.commit_by_id.get(commit_id).cloned()
329 }
330
331 pub async fn merge_base(
332 root_uri: &str,
333 source_branch: Option<&str>,
334 target_branch: Option<&str>,
335 ) -> Result<Option<GraphCommit>> {
336 let source = open_for_branch(root_uri, source_branch).await?;
337 let target = open_for_branch(root_uri, target_branch).await?;
338
339 let source_head = match source.head_commit().await? {
340 Some(commit) => commit,
341 None => return Ok(None),
342 };
343 let target_head = match target.head_commit().await? {
344 Some(commit) => commit,
345 None => return Ok(None),
346 };
347
348 let mut commits = HashMap::new();
349 for commit in source.load_commits().await? {
350 commits.insert(commit.graph_commit_id.clone(), commit);
351 }
352 for commit in target.load_commits().await? {
353 commits.insert(commit.graph_commit_id.clone(), commit);
354 }
355
356 let source_distances = ancestor_distances(&source_head.graph_commit_id, &commits);
357 let target_distances = ancestor_distances(&target_head.graph_commit_id, &commits);
358
359 let best = source_distances
360 .iter()
361 .filter_map(|(id, source_distance)| {
362 target_distances.get(id).and_then(|target_distance| {
363 commits.get(id).map(|commit| {
364 (
365 (
366 *source_distance + *target_distance,
367 u64::MAX - commit.manifest_version,
368 ),
369 commit.clone(),
370 )
371 })
372 })
373 })
374 .min_by_key(|(score, _)| *score)
375 .map(|(_, commit)| commit);
376
377 Ok(best)
378 }
379}
380
381pub(crate) fn graph_commits_uri(root_uri: &str) -> String {
382 format!("{}/{}", root_uri.trim_end_matches('/'), GRAPH_COMMITS_DIR)
383}
384
385fn graph_commit_actors_uri(root_uri: &str) -> String {
386 format!(
387 "{}/{}",
388 root_uri.trim_end_matches('/'),
389 GRAPH_COMMIT_ACTORS_DIR
390 )
391}
392
393fn commit_graph_schema() -> SchemaRef {
394 Arc::new(Schema::new(vec![
395 Field::new("graph_commit_id", DataType::Utf8, false),
396 Field::new("manifest_branch", DataType::Utf8, true),
397 Field::new("manifest_version", DataType::UInt64, false),
398 Field::new("parent_commit_id", DataType::Utf8, true),
399 Field::new("merged_parent_commit_id", DataType::Utf8, true),
400 Field::new(
401 "created_at",
402 DataType::Timestamp(TimeUnit::Microsecond, None),
403 false,
404 ),
405 ]))
406}
407
408fn commit_actor_schema() -> SchemaRef {
409 Arc::new(Schema::new(vec![
410 Field::new("graph_commit_id", DataType::Utf8, false),
411 Field::new("actor_id", DataType::Utf8, false),
412 Field::new(
413 "created_at",
414 DataType::Timestamp(TimeUnit::Microsecond, None),
415 false,
416 ),
417 ]))
418}
419
420#[derive(Debug, Clone)]
421struct CommitActorRecord {
422 graph_commit_id: String,
423 actor_id: String,
424 created_at: i64,
425}
426
427async fn create_commit_actor_dataset(root_uri: &str) -> Result<Dataset> {
428 let uri = graph_commit_actors_uri(root_uri);
429 let batch = RecordBatch::new_empty(commit_actor_schema());
430 let reader = RecordBatchIterator::new(vec![Ok(batch)], commit_actor_schema());
431 let params = WriteParams {
432 mode: WriteMode::Create,
433 enable_stable_row_ids: true,
434 data_storage_version: Some(LanceFileVersion::V2_2),
435 auto_cleanup: None,
436 skip_auto_cleanup: true,
437 ..Default::default()
438 };
439 match Dataset::write(reader, &uri as &str, Some(params)).await {
440 Ok(dataset) => Ok(dataset),
441 Err(err) if err.to_string().contains("Dataset already exists") => Dataset::open(&uri)
442 .await
443 .map_err(|open_err| OmniError::Lance(open_err.to_string())),
444 Err(err) => Err(OmniError::Lance(err.to_string())),
445 }
446}
447
448fn commits_to_batch(commits: &[GraphCommit]) -> Result<RecordBatch> {
449 let ids: Vec<&str> = commits.iter().map(|c| c.graph_commit_id.as_str()).collect();
450 let branches: Vec<Option<&str>> = commits
451 .iter()
452 .map(|c| c.manifest_branch.as_deref())
453 .collect();
454 let versions: Vec<u64> = commits.iter().map(|c| c.manifest_version).collect();
455 let parents: Vec<Option<&str>> = commits
456 .iter()
457 .map(|c| c.parent_commit_id.as_deref())
458 .collect();
459 let merged_parents: Vec<Option<&str>> = commits
460 .iter()
461 .map(|c| c.merged_parent_commit_id.as_deref())
462 .collect();
463 let created_at: Vec<i64> = commits.iter().map(|c| c.created_at).collect();
464
465 RecordBatch::try_new(
466 commit_graph_schema(),
467 vec![
468 Arc::new(StringArray::from(ids)),
469 Arc::new(StringArray::from(branches)),
470 Arc::new(UInt64Array::from(versions)),
471 Arc::new(StringArray::from(parents)),
472 Arc::new(StringArray::from(merged_parents)),
473 Arc::new(TimestampMicrosecondArray::from(created_at)),
474 ],
475 )
476 .map_err(|e| OmniError::Lance(e.to_string()))
477}
478
479async fn load_commit_cache(
480 dataset: &Dataset,
481 actor_by_commit_id: &HashMap<String, String>,
482) -> Result<(HashMap<String, GraphCommit>, Option<GraphCommit>)> {
483 let batches: Vec<RecordBatch> = dataset
484 .scan()
485 .try_into_stream()
486 .await
487 .map_err(|e| OmniError::Lance(e.to_string()))?
488 .try_collect()
489 .await
490 .map_err(|e| OmniError::Lance(e.to_string()))?;
491
492 let mut commits = load_commits_from_batches(&batches)?;
493 for commit in &mut commits {
494 commit.actor_id = actor_by_commit_id
495 .get(commit.graph_commit_id.as_str())
496 .cloned();
497 }
498 let mut commit_by_id = HashMap::with_capacity(commits.len());
499 let mut head_commit = None;
500 for commit in commits {
501 if should_replace_head(head_commit.as_ref(), &commit) {
502 head_commit = Some(commit.clone());
503 }
504 commit_by_id.insert(commit.graph_commit_id.clone(), commit);
505 }
506 Ok((commit_by_id, head_commit))
507}
508
509async fn load_commit_actor_cache(dataset: &Dataset) -> Result<HashMap<String, String>> {
510 let batches: Vec<RecordBatch> = dataset
511 .scan()
512 .try_into_stream()
513 .await
514 .map_err(|e| OmniError::Lance(e.to_string()))?
515 .try_collect()
516 .await
517 .map_err(|e| OmniError::Lance(e.to_string()))?;
518
519 let mut actors = HashMap::new();
520 for batch in batches {
521 let commit_ids = string_column(&batch, "graph_commit_id", "commit actor registry")?;
522 let actor_ids = string_column(&batch, "actor_id", "commit actor registry")?;
523 for row in 0..batch.num_rows() {
524 actors.insert(
525 commit_ids.value(row).to_string(),
526 actor_ids.value(row).to_string(),
527 );
528 }
529 }
530 Ok(actors)
531}
532
533fn load_commits_from_batches(batches: &[RecordBatch]) -> Result<Vec<GraphCommit>> {
534 let mut commits = Vec::new();
535 for batch in batches {
536 let ids = string_column(batch, "graph_commit_id", "commit graph")?;
537 let branches = string_column(batch, "manifest_branch", "commit graph")?;
538 let versions = u64_column(batch, "manifest_version", "commit graph")?;
539 let parents = string_column(batch, "parent_commit_id", "commit graph")?;
540 let merged_parents = string_column(batch, "merged_parent_commit_id", "commit graph")?;
541 let created = timestamp_micros_column(batch, "created_at", "commit graph")?;
542
543 for row in 0..batch.num_rows() {
544 commits.push(GraphCommit {
545 graph_commit_id: ids.value(row).to_string(),
546 manifest_branch: if branches.is_null(row) {
547 None
548 } else {
549 Some(branches.value(row).to_string())
550 },
551 manifest_version: versions.value(row),
552 parent_commit_id: if parents.is_null(row) {
553 None
554 } else {
555 Some(parents.value(row).to_string())
556 },
557 merged_parent_commit_id: if merged_parents.is_null(row) {
558 None
559 } else {
560 Some(merged_parents.value(row).to_string())
561 },
562 actor_id: None,
563 created_at: created.value(row),
564 });
565 }
566 }
567 Ok(commits)
568}
569
570fn commit_actors_to_batch(records: &[CommitActorRecord]) -> Result<RecordBatch> {
571 let commit_ids: Vec<&str> = records
572 .iter()
573 .map(|record| record.graph_commit_id.as_str())
574 .collect();
575 let actor_ids: Vec<&str> = records
576 .iter()
577 .map(|record| record.actor_id.as_str())
578 .collect();
579 let created_at: Vec<i64> = records.iter().map(|record| record.created_at).collect();
580
581 RecordBatch::try_new(
582 commit_actor_schema(),
583 vec![
584 Arc::new(StringArray::from(commit_ids)),
585 Arc::new(StringArray::from(actor_ids)),
586 Arc::new(TimestampMicrosecondArray::from(created_at)),
587 ],
588 )
589 .map_err(|e| OmniError::Lance(e.to_string()))
590}
591
592fn should_replace_head(current: Option<&GraphCommit>, candidate: &GraphCommit) -> bool {
593 current.is_none_or(|existing| {
594 candidate
595 .manifest_version
596 .cmp(&existing.manifest_version)
597 .then_with(|| candidate.created_at.cmp(&existing.created_at))
598 .then_with(|| candidate.graph_commit_id.cmp(&existing.graph_commit_id))
599 .is_gt()
600 })
601}
602
603fn string_column<'a>(batch: &'a RecordBatch, name: &str, context: &str) -> Result<&'a StringArray> {
604 batch
605 .column_by_name(name)
606 .ok_or_else(|| {
607 OmniError::manifest_internal(format!("{context} batch missing '{name}' column"))
608 })?
609 .as_any()
610 .downcast_ref::<StringArray>()
611 .ok_or_else(|| {
612 OmniError::manifest_internal(format!("{context} column '{name}' is not Utf8"))
613 })
614}
615
616fn u64_column<'a>(batch: &'a RecordBatch, name: &str, context: &str) -> Result<&'a UInt64Array> {
617 batch
618 .column_by_name(name)
619 .ok_or_else(|| {
620 OmniError::manifest_internal(format!("{context} batch missing '{name}' column"))
621 })?
622 .as_any()
623 .downcast_ref::<UInt64Array>()
624 .ok_or_else(|| {
625 OmniError::manifest_internal(format!("{context} column '{name}' is not UInt64"))
626 })
627}
628
629fn timestamp_micros_column<'a>(
630 batch: &'a RecordBatch,
631 name: &str,
632 context: &str,
633) -> Result<&'a TimestampMicrosecondArray> {
634 batch
635 .column_by_name(name)
636 .ok_or_else(|| {
637 OmniError::manifest_internal(format!("{context} batch missing '{name}' column"))
638 })?
639 .as_any()
640 .downcast_ref::<TimestampMicrosecondArray>()
641 .ok_or_else(|| {
642 OmniError::manifest_internal(format!(
643 "{context} column '{name}' is not Timestamp(Microsecond)"
644 ))
645 })
646}
647
648fn ancestor_distances(
649 start_id: &str,
650 commits: &HashMap<String, GraphCommit>,
651) -> HashMap<String, u64> {
652 let mut distances = HashMap::new();
653 let mut queue = VecDeque::from([(start_id.to_string(), 0u64)]);
654
655 while let Some((id, distance)) = queue.pop_front() {
656 if let Some(existing) = distances.get(&id) {
657 if *existing <= distance {
658 continue;
659 }
660 }
661 distances.insert(id.clone(), distance);
662
663 if let Some(commit) = commits.get(&id) {
664 if let Some(parent) = &commit.parent_commit_id {
665 queue.push_back((parent.clone(), distance + 1));
666 }
667 if let Some(parent) = &commit.merged_parent_commit_id {
668 queue.push_back((parent.clone(), distance + 1));
669 }
670 }
671 }
672
673 distances
674}
675
676async fn open_for_branch(root_uri: &str, branch: Option<&str>) -> Result<CommitGraph> {
677 match branch {
678 Some(branch) if branch != "main" => CommitGraph::open_at_branch(root_uri, branch).await,
679 _ => CommitGraph::open(root_uri).await,
680 }
681}
682
683fn now_micros() -> Result<i64> {
684 let duration = SystemTime::now()
685 .duration_since(UNIX_EPOCH)
686 .map_err(|e| OmniError::manifest(format!("system clock before UNIX_EPOCH: {}", e)))?;
687 Ok(duration.as_micros() as i64)
688}
689
690#[cfg(test)]
691mod tests {
692 use std::sync::Arc;
693
694 use arrow_schema::{DataType, Field, Schema};
695
696 use super::*;
697
698 #[test]
699 fn load_commits_from_batches_returns_error_for_bad_schema() {
700 let batch = RecordBatch::try_new(
701 Arc::new(Schema::new(vec![
702 Field::new("graph_commit_id", DataType::UInt64, false),
703 Field::new("manifest_branch", DataType::Utf8, true),
704 Field::new("manifest_version", DataType::UInt64, false),
705 Field::new("parent_commit_id", DataType::Utf8, true),
706 Field::new("merged_parent_commit_id", DataType::Utf8, true),
707 Field::new(
708 "created_at",
709 DataType::Timestamp(TimeUnit::Microsecond, None),
710 false,
711 ),
712 ])),
713 vec![
714 Arc::new(UInt64Array::from(vec![1_u64])),
715 Arc::new(StringArray::from(vec![None::<&str>])),
716 Arc::new(UInt64Array::from(vec![1_u64])),
717 Arc::new(StringArray::from(vec![None::<&str>])),
718 Arc::new(StringArray::from(vec![None::<&str>])),
719 Arc::new(TimestampMicrosecondArray::from(vec![1_i64])),
720 ],
721 )
722 .unwrap();
723
724 let err = load_commits_from_batches(&[batch]).unwrap_err();
725 assert!(err.to_string().contains("graph_commit_id"));
726 }
727}