Skip to main content

arrs/
dataset.rs

1use std::fmt::Debug;
2use std::path::{Path, PathBuf};
3use std::pin::Pin;
4use std::sync::Arc;
5
6use arrow_array::RecordBatch;
7use arrow_schema::SchemaRef;
8use async_trait::async_trait;
9use chrono::{DateTime, Utc};
10use futures::Stream;
11
12use crate::Result;
13use crate::cli::LanceArgs;
14use crate::error::Error;
15
16/// Stream of `RecordBatch` results produced by a scan.
17pub type BatchStream = Pin<Box<dyn Stream<Item = Result<RecordBatch>> + Send>>;
18
19/// Format-agnostic dataset view used by every command.
20///
21/// Input-format adapters (Lance today, potentially others in the future) implement this trait.
22/// Commands are written against the trait only — they never see format-specific types.
23#[async_trait]
24pub trait Dataset: Send + Sync + Debug {
25    /// Path or URI the dataset was opened from.
26    fn origin(&self) -> &Path;
27
28    /// Logical arrow schema of the dataset.
29    fn arrow_schema(&self) -> SchemaRef;
30
31    /// Pretty-printed format-native schema (for `schema --type physical`), optionally
32    /// projected to a subset of columns.
33    fn physical_schema_debug(&self, projection: Option<&[String]>) -> Result<String>;
34
35    /// Total row count.
36    async fn count_rows(&self) -> Result<u64>;
37
38    /// Stream all rows, optionally projected to the given columns.
39    async fn scan(&self, projection: Option<&[String]>) -> Result<BatchStream>;
40
41    /// Materialise a `RecordBatch` containing only the rows at the given indices,
42    /// in the order given. `indices` must all be < `count_rows()`.
43    async fn take(&self, indices: &[u64], projection: Option<&[String]>) -> Result<RecordBatch>;
44
45    /// Returns `Some(...)` when this dataset is backed by a format that supports
46    /// Lance-specific operations (versions, branches, indices). The default
47    /// `None` covers any future format that doesn't.
48    fn lance(&self) -> Option<&dyn LanceCapabilities> {
49        None
50    }
51}
52
53/// Lance-specific operations exposed beyond the format-agnostic `Dataset` trait.
54#[async_trait]
55pub trait LanceCapabilities: Send + Sync {
56    /// List versions on `branch` (defaults to `main` when `None`). When
57    /// `tagged_only` is true, drops untagged versions from the result.
58    async fn list_versions(
59        &self,
60        branch: Option<&str>,
61        tagged_only: bool,
62    ) -> Result<Vec<VersionInfo>>;
63
64    /// List every branch the dataset has, including the default `main`.
65    async fn list_branches(&self) -> Result<Vec<BranchInfo>>;
66
67    /// List indices defined on the active version of the dataset.
68    async fn list_indices(&self) -> Result<Vec<IndexInfo>>;
69
70    /// List every tag in the dataset, regardless of branch.
71    async fn list_tags(&self) -> Result<Vec<TagInfo>>;
72}
73
74/// One row in `arrs versions` output.
75#[derive(Debug, Clone)]
76pub struct VersionInfo {
77    pub version: u64,
78    pub timestamp: DateTime<Utc>,
79    pub tag: Option<String>,
80    pub message: Option<String>,
81}
82
83/// One row in `arrs branches` output.
84#[derive(Debug, Clone)]
85pub struct BranchInfo {
86    pub name: String,
87    pub parent_branch: Option<String>,
88    pub parent_version: Option<u64>,
89    pub created_at: Option<DateTime<Utc>>,
90}
91
92/// One row in `arrs indices` output.
93#[derive(Debug, Clone)]
94pub struct IndexInfo {
95    pub name: String,
96    pub uuid: String,
97    pub columns: Vec<String>,
98    pub dataset_version: u64,
99    pub created_at: Option<DateTime<Utc>>,
100}
101
102/// One row in `arrs tags` output.
103#[derive(Debug, Clone)]
104pub struct TagInfo {
105    pub name: String,
106    pub branch: String,
107    pub version: u64,
108}
109
110/// Open a dataset at `path`, optionally checking out a specific Lance
111/// branch/version/tag. Returns an error if the dataset is not Lance and any
112/// `LanceArgs` field is set.
113pub async fn open(path: &Path, lance: Option<&LanceArgs>) -> Result<Arc<dyn Dataset>> {
114    match detect_format(path)? {
115        Format::Lance => {
116            let ds = crate::lance::LanceDataset::open(path, lance).await?;
117            Ok(Arc::new(ds))
118        }
119    }
120}
121
122#[derive(Debug, Clone, Copy, PartialEq, Eq)]
123pub enum Format {
124    Lance,
125}
126
127/// Determine which adapter should be used for `path`. Today we only support
128/// Lance; the function exists in match shape so a Parquet (or other) arm can
129/// be added without touching call sites.
130pub fn detect_format(path: &Path) -> Result<Format> {
131    if is_lance_dataset(path) {
132        Ok(Format::Lance)
133    } else {
134        Err(Error::UnknownFormat {
135            path: path.to_path_buf(),
136        })
137    }
138}
139
140fn is_lance_dataset(path: &Path) -> bool {
141    // A Lance dataset is a directory that contains a `_versions/` subfolder.
142    // (`_transactions/` is the other typical marker but not always present in
143    // freshly written datasets.)
144    let p: PathBuf = path.into();
145    p.is_dir() && p.join("_versions").is_dir()
146}