Skip to main content

lance/
dataset.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4//! Lance Dataset
5//!
6
7use arrow_array::{RecordBatch, RecordBatchReader};
8use arrow_schema::DataType;
9use byteorder::{ByteOrder, LittleEndian};
10use chrono::{prelude::*, Duration};
11use deepsize::DeepSizeOf;
12use futures::future::BoxFuture;
13use futures::stream::{self, BoxStream, StreamExt, TryStreamExt};
14use futures::{FutureExt, Stream};
15
16use crate::dataset::blob::blob_version_from_config;
17use crate::dataset::metadata::UpdateFieldMetadataBuilder;
18use crate::dataset::transaction::translate_schema_metadata_updates;
19use crate::session::caches::{DSMetadataCache, ManifestKey, TransactionKey};
20use crate::session::index_caches::DSIndexCache;
21use itertools::Itertools;
22use lance_core::datatypes::{
23    BlobVersion, Field, OnMissing, OnTypeMismatch, Projectable, Projection,
24};
25use lance_core::traits::DatasetTakeRows;
26use lance_core::utils::address::RowAddress;
27use lance_core::utils::tracing::{
28    DATASET_CLEANING_EVENT, DATASET_DELETING_EVENT, DATASET_DROPPING_COLUMN_EVENT,
29    TRACE_DATASET_EVENTS,
30};
31use lance_core::{ROW_ADDR, ROW_ADDR_FIELD, ROW_ID_FIELD};
32use lance_datafusion::projection::ProjectionPlan;
33use lance_file::datatypes::populate_schema_dictionary;
34use lance_file::reader::FileReaderOptions;
35use lance_file::version::LanceFileVersion;
36use lance_index::DatasetIndexExt;
37use lance_io::object_store::{
38    LanceNamespaceStorageOptionsProvider, ObjectStore, ObjectStoreParams, StorageOptions,
39    StorageOptionsAccessor, StorageOptionsProvider,
40};
41use lance_io::utils::{read_last_block, read_message, read_metadata_offset, read_struct};
42use lance_namespace::LanceNamespace;
43use lance_table::format::{
44    pb, DataFile, DataStorageFormat, DeletionFile, Fragment, IndexMetadata, Manifest, RowIdMeta,
45};
46use lance_table::io::commit::{
47    migrate_scheme_to_v2, write_manifest_file_to_path, CommitConfig, CommitError, CommitHandler,
48    CommitLock, ManifestLocation, ManifestNamingScheme, VERSIONS_DIR,
49};
50use lance_table::io::manifest::{read_manifest, read_manifest_indexes};
51use object_store::path::Path;
52use prost::Message;
53use roaring::RoaringBitmap;
54use rowids::get_row_id_index;
55use serde::{Deserialize, Serialize};
56use snafu::location;
57use std::borrow::Cow;
58use std::collections::{BTreeMap, HashMap, HashSet};
59use std::fmt::Debug;
60use std::ops::Range;
61use std::pin::Pin;
62use std::sync::Arc;
63use take::row_offsets_to_row_addresses;
64use tracing::{info, instrument};
65
66pub(crate) mod blob;
67mod branch_location;
68pub mod builder;
69pub mod cleanup;
70pub mod delta;
71pub mod fragment;
72mod hash_joiner;
73pub mod index;
74mod metadata;
75pub mod optimize;
76pub mod progress;
77pub mod refs;
78pub(crate) mod rowids;
79pub mod scanner;
80mod schema_evolution;
81pub mod sql;
82pub mod statistics;
83mod take;
84pub mod transaction;
85pub mod udtf;
86pub mod updater;
87mod utils;
88pub mod write;
89
90use self::builder::DatasetBuilder;
91use self::cleanup::RemovalStats;
92use self::fragment::FileFragment;
93use self::refs::Refs;
94use self::scanner::{DatasetRecordBatchStream, Scanner};
95use self::transaction::{Operation, Transaction, TransactionBuilder, UpdateMapEntry};
96use self::write::write_fragments_internal;
97use crate::dataset::branch_location::BranchLocation;
98use crate::dataset::cleanup::{CleanupPolicy, CleanupPolicyBuilder};
99use crate::dataset::refs::{BranchContents, Branches, Tags};
100use crate::dataset::sql::SqlQueryBuilder;
101use crate::datatypes::Schema;
102use crate::index::retain_supported_indices;
103use crate::io::commit::{
104    commit_detached_transaction, commit_new_dataset, commit_transaction,
105    detect_overlapping_fragments,
106};
107use crate::session::Session;
108use crate::utils::temporal::{timestamp_to_nanos, utc_now, SystemTime};
109use crate::{Error, Result};
110pub use blob::BlobFile;
111use hash_joiner::HashJoiner;
112use lance_core::box_error;
113pub use lance_core::ROW_ID;
114use lance_namespace::models::{
115    CreateEmptyTableRequest, DeclareTableRequest, DeclareTableResponse, DescribeTableRequest,
116};
117use lance_table::feature_flags::{apply_feature_flags, can_read_dataset};
118use lance_table::io::deletion::{relative_deletion_file_path, DELETIONS_DIR};
119pub use schema_evolution::{
120    BatchInfo, BatchUDF, ColumnAlteration, NewColumnTransform, UDFCheckpointStore,
121};
122pub use take::TakeBuilder;
123pub use write::merge_insert::{
124    MergeInsertBuilder, MergeInsertJob, MergeStats, UncommittedMergeInsert, WhenMatched,
125    WhenNotMatched, WhenNotMatchedBySource,
126};
127
128pub use write::update::{UpdateBuilder, UpdateJob};
129#[allow(deprecated)]
130pub use write::{
131    write_fragments, AutoCleanupParams, CommitBuilder, DeleteBuilder, InsertBuilder,
132    WriteDestination, WriteMode, WriteParams,
133};
134
135pub(crate) const INDICES_DIR: &str = "_indices";
136pub(crate) const DATA_DIR: &str = "data";
137pub(crate) const TRANSACTIONS_DIR: &str = "_transactions";
138
139// We default to 6GB for the index cache, since indices are often large but
140// worth caching.
141pub const DEFAULT_INDEX_CACHE_SIZE: usize = 6 * 1024 * 1024 * 1024;
142// Default to 1 GiB for the metadata cache. Column metadata can be like 40MB,
143// so this should be enough for a few hundred columns. Other metadata is much
144// smaller.
145pub const DEFAULT_METADATA_CACHE_SIZE: usize = 1024 * 1024 * 1024;
146
147/// Lance Dataset
148#[derive(Clone)]
149pub struct Dataset {
150    pub object_store: Arc<ObjectStore>,
151    pub(crate) commit_handler: Arc<dyn CommitHandler>,
152    /// Uri of the dataset.
153    ///
154    /// On cloud storage, we can not use [Dataset::base] to build the full uri because the
155    /// `bucket` is swallowed in the inner [ObjectStore].
156    uri: String,
157    pub(crate) base: Path,
158    pub manifest: Arc<Manifest>,
159    // Path for the manifest that is loaded. Used to get additional information,
160    // such as the index metadata.
161    pub(crate) manifest_location: ManifestLocation,
162    pub(crate) session: Arc<Session>,
163    pub refs: Refs,
164
165    // Bitmap of fragment ids in this dataset.
166    pub(crate) fragment_bitmap: Arc<RoaringBitmap>,
167
168    // These are references to session caches, but with the dataset URI as a prefix.
169    pub(crate) index_cache: Arc<DSIndexCache>,
170    pub(crate) metadata_cache: Arc<DSMetadataCache>,
171
172    /// File reader options to use when reading data files.
173    pub(crate) file_reader_options: Option<FileReaderOptions>,
174
175    /// Object store parameters used when opening this dataset.
176    /// These are used when creating object stores for additional base paths.
177    pub(crate) store_params: Option<Box<ObjectStoreParams>>,
178}
179
180impl std::fmt::Debug for Dataset {
181    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
182        f.debug_struct("Dataset")
183            .field("uri", &self.uri)
184            .field("base", &self.base)
185            .field("version", &self.manifest.version)
186            .field("cache_num_items", &self.session.approx_num_items())
187            .finish()
188    }
189}
190
191/// Dataset Version
192#[derive(Deserialize, Serialize, Debug)]
193pub struct Version {
194    /// version number
195    pub version: u64,
196
197    /// Timestamp of dataset creation in UTC.
198    pub timestamp: DateTime<Utc>,
199
200    /// Key-value pairs of metadata.
201    pub metadata: BTreeMap<String, String>,
202}
203
204/// Convert Manifest to Data Version.
205impl From<&Manifest> for Version {
206    fn from(m: &Manifest) -> Self {
207        Self {
208            version: m.version,
209            timestamp: m.timestamp(),
210            metadata: m.summary().into(),
211        }
212    }
213}
214
215/// Customize read behavior of a dataset.
216#[derive(Clone, Debug)]
217pub struct ReadParams {
218    /// Size of the index cache in bytes. This cache stores index data in memory
219    /// for faster lookups. The default is 6 GiB.
220    pub index_cache_size_bytes: usize,
221
222    /// Size of the metadata cache in bytes. This cache stores metadata in memory
223    /// for faster open table and scans. The default is 1 GiB.
224    pub metadata_cache_size_bytes: usize,
225
226    /// If present, dataset will use this shared [`Session`] instead creating a new one.
227    ///
228    /// This is useful for sharing the same session across multiple datasets.
229    pub session: Option<Arc<Session>>,
230
231    pub store_options: Option<ObjectStoreParams>,
232
233    /// If present, dataset will use this to resolve the latest version
234    ///
235    /// Lance needs to be able to make atomic updates to the manifest.  This involves
236    /// coordination between readers and writers and we can usually rely on the filesystem
237    /// to do this coordination for us.
238    ///
239    /// Some file systems (e.g. S3) do not support atomic operations.  In this case, for
240    /// safety, we recommend an external commit mechanism (such as dynamodb) and, on the
241    /// read path, we need to reach out to that external mechanism to figure out the latest
242    /// version of the dataset.
243    ///
244    /// If this is not set then a default behavior is chosen that is appropriate for the
245    /// filesystem.
246    ///
247    /// If a custom object store is provided (via store_params.object_store) then this
248    /// must also be provided.
249    pub commit_handler: Option<Arc<dyn CommitHandler>>,
250
251    /// File reader options to use when reading data files.
252    ///
253    /// This allows control over features like caching repetition indices and validation.
254    pub file_reader_options: Option<FileReaderOptions>,
255}
256
257impl ReadParams {
258    /// Set the cache size for indices. Set to zero, to disable the cache.
259    #[deprecated(
260        since = "0.30.0",
261        note = "Use `index_cache_size_bytes` instead, which accepts a size in bytes."
262    )]
263    pub fn index_cache_size(&mut self, cache_size: usize) -> &mut Self {
264        let assumed_entry_size = 20 * 1024 * 1024; // 20 MiB per entry
265        self.index_cache_size_bytes = cache_size * assumed_entry_size;
266        self
267    }
268
269    pub fn index_cache_size_bytes(&mut self, cache_size: usize) -> &mut Self {
270        self.index_cache_size_bytes = cache_size;
271        self
272    }
273
274    /// Set the cache size for the file metadata. Set to zero to disable this cache.
275    #[deprecated(
276        since = "0.30.0",
277        note = "Use `metadata_cache_size_bytes` instead, which accepts a size in bytes."
278    )]
279    pub fn metadata_cache_size(&mut self, cache_size: usize) -> &mut Self {
280        let assumed_entry_size = 10 * 1024 * 1024; // 10 MiB per entry
281        self.metadata_cache_size_bytes = cache_size * assumed_entry_size;
282        self
283    }
284
285    /// Set the cache size for the file metadata in bytes.
286    pub fn metadata_cache_size_bytes(&mut self, cache_size: usize) -> &mut Self {
287        self.metadata_cache_size_bytes = cache_size;
288        self
289    }
290
291    /// Set a shared session for the datasets.
292    pub fn session(&mut self, session: Arc<Session>) -> &mut Self {
293        self.session = Some(session);
294        self
295    }
296
297    /// Use the explicit locking to resolve the latest version
298    pub fn set_commit_lock<T: CommitLock + Send + Sync + 'static>(&mut self, lock: Arc<T>) {
299        self.commit_handler = Some(Arc::new(lock));
300    }
301
302    /// Set the file reader options.
303    pub fn file_reader_options(&mut self, options: FileReaderOptions) -> &mut Self {
304        self.file_reader_options = Some(options);
305        self
306    }
307}
308
309impl Default for ReadParams {
310    fn default() -> Self {
311        Self {
312            index_cache_size_bytes: DEFAULT_INDEX_CACHE_SIZE,
313            metadata_cache_size_bytes: DEFAULT_METADATA_CACHE_SIZE,
314            session: None,
315            store_options: None,
316            commit_handler: None,
317            file_reader_options: None,
318        }
319    }
320}
321
322#[derive(Debug, Clone)]
323pub enum ProjectionRequest {
324    Schema(Arc<Schema>),
325    Sql(Vec<(String, String)>),
326}
327
328impl ProjectionRequest {
329    pub fn from_columns(
330        columns: impl IntoIterator<Item = impl AsRef<str>>,
331        dataset_schema: &Schema,
332    ) -> Self {
333        let columns = columns
334            .into_iter()
335            .map(|s| s.as_ref().to_string())
336            .collect::<Vec<_>>();
337
338        // Separate data columns from system columns
339        // System columns need to be added to the schema manually since Schema::project
340        // doesn't include them (they're virtual columns)
341        let mut data_columns = Vec::new();
342        let mut system_fields = Vec::new();
343
344        for col in &columns {
345            if lance_core::is_system_column(col) {
346                // For now we only support _rowid and _rowaddr in projections
347                if col == ROW_ID {
348                    system_fields.push(Field::try_from(ROW_ID_FIELD.clone()).unwrap());
349                } else if col == ROW_ADDR {
350                    system_fields.push(Field::try_from(ROW_ADDR_FIELD.clone()).unwrap());
351                }
352                // Note: Other system columns like _rowoffset are handled differently
353            } else {
354                data_columns.push(col.as_str());
355            }
356        }
357
358        // Project only the data columns
359        let mut schema = dataset_schema.project(&data_columns).unwrap();
360
361        // Add system fields in the order they appeared in the original columns list
362        // We need to reconstruct the proper order
363        let mut final_fields = Vec::new();
364        for col in &columns {
365            if lance_core::is_system_column(col) {
366                // Find and add the system field
367                if let Some(field) = system_fields.iter().find(|f| &f.name == col) {
368                    final_fields.push(field.clone());
369                }
370            } else {
371                // Find and add the data field
372                if let Some(field) = schema.fields.iter().find(|f| &f.name == col) {
373                    final_fields.push(field.clone());
374                }
375            }
376        }
377
378        schema.fields = final_fields;
379        Self::Schema(Arc::new(schema))
380    }
381
382    pub fn from_schema(schema: Schema) -> Self {
383        Self::Schema(Arc::new(schema))
384    }
385
386    /// Provide a list of projection with SQL transform.
387    ///
388    /// # Parameters
389    /// - `columns`: A list of tuples where the first element is resulted column name and the second
390    ///   element is the SQL expression.
391    pub fn from_sql(
392        columns: impl IntoIterator<Item = (impl Into<String>, impl Into<String>)>,
393    ) -> Self {
394        Self::Sql(
395            columns
396                .into_iter()
397                .map(|(a, b)| (a.into(), b.into()))
398                .collect(),
399        )
400    }
401
402    pub fn into_projection_plan(self, dataset: Arc<Dataset>) -> Result<ProjectionPlan> {
403        let blob_version = dataset.blob_version();
404        match self {
405            Self::Schema(schema) => {
406                // The schema might contain system columns (_rowid, _rowaddr) which are not
407                // in the dataset schema. We handle these specially in ProjectionPlan::from_schema.
408                let system_columns_present = schema
409                    .fields
410                    .iter()
411                    .any(|f| lance_core::is_system_column(&f.name));
412
413                if system_columns_present {
414                    // If system columns are present, we can't use project_by_schema directly
415                    // Just pass the schema to ProjectionPlan::from_schema which handles it
416                    ProjectionPlan::from_schema(dataset, schema.as_ref(), blob_version)
417                } else {
418                    // No system columns, use normal path with validation
419                    let projection = dataset.schema().project_by_schema(
420                        schema.as_ref(),
421                        OnMissing::Error,
422                        OnTypeMismatch::Error,
423                    )?;
424                    ProjectionPlan::from_schema(dataset, &projection, blob_version)
425                }
426            }
427            Self::Sql(columns) => ProjectionPlan::from_expressions(dataset, &columns, blob_version),
428        }
429    }
430}
431
432impl From<Arc<Schema>> for ProjectionRequest {
433    fn from(schema: Arc<Schema>) -> Self {
434        Self::Schema(schema)
435    }
436}
437
438impl From<Schema> for ProjectionRequest {
439    fn from(schema: Schema) -> Self {
440        Self::from(Arc::new(schema))
441    }
442}
443
444impl Dataset {
445    /// Open an existing dataset.
446    ///
447    /// See also [DatasetBuilder].
448    #[instrument]
449    pub async fn open(uri: &str) -> Result<Self> {
450        DatasetBuilder::from_uri(uri).load().await
451    }
452
453    /// Check out a dataset version with a ref
454    pub async fn checkout_version(&self, version: impl Into<refs::Ref>) -> Result<Self> {
455        let reference: refs::Ref = version.into();
456        match reference {
457            refs::Ref::Version(branch, version_number) => {
458                self.checkout_by_ref(version_number, branch.as_deref())
459                    .await
460            }
461            refs::Ref::VersionNumber(version_number) => {
462                self.checkout_by_ref(Some(version_number), self.manifest.branch.as_deref())
463                    .await
464            }
465            refs::Ref::Tag(tag_name) => {
466                let tag_contents = self.tags().get(tag_name.as_str()).await?;
467                self.checkout_by_ref(Some(tag_contents.version), tag_contents.branch.as_deref())
468                    .await
469            }
470        }
471    }
472
473    pub fn tags(&self) -> Tags<'_> {
474        self.refs.tags()
475    }
476
477    pub fn branches(&self) -> Branches<'_> {
478        self.refs.branches()
479    }
480
481    /// Check out the latest version of the dataset
482    pub async fn checkout_latest(&mut self) -> Result<()> {
483        let (manifest, manifest_location) = self.latest_manifest().await?;
484        self.manifest = manifest;
485        self.manifest_location = manifest_location;
486        self.fragment_bitmap = Arc::new(
487            self.manifest
488                .fragments
489                .iter()
490                .map(|f| f.id as u32)
491                .collect(),
492        );
493        Ok(())
494    }
495
496    /// Check out the latest version of the branch
497    pub async fn checkout_branch(&self, branch: &str) -> Result<Self> {
498        self.checkout_by_ref(None, Some(branch)).await
499    }
500
501    /// This is a two-phase operation:
502    /// - Create the branch dataset by shallow cloning.
503    /// - Create the branch metadata (a.k.a. `BranchContents`).
504    ///
505    /// These two phases are not atomic. We consider `BranchContents` as the source of truth
506    /// for the branch.
507    ///
508    /// The cleanup procedure should:
509    /// - Clean up zombie branch datasets that have no related `BranchContents`.
510    /// - Delete broken `BranchContents` entries that have no related branch dataset.
511    ///
512    /// If `create_branch` stops at phase 1, it may leave a zombie branch dataset,
513    /// which can be cleaned up later. Such a zombie dataset may cause a branch creation
514    /// failure if we use the same name to `create_branch`. In that case, you need to call
515    /// `force_delete_branch` to interactively clean up the zombie dataset.
516    pub async fn create_branch(
517        &mut self,
518        branch: &str,
519        version: impl Into<refs::Ref>,
520        store_params: Option<ObjectStoreParams>,
521    ) -> Result<Self> {
522        let (source_branch, version_number) = self.resolve_reference(version.into()).await?;
523        let branch_location = self.find_branch_location(branch)?;
524        let clone_op = Operation::Clone {
525            is_shallow: true,
526            ref_name: source_branch.clone(),
527            ref_version: version_number,
528            ref_path: String::from(self.uri()),
529            branch_name: Some(branch.to_string()),
530        };
531        let transaction = Transaction::new(version_number, clone_op, None);
532
533        let builder = CommitBuilder::new(WriteDestination::Uri(branch_location.uri.as_str()))
534            .with_store_params(store_params.unwrap_or_default())
535            .with_object_store(Arc::new(self.object_store().clone()))
536            .with_commit_handler(self.commit_handler.clone())
537            .with_storage_format(self.manifest.data_storage_format.lance_file_version()?);
538        let dataset = builder.execute(transaction).await?;
539
540        // Create BranchContents after shallow_clone
541        self.branches()
542            .create(branch, version_number, source_branch.as_deref())
543            .await?;
544        Ok(dataset)
545    }
546
547    pub async fn delete_branch(&mut self, branch: &str) -> Result<()> {
548        self.branches().delete(branch, false).await
549    }
550
551    /// Delete the branch even if the BranchContents is not found.
552    /// This could be useful when we have zombie branches and want to clean them up immediately.
553    pub async fn force_delete_branch(&mut self, branch: &str) -> Result<()> {
554        self.branches().delete(branch, true).await
555    }
556
557    pub async fn list_branches(&self) -> Result<HashMap<String, BranchContents>> {
558        self.branches().list().await
559    }
560
561    fn already_checked_out(&self, location: &ManifestLocation, branch_name: Option<&str>) -> bool {
562        // We check the e_tag here just in case it has been overwritten. This can
563        // happen if the table has been dropped then re-created recently.
564        self.manifest.branch.as_deref() == branch_name
565            && self.manifest.version == location.version
566            && self.manifest_location.naming_scheme == location.naming_scheme
567            && location.e_tag.as_ref().is_some_and(|e_tag| {
568                self.manifest_location
569                    .e_tag
570                    .as_ref()
571                    .is_some_and(|current_e_tag| e_tag == current_e_tag)
572            })
573    }
574
575    async fn checkout_by_ref(
576        &self,
577        version_number: Option<u64>,
578        branch: Option<&str>,
579    ) -> Result<Self> {
580        let new_location = self.branch_location().find_branch(branch)?;
581
582        let manifest_location = if let Some(version_number) = version_number {
583            self.commit_handler
584                .resolve_version_location(
585                    &new_location.path,
586                    version_number,
587                    &self.object_store.inner,
588                )
589                .await?
590        } else {
591            self.commit_handler
592                .resolve_latest_location(&new_location.path, &self.object_store)
593                .await?
594        };
595
596        if self.already_checked_out(&manifest_location, branch) {
597            return Ok(self.clone());
598        }
599
600        let manifest = Self::load_manifest(
601            self.object_store.as_ref(),
602            &manifest_location,
603            &new_location.uri,
604            self.session.as_ref(),
605        )
606        .await?;
607        Self::checkout_manifest(
608            self.object_store.clone(),
609            new_location.path,
610            new_location.uri,
611            Arc::new(manifest),
612            manifest_location,
613            self.session.clone(),
614            self.commit_handler.clone(),
615            self.file_reader_options.clone(),
616            self.store_params.as_deref().cloned(),
617        )
618    }
619
620    pub(crate) async fn load_manifest(
621        object_store: &ObjectStore,
622        manifest_location: &ManifestLocation,
623        uri: &str,
624        session: &Session,
625    ) -> Result<Manifest> {
626        let object_reader = if let Some(size) = manifest_location.size {
627            object_store
628                .open_with_size(&manifest_location.path, size as usize)
629                .await
630        } else {
631            object_store.open(&manifest_location.path).await
632        };
633        let object_reader = object_reader.map_err(|e| match &e {
634            Error::NotFound { uri, .. } => Error::DatasetNotFound {
635                path: uri.clone(),
636                source: box_error(e),
637                location: location!(),
638            },
639            _ => e,
640        })?;
641
642        let last_block =
643            read_last_block(object_reader.as_ref())
644                .await
645                .map_err(|err| match err {
646                    object_store::Error::NotFound { path, source } => Error::DatasetNotFound {
647                        path,
648                        source,
649                        location: location!(),
650                    },
651                    _ => Error::IO {
652                        source: err.into(),
653                        location: location!(),
654                    },
655                })?;
656        let offset = read_metadata_offset(&last_block)?;
657
658        // If manifest is in the last block, we can decode directly from memory.
659        let manifest_size = object_reader.size().await?;
660        let mut manifest = if manifest_size - offset <= last_block.len() {
661            let manifest_len = manifest_size - offset;
662            let offset_in_block = last_block.len() - manifest_len;
663            let message_len =
664                LittleEndian::read_u32(&last_block[offset_in_block..offset_in_block + 4]) as usize;
665            let message_data = &last_block[offset_in_block + 4..offset_in_block + 4 + message_len];
666            Manifest::try_from(lance_table::format::pb::Manifest::decode(message_data)?)
667        } else {
668            read_struct(object_reader.as_ref(), offset).await
669        }?;
670
671        if !can_read_dataset(manifest.reader_feature_flags) {
672            let message = format!(
673                "This dataset cannot be read by this version of Lance. \
674                 Please upgrade Lance to read this dataset.\n Flags: {}",
675                manifest.reader_feature_flags
676            );
677            return Err(Error::NotSupported {
678                source: message.into(),
679                location: location!(),
680            });
681        }
682
683        // If indices were also in the last block, we can take the opportunity to
684        // decode them now and cache them.
685        if let Some(index_offset) = manifest.index_section {
686            if manifest_size - index_offset <= last_block.len() {
687                let offset_in_block = last_block.len() - (manifest_size - index_offset);
688                let message_len =
689                    LittleEndian::read_u32(&last_block[offset_in_block..offset_in_block + 4])
690                        as usize;
691                let message_data =
692                    &last_block[offset_in_block + 4..offset_in_block + 4 + message_len];
693                let section = lance_table::format::pb::IndexSection::decode(message_data)?;
694                let mut indices: Vec<IndexMetadata> = section
695                    .indices
696                    .into_iter()
697                    .map(IndexMetadata::try_from)
698                    .collect::<Result<Vec<_>>>()?;
699                retain_supported_indices(&mut indices);
700                let ds_index_cache = session.index_cache.for_dataset(uri);
701                let metadata_key = crate::session::index_caches::IndexMetadataKey {
702                    version: manifest_location.version,
703                };
704                ds_index_cache
705                    .insert_with_key(&metadata_key, Arc::new(indices))
706                    .await;
707            }
708        }
709
710        // If transaction is also in the last block, we can take the opportunity to
711        // decode them now and cache them.
712        if let Some(transaction_offset) = manifest.transaction_section {
713            if manifest_size - transaction_offset <= last_block.len() {
714                let offset_in_block = last_block.len() - (manifest_size - transaction_offset);
715                let message_len =
716                    LittleEndian::read_u32(&last_block[offset_in_block..offset_in_block + 4])
717                        as usize;
718                let message_data =
719                    &last_block[offset_in_block + 4..offset_in_block + 4 + message_len];
720                let transaction: Transaction =
721                    lance_table::format::pb::Transaction::decode(message_data)?.try_into()?;
722
723                let metadata_cache = session.metadata_cache.for_dataset(uri);
724                let metadata_key = TransactionKey {
725                    version: manifest_location.version,
726                };
727                metadata_cache
728                    .insert_with_key(&metadata_key, Arc::new(transaction))
729                    .await;
730            }
731        }
732
733        if manifest.should_use_legacy_format() {
734            populate_schema_dictionary(&mut manifest.schema, object_reader.as_ref()).await?;
735        }
736
737        Ok(manifest)
738    }
739
740    #[allow(clippy::too_many_arguments)]
741    fn checkout_manifest(
742        object_store: Arc<ObjectStore>,
743        base_path: Path,
744        uri: String,
745        manifest: Arc<Manifest>,
746        manifest_location: ManifestLocation,
747        session: Arc<Session>,
748        commit_handler: Arc<dyn CommitHandler>,
749        file_reader_options: Option<FileReaderOptions>,
750        store_params: Option<ObjectStoreParams>,
751    ) -> Result<Self> {
752        let refs = Refs::new(
753            object_store.clone(),
754            commit_handler.clone(),
755            BranchLocation {
756                path: base_path.clone(),
757                uri: uri.clone(),
758                branch: manifest.branch.clone(),
759            },
760        );
761        let metadata_cache = Arc::new(session.metadata_cache.for_dataset(&uri));
762        let index_cache = Arc::new(session.index_cache.for_dataset(&uri));
763        let fragment_bitmap = Arc::new(manifest.fragments.iter().map(|f| f.id as u32).collect());
764        Ok(Self {
765            object_store,
766            base: base_path,
767            uri,
768            manifest,
769            manifest_location,
770            commit_handler,
771            session,
772            refs,
773            fragment_bitmap,
774            metadata_cache,
775            index_cache,
776            file_reader_options,
777            store_params: store_params.map(Box::new),
778        })
779    }
780
781    /// Write to or Create a [Dataset] with a stream of [RecordBatch]s.
782    ///
783    /// `dest` can be a `&str`, `object_store::path::Path` or `Arc<Dataset>`.
784    ///
785    /// Returns the newly created [`Dataset`].
786    /// Or Returns [Error] if the dataset already exists.
787    ///
788    pub async fn write(
789        batches: impl RecordBatchReader + Send + 'static,
790        dest: impl Into<WriteDestination<'_>>,
791        params: Option<WriteParams>,
792    ) -> Result<Self> {
793        let mut builder = InsertBuilder::new(dest);
794        if let Some(params) = &params {
795            builder = builder.with_params(params);
796        }
797        Box::pin(builder.execute_stream(Box::new(batches) as Box<dyn RecordBatchReader + Send>))
798            .await
799    }
800
801    /// Write into a namespace-managed table with automatic credential vending.
802    ///
803    /// For CREATE mode, calls create_empty_table() to initialize the table.
804    /// For other modes, calls describe_table() and opens dataset with namespace credentials.
805    ///
806    /// # Arguments
807    ///
808    /// * `batches` - The record batches to write
809    /// * `namespace` - The namespace to use for table management
810    /// * `table_id` - The table identifier
811    /// * `params` - Write parameters
812    pub async fn write_into_namespace(
813        batches: impl RecordBatchReader + Send + 'static,
814        namespace: Arc<dyn LanceNamespace>,
815        table_id: Vec<String>,
816        mut params: Option<WriteParams>,
817    ) -> Result<Self> {
818        let mut write_params = params.take().unwrap_or_default();
819
820        match write_params.mode {
821            WriteMode::Create => {
822                let declare_request = DeclareTableRequest {
823                    id: Some(table_id.clone()),
824                    ..Default::default()
825                };
826                // Try declare_table first, fall back to deprecated create_empty_table
827                // for backward compatibility with older namespace implementations.
828                // create_empty_table support will be removed in 3.0.0.
829                #[allow(deprecated)]
830                let response = match namespace.declare_table(declare_request).await {
831                    Ok(resp) => resp,
832                    Err(Error::NotSupported { .. }) => {
833                        let fallback_request = CreateEmptyTableRequest {
834                            id: Some(table_id.clone()),
835                            ..Default::default()
836                        };
837                        let fallback_resp = namespace
838                            .create_empty_table(fallback_request)
839                            .await
840                            .map_err(|e| Error::Namespace {
841                                source: Box::new(e),
842                                location: location!(),
843                            })?;
844                        DeclareTableResponse {
845                            transaction_id: fallback_resp.transaction_id,
846                            location: fallback_resp.location,
847                            storage_options: fallback_resp.storage_options,
848                        }
849                    }
850                    Err(e) => {
851                        return Err(Error::Namespace {
852                            source: Box::new(e),
853                            location: location!(),
854                        });
855                    }
856                };
857
858                let uri = response.location.ok_or_else(|| Error::Namespace {
859                    source: Box::new(std::io::Error::other(
860                        "Table location not found in declare_table response",
861                    )),
862                    location: location!(),
863                })?;
864
865                // Set initial credentials and provider from namespace
866                if let Some(namespace_storage_options) = response.storage_options {
867                    let provider: Arc<dyn StorageOptionsProvider> = Arc::new(
868                        LanceNamespaceStorageOptionsProvider::new(namespace, table_id),
869                    );
870
871                    // Merge namespace storage options with any existing options
872                    let mut merged_options = write_params
873                        .store_params
874                        .as_ref()
875                        .and_then(|p| p.storage_options().cloned())
876                        .unwrap_or_default();
877                    merged_options.extend(namespace_storage_options);
878
879                    let accessor = Arc::new(StorageOptionsAccessor::with_initial_and_provider(
880                        merged_options,
881                        provider,
882                    ));
883
884                    let existing_params = write_params.store_params.take().unwrap_or_default();
885                    write_params.store_params = Some(ObjectStoreParams {
886                        storage_options_accessor: Some(accessor),
887                        ..existing_params
888                    });
889                }
890
891                Self::write(batches, uri.as_str(), Some(write_params)).await
892            }
893            WriteMode::Append | WriteMode::Overwrite => {
894                let request = DescribeTableRequest {
895                    id: Some(table_id.clone()),
896                    ..Default::default()
897                };
898                let response =
899                    namespace
900                        .describe_table(request)
901                        .await
902                        .map_err(|e| Error::Namespace {
903                            source: Box::new(e),
904                            location: location!(),
905                        })?;
906
907                let uri = response.location.ok_or_else(|| Error::Namespace {
908                    source: Box::new(std::io::Error::other(
909                        "Table location not found in describe_table response",
910                    )),
911                    location: location!(),
912                })?;
913
914                // Set initial credentials and provider from namespace
915                if let Some(namespace_storage_options) = response.storage_options {
916                    let provider: Arc<dyn StorageOptionsProvider> =
917                        Arc::new(LanceNamespaceStorageOptionsProvider::new(
918                            namespace.clone(),
919                            table_id.clone(),
920                        ));
921
922                    // Merge namespace storage options with any existing options
923                    let mut merged_options = write_params
924                        .store_params
925                        .as_ref()
926                        .and_then(|p| p.storage_options().cloned())
927                        .unwrap_or_default();
928                    merged_options.extend(namespace_storage_options);
929
930                    let accessor = Arc::new(StorageOptionsAccessor::with_initial_and_provider(
931                        merged_options,
932                        provider,
933                    ));
934
935                    let existing_params = write_params.store_params.take().unwrap_or_default();
936                    write_params.store_params = Some(ObjectStoreParams {
937                        storage_options_accessor: Some(accessor),
938                        ..existing_params
939                    });
940                }
941
942                // For APPEND/OVERWRITE modes, we must open the existing dataset first
943                // and pass it to InsertBuilder. If we pass just the URI, InsertBuilder
944                // assumes no dataset exists and converts the mode to CREATE.
945                let mut builder = DatasetBuilder::from_uri(uri.as_str());
946                if let Some(ref store_params) = write_params.store_params {
947                    if let Some(accessor) = &store_params.storage_options_accessor {
948                        builder = builder.with_storage_options_accessor(accessor.clone());
949                    }
950                }
951                let dataset = Arc::new(builder.load().await?);
952
953                Self::write(batches, dataset, Some(write_params)).await
954            }
955        }
956    }
957
958    /// Append to existing [Dataset] with a stream of [RecordBatch]s
959    ///
960    /// Returns void result or Returns [Error]
961    pub async fn append(
962        &mut self,
963        batches: impl RecordBatchReader + Send + 'static,
964        params: Option<WriteParams>,
965    ) -> Result<()> {
966        let write_params = WriteParams {
967            mode: WriteMode::Append,
968            ..params.unwrap_or_default()
969        };
970
971        let new_dataset = InsertBuilder::new(WriteDestination::Dataset(Arc::new(self.clone())))
972            .with_params(&write_params)
973            .execute_stream(Box::new(batches) as Box<dyn RecordBatchReader + Send>)
974            .await?;
975
976        *self = new_dataset;
977
978        Ok(())
979    }
980
981    /// Get the fully qualified URI of this dataset.
982    pub fn uri(&self) -> &str {
983        &self.uri
984    }
985
986    pub fn branch_location(&self) -> BranchLocation {
987        BranchLocation {
988            path: self.base.clone(),
989            uri: self.uri.clone(),
990            branch: self.manifest.branch.clone(),
991        }
992    }
993
994    pub fn find_branch_location(&self, branch_name: &str) -> Result<BranchLocation> {
995        let current_location = BranchLocation {
996            path: self.base.clone(),
997            uri: self.uri.clone(),
998            branch: self.manifest.branch.clone(),
999        };
1000        current_location.find_branch(Some(branch_name))
1001    }
1002
1003    /// Get the full manifest of the dataset version.
1004    pub fn manifest(&self) -> &Manifest {
1005        &self.manifest
1006    }
1007
1008    pub fn manifest_location(&self) -> &ManifestLocation {
1009        &self.manifest_location
1010    }
1011
1012    /// Create a [`delta::DatasetDeltaBuilder`] to explore changes between dataset versions.
1013    ///
1014    /// # Example
1015    ///
1016    /// ```
1017    /// # use lance::{Dataset, Result};
1018    /// # async fn example(dataset: &Dataset) -> Result<()> {
1019    /// let delta = dataset.delta()
1020    ///     .compared_against_version(5)
1021    ///     .build()?;
1022    /// let inserted = delta.get_inserted_rows().await?;
1023    /// # Ok(())
1024    /// # }
1025    /// ```
1026    pub fn delta(&self) -> delta::DatasetDeltaBuilder {
1027        delta::DatasetDeltaBuilder::new(self.clone())
1028    }
1029
1030    // TODO: Cache this
1031    pub(crate) fn is_legacy_storage(&self) -> bool {
1032        self.manifest
1033            .data_storage_format
1034            .lance_file_version()
1035            .unwrap()
1036            == LanceFileVersion::Legacy
1037    }
1038
1039    pub async fn latest_manifest(&self) -> Result<(Arc<Manifest>, ManifestLocation)> {
1040        let location = self
1041            .commit_handler
1042            .resolve_latest_location(&self.base, &self.object_store)
1043            .await?;
1044
1045        // Check if manifest is in cache before reading from storage
1046        let manifest_key = ManifestKey {
1047            version: location.version,
1048            e_tag: location.e_tag.as_deref(),
1049        };
1050        let cached_manifest = self.metadata_cache.get_with_key(&manifest_key).await;
1051        if let Some(cached_manifest) = cached_manifest {
1052            return Ok((cached_manifest, location));
1053        }
1054
1055        if self.already_checked_out(&location, self.manifest.branch.as_deref()) {
1056            return Ok((self.manifest.clone(), self.manifest_location.clone()));
1057        }
1058        let mut manifest = read_manifest(&self.object_store, &location.path, location.size).await?;
1059        if manifest.schema.has_dictionary_types() && manifest.should_use_legacy_format() {
1060            let reader = if let Some(size) = location.size {
1061                self.object_store
1062                    .open_with_size(&location.path, size as usize)
1063                    .await?
1064            } else {
1065                self.object_store.open(&location.path).await?
1066            };
1067            populate_schema_dictionary(&mut manifest.schema, reader.as_ref()).await?;
1068        }
1069        let manifest_arc = Arc::new(manifest);
1070        self.metadata_cache
1071            .insert_with_key(&manifest_key, manifest_arc.clone())
1072            .await;
1073        Ok((manifest_arc, location))
1074    }
1075
1076    /// Read the transaction file for this version of the dataset.
1077    ///
1078    /// If there was no transaction file written for this version of the dataset
1079    /// then this will return None.
1080    pub async fn read_transaction(&self) -> Result<Option<Transaction>> {
1081        let transaction_key = TransactionKey {
1082            version: self.manifest.version,
1083        };
1084        if let Some(transaction) = self.metadata_cache.get_with_key(&transaction_key).await {
1085            return Ok(Some((*transaction).clone()));
1086        }
1087
1088        // Prefer inline transaction from manifest when available
1089        let transaction = if let Some(pos) = self.manifest.transaction_section {
1090            let reader = if let Some(size) = self.manifest_location.size {
1091                self.object_store
1092                    .open_with_size(&self.manifest_location.path, size as usize)
1093                    .await?
1094            } else {
1095                self.object_store.open(&self.manifest_location.path).await?
1096            };
1097
1098            let tx: pb::Transaction = read_message(reader.as_ref(), pos).await?;
1099            Transaction::try_from(tx).map(Some)?
1100        } else if let Some(path) = &self.manifest.transaction_file {
1101            // Fallback: read external transaction file if present
1102            let path = self.transactions_dir().child(path.as_str());
1103            let data = self.object_store.inner.get(&path).await?.bytes().await?;
1104            let transaction = lance_table::format::pb::Transaction::decode(data)?;
1105            Transaction::try_from(transaction).map(Some)?
1106        } else {
1107            None
1108        };
1109
1110        if let Some(tx) = transaction.as_ref() {
1111            self.metadata_cache
1112                .insert_with_key(&transaction_key, Arc::new(tx.clone()))
1113                .await;
1114        }
1115        Ok(transaction)
1116    }
1117
1118    /// Read the transaction file for this version of the dataset.
1119    ///
1120    /// If there was no transaction file written for this version of the dataset
1121    /// then this will return None.
1122    pub async fn read_transaction_by_version(&self, version: u64) -> Result<Option<Transaction>> {
1123        let dataset_version = self.checkout_version(version).await?;
1124        dataset_version.read_transaction().await
1125    }
1126
1127    /// List transactions for the dataset, up to a maximum number.
1128    ///
1129    /// This method iterates through dataset versions, starting from the current version,
1130    /// and collects the transaction for each version. It stops when either `recent_transactions`
1131    /// is reached or there are no more versions.
1132    ///
1133    /// # Arguments
1134    ///
1135    /// * `recent_transactions` - Maximum number of transactions to return
1136    ///
1137    /// # Returns
1138    ///
1139    /// A vector of optional transactions. Each element corresponds to a version,
1140    /// and may be None if no transaction file exists for that version.
1141    pub async fn get_transactions(
1142        &self,
1143        recent_transactions: usize,
1144    ) -> Result<Vec<Option<Transaction>>> {
1145        let mut transactions = vec![];
1146        let mut dataset = self.clone();
1147
1148        loop {
1149            let transaction = dataset.read_transaction().await?;
1150            transactions.push(transaction);
1151
1152            if transactions.len() >= recent_transactions {
1153                break;
1154            } else {
1155                match dataset
1156                    .checkout_version(dataset.version().version - 1)
1157                    .await
1158                {
1159                    Ok(ds) => dataset = ds,
1160                    Err(Error::DatasetNotFound { .. }) => break,
1161                    Err(err) => return Err(err),
1162                }
1163            }
1164        }
1165
1166        Ok(transactions)
1167    }
1168
1169    /// Restore the currently checked out version of the dataset as the latest version.
1170    pub async fn restore(&mut self) -> Result<()> {
1171        let (latest_manifest, _) = self.latest_manifest().await?;
1172        let latest_version = latest_manifest.version;
1173
1174        let transaction = Transaction::new(
1175            latest_version,
1176            Operation::Restore {
1177                version: self.manifest.version,
1178            },
1179            None,
1180        );
1181
1182        self.apply_commit(transaction, &Default::default(), &Default::default())
1183            .await?;
1184
1185        Ok(())
1186    }
1187
1188    /// Removes old versions of the dataset from disk
1189    ///
1190    /// This function will remove all versions of the dataset that are older than the provided
1191    /// timestamp.  This function will not remove the current version of the dataset.
1192    ///
1193    /// Once a version is removed it can no longer be checked out or restored.  Any data unique
1194    /// to that version will be lost.
1195    ///
1196    /// # Arguments
1197    ///
1198    /// * `older_than` - Versions older than this will be deleted.
1199    /// * `delete_unverified` - If false (the default) then files will only be deleted if they
1200    ///                        are listed in at least one manifest.  Otherwise these files will
1201    ///                        be kept since they cannot be distinguished from an in-progress
1202    ///                        transaction.  Set to true to delete these files if you are sure
1203    ///                        there are no other in-progress dataset operations.
1204    ///
1205    /// # Returns
1206    ///
1207    /// * `RemovalStats` - Statistics about the removal operation
1208    #[instrument(level = "debug", skip(self))]
1209    pub fn cleanup_old_versions(
1210        &self,
1211        older_than: Duration,
1212        delete_unverified: Option<bool>,
1213        error_if_tagged_old_versions: Option<bool>,
1214    ) -> BoxFuture<'_, Result<RemovalStats>> {
1215        let mut builder = CleanupPolicyBuilder::default();
1216        builder = builder.before_timestamp(utc_now() - older_than);
1217        if let Some(v) = delete_unverified {
1218            builder = builder.delete_unverified(v);
1219        }
1220        if let Some(v) = error_if_tagged_old_versions {
1221            builder = builder.error_if_tagged_old_versions(v);
1222        }
1223
1224        self.cleanup_with_policy(builder.build())
1225    }
1226
1227    /// Removes old versions of the dataset from storage
1228    ///
1229    /// This function will remove all versions of the dataset that satisfies the given policy.
1230    /// This function will not remove the current version of the dataset.
1231    ///
1232    /// Once a version is removed it can no longer be checked out or restored.  Any data unique
1233    /// to that version will be lost.
1234    ///
1235    /// # Arguments
1236    ///
1237    /// * `policy` - `CleanupPolicy` determines the behaviour of cleanup.
1238    ///
1239    /// # Returns
1240    ///
1241    /// * `RemovalStats` - Statistics about the removal operation
1242    #[instrument(level = "debug", skip(self))]
1243    pub fn cleanup_with_policy(
1244        &self,
1245        policy: CleanupPolicy,
1246    ) -> BoxFuture<'_, Result<RemovalStats>> {
1247        info!(target: TRACE_DATASET_EVENTS, event=DATASET_CLEANING_EVENT, uri=&self.uri);
1248        cleanup::cleanup_old_versions(self, policy).boxed()
1249    }
1250
1251    #[allow(clippy::too_many_arguments)]
1252    async fn do_commit(
1253        base_uri: WriteDestination<'_>,
1254        operation: Operation,
1255        read_version: Option<u64>,
1256        store_params: Option<ObjectStoreParams>,
1257        commit_handler: Option<Arc<dyn CommitHandler>>,
1258        session: Arc<Session>,
1259        enable_v2_manifest_paths: bool,
1260        detached: bool,
1261    ) -> Result<Self> {
1262        let read_version = read_version.map_or_else(
1263            || match operation {
1264                Operation::Overwrite { .. } | Operation::Restore { .. } => Ok(0),
1265                _ => Err(Error::invalid_input(
1266                    "read_version must be specified for this operation",
1267                    location!(),
1268                )),
1269            },
1270            Ok,
1271        )?;
1272
1273        let transaction = Transaction::new(read_version, operation, None);
1274
1275        let mut builder = CommitBuilder::new(base_uri)
1276            .enable_v2_manifest_paths(enable_v2_manifest_paths)
1277            .with_session(session)
1278            .with_detached(detached);
1279
1280        if let Some(store_params) = store_params {
1281            builder = builder.with_store_params(store_params);
1282        }
1283
1284        if let Some(commit_handler) = commit_handler {
1285            builder = builder.with_commit_handler(commit_handler);
1286        }
1287
1288        builder.execute(transaction).await
1289    }
1290
1291    /// Commit changes to the dataset
1292    ///
1293    /// This operation is not needed if you are using append/write/delete to manipulate the dataset.
1294    /// It is used to commit changes to the dataset that are made externally.  For example, a bulk
1295    /// import tool may import large amounts of new data and write the appropriate lance files
1296    /// directly instead of using the write function.
1297    ///
1298    /// This method can be used to commit this change to the dataset's manifest.  This method will
1299    /// not verify that the provided fragments exist and correct, that is the caller's responsibility.
1300    /// Some validation can be performed using the function
1301    /// [crate::dataset::transaction::validate_operation].
1302    ///
1303    /// If this commit is a change to an existing dataset then it will often need to be based on an
1304    /// existing version of the dataset.  For example, if this change is a `delete` operation then
1305    /// the caller will have read in the existing data (at some version) to determine which fragments
1306    /// need to be deleted.  The base version that the caller used should be supplied as the `read_version`
1307    /// parameter.  Some operations (e.g. Overwrite) do not depend on a previous version and `read_version`
1308    /// can be None.  An error will be returned if the `read_version` is needed for an operation and
1309    /// it is not specified.
1310    ///
1311    /// All operations except Overwrite will fail if the dataset does not already exist.
1312    ///
1313    /// # Arguments
1314    ///
1315    /// * `base_uri` - The base URI of the dataset
1316    /// * `operation` - A description of the change to commit
1317    /// * `read_version` - The version of the dataset that this change is based on
1318    /// * `store_params` Parameters controlling object store access to the manifest
1319    /// * `enable_v2_manifest_paths`: If set to true, and this is a new dataset, uses the new v2 manifest
1320    ///   paths. These allow constant-time lookups for the latest manifest on object storage.
1321    ///   This parameter has no effect on existing datasets. To migrate an existing
1322    ///   dataset, use the [`Self::migrate_manifest_paths_v2`] method. WARNING: turning
1323    ///   this on will make the dataset unreadable for older versions of Lance
1324    ///   (prior to 0.17.0). Default is False.
1325    pub async fn commit(
1326        dest: impl Into<WriteDestination<'_>>,
1327        operation: Operation,
1328        read_version: Option<u64>,
1329        store_params: Option<ObjectStoreParams>,
1330        commit_handler: Option<Arc<dyn CommitHandler>>,
1331        session: Arc<Session>,
1332        enable_v2_manifest_paths: bool,
1333    ) -> Result<Self> {
1334        Self::do_commit(
1335            dest.into(),
1336            operation,
1337            read_version,
1338            store_params,
1339            commit_handler,
1340            session,
1341            enable_v2_manifest_paths,
1342            /*detached=*/ false,
1343        )
1344        .await
1345    }
1346
1347    /// Commits changes exactly the same as [`Self::commit`] but the commit will
1348    /// not be associated with the dataset lineage.
1349    ///
1350    /// The commit will not show up in the dataset's history and will never be
1351    /// the latest version of the dataset.
1352    ///
1353    /// This can be used to stage changes or to handle "secondary" datasets whose
1354    /// lineage is tracked elsewhere.
1355    pub async fn commit_detached(
1356        dest: impl Into<WriteDestination<'_>>,
1357        operation: Operation,
1358        read_version: Option<u64>,
1359        store_params: Option<ObjectStoreParams>,
1360        commit_handler: Option<Arc<dyn CommitHandler>>,
1361        session: Arc<Session>,
1362        enable_v2_manifest_paths: bool,
1363    ) -> Result<Self> {
1364        Self::do_commit(
1365            dest.into(),
1366            operation,
1367            read_version,
1368            store_params,
1369            commit_handler,
1370            session,
1371            enable_v2_manifest_paths,
1372            /*detached=*/ true,
1373        )
1374        .await
1375    }
1376
1377    pub(crate) async fn apply_commit(
1378        &mut self,
1379        transaction: Transaction,
1380        write_config: &ManifestWriteConfig,
1381        commit_config: &CommitConfig,
1382    ) -> Result<()> {
1383        let (manifest, manifest_location) = commit_transaction(
1384            self,
1385            self.object_store(),
1386            self.commit_handler.as_ref(),
1387            &transaction,
1388            write_config,
1389            commit_config,
1390            self.manifest_location.naming_scheme,
1391            None,
1392        )
1393        .await?;
1394
1395        self.manifest = Arc::new(manifest);
1396        self.manifest_location = manifest_location;
1397        self.fragment_bitmap = Arc::new(
1398            self.manifest
1399                .fragments
1400                .iter()
1401                .map(|f| f.id as u32)
1402                .collect(),
1403        );
1404
1405        Ok(())
1406    }
1407
1408    /// Create a Scanner to scan the dataset.
1409    pub fn scan(&self) -> Scanner {
1410        Scanner::new(Arc::new(self.clone()))
1411    }
1412
1413    /// Count the number of rows in the dataset.
1414    ///
1415    /// It offers a fast path of counting rows by just computing via metadata.
1416    #[instrument(skip_all)]
1417    pub async fn count_rows(&self, filter: Option<String>) -> Result<usize> {
1418        // TODO: consolidate the count_rows into Scanner plan.
1419        if let Some(filter) = filter {
1420            let mut scanner = self.scan();
1421            scanner.filter(&filter)?;
1422            Ok(scanner
1423                .project::<String>(&[])?
1424                .with_row_id() // TODO: fix scan plan to not require row_id for count_rows.
1425                .count_rows()
1426                .await? as usize)
1427        } else {
1428            self.count_all_rows().await
1429        }
1430    }
1431
1432    pub(crate) async fn count_all_rows(&self) -> Result<usize> {
1433        let cnts = stream::iter(self.get_fragments())
1434            .map(|f| async move { f.count_rows(None).await })
1435            .buffer_unordered(16)
1436            .try_collect::<Vec<_>>()
1437            .await?;
1438        Ok(cnts.iter().sum())
1439    }
1440
1441    /// Take rows by indices.
1442    #[instrument(skip_all, fields(num_rows=row_indices.len()))]
1443    pub async fn take(
1444        &self,
1445        row_indices: &[u64],
1446        projection: impl Into<ProjectionRequest>,
1447    ) -> Result<RecordBatch> {
1448        take::take(self, row_indices, projection.into()).await
1449    }
1450
1451    /// Take Rows by the internal ROW ids.
1452    ///
1453    /// In Lance format, each row has a unique `u64` id, which is used to identify the row globally.
1454    ///
1455    /// ```rust
1456    /// # use std::sync::Arc;
1457    /// # use tokio::runtime::Runtime;
1458    /// # use arrow_array::{RecordBatch, RecordBatchIterator, Int64Array};
1459    /// # use arrow_schema::{Schema, Field, DataType};
1460    /// # use lance::dataset::{WriteParams, Dataset, ProjectionRequest};
1461    /// #
1462    /// # let mut rt = Runtime::new().unwrap();
1463    /// # rt.block_on(async {
1464    /// # let test_dir = tempfile::tempdir().unwrap();
1465    /// # let uri = test_dir.path().to_str().unwrap().to_string();
1466    /// #
1467    /// # let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int64, false)]));
1468    /// # let write_params = WriteParams::default();
1469    /// # let array = Arc::new(Int64Array::from_iter(0..128));
1470    /// # let batch = RecordBatch::try_new(schema.clone(), vec![array]).unwrap();
1471    /// # let reader = RecordBatchIterator::new(
1472    /// #    vec![batch].into_iter().map(Ok), schema
1473    /// # );
1474    /// # let dataset = Dataset::write(reader, &uri, Some(write_params)).await.unwrap();
1475    /// #
1476    /// let schema = dataset.schema().clone();
1477    /// let row_ids = vec![0, 4, 7];
1478    /// let rows = dataset.take_rows(&row_ids, schema).await.unwrap();
1479    ///
1480    /// // We can have more fine-grained control over the projection, i.e., SQL projection.
1481    /// let projection = ProjectionRequest::from_sql([("identity", "id * 2")]);
1482    /// let rows = dataset.take_rows(&row_ids, projection).await.unwrap();
1483    /// # });
1484    /// ```
1485    pub async fn take_rows(
1486        &self,
1487        row_ids: &[u64],
1488        projection: impl Into<ProjectionRequest>,
1489    ) -> Result<RecordBatch> {
1490        Arc::new(self.clone())
1491            .take_builder(row_ids, projection)?
1492            .execute()
1493            .await
1494    }
1495
1496    pub fn take_builder(
1497        self: &Arc<Self>,
1498        row_ids: &[u64],
1499        projection: impl Into<ProjectionRequest>,
1500    ) -> Result<TakeBuilder> {
1501        TakeBuilder::try_new_from_ids(self.clone(), row_ids.to_vec(), projection.into())
1502    }
1503
1504    /// Take [BlobFile] by row IDs.
1505    pub async fn take_blobs(
1506        self: &Arc<Self>,
1507        row_ids: &[u64],
1508        column: impl AsRef<str>,
1509    ) -> Result<Vec<BlobFile>> {
1510        blob::take_blobs(self, row_ids, column.as_ref()).await
1511    }
1512
1513    /// Take [BlobFile] by row addresses.
1514    ///
1515    /// Row addresses are `u64` values encoding `(fragment_id << 32) | row_offset`.
1516    /// Use this method when you already have row addresses, for example from
1517    /// a scan with `with_row_address()`. For row IDs (stable identifiers), use
1518    /// [`Self::take_blobs`]. For row indices (offsets), use
1519    /// [`Self::take_blobs_by_indices`].
1520    pub async fn take_blobs_by_addresses(
1521        self: &Arc<Self>,
1522        row_addrs: &[u64],
1523        column: impl AsRef<str>,
1524    ) -> Result<Vec<BlobFile>> {
1525        blob::take_blobs_by_addresses(self, row_addrs, column.as_ref()).await
1526    }
1527
1528    /// Take [BlobFile] by row indices (offsets in the dataset).
1529    pub async fn take_blobs_by_indices(
1530        self: &Arc<Self>,
1531        row_indices: &[u64],
1532        column: impl AsRef<str>,
1533    ) -> Result<Vec<BlobFile>> {
1534        let row_addrs = row_offsets_to_row_addresses(self, row_indices).await?;
1535        blob::take_blobs_by_addresses(self, &row_addrs, column.as_ref()).await
1536    }
1537
1538    /// Get a stream of batches based on iterator of ranges of row numbers.
1539    ///
1540    /// This is an experimental API. It may change at any time.
1541    pub fn take_scan(
1542        &self,
1543        row_ranges: Pin<Box<dyn Stream<Item = Result<Range<u64>>> + Send>>,
1544        projection: Arc<Schema>,
1545        batch_readahead: usize,
1546    ) -> DatasetRecordBatchStream {
1547        take::take_scan(self, row_ranges, projection, batch_readahead)
1548    }
1549
1550    /// Sample `n` rows from the dataset.
1551    pub(crate) async fn sample(&self, n: usize, projection: &Schema) -> Result<RecordBatch> {
1552        use rand::seq::IteratorRandom;
1553        let num_rows = self.count_rows(None).await?;
1554        let ids = (0..num_rows as u64).choose_multiple(&mut rand::rng(), n);
1555        self.take(&ids, projection.clone()).await
1556    }
1557
1558    /// Delete rows based on a predicate.
1559    pub async fn delete(&mut self, predicate: &str) -> Result<()> {
1560        info!(target: TRACE_DATASET_EVENTS, event=DATASET_DELETING_EVENT, uri = &self.uri, predicate=predicate);
1561        write::delete::delete(self, predicate).await
1562    }
1563
1564    /// Truncate the dataset by deleting all rows.
1565    pub async fn truncate_table(&mut self) -> Result<()> {
1566        self.delete("true").await
1567    }
1568
1569    /// Add new base paths to the dataset.
1570    ///
1571    /// This method allows you to register additional storage locations (buckets)
1572    /// that can be used for future data writes. The base paths are added to the
1573    /// dataset's manifest and can be referenced by name in subsequent write operations.
1574    ///
1575    /// # Arguments
1576    ///
1577    /// * `new_bases` - A vector of `lance_table::format::BasePath` objects representing the new storage
1578    ///   locations to add. Each base path should have a unique name and path.
1579    ///
1580    /// # Returns
1581    ///
1582    /// Returns a new `Dataset` instance with the updated manifest containing the
1583    /// new base paths.
1584    pub async fn add_bases(
1585        self: &Arc<Self>,
1586        new_bases: Vec<lance_table::format::BasePath>,
1587        transaction_properties: Option<HashMap<String, String>>,
1588    ) -> Result<Self> {
1589        let operation = Operation::UpdateBases { new_bases };
1590
1591        let transaction = TransactionBuilder::new(self.manifest.version, operation)
1592            .transaction_properties(transaction_properties.map(Arc::new))
1593            .build();
1594
1595        let new_dataset = CommitBuilder::new(self.clone())
1596            .execute(transaction)
1597            .await?;
1598
1599        Ok(new_dataset)
1600    }
1601
1602    pub async fn count_deleted_rows(&self) -> Result<usize> {
1603        futures::stream::iter(self.get_fragments())
1604            .map(|f| async move { f.count_deletions().await })
1605            .buffer_unordered(self.object_store.io_parallelism())
1606            .try_fold(0, |acc, x| futures::future::ready(Ok(acc + x)))
1607            .await
1608    }
1609
1610    pub fn object_store(&self) -> &ObjectStore {
1611        &self.object_store
1612    }
1613
1614    /// Returns the initial storage options used when opening this dataset, if any.
1615    ///
1616    /// This returns the static initial options without triggering any refresh.
1617    /// For the latest refreshed options, use [`Self::latest_storage_options`].
1618    #[deprecated(since = "0.25.0", note = "Use initial_storage_options() instead")]
1619    pub fn storage_options(&self) -> Option<&HashMap<String, String>> {
1620        self.initial_storage_options()
1621    }
1622
1623    /// Returns the initial storage options without triggering any refresh.
1624    ///
1625    /// For the latest refreshed options, use [`Self::latest_storage_options`].
1626    pub fn initial_storage_options(&self) -> Option<&HashMap<String, String>> {
1627        self.store_params
1628            .as_ref()
1629            .and_then(|params| params.storage_options())
1630    }
1631
1632    /// Returns the storage options provider used when opening this dataset, if any.
1633    pub fn storage_options_provider(
1634        &self,
1635    ) -> Option<Arc<dyn lance_io::object_store::StorageOptionsProvider>> {
1636        self.store_params
1637            .as_ref()
1638            .and_then(|params| params.storage_options_accessor.as_ref())
1639            .and_then(|accessor| accessor.provider().cloned())
1640    }
1641
1642    /// Returns the unified storage options accessor for this dataset, if any.
1643    ///
1644    /// The accessor handles both static and dynamic storage options with automatic
1645    /// caching and refresh. Use [`StorageOptionsAccessor::get_storage_options`] to
1646    /// get the latest options.
1647    pub fn storage_options_accessor(&self) -> Option<Arc<StorageOptionsAccessor>> {
1648        self.store_params
1649            .as_ref()
1650            .and_then(|params| params.get_accessor())
1651    }
1652
1653    /// Returns the latest (possibly refreshed) storage options.
1654    ///
1655    /// If a dynamic storage options provider is configured, this will return
1656    /// the cached options if still valid, or fetch fresh options if expired.
1657    ///
1658    /// For the initial static options without refresh, use [`Self::storage_options`].
1659    ///
1660    /// # Returns
1661    ///
1662    /// - `Ok(Some(options))` - Storage options are available (static or refreshed)
1663    /// - `Ok(None)` - No storage options were configured for this dataset
1664    /// - `Err(...)` - Error occurred while fetching/refreshing options from provider
1665    pub async fn latest_storage_options(&self) -> Result<Option<StorageOptions>> {
1666        // First check if we have an accessor (handles both static and dynamic options)
1667        if let Some(accessor) = self.storage_options_accessor() {
1668            let options = accessor.get_storage_options().await?;
1669            return Ok(Some(options));
1670        }
1671
1672        // Fallback to initial storage options if no accessor
1673        Ok(self.initial_storage_options().cloned().map(StorageOptions))
1674    }
1675
1676    pub fn data_dir(&self) -> Path {
1677        self.base.child(DATA_DIR)
1678    }
1679
1680    pub fn indices_dir(&self) -> Path {
1681        self.base.child(INDICES_DIR)
1682    }
1683
1684    pub fn transactions_dir(&self) -> Path {
1685        self.base.child(TRANSACTIONS_DIR)
1686    }
1687
1688    pub fn deletions_dir(&self) -> Path {
1689        self.base.child(DELETIONS_DIR)
1690    }
1691
1692    pub fn versions_dir(&self) -> Path {
1693        self.base.child(VERSIONS_DIR)
1694    }
1695
1696    pub(crate) fn data_file_dir(&self, data_file: &DataFile) -> Result<Path> {
1697        match data_file.base_id.as_ref() {
1698            Some(base_id) => {
1699                let base_paths = &self.manifest.base_paths;
1700                let base_path = base_paths.get(base_id).ok_or_else(|| {
1701                    Error::invalid_input(
1702                        format!(
1703                            "base_path id {} not found for data_file {}",
1704                            base_id, data_file.path
1705                        ),
1706                        location!(),
1707                    )
1708                })?;
1709                let path = base_path.extract_path(self.session.store_registry())?;
1710                if base_path.is_dataset_root {
1711                    Ok(path.child(DATA_DIR))
1712                } else {
1713                    Ok(path)
1714                }
1715            }
1716            None => Ok(self.base.child(DATA_DIR)),
1717        }
1718    }
1719
1720    /// Get the ObjectStore for a specific path based on base_id
1721    pub(crate) async fn object_store_for_base(&self, base_id: u32) -> Result<Arc<ObjectStore>> {
1722        let base_path = self.manifest.base_paths.get(&base_id).ok_or_else(|| {
1723            Error::invalid_input(
1724                format!("Dataset base path with ID {} not found", base_id),
1725                Default::default(),
1726            )
1727        })?;
1728
1729        let (store, _) = ObjectStore::from_uri_and_params(
1730            self.session.store_registry(),
1731            &base_path.path,
1732            &self.store_params.as_deref().cloned().unwrap_or_default(),
1733        )
1734        .await?;
1735
1736        Ok(store)
1737    }
1738
1739    pub(crate) fn dataset_dir_for_deletion(&self, deletion_file: &DeletionFile) -> Result<Path> {
1740        match deletion_file.base_id.as_ref() {
1741            Some(base_id) => {
1742                let base_paths = &self.manifest.base_paths;
1743                let base_path = base_paths.get(base_id).ok_or_else(|| {
1744                    Error::invalid_input(
1745                        format!(
1746                            "base_path id {} not found for deletion_file {:?}",
1747                            base_id, deletion_file
1748                        ),
1749                        location!(),
1750                    )
1751                })?;
1752
1753                if !base_path.is_dataset_root {
1754                    return Err(Error::Internal {
1755                        message: format!(
1756                            "base_path id {} is not a dataset root for deletion_file {:?}",
1757                            base_id, deletion_file
1758                        ),
1759                        location: location!(),
1760                    });
1761                }
1762                base_path.extract_path(self.session.store_registry())
1763            }
1764            None => Ok(self.base.clone()),
1765        }
1766    }
1767
1768    /// Get the indices directory for a specific index, considering its base_id
1769    pub(crate) fn indice_files_dir(&self, index: &IndexMetadata) -> Result<Path> {
1770        match index.base_id.as_ref() {
1771            Some(base_id) => {
1772                let base_paths = &self.manifest.base_paths;
1773                let base_path = base_paths.get(base_id).ok_or_else(|| {
1774                    Error::invalid_input(
1775                        format!(
1776                            "base_path id {} not found for index {}",
1777                            base_id, index.uuid
1778                        ),
1779                        location!(),
1780                    )
1781                })?;
1782                let path = base_path.extract_path(self.session.store_registry())?;
1783                if base_path.is_dataset_root {
1784                    Ok(path.child(INDICES_DIR))
1785                } else {
1786                    // For non-dataset-root base paths, we assume the path already points to the indices directory
1787                    Ok(path)
1788                }
1789            }
1790            None => Ok(self.base.child(INDICES_DIR)),
1791        }
1792    }
1793
1794    pub fn session(&self) -> Arc<Session> {
1795        self.session.clone()
1796    }
1797
1798    pub fn version(&self) -> Version {
1799        Version::from(self.manifest.as_ref())
1800    }
1801
1802    /// Get the number of entries currently in the index cache.
1803    pub async fn index_cache_entry_count(&self) -> usize {
1804        self.session.index_cache.size().await
1805    }
1806
1807    /// Get cache hit ratio.
1808    pub async fn index_cache_hit_rate(&self) -> f32 {
1809        let stats = self.session.index_cache_stats().await;
1810        stats.hit_ratio()
1811    }
1812
1813    pub fn cache_size_bytes(&self) -> u64 {
1814        self.session.deep_size_of() as u64
1815    }
1816
1817    /// Get all versions.
1818    pub async fn versions(&self) -> Result<Vec<Version>> {
1819        let mut versions: Vec<Version> = self
1820            .commit_handler
1821            .list_manifest_locations(&self.base, &self.object_store, false)
1822            .try_filter_map(|location| async move {
1823                match read_manifest(&self.object_store, &location.path, location.size).await {
1824                    Ok(manifest) => Ok(Some(Version::from(&manifest))),
1825                    Err(e) => Err(e),
1826                }
1827            })
1828            .try_collect()
1829            .await?;
1830
1831        // TODO: this API should support pagination
1832        versions.sort_by_key(|v| v.version);
1833
1834        Ok(versions)
1835    }
1836
1837    /// Get the latest version of the dataset
1838    /// This is meant to be a fast path for checking if a dataset has changed. This is why
1839    /// we don't return the full version struct.
1840    pub async fn latest_version_id(&self) -> Result<u64> {
1841        Ok(self
1842            .commit_handler
1843            .resolve_latest_location(&self.base, &self.object_store)
1844            .await?
1845            .version)
1846    }
1847
1848    pub fn count_fragments(&self) -> usize {
1849        self.manifest.fragments.len()
1850    }
1851
1852    /// Get the schema of the dataset
1853    pub fn schema(&self) -> &Schema {
1854        &self.manifest.schema
1855    }
1856
1857    /// Similar to [Self::schema], but only returns fields that are not marked as blob columns
1858    /// Creates a new empty projection into the dataset schema
1859    pub fn empty_projection(self: &Arc<Self>) -> Projection {
1860        Projection::empty(self.clone()).with_blob_version(self.blob_version())
1861    }
1862
1863    /// Creates a projection that includes all columns in the dataset
1864    pub fn full_projection(self: &Arc<Self>) -> Projection {
1865        Projection::full(self.clone()).with_blob_version(self.blob_version())
1866    }
1867
1868    /// Get fragments.
1869    pub fn get_fragments(&self) -> Vec<FileFragment> {
1870        let dataset = Arc::new(self.clone());
1871        self.manifest
1872            .fragments
1873            .iter()
1874            .map(|f| FileFragment::new(dataset.clone(), f.clone()))
1875            .collect()
1876    }
1877
1878    pub fn get_fragment(&self, fragment_id: usize) -> Option<FileFragment> {
1879        let dataset = Arc::new(self.clone());
1880        let fragment = self
1881            .manifest
1882            .fragments
1883            .iter()
1884            .find(|f| f.id == fragment_id as u64)?;
1885        Some(FileFragment::new(dataset, fragment.clone()))
1886    }
1887
1888    pub fn fragments(&self) -> &Arc<Vec<Fragment>> {
1889        &self.manifest.fragments
1890    }
1891
1892    // Gets a filtered list of fragments from ids in O(N) time instead of using
1893    // `get_fragment` which would require O(N^2) time.
1894    pub fn get_frags_from_ordered_ids(&self, ordered_ids: &[u32]) -> Vec<Option<FileFragment>> {
1895        let mut fragments = Vec::with_capacity(ordered_ids.len());
1896        let mut id_iter = ordered_ids.iter();
1897        let mut id = id_iter.next();
1898        // This field is just used to assert the ids are in order
1899        let mut last_id: i64 = -1;
1900        for frag in self.manifest.fragments.iter() {
1901            let mut the_id = if let Some(id) = id { *id } else { break };
1902            // Assert the given ids are, in fact, in order
1903            assert!(the_id as i64 > last_id);
1904            // For any IDs we've passed we can assume that no fragment exists any longer
1905            // with that ID.
1906            while the_id < frag.id as u32 {
1907                fragments.push(None);
1908                last_id = the_id as i64;
1909                id = id_iter.next();
1910                the_id = if let Some(id) = id { *id } else { break };
1911            }
1912
1913            if the_id == frag.id as u32 {
1914                fragments.push(Some(FileFragment::new(
1915                    Arc::new(self.clone()),
1916                    frag.clone(),
1917                )));
1918                last_id = the_id as i64;
1919                id = id_iter.next();
1920            }
1921        }
1922        fragments
1923    }
1924
1925    // This method filters deleted items from `addr_or_ids` using `addrs` as a reference
1926    async fn filter_addr_or_ids(&self, addr_or_ids: &[u64], addrs: &[u64]) -> Result<Vec<u64>> {
1927        if addrs.is_empty() {
1928            return Ok(Vec::new());
1929        }
1930
1931        let mut perm = permutation::sort(addrs);
1932        // First we sort the addrs, then we transform from Vec<u64> to Vec<Option<u64>> and then
1933        // we un-sort and use the None values to filter `addr_or_ids`
1934        let sorted_addrs = perm.apply_slice(addrs);
1935
1936        // Only collect deletion vectors for the fragments referenced by the given addrs
1937        let referenced_frag_ids = sorted_addrs
1938            .iter()
1939            .map(|addr| RowAddress::from(*addr).fragment_id())
1940            .dedup()
1941            .collect::<Vec<_>>();
1942        let frags = self.get_frags_from_ordered_ids(&referenced_frag_ids);
1943        let dv_futs = frags
1944            .iter()
1945            .map(|frag| {
1946                if let Some(frag) = frag {
1947                    frag.get_deletion_vector().boxed()
1948                } else {
1949                    std::future::ready(Ok(None)).boxed()
1950                }
1951            })
1952            .collect::<Vec<_>>();
1953        let dvs = stream::iter(dv_futs)
1954            .buffered(self.object_store.io_parallelism())
1955            .try_collect::<Vec<_>>()
1956            .await?;
1957
1958        // Iterate through the sorted addresses and sorted fragments (and sorted deletion vectors)
1959        // and filter out addresses that have been deleted
1960        let mut filtered_sorted_addrs = Vec::with_capacity(sorted_addrs.len());
1961        let mut sorted_addr_iter = sorted_addrs.into_iter().map(RowAddress::from);
1962        let mut next_addr = sorted_addr_iter.next().unwrap();
1963        let mut exhausted = false;
1964
1965        for frag_dv in frags.iter().zip(dvs).zip(referenced_frag_ids.iter()) {
1966            let ((frag, dv), frag_id) = frag_dv;
1967            if frag.is_some() {
1968                // Frag exists
1969                if let Some(dv) = dv.as_ref() {
1970                    // Deletion vector exists, scan DV
1971                    for deleted in dv.to_sorted_iter() {
1972                        while next_addr.fragment_id() == *frag_id
1973                            && next_addr.row_offset() < deleted
1974                        {
1975                            filtered_sorted_addrs.push(Some(u64::from(next_addr)));
1976                            if let Some(next) = sorted_addr_iter.next() {
1977                                next_addr = next;
1978                            } else {
1979                                exhausted = true;
1980                                break;
1981                            }
1982                        }
1983                        if exhausted {
1984                            break;
1985                        }
1986                        if next_addr.fragment_id() != *frag_id {
1987                            break;
1988                        }
1989                        if next_addr.row_offset() == deleted {
1990                            filtered_sorted_addrs.push(None);
1991                            if let Some(next) = sorted_addr_iter.next() {
1992                                next_addr = next;
1993                            } else {
1994                                exhausted = true;
1995                                break;
1996                            }
1997                        }
1998                    }
1999                }
2000                if exhausted {
2001                    break;
2002                }
2003                // Either no deletion vector, or we've exhausted it, keep everything else
2004                // in this frag
2005                while next_addr.fragment_id() == *frag_id {
2006                    filtered_sorted_addrs.push(Some(u64::from(next_addr)));
2007                    if let Some(next) = sorted_addr_iter.next() {
2008                        next_addr = next;
2009                    } else {
2010                        break;
2011                    }
2012                }
2013            } else {
2014                // Frag doesn't exist (possibly deleted), delete all items
2015                while next_addr.fragment_id() == *frag_id {
2016                    filtered_sorted_addrs.push(None);
2017                    if let Some(next) = sorted_addr_iter.next() {
2018                        next_addr = next;
2019                    } else {
2020                        break;
2021                    }
2022                }
2023            }
2024        }
2025
2026        // filtered_sorted_ids is now a Vec with the same size as sorted_addrs, but with None
2027        // values where the corresponding address was deleted.  We now need to un-sort it and
2028        // filter out the deleted addresses.
2029        perm.apply_inv_slice_in_place(&mut filtered_sorted_addrs);
2030        Ok(addr_or_ids
2031            .iter()
2032            .zip(filtered_sorted_addrs)
2033            .filter_map(|(addr_or_id, maybe_addr)| maybe_addr.map(|_| *addr_or_id))
2034            .collect())
2035    }
2036
2037    pub(crate) async fn filter_deleted_ids(&self, ids: &[u64]) -> Result<Vec<u64>> {
2038        let addresses = if let Some(row_id_index) = get_row_id_index(self).await? {
2039            let addresses = ids
2040                .iter()
2041                .filter_map(|id| row_id_index.get(*id).map(|address| address.into()))
2042                .collect::<Vec<_>>();
2043            Cow::Owned(addresses)
2044        } else {
2045            Cow::Borrowed(ids)
2046        };
2047
2048        self.filter_addr_or_ids(ids, &addresses).await
2049    }
2050
2051    /// Gets the number of files that are so small they don't even have a full
2052    /// group. These are considered too small because reading many of them is
2053    /// much less efficient than reading a single file because the separate files
2054    /// split up what would otherwise be single IO requests into multiple.
2055    pub async fn num_small_files(&self, max_rows_per_group: usize) -> usize {
2056        futures::stream::iter(self.get_fragments())
2057            .map(|f| async move { f.physical_rows().await })
2058            .buffered(self.object_store.io_parallelism())
2059            .try_filter(|row_count| futures::future::ready(*row_count < max_rows_per_group))
2060            .count()
2061            .await
2062    }
2063
2064    pub async fn validate(&self) -> Result<()> {
2065        // All fragments have unique ids
2066        let id_counts =
2067            self.manifest
2068                .fragments
2069                .iter()
2070                .map(|f| f.id)
2071                .fold(HashMap::new(), |mut acc, id| {
2072                    *acc.entry(id).or_insert(0) += 1;
2073                    acc
2074                });
2075        for (id, count) in id_counts {
2076            if count > 1 {
2077                return Err(Error::corrupt_file(
2078                    self.base.clone(),
2079                    format!(
2080                        "Duplicate fragment id {} found in dataset {:?}",
2081                        id, self.base
2082                    ),
2083                    location!(),
2084                ));
2085            }
2086        }
2087
2088        // Fragments are sorted in increasing fragment id order
2089        self.manifest
2090            .fragments
2091            .iter()
2092            .map(|f| f.id)
2093            .try_fold(0, |prev, id| {
2094                if id < prev {
2095                    Err(Error::corrupt_file(
2096                        self.base.clone(),
2097                        format!(
2098                            "Fragment ids are not sorted in increasing fragment-id order. Found {} after {} in dataset {:?}",
2099                            id, prev, self.base
2100                        ),
2101                        location!(),
2102                    ))
2103                } else {
2104                    Ok(id)
2105                }
2106            })?;
2107
2108        // All fragments have equal lengths
2109        futures::stream::iter(self.get_fragments())
2110            .map(|f| async move { f.validate().await })
2111            .buffer_unordered(self.object_store.io_parallelism())
2112            .try_collect::<Vec<()>>()
2113            .await?;
2114
2115        // Validate indices
2116        let indices = self.load_indices().await?;
2117        self.validate_indices(&indices)?;
2118
2119        Ok(())
2120    }
2121
2122    fn validate_indices(&self, indices: &[IndexMetadata]) -> Result<()> {
2123        // Make sure there are no duplicate ids
2124        let mut index_ids = HashSet::new();
2125        for index in indices.iter() {
2126            if !index_ids.insert(&index.uuid) {
2127                return Err(Error::corrupt_file(
2128                    self.manifest_location.path.clone(),
2129                    format!(
2130                        "Duplicate index id {} found in dataset {:?}",
2131                        &index.uuid, self.base
2132                    ),
2133                    location!(),
2134                ));
2135            }
2136        }
2137
2138        // For each index name, make sure there is no overlap in fragment bitmaps
2139        if let Err(err) = detect_overlapping_fragments(indices) {
2140            let mut message = "Overlapping fragments detected in dataset.".to_string();
2141            for (index_name, overlapping_frags) in err.bad_indices {
2142                message.push_str(&format!(
2143                    "\nIndex {:?} has overlapping fragments: {:?}",
2144                    index_name, overlapping_frags
2145                ));
2146            }
2147            return Err(Error::corrupt_file(
2148                self.manifest_location.path.clone(),
2149                message,
2150                location!(),
2151            ));
2152        };
2153
2154        Ok(())
2155    }
2156
2157    /// Migrate the dataset to use the new manifest path scheme.
2158    ///
2159    /// This function will rename all V1 manifests to [ManifestNamingScheme::V2].
2160    /// These paths provide more efficient opening of datasets with many versions
2161    /// on object stores.
2162    ///
2163    /// This function is idempotent, and can be run multiple times without
2164    /// changing the state of the object store.
2165    ///
2166    /// However, it should not be run while other concurrent operations are happening.
2167    /// And it should also run until completion before resuming other operations.
2168    ///
2169    /// ```rust
2170    /// # use lance::dataset::Dataset;
2171    /// # use lance_table::io::commit::ManifestNamingScheme;
2172    /// # use lance_datagen::{array, RowCount, BatchCount};
2173    /// # use arrow_array::types::Int32Type;
2174    /// # use lance::dataset::write::WriteParams;
2175    /// # let data = lance_datagen::gen_batch()
2176    /// #  .col("key", array::step::<Int32Type>())
2177    /// #  .into_reader_rows(RowCount::from(10), BatchCount::from(1));
2178    /// # let fut = async {
2179    /// # let params = WriteParams {
2180    /// #     enable_v2_manifest_paths: false,
2181    /// #     ..Default::default()
2182    /// # };
2183    /// let mut dataset = Dataset::write(data, "memory://test", Some(params)).await.unwrap();
2184    /// assert_eq!(dataset.manifest_location().naming_scheme, ManifestNamingScheme::V1);
2185    ///
2186    /// dataset.migrate_manifest_paths_v2().await.unwrap();
2187    /// assert_eq!(dataset.manifest_location().naming_scheme, ManifestNamingScheme::V2);
2188    /// # };
2189    /// # tokio::runtime::Runtime::new().unwrap().block_on(fut);
2190    /// ```
2191    pub async fn migrate_manifest_paths_v2(&mut self) -> Result<()> {
2192        migrate_scheme_to_v2(self.object_store(), &self.base).await?;
2193        // We need to re-open.
2194        let latest_version = self.latest_version_id().await?;
2195        *self = self.checkout_version(latest_version).await?;
2196        Ok(())
2197    }
2198
2199    /// Shallow clone the target version into a new dataset at target_path.
2200    /// 'target_path': the uri string to clone the dataset into.
2201    /// 'version': the version cloned from, could be a version number or tag.
2202    /// 'store_params': the object store params to use for the new dataset.
2203    pub async fn shallow_clone(
2204        &mut self,
2205        target_path: &str,
2206        version: impl Into<refs::Ref>,
2207        store_params: Option<ObjectStoreParams>,
2208    ) -> Result<Self> {
2209        let (ref_name, version_number) = self.resolve_reference(version.into()).await?;
2210        let clone_op = Operation::Clone {
2211            is_shallow: true,
2212            ref_name,
2213            ref_version: version_number,
2214            ref_path: self.uri.clone(),
2215            branch_name: None,
2216        };
2217        let transaction = Transaction::new(version_number, clone_op, None);
2218
2219        let builder = CommitBuilder::new(WriteDestination::Uri(target_path))
2220            .with_store_params(
2221                store_params.unwrap_or(self.store_params.as_deref().cloned().unwrap_or_default()),
2222            )
2223            .with_object_store(Arc::new(self.object_store().clone()))
2224            .with_commit_handler(self.commit_handler.clone())
2225            .with_storage_format(self.manifest.data_storage_format.lance_file_version()?);
2226        builder.execute(transaction).await
2227    }
2228
2229    /// Deep clone the target version into a new dataset at target_path.
2230    /// This performs a server-side copy of all relevant dataset files (data files,
2231    /// deletion files, and any external row-id files) into the target dataset
2232    /// without loading data into memory.
2233    ///
2234    /// Parameters:
2235    /// - `target_path`: the URI string to clone the dataset into.
2236    /// - `version`: the version cloned from, could be a version number, branch head, or tag.
2237    /// - `store_params`: the object store params to use for the new dataset.
2238    pub async fn deep_clone(
2239        &mut self,
2240        target_path: &str,
2241        version: impl Into<refs::Ref>,
2242        store_params: Option<ObjectStoreParams>,
2243    ) -> Result<Self> {
2244        use futures::StreamExt;
2245
2246        // Resolve source dataset and its manifest using checkout_version
2247        let src_ds = self.checkout_version(version).await?;
2248        let src_paths = src_ds.collect_paths().await?;
2249
2250        // Prepare target object store and base path
2251        let (target_store, target_base) = ObjectStore::from_uri_and_params(
2252            self.session.store_registry(),
2253            target_path,
2254            &store_params.clone().unwrap_or_default(),
2255        )
2256        .await?;
2257
2258        // Prevent cloning into an existing target dataset
2259        if self
2260            .commit_handler
2261            .resolve_latest_location(&target_base, &target_store)
2262            .await
2263            .is_ok()
2264        {
2265            return Err(Error::DatasetAlreadyExists {
2266                uri: target_path.to_string(),
2267                location: location!(),
2268            });
2269        }
2270
2271        let build_absolute_path = |relative_path: &str, base: &Path| -> Path {
2272            let mut path = base.clone();
2273            for seg in relative_path.split('/') {
2274                if !seg.is_empty() {
2275                    path = path.child(seg);
2276                }
2277            }
2278            path
2279        };
2280
2281        // TODO: Leverage object store bulk copy for efficient deep_clone
2282        //
2283        // All cloud storage providers support batch copy APIs that would provide significant
2284        // performance improvements. We use single file copy before we have upstream support.
2285        //
2286        // Tracked by: https://github.com/lance-format/lance/issues/5435
2287        let io_parallelism = self.object_store.io_parallelism();
2288        let copy_futures = src_paths
2289            .iter()
2290            .map(|(relative_path, base)| {
2291                let store = Arc::clone(&target_store);
2292                let src_path = build_absolute_path(relative_path, base);
2293                let target_path = build_absolute_path(relative_path, &target_base);
2294                async move { store.copy(&src_path, &target_path).await.map(|_| ()) }
2295            })
2296            .collect::<Vec<_>>();
2297
2298        futures::stream::iter(copy_futures)
2299            .buffer_unordered(io_parallelism)
2300            .collect::<Vec<_>>()
2301            .await
2302            .into_iter()
2303            .collect::<Result<Vec<_>>>()?;
2304
2305        // Record a Clone operation and commit via CommitBuilder
2306        let ref_name = src_ds.manifest.branch.clone();
2307        let ref_version = src_ds.manifest_location.version;
2308        let clone_op = Operation::Clone {
2309            is_shallow: false,
2310            ref_name,
2311            ref_version,
2312            ref_path: src_ds.uri().to_string(),
2313            branch_name: None,
2314        };
2315        let txn = Transaction::new(ref_version, clone_op, None);
2316        let builder = CommitBuilder::new(WriteDestination::Uri(target_path))
2317            .with_store_params(store_params.clone().unwrap_or_default())
2318            .with_object_store(target_store.clone())
2319            .with_commit_handler(self.commit_handler.clone())
2320            .with_storage_format(self.manifest.data_storage_format.lance_file_version()?);
2321        let new_ds = builder.execute(txn).await?;
2322        Ok(new_ds)
2323    }
2324
2325    async fn resolve_reference(&self, reference: refs::Ref) -> Result<(Option<String>, u64)> {
2326        match reference {
2327            refs::Ref::Version(branch, version_number) => {
2328                if let Some(version_number) = version_number {
2329                    Ok((branch, version_number))
2330                } else {
2331                    let branch_location = self.branch_location().find_branch(branch.as_deref())?;
2332                    let version_number = self
2333                        .commit_handler
2334                        .resolve_latest_location(&branch_location.path, &self.object_store)
2335                        .await?
2336                        .version;
2337                    Ok((branch, version_number))
2338                }
2339            }
2340            refs::Ref::VersionNumber(version_number) => {
2341                Ok((self.manifest.branch.clone(), version_number))
2342            }
2343            refs::Ref::Tag(tag_name) => {
2344                let tag_contents = self.tags().get(tag_name.as_str()).await?;
2345                Ok((tag_contents.branch, tag_contents.version))
2346            }
2347        }
2348    }
2349
2350    /// Collect all (relative_path, path) of the dataset files.
2351    async fn collect_paths(&self) -> Result<Vec<(String, Path)>> {
2352        let mut file_paths: Vec<(String, Path)> = Vec::new();
2353        for fragment in self.manifest.fragments.iter() {
2354            if let Some(RowIdMeta::External(external_file)) = &fragment.row_id_meta {
2355                return Err(Error::Internal {
2356                    message: format!(
2357                        "External row_id_meta is not supported yet. external file path: {}",
2358                        external_file.path
2359                    ),
2360                    location: location!(),
2361                });
2362            }
2363            for data_file in fragment.files.iter() {
2364                let base_root = if let Some(base_id) = data_file.base_id {
2365                    let base_path =
2366                        self.manifest
2367                            .base_paths
2368                            .get(&base_id)
2369                            .ok_or_else(|| Error::Internal {
2370                                message: format!("base_id {} not found", base_id),
2371                                location: location!(),
2372                            })?;
2373                    Path::parse(base_path.path.as_str())?
2374                } else {
2375                    self.base.clone()
2376                };
2377                file_paths.push((
2378                    format!("{}/{}", DATA_DIR, data_file.path.clone()),
2379                    base_root,
2380                ));
2381            }
2382            if let Some(deletion_file) = &fragment.deletion_file {
2383                let base_root = if let Some(base_id) = deletion_file.base_id {
2384                    let base_path =
2385                        self.manifest
2386                            .base_paths
2387                            .get(&base_id)
2388                            .ok_or_else(|| Error::Internal {
2389                                message: format!("base_id {} not found", base_id),
2390                                location: location!(),
2391                            })?;
2392                    Path::parse(base_path.path.as_str())?
2393                } else {
2394                    self.base.clone()
2395                };
2396                file_paths.push((
2397                    relative_deletion_file_path(fragment.id, deletion_file),
2398                    base_root,
2399                ));
2400            }
2401        }
2402
2403        let indices = read_manifest_indexes(
2404            self.object_store.as_ref(),
2405            &self.manifest_location,
2406            &self.manifest,
2407        )
2408        .await?;
2409
2410        for index in &indices {
2411            let base_root = if let Some(base_id) = index.base_id {
2412                let base_path =
2413                    self.manifest
2414                        .base_paths
2415                        .get(&base_id)
2416                        .ok_or_else(|| Error::Internal {
2417                            message: format!("base_id {} not found", base_id),
2418                            location: location!(),
2419                        })?;
2420                Path::parse(base_path.path.as_str())?
2421            } else {
2422                self.base.clone()
2423            };
2424            let index_root = base_root.child(INDICES_DIR).child(index.uuid.to_string());
2425            let mut stream = self.object_store.read_dir_all(&index_root, None);
2426            while let Some(meta) = stream.next().await.transpose()? {
2427                if let Some(filename) = meta.location.filename() {
2428                    file_paths.push((
2429                        format!("{}/{}/{}", INDICES_DIR, index.uuid, filename),
2430                        base_root.clone(),
2431                    ));
2432                }
2433            }
2434        }
2435        Ok(file_paths)
2436    }
2437
2438    /// Run a SQL query against the dataset.
2439    /// The underlying SQL engine is DataFusion.
2440    /// Please refer to the DataFusion documentation for supported SQL syntax.
2441    pub fn sql(&self, sql: &str) -> SqlQueryBuilder {
2442        SqlQueryBuilder::new(self.clone(), sql)
2443    }
2444
2445    /// Returns true if Lance supports writing this datatype with nulls.
2446    pub(crate) fn lance_supports_nulls(&self, datatype: &DataType) -> bool {
2447        match self
2448            .manifest()
2449            .data_storage_format
2450            .lance_file_version()
2451            .unwrap_or(LanceFileVersion::Legacy)
2452            .resolve()
2453        {
2454            LanceFileVersion::Legacy => matches!(
2455                datatype,
2456                DataType::Utf8
2457                    | DataType::LargeUtf8
2458                    | DataType::Binary
2459                    | DataType::List(_)
2460                    | DataType::FixedSizeBinary(_)
2461                    | DataType::FixedSizeList(_, _)
2462            ),
2463            LanceFileVersion::V2_0 => !matches!(datatype, DataType::Struct(..)),
2464            _ => true,
2465        }
2466    }
2467}
2468
2469pub(crate) struct NewTransactionResult<'a> {
2470    pub dataset: BoxFuture<'a, Result<Dataset>>,
2471    pub new_transactions: BoxStream<'a, Result<(u64, Arc<Transaction>)>>,
2472}
2473
2474pub(crate) fn load_new_transactions(dataset: &Dataset) -> NewTransactionResult<'_> {
2475    // Re-use the same list call for getting the latest manifest and the metadata
2476    // for all manifests in between.
2477    let io_parallelism = dataset.object_store().io_parallelism();
2478    let latest_version = dataset.manifest.version;
2479    let locations = dataset
2480        .commit_handler
2481        .list_manifest_locations(&dataset.base, dataset.object_store(), true)
2482        .try_take_while(move |location| {
2483            futures::future::ready(Ok(location.version > latest_version))
2484        });
2485
2486    // Will send the latest manifest via a channel.
2487    let (latest_tx, latest_rx) = tokio::sync::oneshot::channel();
2488    let mut latest_tx = Some(latest_tx);
2489
2490    let manifests = locations
2491        .map_ok(move |location| {
2492            let latest_tx = latest_tx.take();
2493            async move {
2494                let manifest_key = ManifestKey {
2495                    version: location.version,
2496                    e_tag: location.e_tag.as_deref(),
2497                };
2498                let manifest = if let Some(cached) =
2499                    dataset.metadata_cache.get_with_key(&manifest_key).await
2500                {
2501                    cached
2502                } else {
2503                    let loaded = Arc::new(
2504                        Dataset::load_manifest(
2505                            dataset.object_store(),
2506                            &location,
2507                            &dataset.uri,
2508                            dataset.session.as_ref(),
2509                        )
2510                        .await?,
2511                    );
2512                    dataset
2513                        .metadata_cache
2514                        .insert_with_key(&manifest_key, loaded.clone())
2515                        .await;
2516                    loaded
2517                };
2518
2519                if let Some(latest_tx) = latest_tx {
2520                    // We ignore the error, since we don't care if the receiver is dropped.
2521                    let _ = latest_tx.send((manifest.clone(), location.clone()));
2522                }
2523
2524                Ok((manifest, location))
2525            }
2526        })
2527        .try_buffer_unordered(io_parallelism / 2);
2528    let transactions = manifests
2529        .map_ok(move |(manifest, location)| async move {
2530            let manifest_copy = manifest.clone();
2531            let tx_key = TransactionKey {
2532                version: manifest.version,
2533            };
2534            let transaction =
2535                if let Some(cached) = dataset.metadata_cache.get_with_key(&tx_key).await {
2536                    cached
2537                } else {
2538                    let dataset_version = Dataset::checkout_manifest(
2539                        dataset.object_store.clone(),
2540                        dataset.base.clone(),
2541                        dataset.uri.clone(),
2542                        manifest_copy.clone(),
2543                        location,
2544                        dataset.session(),
2545                        dataset.commit_handler.clone(),
2546                        dataset.file_reader_options.clone(),
2547                        dataset.store_params.as_deref().cloned(),
2548                    )?;
2549                    let loaded =
2550                        Arc::new(dataset_version.read_transaction().await?.ok_or_else(|| {
2551                            Error::Internal {
2552                                message: format!(
2553                                    "Dataset version {} does not have a transaction file",
2554                                    manifest_copy.version
2555                                ),
2556                                location: location!(),
2557                            }
2558                        })?);
2559                    dataset
2560                        .metadata_cache
2561                        .insert_with_key(&tx_key, loaded.clone())
2562                        .await;
2563                    loaded
2564                };
2565            Ok((manifest.version, transaction))
2566        })
2567        .try_buffer_unordered(io_parallelism / 2);
2568
2569    let dataset = async move {
2570        if let Ok((latest_manifest, location)) = latest_rx.await {
2571            // If we got the latest manifest, we can checkout the dataset.
2572            Dataset::checkout_manifest(
2573                dataset.object_store.clone(),
2574                dataset.base.clone(),
2575                dataset.uri.clone(),
2576                latest_manifest,
2577                location,
2578                dataset.session(),
2579                dataset.commit_handler.clone(),
2580                dataset.file_reader_options.clone(),
2581                dataset.store_params.as_deref().cloned(),
2582            )
2583        } else {
2584            // If we didn't get the latest manifest, we can still return the dataset
2585            // with the current manifest.
2586            Ok(dataset.clone())
2587        }
2588    }
2589    .boxed();
2590
2591    let new_transactions = transactions.boxed();
2592
2593    NewTransactionResult {
2594        dataset,
2595        new_transactions,
2596    }
2597}
2598
2599/// # Schema Evolution
2600///
2601/// Lance datasets support evolving the schema. Several operations are
2602/// supported that mirror common SQL operations:
2603///
2604/// - [Self::add_columns()]: Add new columns to the dataset, similar to `ALTER TABLE ADD COLUMN`.
2605/// - [Self::drop_columns()]: Drop columns from the dataset, similar to `ALTER TABLE DROP COLUMN`.
2606/// - [Self::alter_columns()]: Modify columns in the dataset, changing their name, type, or nullability.
2607///   Similar to `ALTER TABLE ALTER COLUMN`.
2608///
2609/// In addition, one operation is unique to Lance: [`merge`](Self::merge). This
2610/// operation allows inserting precomputed data into the dataset.
2611///
2612/// Because these operations change the schema of the dataset, they will conflict
2613/// with most other concurrent operations. Therefore, they should be performed
2614/// when no other write operations are being run.
2615impl Dataset {
2616    /// Append new columns to the dataset.
2617    pub async fn add_columns(
2618        &mut self,
2619        transforms: NewColumnTransform,
2620        read_columns: Option<Vec<String>>,
2621        batch_size: Option<u32>,
2622    ) -> Result<()> {
2623        schema_evolution::add_columns(self, transforms, read_columns, batch_size).await
2624    }
2625
2626    /// Modify columns in the dataset, changing their name, type, or nullability.
2627    ///
2628    /// If only changing the name or nullability of a column, this is a zero-copy
2629    /// operation and any indices will be preserved. If changing the type of a
2630    /// column, the data for that column will be rewritten and any indices will
2631    /// be dropped. The old column data will not be immediately deleted. To remove
2632    /// it, call [optimize::compact_files()] and then
2633    /// [cleanup::cleanup_old_versions()] on the dataset.
2634    pub async fn alter_columns(&mut self, alterations: &[ColumnAlteration]) -> Result<()> {
2635        schema_evolution::alter_columns(self, alterations).await
2636    }
2637
2638    /// Remove columns from the dataset.
2639    ///
2640    /// This is a metadata-only operation and does not remove the data from the
2641    /// underlying storage. In order to remove the data, you must subsequently
2642    /// call [optimize::compact_files()] to rewrite the data without the removed columns and
2643    /// then call [cleanup::cleanup_old_versions()] to remove the old files.
2644    pub async fn drop_columns(&mut self, columns: &[&str]) -> Result<()> {
2645        info!(target: TRACE_DATASET_EVENTS, event=DATASET_DROPPING_COLUMN_EVENT, uri = &self.uri, columns = columns.join(","));
2646        schema_evolution::drop_columns(self, columns).await
2647    }
2648
2649    /// Drop columns from the dataset and return updated dataset. Note that this
2650    /// is a zero-copy operation and column is not physically removed from the
2651    /// dataset.
2652    /// Parameters:
2653    /// - `columns`: the list of column names to drop.
2654    #[deprecated(since = "0.9.12", note = "Please use `drop_columns` instead.")]
2655    pub async fn drop(&mut self, columns: &[&str]) -> Result<()> {
2656        self.drop_columns(columns).await
2657    }
2658
2659    async fn merge_impl(
2660        &mut self,
2661        stream: Box<dyn RecordBatchReader + Send>,
2662        left_on: &str,
2663        right_on: &str,
2664    ) -> Result<()> {
2665        // Sanity check.
2666        if self.schema().field(left_on).is_none() && left_on != ROW_ID && left_on != ROW_ADDR {
2667            return Err(Error::invalid_input(
2668                format!("Column {} does not exist in the left side dataset", left_on),
2669                location!(),
2670            ));
2671        };
2672        let right_schema = stream.schema();
2673        if right_schema.field_with_name(right_on).is_err() {
2674            return Err(Error::invalid_input(
2675                format!(
2676                    "Column {} does not exist in the right side dataset",
2677                    right_on
2678                ),
2679                location!(),
2680            ));
2681        };
2682        for field in right_schema.fields() {
2683            if field.name() == right_on {
2684                // right_on is allowed to exist in the dataset, since it may be
2685                // the same as left_on.
2686                continue;
2687            }
2688            if self.schema().field(field.name()).is_some() {
2689                return Err(Error::invalid_input(
2690                    format!(
2691                        "Column {} exists in both sides of the dataset",
2692                        field.name()
2693                    ),
2694                    location!(),
2695                ));
2696            }
2697        }
2698
2699        // Hash join
2700        let joiner = Arc::new(HashJoiner::try_new(stream, right_on).await?);
2701        // Final schema is union of current schema, plus the RHS schema without
2702        // the right_on key.
2703        let mut new_schema: Schema = self.schema().merge(joiner.out_schema().as_ref())?;
2704        new_schema.set_field_id(Some(self.manifest.max_field_id()));
2705
2706        // Write new data file to each fragment. Parallelism is done over columns,
2707        // so no parallelism done at this level.
2708        let updated_fragments: Vec<Fragment> = stream::iter(self.get_fragments())
2709            .then(|f| {
2710                let joiner = joiner.clone();
2711                async move { f.merge(left_on, &joiner).await.map(|f| f.metadata) }
2712            })
2713            .try_collect::<Vec<_>>()
2714            .await?;
2715
2716        let transaction = Transaction::new(
2717            self.manifest.version,
2718            Operation::Merge {
2719                fragments: updated_fragments,
2720                schema: new_schema,
2721            },
2722            None,
2723        );
2724
2725        self.apply_commit(transaction, &Default::default(), &Default::default())
2726            .await?;
2727
2728        Ok(())
2729    }
2730
2731    /// Merge this dataset with another arrow Table / Dataset, and returns a new version of dataset.
2732    ///
2733    /// Parameters:
2734    ///
2735    /// - `stream`: the stream of [`RecordBatch`] to merge.
2736    /// - `left_on`: the column name to join on the left side (self).
2737    /// - `right_on`: the column name to join on the right side (stream).
2738    ///
2739    /// Returns: a new version of dataset.
2740    ///
2741    /// It performs a left-join on the two datasets.
2742    pub async fn merge(
2743        &mut self,
2744        stream: impl RecordBatchReader + Send + 'static,
2745        left_on: &str,
2746        right_on: &str,
2747    ) -> Result<()> {
2748        let stream = Box::new(stream);
2749        self.merge_impl(stream, left_on, right_on).await
2750    }
2751}
2752
2753/// # Dataset metadata APIs
2754///
2755/// There are four kinds of metadata on datasets:
2756///
2757///  - **Schema metadata**: metadata about the data itself.
2758///  - **Field metadata**: metadata about the dataset itself.
2759///  - **Dataset metadata**: metadata about the dataset. For example, this could
2760///    store a created_at date.
2761///  - **Dataset config**: configuration values controlling how engines should
2762///    manage the dataset. This configures things like auto-cleanup.
2763///
2764/// You can get
2765impl Dataset {
2766    /// Get dataset metadata.
2767    pub fn metadata(&self) -> &HashMap<String, String> {
2768        &self.manifest.table_metadata
2769    }
2770
2771    /// Get the dataset config from manifest
2772    pub fn config(&self) -> &HashMap<String, String> {
2773        &self.manifest.config
2774    }
2775
2776    pub(crate) fn blob_version(&self) -> BlobVersion {
2777        blob_version_from_config(&self.manifest.config)
2778    }
2779
2780    /// Delete keys from the config.
2781    #[deprecated(
2782        note = "Use the new update_config(values, replace) method - pass None values to delete keys"
2783    )]
2784    pub async fn delete_config_keys(&mut self, delete_keys: &[&str]) -> Result<()> {
2785        let updates = delete_keys.iter().map(|key| (*key, None));
2786        self.update_config(updates).await?;
2787        Ok(())
2788    }
2789
2790    /// Update table metadata.
2791    ///
2792    /// Pass `None` for a value to remove that key.
2793    ///
2794    /// Use `.replace()` to replace the entire metadata map instead of merging.
2795    ///
2796    /// Returns the updated metadata map after the operation.
2797    ///
2798    /// ```
2799    /// # use lance::{Dataset, Result};
2800    /// # use lance::dataset::transaction::UpdateMapEntry;
2801    /// # async fn test_update_metadata(dataset: &mut Dataset) -> Result<()> {
2802    /// // Update single key
2803    /// dataset.update_metadata([("key", "value")]).await?;
2804    ///
2805    /// // Remove a key
2806    /// dataset.update_metadata([("to_delete", None)]).await?;
2807    ///
2808    /// // Clear all metadata
2809    /// dataset.update_metadata([] as [UpdateMapEntry; 0]).replace().await?;
2810    ///
2811    /// // Replace full metadata
2812    /// dataset.update_metadata([("k1", "v1"), ("k2", "v2")]).replace().await?;
2813    /// # Ok(())
2814    /// # }
2815    /// ```
2816    pub fn update_metadata(
2817        &mut self,
2818        values: impl IntoIterator<Item = impl Into<UpdateMapEntry>>,
2819    ) -> metadata::UpdateMetadataBuilder<'_> {
2820        metadata::UpdateMetadataBuilder::new(self, values, metadata::MetadataType::TableMetadata)
2821    }
2822
2823    /// Update config.
2824    ///
2825    /// Pass `None` for a value to remove that key.
2826    ///
2827    /// Use `.replace()` to replace the entire config map instead of merging.
2828    ///
2829    /// Returns the updated config map after the operation.
2830    ///
2831    /// ```
2832    /// # use lance::{Dataset, Result};
2833    /// # use lance::dataset::transaction::UpdateMapEntry;
2834    /// # async fn test_update_config(dataset: &mut Dataset) -> Result<()> {
2835    /// // Update single key
2836    /// dataset.update_config([("key", "value")]).await?;
2837    ///
2838    /// // Remove a key
2839    /// dataset.update_config([("to_delete", None)]).await?;
2840    ///
2841    /// // Clear all config
2842    /// dataset.update_config([] as [UpdateMapEntry; 0]).replace().await?;
2843    ///
2844    /// // Replace full config
2845    /// dataset.update_config([("k1", "v1"), ("k2", "v2")]).replace().await?;
2846    /// # Ok(())
2847    /// # }
2848    /// ```
2849    pub fn update_config(
2850        &mut self,
2851        values: impl IntoIterator<Item = impl Into<UpdateMapEntry>>,
2852    ) -> metadata::UpdateMetadataBuilder<'_> {
2853        metadata::UpdateMetadataBuilder::new(self, values, metadata::MetadataType::Config)
2854    }
2855
2856    /// Update schema metadata.
2857    ///
2858    /// Pass `None` for a value to remove that key.
2859    ///
2860    /// Use `.replace()` to replace the entire schema metadata map instead of merging.
2861    ///
2862    /// Returns the updated schema metadata map after the operation.
2863    ///
2864    /// ```
2865    /// # use lance::{Dataset, Result};
2866    /// # use lance::dataset::transaction::UpdateMapEntry;
2867    /// # async fn test_update_schema_metadata(dataset: &mut Dataset) -> Result<()> {
2868    /// // Update single key
2869    /// dataset.update_schema_metadata([("key", "value")]).await?;
2870    ///
2871    /// // Remove a key
2872    /// dataset.update_schema_metadata([("to_delete", None)]).await?;
2873    ///
2874    /// // Clear all schema metadata
2875    /// dataset.update_schema_metadata([] as [UpdateMapEntry; 0]).replace().await?;
2876    ///
2877    /// // Replace full schema metadata
2878    /// dataset.update_schema_metadata([("k1", "v1"), ("k2", "v2")]).replace().await?;
2879    /// # Ok(())
2880    /// # }
2881    /// ```
2882    pub fn update_schema_metadata(
2883        &mut self,
2884        values: impl IntoIterator<Item = impl Into<UpdateMapEntry>>,
2885    ) -> metadata::UpdateMetadataBuilder<'_> {
2886        metadata::UpdateMetadataBuilder::new(self, values, metadata::MetadataType::SchemaMetadata)
2887    }
2888
2889    /// Update schema metadata
2890    #[deprecated(note = "Use the new update_schema_metadata(values).replace() instead")]
2891    pub async fn replace_schema_metadata(
2892        &mut self,
2893        new_values: impl IntoIterator<Item = (String, String)>,
2894    ) -> Result<()> {
2895        let new_values = new_values
2896            .into_iter()
2897            .map(|(k, v)| (k, Some(v)))
2898            .collect::<HashMap<_, _>>();
2899        self.update_schema_metadata(new_values).replace().await?;
2900        Ok(())
2901    }
2902
2903    /// Update field metadata
2904    ///
2905    /// ```
2906    /// # use lance::{Dataset, Result};
2907    /// # use lance::dataset::transaction::UpdateMapEntry;
2908    /// # async fn test_update_field_metadata(dataset: &mut Dataset) -> Result<()> {
2909    /// // Update metadata by field path
2910    /// dataset.update_field_metadata()
2911    ///     .update("path.to_field", [("key", "value")])?
2912    ///     .await?;
2913    ///
2914    /// // Update metadata by field id
2915    /// dataset.update_field_metadata()
2916    ///     .update(12, [("key", "value")])?
2917    ///     .await?;
2918    ///
2919    /// // Clear field metadata
2920    /// dataset.update_field_metadata()
2921    ///     .replace("path.to_field", [] as [UpdateMapEntry; 0])?
2922    ///     .replace(12, [] as [UpdateMapEntry; 0])?
2923    ///     .await?;
2924    ///
2925    /// // Replace field metadata
2926    /// dataset.update_field_metadata()
2927    ///     .replace("field_name", [("k1", "v1"), ("k2", "v2")])?
2928    ///     .await?;
2929    /// # Ok(())
2930    /// # }
2931    /// ```
2932    pub fn update_field_metadata(&mut self) -> UpdateFieldMetadataBuilder<'_> {
2933        UpdateFieldMetadataBuilder::new(self)
2934    }
2935
2936    /// Update field metadata
2937    pub async fn replace_field_metadata(
2938        &mut self,
2939        new_values: impl IntoIterator<Item = (u32, HashMap<String, String>)>,
2940    ) -> Result<()> {
2941        let new_values = new_values.into_iter().collect::<HashMap<_, _>>();
2942        let field_metadata_updates = new_values
2943            .into_iter()
2944            .map(|(field_id, metadata)| {
2945                (
2946                    field_id as i32,
2947                    translate_schema_metadata_updates(&metadata),
2948                )
2949            })
2950            .collect();
2951        metadata::execute_metadata_update(
2952            self,
2953            Operation::UpdateConfig {
2954                config_updates: None,
2955                table_metadata_updates: None,
2956                schema_metadata_updates: None,
2957                field_metadata_updates,
2958            },
2959        )
2960        .await
2961    }
2962}
2963
2964#[async_trait::async_trait]
2965impl DatasetTakeRows for Dataset {
2966    fn schema(&self) -> &Schema {
2967        Self::schema(self)
2968    }
2969
2970    async fn take_rows(&self, row_ids: &[u64], projection: &Schema) -> Result<RecordBatch> {
2971        Self::take_rows(self, row_ids, projection.clone()).await
2972    }
2973}
2974
2975#[derive(Debug)]
2976pub(crate) struct ManifestWriteConfig {
2977    auto_set_feature_flags: bool,              // default true
2978    timestamp: Option<SystemTime>,             // default None
2979    use_stable_row_ids: bool,                  // default false
2980    use_legacy_format: Option<bool>,           // default None
2981    storage_format: Option<DataStorageFormat>, // default None
2982    disable_transaction_file: bool,            // default false
2983}
2984
2985impl Default for ManifestWriteConfig {
2986    fn default() -> Self {
2987        Self {
2988            auto_set_feature_flags: true,
2989            timestamp: None,
2990            use_stable_row_ids: false,
2991            disable_transaction_file: false,
2992            use_legacy_format: None,
2993            storage_format: None,
2994        }
2995    }
2996}
2997
2998impl ManifestWriteConfig {
2999    pub fn disable_transaction_file(&self) -> bool {
3000        self.disable_transaction_file
3001    }
3002}
3003
3004/// Commit a manifest file and create a copy at the latest manifest path.
3005#[allow(clippy::too_many_arguments)]
3006pub(crate) async fn write_manifest_file(
3007    object_store: &ObjectStore,
3008    commit_handler: &dyn CommitHandler,
3009    base_path: &Path,
3010    manifest: &mut Manifest,
3011    indices: Option<Vec<IndexMetadata>>,
3012    config: &ManifestWriteConfig,
3013    naming_scheme: ManifestNamingScheme,
3014    mut transaction: Option<&Transaction>,
3015) -> std::result::Result<ManifestLocation, CommitError> {
3016    if config.auto_set_feature_flags {
3017        apply_feature_flags(
3018            manifest,
3019            config.use_stable_row_ids,
3020            config.disable_transaction_file,
3021        )?;
3022    }
3023
3024    manifest.set_timestamp(timestamp_to_nanos(config.timestamp));
3025
3026    manifest.update_max_fragment_id();
3027
3028    commit_handler
3029        .commit(
3030            manifest,
3031            indices,
3032            base_path,
3033            object_store,
3034            write_manifest_file_to_path,
3035            naming_scheme,
3036            transaction.take().map(|tx| tx.into()),
3037        )
3038        .await
3039}
3040
3041impl Projectable for Dataset {
3042    fn schema(&self) -> &Schema {
3043        self.schema()
3044    }
3045}
3046
3047#[cfg(test)]
3048mod tests;