1use std::fmt::Debug;
2use std::path::{Path, PathBuf};
3use std::pin::Pin;
4use std::sync::Arc;
5
6use arrow_array::RecordBatch;
7use arrow_schema::SchemaRef;
8use async_trait::async_trait;
9use chrono::{DateTime, Utc};
10use futures::Stream;
11
12use crate::Result;
13use crate::cli::LanceArgs;
14use crate::error::Error;
15
16pub type BatchStream = Pin<Box<dyn Stream<Item = Result<RecordBatch>> + Send>>;
18
19#[async_trait]
24pub trait Dataset: Send + Sync + Debug {
25 fn origin(&self) -> &Path;
27
28 fn arrow_schema(&self) -> SchemaRef;
30
31 fn physical_schema_debug(&self, projection: Option<&[String]>) -> Result<String>;
34
35 async fn count_rows(&self) -> Result<u64>;
37
38 async fn scan(&self, projection: Option<&[String]>) -> Result<BatchStream>;
40
41 async fn take(&self, indices: &[u64], projection: Option<&[String]>) -> Result<RecordBatch>;
44
45 fn lance(&self) -> Option<&dyn LanceCapabilities> {
49 None
50 }
51}
52
53#[async_trait]
55pub trait LanceCapabilities: Send + Sync {
56 async fn list_versions(
59 &self,
60 branch: Option<&str>,
61 tagged_only: bool,
62 ) -> Result<Vec<VersionInfo>>;
63
64 async fn list_branches(&self) -> Result<Vec<BranchInfo>>;
66
67 async fn list_indices(&self) -> Result<Vec<IndexInfo>>;
69
70 async fn list_tags(&self) -> Result<Vec<TagInfo>>;
72}
73
74#[derive(Debug, Clone)]
76pub struct VersionInfo {
77 pub version: u64,
78 pub timestamp: DateTime<Utc>,
79 pub tag: Option<String>,
80 pub message: Option<String>,
81}
82
83#[derive(Debug, Clone)]
85pub struct BranchInfo {
86 pub name: String,
87 pub parent_branch: Option<String>,
88 pub parent_version: Option<u64>,
89 pub created_at: Option<DateTime<Utc>>,
90}
91
92#[derive(Debug, Clone)]
94pub struct IndexInfo {
95 pub name: String,
96 pub uuid: String,
97 pub columns: Vec<String>,
98 pub dataset_version: u64,
99 pub created_at: Option<DateTime<Utc>>,
100}
101
102#[derive(Debug, Clone)]
104pub struct TagInfo {
105 pub name: String,
106 pub branch: String,
107 pub version: u64,
108}
109
110pub async fn open(path: &Path, lance: Option<&LanceArgs>) -> Result<Arc<dyn Dataset>> {
114 match detect_format(path)? {
115 Format::Lance => {
116 let ds = crate::lance::LanceDataset::open(path, lance).await?;
117 Ok(Arc::new(ds))
118 }
119 }
120}
121
122#[derive(Debug, Clone, Copy, PartialEq, Eq)]
123pub enum Format {
124 Lance,
125}
126
127pub fn detect_format(path: &Path) -> Result<Format> {
131 if is_lance_dataset(path) {
132 Ok(Format::Lance)
133 } else {
134 Err(Error::UnknownFormat {
135 path: path.to_path_buf(),
136 })
137 }
138}
139
140fn is_lance_dataset(path: &Path) -> bool {
141 let p: PathBuf = path.into();
145 p.is_dir() && p.join("_versions").is_dir()
146}