use std::fmt::Debug;
use std::path::{Path, PathBuf};
use std::pin::Pin;
use std::sync::Arc;
use arrow_array::RecordBatch;
use arrow_schema::SchemaRef;
use async_trait::async_trait;
use chrono::{DateTime, Utc};
use futures::Stream;
use crate::Result;
use crate::cli::LanceArgs;
use crate::error::Error;
pub type BatchStream = Pin<Box<dyn Stream<Item = Result<RecordBatch>> + Send>>;
#[async_trait]
pub trait Dataset: Send + Sync + Debug {
fn origin(&self) -> &Path;
fn arrow_schema(&self) -> SchemaRef;
fn physical_schema_debug(&self, projection: Option<&[String]>) -> Result<String>;
async fn count_rows(&self) -> Result<u64>;
async fn scan(&self, projection: Option<&[String]>) -> Result<BatchStream>;
async fn take(&self, indices: &[u64], projection: Option<&[String]>) -> Result<RecordBatch>;
fn lance(&self) -> Option<&dyn LanceCapabilities> {
None
}
}
#[async_trait]
pub trait LanceCapabilities: Send + Sync {
async fn list_versions(
&self,
branch: Option<&str>,
tagged_only: bool,
) -> Result<Vec<VersionInfo>>;
async fn list_branches(&self) -> Result<Vec<BranchInfo>>;
async fn list_indices(&self) -> Result<Vec<IndexInfo>>;
async fn list_tags(&self) -> Result<Vec<TagInfo>>;
}
#[derive(Debug, Clone)]
pub struct VersionInfo {
pub version: u64,
pub timestamp: DateTime<Utc>,
pub tag: Option<String>,
pub message: Option<String>,
}
#[derive(Debug, Clone)]
pub struct BranchInfo {
pub name: String,
pub parent_branch: Option<String>,
pub parent_version: Option<u64>,
pub created_at: Option<DateTime<Utc>>,
}
#[derive(Debug, Clone)]
pub struct IndexInfo {
pub name: String,
pub uuid: String,
pub columns: Vec<String>,
pub dataset_version: u64,
pub created_at: Option<DateTime<Utc>>,
}
#[derive(Debug, Clone)]
pub struct TagInfo {
pub name: String,
pub branch: String,
pub version: u64,
}
pub async fn open(path: &Path, lance: Option<&LanceArgs>) -> Result<Arc<dyn Dataset>> {
match detect_format(path)? {
Format::Lance => {
let ds = crate::lance::LanceDataset::open(path, lance).await?;
Ok(Arc::new(ds))
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum Format {
Lance,
}
pub fn detect_format(path: &Path) -> Result<Format> {
if is_lance_dataset(path) {
Ok(Format::Lance)
} else {
Err(Error::UnknownFormat {
path: path.to_path_buf(),
})
}
}
fn is_lance_dataset(path: &Path) -> bool {
let p: PathBuf = path.into();
p.is_dir() && p.join("_versions").is_dir()
}