1use std::collections::{HashMap, VecDeque};
2use std::sync::Arc;
3use std::time::{SystemTime, UNIX_EPOCH};
4
5use arrow_array::{
6 Array, RecordBatch, RecordBatchIterator, StringArray, TimestampMicrosecondArray, UInt64Array,
7};
8use arrow_schema::{DataType, Field, Schema, SchemaRef, TimeUnit};
9use futures::TryStreamExt;
10use lance::Dataset;
11use lance::dataset::{WriteMode, WriteParams};
12use lance_file::version::LanceFileVersion;
13
14use crate::error::{OmniError, Result};
15
16const GRAPH_COMMITS_DIR: &str = "_graph_commits.lance";
17const GRAPH_COMMIT_ACTORS_DIR: &str = "_graph_commit_actors.lance";
18
19#[derive(Debug, Clone)]
20pub struct GraphCommit {
21 pub graph_commit_id: String,
22 pub manifest_branch: Option<String>,
23 pub manifest_version: u64,
24 pub parent_commit_id: Option<String>,
25 pub merged_parent_commit_id: Option<String>,
26 pub actor_id: Option<String>,
27 pub created_at: i64,
28}
29
30pub struct CommitGraph {
31 root_uri: String,
32 dataset: Dataset,
33 actor_dataset: Option<Dataset>,
34 active_branch: Option<String>,
35 actor_by_commit_id: HashMap<String, String>,
36 commit_by_id: HashMap<String, GraphCommit>,
37 head_commit: Option<GraphCommit>,
38}
39
40impl CommitGraph {
41 pub async fn init(root_uri: &str, manifest_version: u64) -> Result<Self> {
42 let root = root_uri.trim_end_matches('/');
43 let uri = graph_commits_uri(root);
44 let genesis = GraphCommit {
45 graph_commit_id: ulid::Ulid::new().to_string(),
46 manifest_branch: None,
47 manifest_version,
48 parent_commit_id: None,
49 merged_parent_commit_id: None,
50 actor_id: None,
51 created_at: now_micros()?,
52 };
53
54 let batch = commits_to_batch(&[genesis.clone()])?;
55 let reader = RecordBatchIterator::new(vec![Ok(batch)], commit_graph_schema());
56 let params = WriteParams {
57 mode: WriteMode::Create,
58 enable_stable_row_ids: true,
59 data_storage_version: Some(LanceFileVersion::V2_2),
60 auto_cleanup: None,
61 skip_auto_cleanup: true,
62 ..Default::default()
63 };
64 let dataset = Dataset::write(reader, &uri as &str, Some(params))
65 .await
66 .map_err(|e| OmniError::Lance(e.to_string()))?;
67 let actor_dataset = create_commit_actor_dataset(root).await?;
68
69 Ok(Self {
70 root_uri: root.to_string(),
71 dataset,
72 actor_dataset: Some(actor_dataset),
73 active_branch: None,
74 actor_by_commit_id: HashMap::new(),
75 commit_by_id: HashMap::from([(genesis.graph_commit_id.clone(), genesis.clone())]),
76 head_commit: Some(genesis),
77 })
78 }
79
80 pub async fn open(root_uri: &str) -> Result<Self> {
81 let root = root_uri.trim_end_matches('/');
82 let wrapper = crate::instrumentation::commit_graph_wrapper();
83 let dataset =
84 crate::instrumentation::open_dataset_tracked(&graph_commits_uri(root), wrapper.clone())
85 .await?;
86 let actor_dataset =
87 crate::instrumentation::open_dataset_tracked(&graph_commit_actors_uri(root), wrapper)
88 .await
89 .ok();
90 let actor_by_commit_id = match &actor_dataset {
91 Some(dataset) => load_commit_actor_cache(dataset).await?,
92 None => HashMap::new(),
93 };
94 let (commit_by_id, head_commit) = load_commit_cache(&dataset, &actor_by_commit_id).await?;
95 Ok(Self {
96 root_uri: root.to_string(),
97 dataset,
98 actor_dataset,
99 active_branch: None,
100 actor_by_commit_id,
101 commit_by_id,
102 head_commit,
103 })
104 }
105
106 pub async fn open_at_branch(root_uri: &str, branch: &str) -> Result<Self> {
107 let root = root_uri.trim_end_matches('/');
108 let wrapper = crate::instrumentation::commit_graph_wrapper();
109 let dataset =
110 crate::instrumentation::open_dataset_tracked(&graph_commits_uri(root), wrapper.clone())
111 .await?;
112 let dataset = dataset
113 .checkout_branch(branch)
114 .await
115 .map_err(|e| OmniError::Lance(e.to_string()))?;
116 let actor_dataset =
117 crate::instrumentation::open_dataset_tracked(&graph_commit_actors_uri(root), wrapper)
118 .await
119 .ok();
120 let actor_by_commit_id = match &actor_dataset {
121 Some(dataset) => load_commit_actor_cache(dataset).await?,
122 None => HashMap::new(),
123 };
124 let (commit_by_id, head_commit) = load_commit_cache(&dataset, &actor_by_commit_id).await?;
125 Ok(Self {
126 root_uri: root.to_string(),
127 dataset,
128 actor_dataset,
129 active_branch: Some(branch.to_string()),
130 actor_by_commit_id,
131 commit_by_id,
132 head_commit,
133 })
134 }
135
136 pub async fn refresh(&mut self) -> Result<()> {
137 let root = self.root_uri.clone();
138 let wrapper = crate::instrumentation::commit_graph_wrapper();
139 self.dataset = crate::instrumentation::open_dataset_tracked(
140 &graph_commits_uri(&root),
141 wrapper.clone(),
142 )
143 .await?;
144 if let Some(branch) = &self.active_branch {
145 self.dataset = self
146 .dataset
147 .checkout_branch(branch)
148 .await
149 .map_err(|e| OmniError::Lance(e.to_string()))?;
150 }
151 self.actor_dataset =
152 crate::instrumentation::open_dataset_tracked(&graph_commit_actors_uri(&root), wrapper)
153 .await
154 .ok();
155 self.actor_by_commit_id = match &self.actor_dataset {
156 Some(dataset) => load_commit_actor_cache(dataset).await?,
157 None => HashMap::new(),
158 };
159 let (commit_by_id, head_commit) =
160 load_commit_cache(&self.dataset, &self.actor_by_commit_id).await?;
161 self.commit_by_id = commit_by_id;
162 self.head_commit = head_commit;
163 Ok(())
164 }
165
166 pub fn version(&self) -> u64 {
167 self.dataset.version().version
168 }
169
170 pub async fn create_branch(&mut self, name: &str) -> Result<()> {
171 let mut ds = self.dataset.clone();
172 ds.create_branch(name, self.version(), None)
173 .await
174 .map_err(|e| OmniError::Lance(e.to_string()))?;
175 Ok(())
176 }
177
178 pub async fn delete_branch(&mut self, name: &str) -> Result<()> {
179 let mut ds = Dataset::open(&graph_commits_uri(&self.root_uri))
180 .await
181 .map_err(|e| OmniError::Lance(e.to_string()))?;
182 ds.delete_branch(name)
183 .await
184 .map_err(|e| OmniError::Lance(e.to_string()))?;
185 self.refresh().await
186 }
187
188 pub async fn force_delete_branch(&mut self, name: &str) -> Result<()> {
194 let mut ds = Dataset::open(&graph_commits_uri(&self.root_uri))
195 .await
196 .map_err(|e| OmniError::Lance(e.to_string()))?;
197 match ds.force_delete_branch(name).await {
198 Ok(()) => {}
199 Err(lance::Error::RefNotFound { .. }) | Err(lance::Error::NotFound { .. }) => {}
200 Err(e) => return Err(OmniError::Lance(e.to_string())),
201 }
202 self.refresh().await
203 }
204
205 pub async fn list_branches(&self) -> Result<Vec<String>> {
209 let ds = Dataset::open(&graph_commits_uri(&self.root_uri))
210 .await
211 .map_err(|e| OmniError::Lance(e.to_string()))?;
212 let branches = ds
213 .list_branches()
214 .await
215 .map_err(|e| OmniError::Lance(e.to_string()))?;
216 Ok(branches.into_keys().collect())
217 }
218
219 pub async fn append_commit(
220 &mut self,
221 manifest_branch: Option<&str>,
222 manifest_version: u64,
223 actor_id: Option<&str>,
224 ) -> Result<String> {
225 let parent_commit_id = self.head_commit_id().await?;
226 self.append_commit_with_parents(
227 manifest_branch,
228 manifest_version,
229 parent_commit_id.as_deref(),
230 None,
231 actor_id,
232 )
233 .await
234 }
235
236 pub async fn append_merge_commit(
237 &mut self,
238 manifest_branch: Option<&str>,
239 manifest_version: u64,
240 parent_commit_id: &str,
241 merged_parent_commit_id: &str,
242 actor_id: Option<&str>,
243 ) -> Result<String> {
244 self.append_commit_with_parents(
245 manifest_branch,
246 manifest_version,
247 Some(parent_commit_id),
248 Some(merged_parent_commit_id),
249 actor_id,
250 )
251 .await
252 }
253
254 async fn append_commit_with_parents(
255 &mut self,
256 manifest_branch: Option<&str>,
257 manifest_version: u64,
258 parent_commit_id: Option<&str>,
259 merged_parent_commit_id: Option<&str>,
260 actor_id: Option<&str>,
261 ) -> Result<String> {
262 let graph_commit_id = ulid::Ulid::new().to_string();
263 let commit = GraphCommit {
264 graph_commit_id: graph_commit_id.clone(),
265 manifest_branch: manifest_branch.map(|s| s.to_string()),
266 manifest_version,
267 parent_commit_id: parent_commit_id.map(|s| s.to_string()),
268 merged_parent_commit_id: merged_parent_commit_id.map(|s| s.to_string()),
269 actor_id: actor_id.map(str::to_string),
270 created_at: now_micros()?,
271 };
272
273 let batch = commits_to_batch(&[commit.clone()])?;
274 let reader = RecordBatchIterator::new(vec![Ok(batch)], commit_graph_schema());
275 let mut ds = self.dataset.clone();
276 ds.append(reader, None)
277 .await
278 .map_err(|e| OmniError::Lance(e.to_string()))?;
279 self.dataset = ds;
280 if let Some(actor_id) = actor_id {
281 self.append_actor(&graph_commit_id, actor_id).await?;
282 }
283 self.commit_by_id
284 .insert(graph_commit_id.clone(), commit.clone());
285 if should_replace_head(self.head_commit.as_ref(), &commit) {
286 self.head_commit = Some(commit);
287 }
288
289 Ok(graph_commit_id)
290 }
291
292 async fn append_actor(&mut self, graph_commit_id: &str, actor_id: &str) -> Result<()> {
293 if self
294 .actor_by_commit_id
295 .get(graph_commit_id)
296 .is_some_and(|existing| existing == actor_id)
297 {
298 return Ok(());
299 }
300
301 let record = CommitActorRecord {
302 graph_commit_id: graph_commit_id.to_string(),
303 actor_id: actor_id.to_string(),
304 created_at: now_micros()?,
305 };
306 let batch = commit_actors_to_batch(&[record])?;
307 let reader = RecordBatchIterator::new(vec![Ok(batch)], commit_actor_schema());
308 let mut dataset = match self.actor_dataset.take() {
309 Some(dataset) => dataset,
310 None => create_commit_actor_dataset(&self.root_uri).await?,
311 };
312 dataset
313 .append(reader, None)
314 .await
315 .map_err(|e| OmniError::Lance(e.to_string()))?;
316 self.actor_by_commit_id
317 .insert(graph_commit_id.to_string(), actor_id.to_string());
318 self.actor_dataset = Some(dataset);
319 Ok(())
320 }
321
322 pub async fn head_commit(&self) -> Result<Option<GraphCommit>> {
323 Ok(self.head_commit.clone())
324 }
325
326 pub async fn head_commit_id(&self) -> Result<Option<String>> {
327 Ok(self.head_commit().await?.map(|c| c.graph_commit_id))
328 }
329
330 pub async fn load_commits(&self) -> Result<Vec<GraphCommit>> {
331 let mut commits = self.commit_by_id.values().cloned().collect::<Vec<_>>();
332 commits.sort_by(|a, b| {
333 a.manifest_version
334 .cmp(&b.manifest_version)
335 .then_with(|| a.created_at.cmp(&b.created_at))
336 .then_with(|| a.graph_commit_id.cmp(&b.graph_commit_id))
337 });
338 Ok(commits)
339 }
340
341 pub fn get_commit(&self, commit_id: &str) -> Option<GraphCommit> {
342 self.commit_by_id.get(commit_id).cloned()
343 }
344
345 pub async fn merge_base(
346 root_uri: &str,
347 source_branch: Option<&str>,
348 target_branch: Option<&str>,
349 ) -> Result<Option<GraphCommit>> {
350 let source = open_for_branch(root_uri, source_branch).await?;
351 let target = open_for_branch(root_uri, target_branch).await?;
352
353 let source_head = match source.head_commit().await? {
354 Some(commit) => commit,
355 None => return Ok(None),
356 };
357 let target_head = match target.head_commit().await? {
358 Some(commit) => commit,
359 None => return Ok(None),
360 };
361
362 let mut commits = HashMap::new();
363 for commit in source.load_commits().await? {
364 commits.insert(commit.graph_commit_id.clone(), commit);
365 }
366 for commit in target.load_commits().await? {
367 commits.insert(commit.graph_commit_id.clone(), commit);
368 }
369
370 let source_distances = ancestor_distances(&source_head.graph_commit_id, &commits);
371 let target_distances = ancestor_distances(&target_head.graph_commit_id, &commits);
372
373 let best = source_distances
374 .iter()
375 .filter_map(|(id, source_distance)| {
376 target_distances.get(id).and_then(|target_distance| {
377 commits.get(id).map(|commit| {
378 (
379 (
380 *source_distance + *target_distance,
381 u64::MAX - commit.manifest_version,
382 ),
383 commit.clone(),
384 )
385 })
386 })
387 })
388 .min_by_key(|(score, _)| *score)
389 .map(|(_, commit)| commit);
390
391 Ok(best)
392 }
393}
394
395pub(crate) fn graph_commits_uri(root_uri: &str) -> String {
396 format!("{}/{}", root_uri.trim_end_matches('/'), GRAPH_COMMITS_DIR)
397}
398
399fn graph_commit_actors_uri(root_uri: &str) -> String {
400 format!(
401 "{}/{}",
402 root_uri.trim_end_matches('/'),
403 GRAPH_COMMIT_ACTORS_DIR
404 )
405}
406
407fn commit_graph_schema() -> SchemaRef {
408 Arc::new(Schema::new(vec![
409 Field::new("graph_commit_id", DataType::Utf8, false),
410 Field::new("manifest_branch", DataType::Utf8, true),
411 Field::new("manifest_version", DataType::UInt64, false),
412 Field::new("parent_commit_id", DataType::Utf8, true),
413 Field::new("merged_parent_commit_id", DataType::Utf8, true),
414 Field::new(
415 "created_at",
416 DataType::Timestamp(TimeUnit::Microsecond, None),
417 false,
418 ),
419 ]))
420}
421
422fn commit_actor_schema() -> SchemaRef {
423 Arc::new(Schema::new(vec![
424 Field::new("graph_commit_id", DataType::Utf8, false),
425 Field::new("actor_id", DataType::Utf8, false),
426 Field::new(
427 "created_at",
428 DataType::Timestamp(TimeUnit::Microsecond, None),
429 false,
430 ),
431 ]))
432}
433
434#[derive(Debug, Clone)]
435struct CommitActorRecord {
436 graph_commit_id: String,
437 actor_id: String,
438 created_at: i64,
439}
440
441async fn create_commit_actor_dataset(root_uri: &str) -> Result<Dataset> {
442 let uri = graph_commit_actors_uri(root_uri);
443 let batch = RecordBatch::new_empty(commit_actor_schema());
444 let reader = RecordBatchIterator::new(vec![Ok(batch)], commit_actor_schema());
445 let params = WriteParams {
446 mode: WriteMode::Create,
447 enable_stable_row_ids: true,
448 data_storage_version: Some(LanceFileVersion::V2_2),
449 auto_cleanup: None,
450 skip_auto_cleanup: true,
451 ..Default::default()
452 };
453 match Dataset::write(reader, &uri as &str, Some(params)).await {
454 Ok(dataset) => Ok(dataset),
455 Err(err) if err.to_string().contains("Dataset already exists") => Dataset::open(&uri)
456 .await
457 .map_err(|open_err| OmniError::Lance(open_err.to_string())),
458 Err(err) => Err(OmniError::Lance(err.to_string())),
459 }
460}
461
462fn commits_to_batch(commits: &[GraphCommit]) -> Result<RecordBatch> {
463 let ids: Vec<&str> = commits.iter().map(|c| c.graph_commit_id.as_str()).collect();
464 let branches: Vec<Option<&str>> = commits
465 .iter()
466 .map(|c| c.manifest_branch.as_deref())
467 .collect();
468 let versions: Vec<u64> = commits.iter().map(|c| c.manifest_version).collect();
469 let parents: Vec<Option<&str>> = commits
470 .iter()
471 .map(|c| c.parent_commit_id.as_deref())
472 .collect();
473 let merged_parents: Vec<Option<&str>> = commits
474 .iter()
475 .map(|c| c.merged_parent_commit_id.as_deref())
476 .collect();
477 let created_at: Vec<i64> = commits.iter().map(|c| c.created_at).collect();
478
479 RecordBatch::try_new(
480 commit_graph_schema(),
481 vec![
482 Arc::new(StringArray::from(ids)),
483 Arc::new(StringArray::from(branches)),
484 Arc::new(UInt64Array::from(versions)),
485 Arc::new(StringArray::from(parents)),
486 Arc::new(StringArray::from(merged_parents)),
487 Arc::new(TimestampMicrosecondArray::from(created_at)),
488 ],
489 )
490 .map_err(|e| OmniError::Lance(e.to_string()))
491}
492
493async fn load_commit_cache(
494 dataset: &Dataset,
495 actor_by_commit_id: &HashMap<String, String>,
496) -> Result<(HashMap<String, GraphCommit>, Option<GraphCommit>)> {
497 let batches: Vec<RecordBatch> = dataset
498 .scan()
499 .try_into_stream()
500 .await
501 .map_err(|e| OmniError::Lance(e.to_string()))?
502 .try_collect()
503 .await
504 .map_err(|e| OmniError::Lance(e.to_string()))?;
505
506 let mut commits = load_commits_from_batches(&batches)?;
507 for commit in &mut commits {
508 commit.actor_id = actor_by_commit_id
509 .get(commit.graph_commit_id.as_str())
510 .cloned();
511 }
512 let mut commit_by_id = HashMap::with_capacity(commits.len());
513 let mut head_commit = None;
514 for commit in commits {
515 if should_replace_head(head_commit.as_ref(), &commit) {
516 head_commit = Some(commit.clone());
517 }
518 commit_by_id.insert(commit.graph_commit_id.clone(), commit);
519 }
520 Ok((commit_by_id, head_commit))
521}
522
523async fn load_commit_actor_cache(dataset: &Dataset) -> Result<HashMap<String, String>> {
524 let batches: Vec<RecordBatch> = dataset
525 .scan()
526 .try_into_stream()
527 .await
528 .map_err(|e| OmniError::Lance(e.to_string()))?
529 .try_collect()
530 .await
531 .map_err(|e| OmniError::Lance(e.to_string()))?;
532
533 let mut actors = HashMap::new();
534 for batch in batches {
535 let commit_ids = string_column(&batch, "graph_commit_id", "commit actor registry")?;
536 let actor_ids = string_column(&batch, "actor_id", "commit actor registry")?;
537 for row in 0..batch.num_rows() {
538 actors.insert(
539 commit_ids.value(row).to_string(),
540 actor_ids.value(row).to_string(),
541 );
542 }
543 }
544 Ok(actors)
545}
546
547fn load_commits_from_batches(batches: &[RecordBatch]) -> Result<Vec<GraphCommit>> {
548 let mut commits = Vec::new();
549 for batch in batches {
550 let ids = string_column(batch, "graph_commit_id", "commit graph")?;
551 let branches = string_column(batch, "manifest_branch", "commit graph")?;
552 let versions = u64_column(batch, "manifest_version", "commit graph")?;
553 let parents = string_column(batch, "parent_commit_id", "commit graph")?;
554 let merged_parents = string_column(batch, "merged_parent_commit_id", "commit graph")?;
555 let created = timestamp_micros_column(batch, "created_at", "commit graph")?;
556
557 for row in 0..batch.num_rows() {
558 commits.push(GraphCommit {
559 graph_commit_id: ids.value(row).to_string(),
560 manifest_branch: if branches.is_null(row) {
561 None
562 } else {
563 Some(branches.value(row).to_string())
564 },
565 manifest_version: versions.value(row),
566 parent_commit_id: if parents.is_null(row) {
567 None
568 } else {
569 Some(parents.value(row).to_string())
570 },
571 merged_parent_commit_id: if merged_parents.is_null(row) {
572 None
573 } else {
574 Some(merged_parents.value(row).to_string())
575 },
576 actor_id: None,
577 created_at: created.value(row),
578 });
579 }
580 }
581 Ok(commits)
582}
583
584fn commit_actors_to_batch(records: &[CommitActorRecord]) -> Result<RecordBatch> {
585 let commit_ids: Vec<&str> = records
586 .iter()
587 .map(|record| record.graph_commit_id.as_str())
588 .collect();
589 let actor_ids: Vec<&str> = records
590 .iter()
591 .map(|record| record.actor_id.as_str())
592 .collect();
593 let created_at: Vec<i64> = records.iter().map(|record| record.created_at).collect();
594
595 RecordBatch::try_new(
596 commit_actor_schema(),
597 vec![
598 Arc::new(StringArray::from(commit_ids)),
599 Arc::new(StringArray::from(actor_ids)),
600 Arc::new(TimestampMicrosecondArray::from(created_at)),
601 ],
602 )
603 .map_err(|e| OmniError::Lance(e.to_string()))
604}
605
606fn should_replace_head(current: Option<&GraphCommit>, candidate: &GraphCommit) -> bool {
607 current.is_none_or(|existing| {
608 candidate
609 .manifest_version
610 .cmp(&existing.manifest_version)
611 .then_with(|| candidate.created_at.cmp(&existing.created_at))
612 .then_with(|| candidate.graph_commit_id.cmp(&existing.graph_commit_id))
613 .is_gt()
614 })
615}
616
617fn string_column<'a>(batch: &'a RecordBatch, name: &str, context: &str) -> Result<&'a StringArray> {
618 batch
619 .column_by_name(name)
620 .ok_or_else(|| {
621 OmniError::manifest_internal(format!("{context} batch missing '{name}' column"))
622 })?
623 .as_any()
624 .downcast_ref::<StringArray>()
625 .ok_or_else(|| {
626 OmniError::manifest_internal(format!("{context} column '{name}' is not Utf8"))
627 })
628}
629
630fn u64_column<'a>(batch: &'a RecordBatch, name: &str, context: &str) -> Result<&'a UInt64Array> {
631 batch
632 .column_by_name(name)
633 .ok_or_else(|| {
634 OmniError::manifest_internal(format!("{context} batch missing '{name}' column"))
635 })?
636 .as_any()
637 .downcast_ref::<UInt64Array>()
638 .ok_or_else(|| {
639 OmniError::manifest_internal(format!("{context} column '{name}' is not UInt64"))
640 })
641}
642
643fn timestamp_micros_column<'a>(
644 batch: &'a RecordBatch,
645 name: &str,
646 context: &str,
647) -> Result<&'a TimestampMicrosecondArray> {
648 batch
649 .column_by_name(name)
650 .ok_or_else(|| {
651 OmniError::manifest_internal(format!("{context} batch missing '{name}' column"))
652 })?
653 .as_any()
654 .downcast_ref::<TimestampMicrosecondArray>()
655 .ok_or_else(|| {
656 OmniError::manifest_internal(format!(
657 "{context} column '{name}' is not Timestamp(Microsecond)"
658 ))
659 })
660}
661
662fn ancestor_distances(
663 start_id: &str,
664 commits: &HashMap<String, GraphCommit>,
665) -> HashMap<String, u64> {
666 let mut distances = HashMap::new();
667 let mut queue = VecDeque::from([(start_id.to_string(), 0u64)]);
668
669 while let Some((id, distance)) = queue.pop_front() {
670 if let Some(existing) = distances.get(&id) {
671 if *existing <= distance {
672 continue;
673 }
674 }
675 distances.insert(id.clone(), distance);
676
677 if let Some(commit) = commits.get(&id) {
678 if let Some(parent) = &commit.parent_commit_id {
679 queue.push_back((parent.clone(), distance + 1));
680 }
681 if let Some(parent) = &commit.merged_parent_commit_id {
682 queue.push_back((parent.clone(), distance + 1));
683 }
684 }
685 }
686
687 distances
688}
689
690async fn open_for_branch(root_uri: &str, branch: Option<&str>) -> Result<CommitGraph> {
691 match branch {
692 Some(branch) if branch != "main" => CommitGraph::open_at_branch(root_uri, branch).await,
693 _ => CommitGraph::open(root_uri).await,
694 }
695}
696
697fn now_micros() -> Result<i64> {
698 let duration = SystemTime::now()
699 .duration_since(UNIX_EPOCH)
700 .map_err(|e| OmniError::manifest(format!("system clock before UNIX_EPOCH: {}", e)))?;
701 Ok(duration.as_micros() as i64)
702}
703
704#[cfg(test)]
705mod tests {
706 use std::sync::Arc;
707
708 use arrow_schema::{DataType, Field, Schema};
709
710 use super::*;
711
712 #[test]
713 fn load_commits_from_batches_returns_error_for_bad_schema() {
714 let batch = RecordBatch::try_new(
715 Arc::new(Schema::new(vec![
716 Field::new("graph_commit_id", DataType::UInt64, false),
717 Field::new("manifest_branch", DataType::Utf8, true),
718 Field::new("manifest_version", DataType::UInt64, false),
719 Field::new("parent_commit_id", DataType::Utf8, true),
720 Field::new("merged_parent_commit_id", DataType::Utf8, true),
721 Field::new(
722 "created_at",
723 DataType::Timestamp(TimeUnit::Microsecond, None),
724 false,
725 ),
726 ])),
727 vec![
728 Arc::new(UInt64Array::from(vec![1_u64])),
729 Arc::new(StringArray::from(vec![None::<&str>])),
730 Arc::new(UInt64Array::from(vec![1_u64])),
731 Arc::new(StringArray::from(vec![None::<&str>])),
732 Arc::new(StringArray::from(vec![None::<&str>])),
733 Arc::new(TimestampMicrosecondArray::from(vec![1_i64])),
734 ],
735 )
736 .unwrap();
737
738 let err = load_commits_from_batches(&[batch]).unwrap_err();
739 assert!(err.to_string().contains("graph_commit_id"));
740 }
741}