Skip to main content

lance/
dataset.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4//! Lance Dataset
5//!
6
7use arrow_array::{RecordBatch, RecordBatchReader};
8use arrow_schema::DataType;
9use byteorder::{ByteOrder, LittleEndian};
10use chrono::{Duration, prelude::*};
11use deepsize::DeepSizeOf;
12use futures::future::BoxFuture;
13use futures::stream::{self, BoxStream, StreamExt, TryStreamExt};
14use futures::{FutureExt, Stream};
15
16use crate::dataset::metadata::UpdateFieldMetadataBuilder;
17use crate::dataset::transaction::translate_schema_metadata_updates;
18use crate::session::caches::{DSMetadataCache, ManifestKey, TransactionKey};
19use crate::session::index_caches::DSIndexCache;
20use itertools::Itertools;
21use lance_core::ROW_ADDR;
22use lance_core::datatypes::{OnMissing, OnTypeMismatch, Projectable, Projection};
23use lance_core::traits::DatasetTakeRows;
24use lance_core::utils::address::RowAddress;
25use lance_core::utils::tracing::{
26    DATASET_CLEANING_EVENT, DATASET_DELETING_EVENT, DATASET_DROPPING_COLUMN_EVENT,
27    TRACE_DATASET_EVENTS,
28};
29use lance_datafusion::projection::ProjectionPlan;
30use lance_file::datatypes::populate_schema_dictionary;
31use lance_file::reader::FileReaderOptions;
32use lance_file::version::LanceFileVersion;
33use lance_index::{DatasetIndexExt, IndexType};
34use lance_io::object_store::{
35    LanceNamespaceStorageOptionsProvider, ObjectStore, ObjectStoreParams, StorageOptions,
36    StorageOptionsAccessor, StorageOptionsProvider,
37};
38use lance_io::utils::{read_last_block, read_message, read_metadata_offset, read_struct};
39use lance_namespace::LanceNamespace;
40use lance_table::format::{
41    DataFile, DataStorageFormat, DeletionFile, Fragment, IndexMetadata, Manifest, RowIdMeta, pb,
42};
43use lance_table::io::commit::{
44    CommitConfig, CommitError, CommitHandler, CommitLock, ManifestLocation, ManifestNamingScheme,
45    VERSIONS_DIR, external_manifest::ExternalManifestCommitHandler, migrate_scheme_to_v2,
46    write_manifest_file_to_path,
47};
48
49use crate::io::commit::namespace_manifest::LanceNamespaceExternalManifestStore;
50use lance_table::io::manifest::{read_manifest, read_manifest_indexes};
51use object_store::path::Path;
52use prost::Message;
53use roaring::RoaringBitmap;
54use rowids::get_row_id_index;
55use serde::{Deserialize, Serialize};
56use std::borrow::Cow;
57use std::collections::{BTreeMap, HashMap, HashSet};
58use std::fmt::Debug;
59use std::ops::Range;
60use std::pin::Pin;
61use std::sync::Arc;
62use take::row_offsets_to_row_addresses;
63use tracing::{info, instrument};
64
65pub(crate) mod blob;
66mod branch_location;
67pub mod builder;
68pub mod cleanup;
69pub mod delta;
70pub mod fragment;
71mod hash_joiner;
72pub mod index;
73pub mod mem_wal;
74mod metadata;
75pub mod optimize;
76pub mod progress;
77pub mod refs;
78pub(crate) mod rowids;
79pub mod scanner;
80mod schema_evolution;
81pub mod sql;
82pub mod statistics;
83mod take;
84pub mod transaction;
85pub mod udtf;
86pub mod updater;
87mod utils;
88pub mod write;
89
90use self::builder::DatasetBuilder;
91use self::cleanup::RemovalStats;
92use self::fragment::FileFragment;
93use self::refs::Refs;
94use self::scanner::{DatasetRecordBatchStream, Scanner};
95use self::transaction::{Operation, Transaction, TransactionBuilder, UpdateMapEntry};
96use self::write::write_fragments_internal;
97use crate::dataset::branch_location::BranchLocation;
98use crate::dataset::cleanup::{CleanupPolicy, CleanupPolicyBuilder};
99use crate::dataset::refs::{BranchContents, BranchIdentifier, Branches, Tags};
100use crate::dataset::sql::SqlQueryBuilder;
101use crate::datatypes::Schema;
102use crate::index::retain_supported_indices;
103use crate::io::commit::{
104    commit_detached_transaction, commit_new_dataset, commit_transaction,
105    detect_overlapping_fragments,
106};
107use crate::session::Session;
108use crate::utils::temporal::{SystemTime, timestamp_to_nanos, utc_now};
109use crate::{Error, Result};
110pub use blob::BlobFile;
111use hash_joiner::HashJoiner;
112pub use lance_core::ROW_ID;
113use lance_core::box_error;
114use lance_index::scalar::lance_format::LanceIndexStore;
115use lance_namespace::models::{DeclareTableRequest, DescribeTableRequest};
116use lance_table::feature_flags::{apply_feature_flags, can_read_dataset};
117use lance_table::io::deletion::{DELETIONS_DIR, relative_deletion_file_path};
118pub use schema_evolution::{
119    BatchInfo, BatchUDF, ColumnAlteration, NewColumnTransform, UDFCheckpointStore,
120};
121pub use take::TakeBuilder;
122pub use write::merge_insert::{
123    MergeInsertBuilder, MergeInsertJob, MergeStats, UncommittedMergeInsert, WhenMatched,
124    WhenNotMatched, WhenNotMatchedBySource,
125};
126
127use crate::dataset::index::LanceIndexStoreExt;
128pub use write::update::{UpdateBuilder, UpdateJob};
129#[allow(deprecated)]
130pub use write::{
131    AutoCleanupParams, CommitBuilder, DeleteBuilder, DeleteResult, InsertBuilder, WriteDestination,
132    WriteMode, WriteParams, write_fragments,
133};
134
135pub(crate) const INDICES_DIR: &str = "_indices";
136pub(crate) const DATA_DIR: &str = "data";
137pub(crate) const TRANSACTIONS_DIR: &str = "_transactions";
138
139// We default to 6GB for the index cache, since indices are often large but
140// worth caching.
141pub const DEFAULT_INDEX_CACHE_SIZE: usize = 6 * 1024 * 1024 * 1024;
142// Default to 1 GiB for the metadata cache. Column metadata can be like 40MB,
143// so this should be enough for a few hundred columns. Other metadata is much
144// smaller.
145pub const DEFAULT_METADATA_CACHE_SIZE: usize = 1024 * 1024 * 1024;
146
147/// Lance Dataset
148#[derive(Clone)]
149pub struct Dataset {
150    pub object_store: Arc<ObjectStore>,
151    pub(crate) commit_handler: Arc<dyn CommitHandler>,
152    /// Uri of the dataset.
153    ///
154    /// On cloud storage, we can not use [Dataset::base] to build the full uri because the
155    /// `bucket` is swallowed in the inner [ObjectStore].
156    uri: String,
157    pub(crate) base: Path,
158    pub manifest: Arc<Manifest>,
159    // Path for the manifest that is loaded. Used to get additional information,
160    // such as the index metadata.
161    pub(crate) manifest_location: ManifestLocation,
162    pub(crate) session: Arc<Session>,
163    pub refs: Refs,
164
165    // Bitmap of fragment ids in this dataset.
166    pub(crate) fragment_bitmap: Arc<RoaringBitmap>,
167
168    // These are references to session caches, but with the dataset URI as a prefix.
169    pub(crate) index_cache: Arc<DSIndexCache>,
170    pub(crate) metadata_cache: Arc<DSMetadataCache>,
171
172    /// File reader options to use when reading data files.
173    pub(crate) file_reader_options: Option<FileReaderOptions>,
174
175    /// Object store parameters used when opening this dataset.
176    /// These are used when creating object stores for additional base paths.
177    pub(crate) store_params: Option<Box<ObjectStoreParams>>,
178}
179
180impl std::fmt::Debug for Dataset {
181    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
182        f.debug_struct("Dataset")
183            .field("uri", &self.uri)
184            .field("base", &self.base)
185            .field("version", &self.manifest.version)
186            .field("cache_num_items", &self.session.approx_num_items())
187            .finish()
188    }
189}
190
191/// Dataset Version
192#[derive(Deserialize, Serialize, Debug)]
193pub struct Version {
194    /// version number
195    pub version: u64,
196
197    /// Timestamp of dataset creation in UTC.
198    pub timestamp: DateTime<Utc>,
199
200    /// Key-value pairs of metadata.
201    pub metadata: BTreeMap<String, String>,
202}
203
204/// Convert Manifest to Data Version.
205impl From<&Manifest> for Version {
206    fn from(m: &Manifest) -> Self {
207        Self {
208            version: m.version,
209            timestamp: m.timestamp(),
210            metadata: m.summary().into(),
211        }
212    }
213}
214
215/// Customize read behavior of a dataset.
216#[derive(Clone, Debug)]
217pub struct ReadParams {
218    /// Size of the index cache in bytes. This cache stores index data in memory
219    /// for faster lookups. The default is 6 GiB.
220    pub index_cache_size_bytes: usize,
221
222    /// Size of the metadata cache in bytes. This cache stores metadata in memory
223    /// for faster open table and scans. The default is 1 GiB.
224    pub metadata_cache_size_bytes: usize,
225
226    /// If present, dataset will use this shared [`Session`] instead creating a new one.
227    ///
228    /// This is useful for sharing the same session across multiple datasets.
229    pub session: Option<Arc<Session>>,
230
231    pub store_options: Option<ObjectStoreParams>,
232
233    /// If present, dataset will use this to resolve the latest version
234    ///
235    /// Lance needs to be able to make atomic updates to the manifest.  This involves
236    /// coordination between readers and writers and we can usually rely on the filesystem
237    /// to do this coordination for us.
238    ///
239    /// Some file systems (e.g. S3) do not support atomic operations.  In this case, for
240    /// safety, we recommend an external commit mechanism (such as dynamodb) and, on the
241    /// read path, we need to reach out to that external mechanism to figure out the latest
242    /// version of the dataset.
243    ///
244    /// If this is not set then a default behavior is chosen that is appropriate for the
245    /// filesystem.
246    ///
247    /// If a custom object store is provided (via store_params.object_store) then this
248    /// must also be provided.
249    pub commit_handler: Option<Arc<dyn CommitHandler>>,
250
251    /// File reader options to use when reading data files.
252    ///
253    /// This allows control over features like caching repetition indices and validation.
254    pub file_reader_options: Option<FileReaderOptions>,
255}
256
257impl ReadParams {
258    /// Set the cache size for indices. Set to zero, to disable the cache.
259    #[deprecated(
260        since = "0.30.0",
261        note = "Use `index_cache_size_bytes` instead, which accepts a size in bytes."
262    )]
263    pub fn index_cache_size(&mut self, cache_size: usize) -> &mut Self {
264        let assumed_entry_size = 20 * 1024 * 1024; // 20 MiB per entry
265        self.index_cache_size_bytes = cache_size * assumed_entry_size;
266        self
267    }
268
269    pub fn index_cache_size_bytes(&mut self, cache_size: usize) -> &mut Self {
270        self.index_cache_size_bytes = cache_size;
271        self
272    }
273
274    /// Set the cache size for the file metadata. Set to zero to disable this cache.
275    #[deprecated(
276        since = "0.30.0",
277        note = "Use `metadata_cache_size_bytes` instead, which accepts a size in bytes."
278    )]
279    pub fn metadata_cache_size(&mut self, cache_size: usize) -> &mut Self {
280        let assumed_entry_size = 10 * 1024 * 1024; // 10 MiB per entry
281        self.metadata_cache_size_bytes = cache_size * assumed_entry_size;
282        self
283    }
284
285    /// Set the cache size for the file metadata in bytes.
286    pub fn metadata_cache_size_bytes(&mut self, cache_size: usize) -> &mut Self {
287        self.metadata_cache_size_bytes = cache_size;
288        self
289    }
290
291    /// Set a shared session for the datasets.
292    pub fn session(&mut self, session: Arc<Session>) -> &mut Self {
293        self.session = Some(session);
294        self
295    }
296
297    /// Use the explicit locking to resolve the latest version
298    pub fn set_commit_lock<T: CommitLock + Send + Sync + 'static>(&mut self, lock: Arc<T>) {
299        self.commit_handler = Some(Arc::new(lock));
300    }
301
302    /// Set the file reader options.
303    pub fn file_reader_options(&mut self, options: FileReaderOptions) -> &mut Self {
304        self.file_reader_options = Some(options);
305        self
306    }
307}
308
309impl Default for ReadParams {
310    fn default() -> Self {
311        Self {
312            index_cache_size_bytes: DEFAULT_INDEX_CACHE_SIZE,
313            metadata_cache_size_bytes: DEFAULT_METADATA_CACHE_SIZE,
314            session: None,
315            store_options: None,
316            commit_handler: None,
317            file_reader_options: None,
318        }
319    }
320}
321
322#[derive(Debug, Clone)]
323pub enum ProjectionRequest {
324    Schema(Arc<Schema>),
325    Sql(Vec<(String, String)>),
326}
327
328impl ProjectionRequest {
329    pub fn from_columns(
330        columns: impl IntoIterator<Item = impl AsRef<str>>,
331        dataset_schema: &Schema,
332    ) -> Self {
333        let columns = columns
334            .into_iter()
335            .map(|s| s.as_ref().to_string())
336            .collect::<Vec<_>>();
337
338        let schema = dataset_schema
339            .project_preserve_system_columns(&columns)
340            .unwrap();
341        Self::Schema(Arc::new(schema))
342    }
343
344    pub fn from_schema(schema: Schema) -> Self {
345        Self::Schema(Arc::new(schema))
346    }
347
348    /// Provide a list of projection with SQL transform.
349    ///
350    /// # Parameters
351    /// - `columns`: A list of tuples where the first element is resulted column name and the second
352    ///   element is the SQL expression.
353    pub fn from_sql(
354        columns: impl IntoIterator<Item = (impl Into<String>, impl Into<String>)>,
355    ) -> Self {
356        Self::Sql(
357            columns
358                .into_iter()
359                .map(|(a, b)| (a.into(), b.into()))
360                .collect(),
361        )
362    }
363
364    pub fn into_projection_plan(self, dataset: Arc<Dataset>) -> Result<ProjectionPlan> {
365        match self {
366            Self::Schema(schema) => {
367                // The schema might contain system columns (_rowid, _rowaddr) which are not
368                // in the dataset schema. We handle these specially in ProjectionPlan::from_schema.
369                let system_columns_present = schema
370                    .fields
371                    .iter()
372                    .any(|f| lance_core::is_system_column(&f.name));
373
374                if system_columns_present {
375                    // If system columns are present, we can't use project_by_schema directly
376                    // Just pass the schema to ProjectionPlan::from_schema which handles it
377                    ProjectionPlan::from_schema(dataset, schema.as_ref())
378                } else {
379                    // No system columns, use normal path with validation
380                    let projection = dataset.schema().project_by_schema(
381                        schema.as_ref(),
382                        OnMissing::Error,
383                        OnTypeMismatch::Error,
384                    )?;
385                    ProjectionPlan::from_schema(dataset, &projection)
386                }
387            }
388            Self::Sql(columns) => ProjectionPlan::from_expressions(dataset, &columns),
389        }
390    }
391}
392
393impl From<Arc<Schema>> for ProjectionRequest {
394    fn from(schema: Arc<Schema>) -> Self {
395        Self::Schema(schema)
396    }
397}
398
399impl From<Schema> for ProjectionRequest {
400    fn from(schema: Schema) -> Self {
401        Self::from(Arc::new(schema))
402    }
403}
404
405impl Dataset {
406    /// Open an existing dataset.
407    ///
408    /// See also [DatasetBuilder].
409    #[instrument]
410    pub async fn open(uri: &str) -> Result<Self> {
411        DatasetBuilder::from_uri(uri).load().await
412    }
413
414    /// Check out a dataset version with a ref
415    pub async fn checkout_version(&self, version: impl Into<refs::Ref>) -> Result<Self> {
416        let reference: refs::Ref = version.into();
417        match reference {
418            refs::Ref::Version(branch, version_number) => {
419                self.checkout_by_ref(version_number, branch.as_deref())
420                    .await
421            }
422            refs::Ref::VersionNumber(version_number) => {
423                self.checkout_by_ref(Some(version_number), self.manifest.branch.as_deref())
424                    .await
425            }
426            refs::Ref::Tag(tag_name) => {
427                let tag_contents = self.tags().get(tag_name.as_str()).await?;
428                self.checkout_by_ref(Some(tag_contents.version), tag_contents.branch.as_deref())
429                    .await
430            }
431        }
432    }
433
434    pub fn tags(&self) -> Tags<'_> {
435        self.refs.tags()
436    }
437
438    pub fn branches(&self) -> Branches<'_> {
439        self.refs.branches()
440    }
441
442    /// Check out the latest version of the dataset
443    pub async fn checkout_latest(&mut self) -> Result<()> {
444        let (manifest, manifest_location) = self.latest_manifest().await?;
445        self.manifest = manifest;
446        self.manifest_location = manifest_location;
447        self.fragment_bitmap = Arc::new(
448            self.manifest
449                .fragments
450                .iter()
451                .map(|f| f.id as u32)
452                .collect(),
453        );
454        Ok(())
455    }
456
457    /// Check out the latest version of the branch
458    pub async fn checkout_branch(&self, branch: &str) -> Result<Self> {
459        self.checkout_by_ref(None, Some(branch)).await
460    }
461
462    /// This is a two-phase operation:
463    /// - Create the branch dataset by shallow cloning.
464    /// - Create the branch metadata (a.k.a. `BranchContents`).
465    ///
466    /// These two phases are not atomic. We consider `BranchContents` as the source of truth
467    /// for the branch.
468    ///
469    /// The cleanup procedure should:
470    /// - Clean up zombie branch datasets that have no related `BranchContents`.
471    /// - Delete broken `BranchContents` entries that have no related branch dataset.
472    ///
473    /// If `create_branch` stops at phase 1, it may leave a zombie branch dataset,
474    /// which can be cleaned up later. Such a zombie dataset may cause a branch creation
475    /// failure if we use the same name to `create_branch`. In that case, you need to call
476    /// `force_delete_branch` to interactively clean up the zombie dataset.
477    pub async fn create_branch(
478        &mut self,
479        branch: &str,
480        version: impl Into<refs::Ref>,
481        store_params: Option<ObjectStoreParams>,
482    ) -> Result<Self> {
483        let (source_branch, version_number) = self.resolve_reference(version.into()).await?;
484        let branch_location = self.branch_location().find_branch(Some(branch))?;
485        let clone_op = Operation::Clone {
486            is_shallow: true,
487            ref_name: source_branch.clone(),
488            ref_version: version_number,
489            ref_path: String::from(self.uri()),
490            branch_name: Some(branch.to_string()),
491        };
492        let transaction = Transaction::new(version_number, clone_op, None);
493
494        let builder = CommitBuilder::new(WriteDestination::Uri(branch_location.uri.as_str()))
495            .with_store_params(store_params.unwrap_or_default())
496            .with_object_store(Arc::new(self.object_store().clone()))
497            .with_commit_handler(self.commit_handler.clone())
498            .with_storage_format(self.manifest.data_storage_format.lance_file_version()?);
499        let dataset = builder.execute(transaction).await?;
500
501        // Create BranchContents after shallow_clone
502        self.branches()
503            .create(branch, version_number, source_branch.as_deref())
504            .await?;
505        Ok(dataset)
506    }
507
508    pub async fn delete_branch(&mut self, branch: &str) -> Result<()> {
509        self.branches().delete(branch, false).await
510    }
511
512    /// Delete the branch even if the BranchContents is not found.
513    /// This could be useful when we have zombie branches and want to clean them up immediately.
514    pub async fn force_delete_branch(&mut self, branch: &str) -> Result<()> {
515        self.branches().delete(branch, true).await
516    }
517
518    pub async fn list_branches(&self) -> Result<HashMap<String, BranchContents>> {
519        self.branches().list().await
520    }
521
522    fn already_checked_out(&self, location: &ManifestLocation, branch_name: Option<&str>) -> bool {
523        // We check the e_tag here just in case it has been overwritten. This can
524        // happen if the table has been dropped then re-created recently.
525        self.manifest.branch.as_deref() == branch_name
526            && self.manifest.version == location.version
527            && self.manifest_location.naming_scheme == location.naming_scheme
528            && location.e_tag.as_ref().is_some_and(|e_tag| {
529                self.manifest_location
530                    .e_tag
531                    .as_ref()
532                    .is_some_and(|current_e_tag| e_tag == current_e_tag)
533            })
534    }
535
536    async fn checkout_by_ref(
537        &self,
538        version_number: Option<u64>,
539        branch: Option<&str>,
540    ) -> Result<Self> {
541        let new_location = self.branch_location().find_branch(branch)?;
542
543        let manifest_location = if let Some(version_number) = version_number {
544            self.commit_handler
545                .resolve_version_location(
546                    &new_location.path,
547                    version_number,
548                    &self.object_store.inner,
549                )
550                .await?
551        } else {
552            self.commit_handler
553                .resolve_latest_location(&new_location.path, &self.object_store)
554                .await?
555        };
556
557        if self.already_checked_out(&manifest_location, branch) {
558            return Ok(self.clone());
559        }
560
561        let manifest = Self::load_manifest(
562            self.object_store.as_ref(),
563            &manifest_location,
564            &new_location.uri,
565            self.session.as_ref(),
566        )
567        .await?;
568        Self::checkout_manifest(
569            self.object_store.clone(),
570            new_location.path,
571            new_location.uri,
572            Arc::new(manifest),
573            manifest_location,
574            self.session.clone(),
575            self.commit_handler.clone(),
576            self.file_reader_options.clone(),
577            self.store_params.as_deref().cloned(),
578        )
579    }
580
581    pub(crate) async fn load_manifest(
582        object_store: &ObjectStore,
583        manifest_location: &ManifestLocation,
584        uri: &str,
585        session: &Session,
586    ) -> Result<Manifest> {
587        let object_reader = if let Some(size) = manifest_location.size {
588            object_store
589                .open_with_size(&manifest_location.path, size as usize)
590                .await
591        } else {
592            object_store.open(&manifest_location.path).await
593        };
594        let object_reader = object_reader.map_err(|e| match &e {
595            Error::NotFound { uri, .. } => Error::dataset_not_found(uri.clone(), box_error(e)),
596            _ => e,
597        })?;
598
599        let last_block =
600            read_last_block(object_reader.as_ref())
601                .await
602                .map_err(|err| match err {
603                    object_store::Error::NotFound { path, source } => {
604                        Error::dataset_not_found(path, source)
605                    }
606                    _ => Error::io_source(err.into()),
607                })?;
608        let offset = read_metadata_offset(&last_block)?;
609
610        // If manifest is in the last block, we can decode directly from memory.
611        let manifest_size = object_reader.size().await?;
612        let mut manifest = if manifest_size - offset <= last_block.len() {
613            let manifest_len = manifest_size - offset;
614            let offset_in_block = last_block.len() - manifest_len;
615            let message_len =
616                LittleEndian::read_u32(&last_block[offset_in_block..offset_in_block + 4]) as usize;
617            let message_data = &last_block[offset_in_block + 4..offset_in_block + 4 + message_len];
618            Manifest::try_from(lance_table::format::pb::Manifest::decode(message_data)?)
619        } else {
620            read_struct(object_reader.as_ref(), offset).await
621        }?;
622
623        if !can_read_dataset(manifest.reader_feature_flags) {
624            let message = format!(
625                "This dataset cannot be read by this version of Lance. \
626                 Please upgrade Lance to read this dataset.\n Flags: {}",
627                manifest.reader_feature_flags
628            );
629            return Err(Error::not_supported_source(message.into()));
630        }
631
632        // If indices were also in the last block, we can take the opportunity to
633        // decode them now and cache them.
634        if let Some(index_offset) = manifest.index_section
635            && manifest_size - index_offset <= last_block.len()
636        {
637            let offset_in_block = last_block.len() - (manifest_size - index_offset);
638            let message_len =
639                LittleEndian::read_u32(&last_block[offset_in_block..offset_in_block + 4]) as usize;
640            let message_data = &last_block[offset_in_block + 4..offset_in_block + 4 + message_len];
641            let section = lance_table::format::pb::IndexSection::decode(message_data)?;
642            let mut indices: Vec<IndexMetadata> = section
643                .indices
644                .into_iter()
645                .map(IndexMetadata::try_from)
646                .collect::<Result<Vec<_>>>()?;
647            retain_supported_indices(&mut indices);
648            let ds_index_cache = session.index_cache.for_dataset(uri);
649            let metadata_key = crate::session::index_caches::IndexMetadataKey {
650                version: manifest_location.version,
651            };
652            ds_index_cache
653                .insert_with_key(&metadata_key, Arc::new(indices))
654                .await;
655        }
656
657        // If transaction is also in the last block, we can take the opportunity to
658        // decode them now and cache them.
659        if let Some(transaction_offset) = manifest.transaction_section
660            && manifest_size - transaction_offset <= last_block.len()
661        {
662            let offset_in_block = last_block.len() - (manifest_size - transaction_offset);
663            let message_len =
664                LittleEndian::read_u32(&last_block[offset_in_block..offset_in_block + 4]) as usize;
665            let message_data = &last_block[offset_in_block + 4..offset_in_block + 4 + message_len];
666            let transaction: Transaction =
667                lance_table::format::pb::Transaction::decode(message_data)?.try_into()?;
668
669            let metadata_cache = session.metadata_cache.for_dataset(uri);
670            let metadata_key = TransactionKey {
671                version: manifest_location.version,
672            };
673            metadata_cache
674                .insert_with_key(&metadata_key, Arc::new(transaction))
675                .await;
676        }
677
678        if manifest.should_use_legacy_format() {
679            populate_schema_dictionary(&mut manifest.schema, object_reader.as_ref()).await?;
680        }
681
682        Ok(manifest)
683    }
684
685    #[allow(clippy::too_many_arguments)]
686    fn checkout_manifest(
687        object_store: Arc<ObjectStore>,
688        base_path: Path,
689        uri: String,
690        manifest: Arc<Manifest>,
691        manifest_location: ManifestLocation,
692        session: Arc<Session>,
693        commit_handler: Arc<dyn CommitHandler>,
694        file_reader_options: Option<FileReaderOptions>,
695        store_params: Option<ObjectStoreParams>,
696    ) -> Result<Self> {
697        let refs = Refs::new(
698            object_store.clone(),
699            commit_handler.clone(),
700            BranchLocation {
701                path: base_path.clone(),
702                uri: uri.clone(),
703                branch: manifest.branch.clone(),
704            },
705        );
706        let metadata_cache = Arc::new(session.metadata_cache.for_dataset(&uri));
707        let index_cache = Arc::new(session.index_cache.for_dataset(&uri));
708        let fragment_bitmap = Arc::new(manifest.fragments.iter().map(|f| f.id as u32).collect());
709        Ok(Self {
710            object_store,
711            base: base_path,
712            uri,
713            manifest,
714            manifest_location,
715            commit_handler,
716            session,
717            refs,
718            fragment_bitmap,
719            metadata_cache,
720            index_cache,
721            file_reader_options,
722            store_params: store_params.map(Box::new),
723        })
724    }
725
726    /// Write to or Create a [Dataset] with a stream of [RecordBatch]s.
727    ///
728    /// `dest` can be a `&str`, `object_store::path::Path` or `Arc<Dataset>`.
729    ///
730    /// Returns the newly created [`Dataset`].
731    /// Or Returns [Error] if the dataset already exists.
732    ///
733    pub async fn write(
734        batches: impl RecordBatchReader + Send + 'static,
735        dest: impl Into<WriteDestination<'_>>,
736        params: Option<WriteParams>,
737    ) -> Result<Self> {
738        let mut builder = InsertBuilder::new(dest);
739        if let Some(params) = &params {
740            builder = builder.with_params(params);
741        }
742        Box::pin(builder.execute_stream(Box::new(batches) as Box<dyn RecordBatchReader + Send>))
743            .await
744    }
745
746    /// Write into a namespace-managed table with automatic credential vending.
747    ///
748    /// For CREATE mode, calls declare_table() to initialize the table.
749    /// For other modes, calls describe_table() and opens dataset with namespace credentials.
750    ///
751    /// # Arguments
752    ///
753    /// * `batches` - The record batches to write
754    /// * `namespace` - The namespace to use for table management
755    /// * `table_id` - The table identifier
756    /// * `params` - Write parameters
757    pub async fn write_into_namespace(
758        batches: impl RecordBatchReader + Send + 'static,
759        namespace: Arc<dyn LanceNamespace>,
760        table_id: Vec<String>,
761        mut params: Option<WriteParams>,
762    ) -> Result<Self> {
763        let mut write_params = params.take().unwrap_or_default();
764
765        match write_params.mode {
766            WriteMode::Create => {
767                let declare_request = DeclareTableRequest {
768                    id: Some(table_id.clone()),
769                    ..Default::default()
770                };
771                let response = namespace
772                    .declare_table(declare_request)
773                    .await
774                    .map_err(|e| Error::namespace_source(Box::new(e)))?;
775
776                let uri = response.location.ok_or_else(|| {
777                    Error::namespace_source(Box::new(std::io::Error::other(
778                        "Table location not found in declare_table response",
779                    )))
780                })?;
781
782                // Set up commit handler when managed_versioning is enabled
783                if response.managed_versioning == Some(true) {
784                    let external_store = LanceNamespaceExternalManifestStore::new(
785                        namespace.clone(),
786                        table_id.clone(),
787                    );
788                    let commit_handler: Arc<dyn CommitHandler> =
789                        Arc::new(ExternalManifestCommitHandler {
790                            external_manifest_store: Arc::new(external_store),
791                        });
792                    write_params.commit_handler = Some(commit_handler);
793                }
794
795                // Set initial credentials and provider from namespace
796                if let Some(namespace_storage_options) = response.storage_options {
797                    let provider: Arc<dyn StorageOptionsProvider> = Arc::new(
798                        LanceNamespaceStorageOptionsProvider::new(namespace, table_id),
799                    );
800
801                    // Merge namespace storage options with any existing options
802                    let mut merged_options = write_params
803                        .store_params
804                        .as_ref()
805                        .and_then(|p| p.storage_options().cloned())
806                        .unwrap_or_default();
807                    merged_options.extend(namespace_storage_options);
808
809                    let accessor = Arc::new(StorageOptionsAccessor::with_initial_and_provider(
810                        merged_options,
811                        provider,
812                    ));
813
814                    let existing_params = write_params.store_params.take().unwrap_or_default();
815                    write_params.store_params = Some(ObjectStoreParams {
816                        storage_options_accessor: Some(accessor),
817                        ..existing_params
818                    });
819                }
820
821                Self::write(batches, uri.as_str(), Some(write_params)).await
822            }
823            WriteMode::Append | WriteMode::Overwrite => {
824                let request = DescribeTableRequest {
825                    id: Some(table_id.clone()),
826                    ..Default::default()
827                };
828                let response = namespace
829                    .describe_table(request)
830                    .await
831                    .map_err(|e| Error::namespace_source(Box::new(e)))?;
832
833                let uri = response.location.ok_or_else(|| {
834                    Error::namespace_source(Box::new(std::io::Error::other(
835                        "Table location not found in describe_table response",
836                    )))
837                })?;
838
839                // Set up commit handler when managed_versioning is enabled
840                if response.managed_versioning == Some(true) {
841                    let external_store = LanceNamespaceExternalManifestStore::new(
842                        namespace.clone(),
843                        table_id.clone(),
844                    );
845                    let commit_handler: Arc<dyn CommitHandler> =
846                        Arc::new(ExternalManifestCommitHandler {
847                            external_manifest_store: Arc::new(external_store),
848                        });
849                    write_params.commit_handler = Some(commit_handler);
850                }
851
852                // Set initial credentials and provider from namespace
853                if let Some(namespace_storage_options) = response.storage_options {
854                    let provider: Arc<dyn StorageOptionsProvider> =
855                        Arc::new(LanceNamespaceStorageOptionsProvider::new(
856                            namespace.clone(),
857                            table_id.clone(),
858                        ));
859
860                    // Merge namespace storage options with any existing options
861                    let mut merged_options = write_params
862                        .store_params
863                        .as_ref()
864                        .and_then(|p| p.storage_options().cloned())
865                        .unwrap_or_default();
866                    merged_options.extend(namespace_storage_options);
867
868                    let accessor = Arc::new(StorageOptionsAccessor::with_initial_and_provider(
869                        merged_options,
870                        provider,
871                    ));
872
873                    let existing_params = write_params.store_params.take().unwrap_or_default();
874                    write_params.store_params = Some(ObjectStoreParams {
875                        storage_options_accessor: Some(accessor),
876                        ..existing_params
877                    });
878                }
879
880                // For APPEND/OVERWRITE modes, we must open the existing dataset first
881                // and pass it to InsertBuilder. If we pass just the URI, InsertBuilder
882                // assumes no dataset exists and converts the mode to CREATE.
883                let mut builder = DatasetBuilder::from_uri(uri.as_str());
884                if let Some(ref store_params) = write_params.store_params
885                    && let Some(accessor) = &store_params.storage_options_accessor
886                {
887                    builder = builder.with_storage_options_accessor(accessor.clone());
888                }
889                let dataset = Arc::new(builder.load().await?);
890
891                Self::write(batches, dataset, Some(write_params)).await
892            }
893        }
894    }
895
896    /// Append to existing [Dataset] with a stream of [RecordBatch]s
897    ///
898    /// Returns void result or Returns [Error]
899    pub async fn append(
900        &mut self,
901        batches: impl RecordBatchReader + Send + 'static,
902        params: Option<WriteParams>,
903    ) -> Result<()> {
904        let write_params = WriteParams {
905            mode: WriteMode::Append,
906            ..params.unwrap_or_default()
907        };
908
909        let new_dataset = InsertBuilder::new(WriteDestination::Dataset(Arc::new(self.clone())))
910            .with_params(&write_params)
911            .execute_stream(Box::new(batches) as Box<dyn RecordBatchReader + Send>)
912            .await?;
913
914        *self = new_dataset;
915
916        Ok(())
917    }
918
919    /// Get the fully qualified URI of this dataset.
920    pub fn uri(&self) -> &str {
921        &self.uri
922    }
923
924    pub fn branch_location(&self) -> BranchLocation {
925        BranchLocation {
926            path: self.base.clone(),
927            uri: self.uri.clone(),
928            branch: self.manifest.branch.clone(),
929        }
930    }
931
932    pub async fn branch_identifier(&self) -> Result<BranchIdentifier> {
933        self.refs
934            .branches()
935            .get_identifier(self.manifest.branch.as_deref())
936            .await
937    }
938
939    /// Get the full manifest of the dataset version.
940    pub fn manifest(&self) -> &Manifest {
941        &self.manifest
942    }
943
944    pub fn manifest_location(&self) -> &ManifestLocation {
945        &self.manifest_location
946    }
947
948    /// Create a [`delta::DatasetDeltaBuilder`] to explore changes between dataset versions.
949    ///
950    /// # Example
951    ///
952    /// ```
953    /// # use lance::{Dataset, Result};
954    /// # async fn example(dataset: &Dataset) -> Result<()> {
955    /// let delta = dataset.delta()
956    ///     .compared_against_version(5)
957    ///     .build()?;
958    /// let inserted = delta.get_inserted_rows().await?;
959    /// # Ok(())
960    /// # }
961    /// ```
962    pub fn delta(&self) -> delta::DatasetDeltaBuilder {
963        delta::DatasetDeltaBuilder::new(self.clone())
964    }
965
966    // TODO: Cache this
967    pub(crate) fn is_legacy_storage(&self) -> bool {
968        self.manifest
969            .data_storage_format
970            .lance_file_version()
971            .unwrap()
972            == LanceFileVersion::Legacy
973    }
974
975    pub async fn latest_manifest(&self) -> Result<(Arc<Manifest>, ManifestLocation)> {
976        let location = self
977            .commit_handler
978            .resolve_latest_location(&self.base, &self.object_store)
979            .await?;
980
981        // Check if manifest is in cache before reading from storage
982        let manifest_key = ManifestKey {
983            version: location.version,
984            e_tag: location.e_tag.as_deref(),
985        };
986        let cached_manifest = self.metadata_cache.get_with_key(&manifest_key).await;
987        if let Some(cached_manifest) = cached_manifest {
988            return Ok((cached_manifest, location));
989        }
990
991        if self.already_checked_out(&location, self.manifest.branch.as_deref()) {
992            return Ok((self.manifest.clone(), self.manifest_location.clone()));
993        }
994        let mut manifest = read_manifest(&self.object_store, &location.path, location.size).await?;
995        if manifest.schema.has_dictionary_types() && manifest.should_use_legacy_format() {
996            let reader = if let Some(size) = location.size {
997                self.object_store
998                    .open_with_size(&location.path, size as usize)
999                    .await?
1000            } else {
1001                self.object_store.open(&location.path).await?
1002            };
1003            populate_schema_dictionary(&mut manifest.schema, reader.as_ref()).await?;
1004        }
1005        let manifest_arc = Arc::new(manifest);
1006        self.metadata_cache
1007            .insert_with_key(&manifest_key, manifest_arc.clone())
1008            .await;
1009        Ok((manifest_arc, location))
1010    }
1011
1012    /// Read the transaction file for this version of the dataset.
1013    ///
1014    /// If there was no transaction file written for this version of the dataset
1015    /// then this will return None.
1016    pub async fn read_transaction(&self) -> Result<Option<Transaction>> {
1017        let transaction_key = TransactionKey {
1018            version: self.manifest.version,
1019        };
1020        if let Some(transaction) = self.metadata_cache.get_with_key(&transaction_key).await {
1021            return Ok(Some((*transaction).clone()));
1022        }
1023
1024        // Prefer inline transaction from manifest when available
1025        let transaction = if let Some(pos) = self.manifest.transaction_section {
1026            let reader = if let Some(size) = self.manifest_location.size {
1027                self.object_store
1028                    .open_with_size(&self.manifest_location.path, size as usize)
1029                    .await?
1030            } else {
1031                self.object_store.open(&self.manifest_location.path).await?
1032            };
1033
1034            let tx: pb::Transaction = read_message(reader.as_ref(), pos).await?;
1035            Transaction::try_from(tx).map(Some)?
1036        } else if let Some(path) = &self.manifest.transaction_file {
1037            // Fallback: read external transaction file if present
1038            let path = self.transactions_dir().child(path.as_str());
1039            let data = self.object_store.inner.get(&path).await?.bytes().await?;
1040            let transaction = lance_table::format::pb::Transaction::decode(data)?;
1041            Transaction::try_from(transaction).map(Some)?
1042        } else {
1043            None
1044        };
1045
1046        if let Some(tx) = transaction.as_ref() {
1047            self.metadata_cache
1048                .insert_with_key(&transaction_key, Arc::new(tx.clone()))
1049                .await;
1050        }
1051        Ok(transaction)
1052    }
1053
1054    /// Read the transaction file for this version of the dataset.
1055    ///
1056    /// If there was no transaction file written for this version of the dataset
1057    /// then this will return None.
1058    pub async fn read_transaction_by_version(&self, version: u64) -> Result<Option<Transaction>> {
1059        let dataset_version = self.checkout_version(version).await?;
1060        dataset_version.read_transaction().await
1061    }
1062
1063    /// List transactions for the dataset, up to a maximum number.
1064    ///
1065    /// This method iterates through dataset versions, starting from the current version,
1066    /// and collects the transaction for each version. It stops when either `recent_transactions`
1067    /// is reached or there are no more versions.
1068    ///
1069    /// # Arguments
1070    ///
1071    /// * `recent_transactions` - Maximum number of transactions to return
1072    ///
1073    /// # Returns
1074    ///
1075    /// A vector of optional transactions. Each element corresponds to a version,
1076    /// and may be None if no transaction file exists for that version.
1077    pub async fn get_transactions(
1078        &self,
1079        recent_transactions: usize,
1080    ) -> Result<Vec<Option<Transaction>>> {
1081        let mut transactions = vec![];
1082        let mut dataset = self.clone();
1083
1084        loop {
1085            let transaction = dataset.read_transaction().await?;
1086            transactions.push(transaction);
1087
1088            if transactions.len() >= recent_transactions {
1089                break;
1090            } else {
1091                match dataset
1092                    .checkout_version(dataset.version().version - 1)
1093                    .await
1094                {
1095                    Ok(ds) => dataset = ds,
1096                    Err(Error::DatasetNotFound { .. }) => break,
1097                    Err(err) => return Err(err),
1098                }
1099            }
1100        }
1101
1102        Ok(transactions)
1103    }
1104
1105    /// Restore the currently checked out version of the dataset as the latest version.
1106    pub async fn restore(&mut self) -> Result<()> {
1107        let (latest_manifest, _) = self.latest_manifest().await?;
1108        let latest_version = latest_manifest.version;
1109
1110        let transaction = Transaction::new(
1111            latest_version,
1112            Operation::Restore {
1113                version: self.manifest.version,
1114            },
1115            None,
1116        );
1117
1118        self.apply_commit(transaction, &Default::default(), &Default::default())
1119            .await?;
1120
1121        Ok(())
1122    }
1123
1124    /// Removes old versions of the dataset from disk
1125    ///
1126    /// This function will remove all versions of the dataset that are older than the provided
1127    /// timestamp.  This function will not remove the current version of the dataset.
1128    ///
1129    /// Once a version is removed it can no longer be checked out or restored.  Any data unique
1130    /// to that version will be lost.
1131    ///
1132    /// # Arguments
1133    ///
1134    /// * `older_than` - Versions older than this will be deleted.
1135    /// * `delete_unverified` - If false (the default) then files will only be deleted if they
1136    ///                        are listed in at least one manifest.  Otherwise these files will
1137    ///                        be kept since they cannot be distinguished from an in-progress
1138    ///                        transaction.  Set to true to delete these files if you are sure
1139    ///                        there are no other in-progress dataset operations.
1140    ///
1141    /// # Returns
1142    ///
1143    /// * `RemovalStats` - Statistics about the removal operation
1144    #[instrument(level = "debug", skip(self))]
1145    pub fn cleanup_old_versions(
1146        &self,
1147        older_than: Duration,
1148        delete_unverified: Option<bool>,
1149        error_if_tagged_old_versions: Option<bool>,
1150    ) -> BoxFuture<'_, Result<RemovalStats>> {
1151        let mut builder = CleanupPolicyBuilder::default();
1152        builder = builder.before_timestamp(utc_now() - older_than);
1153        if let Some(v) = delete_unverified {
1154            builder = builder.delete_unverified(v);
1155        }
1156        if let Some(v) = error_if_tagged_old_versions {
1157            builder = builder.error_if_tagged_old_versions(v);
1158        }
1159
1160        self.cleanup_with_policy(builder.build())
1161    }
1162
1163    /// Removes old versions of the dataset from storage
1164    ///
1165    /// This function will remove all versions of the dataset that satisfies the given policy.
1166    /// This function will not remove the current version of the dataset.
1167    ///
1168    /// Once a version is removed it can no longer be checked out or restored.  Any data unique
1169    /// to that version will be lost.
1170    ///
1171    /// # Arguments
1172    ///
1173    /// * `policy` - `CleanupPolicy` determines the behaviour of cleanup.
1174    ///
1175    /// # Returns
1176    ///
1177    /// * `RemovalStats` - Statistics about the removal operation
1178    #[instrument(level = "debug", skip(self))]
1179    pub fn cleanup_with_policy(
1180        &self,
1181        policy: CleanupPolicy,
1182    ) -> BoxFuture<'_, Result<RemovalStats>> {
1183        info!(target: TRACE_DATASET_EVENTS, event=DATASET_CLEANING_EVENT, uri=&self.uri);
1184        cleanup::cleanup_old_versions(self, policy).boxed()
1185    }
1186
1187    #[allow(clippy::too_many_arguments)]
1188    async fn do_commit(
1189        base_uri: WriteDestination<'_>,
1190        operation: Operation,
1191        read_version: Option<u64>,
1192        store_params: Option<ObjectStoreParams>,
1193        commit_handler: Option<Arc<dyn CommitHandler>>,
1194        session: Arc<Session>,
1195        enable_v2_manifest_paths: bool,
1196        detached: bool,
1197    ) -> Result<Self> {
1198        let read_version = read_version.map_or_else(
1199            || match operation {
1200                Operation::Overwrite { .. } | Operation::Restore { .. } => Ok(0),
1201                _ => Err(Error::invalid_input(
1202                    "read_version must be specified for this operation",
1203                )),
1204            },
1205            Ok,
1206        )?;
1207
1208        let transaction = Transaction::new(read_version, operation, None);
1209
1210        let mut builder = CommitBuilder::new(base_uri)
1211            .enable_v2_manifest_paths(enable_v2_manifest_paths)
1212            .with_session(session)
1213            .with_detached(detached);
1214
1215        if let Some(store_params) = store_params {
1216            builder = builder.with_store_params(store_params);
1217        }
1218
1219        if let Some(commit_handler) = commit_handler {
1220            builder = builder.with_commit_handler(commit_handler);
1221        }
1222
1223        builder.execute(transaction).await
1224    }
1225
1226    /// Commit changes to the dataset
1227    ///
1228    /// This operation is not needed if you are using append/write/delete to manipulate the dataset.
1229    /// It is used to commit changes to the dataset that are made externally.  For example, a bulk
1230    /// import tool may import large amounts of new data and write the appropriate lance files
1231    /// directly instead of using the write function.
1232    ///
1233    /// This method can be used to commit this change to the dataset's manifest.  This method will
1234    /// not verify that the provided fragments exist and correct, that is the caller's responsibility.
1235    /// Some validation can be performed using the function
1236    /// [crate::dataset::transaction::validate_operation].
1237    ///
1238    /// If this commit is a change to an existing dataset then it will often need to be based on an
1239    /// existing version of the dataset.  For example, if this change is a `delete` operation then
1240    /// the caller will have read in the existing data (at some version) to determine which fragments
1241    /// need to be deleted.  The base version that the caller used should be supplied as the `read_version`
1242    /// parameter.  Some operations (e.g. Overwrite) do not depend on a previous version and `read_version`
1243    /// can be None.  An error will be returned if the `read_version` is needed for an operation and
1244    /// it is not specified.
1245    ///
1246    /// All operations except Overwrite will fail if the dataset does not already exist.
1247    ///
1248    /// # Arguments
1249    ///
1250    /// * `base_uri` - The base URI of the dataset
1251    /// * `operation` - A description of the change to commit
1252    /// * `read_version` - The version of the dataset that this change is based on
1253    /// * `store_params` Parameters controlling object store access to the manifest
1254    /// * `enable_v2_manifest_paths`: If set to true, and this is a new dataset, uses the new v2 manifest
1255    ///   paths. These allow constant-time lookups for the latest manifest on object storage.
1256    ///   This parameter has no effect on existing datasets. To migrate an existing
1257    ///   dataset, use the [`Self::migrate_manifest_paths_v2`] method. WARNING: turning
1258    ///   this on will make the dataset unreadable for older versions of Lance
1259    ///   (prior to 0.17.0). Default is False.
1260    pub async fn commit(
1261        dest: impl Into<WriteDestination<'_>>,
1262        operation: Operation,
1263        read_version: Option<u64>,
1264        store_params: Option<ObjectStoreParams>,
1265        commit_handler: Option<Arc<dyn CommitHandler>>,
1266        session: Arc<Session>,
1267        enable_v2_manifest_paths: bool,
1268    ) -> Result<Self> {
1269        Self::do_commit(
1270            dest.into(),
1271            operation,
1272            read_version,
1273            store_params,
1274            commit_handler,
1275            session,
1276            enable_v2_manifest_paths,
1277            /*detached=*/ false,
1278        )
1279        .await
1280    }
1281
1282    /// Commits changes exactly the same as [`Self::commit`] but the commit will
1283    /// not be associated with the dataset lineage.
1284    ///
1285    /// The commit will not show up in the dataset's history and will never be
1286    /// the latest version of the dataset.
1287    ///
1288    /// This can be used to stage changes or to handle "secondary" datasets whose
1289    /// lineage is tracked elsewhere.
1290    pub async fn commit_detached(
1291        dest: impl Into<WriteDestination<'_>>,
1292        operation: Operation,
1293        read_version: Option<u64>,
1294        store_params: Option<ObjectStoreParams>,
1295        commit_handler: Option<Arc<dyn CommitHandler>>,
1296        session: Arc<Session>,
1297        enable_v2_manifest_paths: bool,
1298    ) -> Result<Self> {
1299        Self::do_commit(
1300            dest.into(),
1301            operation,
1302            read_version,
1303            store_params,
1304            commit_handler,
1305            session,
1306            enable_v2_manifest_paths,
1307            /*detached=*/ true,
1308        )
1309        .await
1310    }
1311
1312    pub(crate) async fn apply_commit(
1313        &mut self,
1314        transaction: Transaction,
1315        write_config: &ManifestWriteConfig,
1316        commit_config: &CommitConfig,
1317    ) -> Result<()> {
1318        let (manifest, manifest_location) = commit_transaction(
1319            self,
1320            self.object_store(),
1321            self.commit_handler.as_ref(),
1322            &transaction,
1323            write_config,
1324            commit_config,
1325            self.manifest_location.naming_scheme,
1326            None,
1327        )
1328        .await?;
1329
1330        self.manifest = Arc::new(manifest);
1331        self.manifest_location = manifest_location;
1332        self.fragment_bitmap = Arc::new(
1333            self.manifest
1334                .fragments
1335                .iter()
1336                .map(|f| f.id as u32)
1337                .collect(),
1338        );
1339
1340        Ok(())
1341    }
1342
1343    /// Create a Scanner to scan the dataset.
1344    pub fn scan(&self) -> Scanner {
1345        Scanner::new(Arc::new(self.clone()))
1346    }
1347
1348    /// Count the number of rows in the dataset.
1349    ///
1350    /// It offers a fast path of counting rows by just computing via metadata.
1351    #[instrument(skip_all)]
1352    pub async fn count_rows(&self, filter: Option<String>) -> Result<usize> {
1353        // TODO: consolidate the count_rows into Scanner plan.
1354        if let Some(filter) = filter {
1355            let mut scanner = self.scan();
1356            scanner.filter(&filter)?;
1357            Ok(scanner
1358                .project::<String>(&[])?
1359                .with_row_id() // TODO: fix scan plan to not require row_id for count_rows.
1360                .count_rows()
1361                .await? as usize)
1362        } else {
1363            self.count_all_rows().await
1364        }
1365    }
1366
1367    pub(crate) async fn count_all_rows(&self) -> Result<usize> {
1368        let cnts = stream::iter(self.get_fragments())
1369            .map(|f| async move { f.count_rows(None).await })
1370            .buffer_unordered(16)
1371            .try_collect::<Vec<_>>()
1372            .await?;
1373        Ok(cnts.iter().sum())
1374    }
1375
1376    /// Take rows by indices.
1377    #[instrument(skip_all, fields(num_rows=row_indices.len()))]
1378    pub async fn take(
1379        &self,
1380        row_indices: &[u64],
1381        projection: impl Into<ProjectionRequest>,
1382    ) -> Result<RecordBatch> {
1383        take::take(self, row_indices, projection.into()).await
1384    }
1385
1386    /// Take Rows by the internal ROW ids.
1387    ///
1388    /// In Lance format, each row has a unique `u64` id, which is used to identify the row globally.
1389    ///
1390    /// ```rust
1391    /// # use std::sync::Arc;
1392    /// # use tokio::runtime::Runtime;
1393    /// # use arrow_array::{RecordBatch, RecordBatchIterator, Int64Array};
1394    /// # use arrow_schema::{Schema, Field, DataType};
1395    /// # use lance::dataset::{WriteParams, Dataset, ProjectionRequest};
1396    /// #
1397    /// # let mut rt = Runtime::new().unwrap();
1398    /// # rt.block_on(async {
1399    /// # let test_dir = tempfile::tempdir().unwrap();
1400    /// # let uri = test_dir.path().to_str().unwrap().to_string();
1401    /// #
1402    /// # let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int64, false)]));
1403    /// # let write_params = WriteParams::default();
1404    /// # let array = Arc::new(Int64Array::from_iter(0..128));
1405    /// # let batch = RecordBatch::try_new(schema.clone(), vec![array]).unwrap();
1406    /// # let reader = RecordBatchIterator::new(
1407    /// #    vec![batch].into_iter().map(Ok), schema
1408    /// # );
1409    /// # let dataset = Dataset::write(reader, &uri, Some(write_params)).await.unwrap();
1410    /// #
1411    /// let schema = dataset.schema().clone();
1412    /// let row_ids = vec![0, 4, 7];
1413    /// let rows = dataset.take_rows(&row_ids, schema).await.unwrap();
1414    ///
1415    /// // We can have more fine-grained control over the projection, i.e., SQL projection.
1416    /// let projection = ProjectionRequest::from_sql([("identity", "id * 2")]);
1417    /// let rows = dataset.take_rows(&row_ids, projection).await.unwrap();
1418    /// # });
1419    /// ```
1420    pub async fn take_rows(
1421        &self,
1422        row_ids: &[u64],
1423        projection: impl Into<ProjectionRequest>,
1424    ) -> Result<RecordBatch> {
1425        Arc::new(self.clone())
1426            .take_builder(row_ids, projection)?
1427            .execute()
1428            .await
1429    }
1430
1431    pub fn take_builder(
1432        self: &Arc<Self>,
1433        row_ids: &[u64],
1434        projection: impl Into<ProjectionRequest>,
1435    ) -> Result<TakeBuilder> {
1436        TakeBuilder::try_new_from_ids(self.clone(), row_ids.to_vec(), projection.into())
1437    }
1438
1439    /// Take [BlobFile] by row IDs.
1440    pub async fn take_blobs(
1441        self: &Arc<Self>,
1442        row_ids: &[u64],
1443        column: impl AsRef<str>,
1444    ) -> Result<Vec<BlobFile>> {
1445        blob::take_blobs(self, row_ids, column.as_ref()).await
1446    }
1447
1448    /// Take [BlobFile] by row addresses.
1449    ///
1450    /// Row addresses are `u64` values encoding `(fragment_id << 32) | row_offset`.
1451    /// Use this method when you already have row addresses, for example from
1452    /// a scan with `with_row_address()`. For row IDs (stable identifiers), use
1453    /// [`Self::take_blobs`]. For row indices (offsets), use
1454    /// [`Self::take_blobs_by_indices`].
1455    pub async fn take_blobs_by_addresses(
1456        self: &Arc<Self>,
1457        row_addrs: &[u64],
1458        column: impl AsRef<str>,
1459    ) -> Result<Vec<BlobFile>> {
1460        blob::take_blobs_by_addresses(self, row_addrs, column.as_ref()).await
1461    }
1462
1463    /// Take [BlobFile] by row indices (offsets in the dataset).
1464    pub async fn take_blobs_by_indices(
1465        self: &Arc<Self>,
1466        row_indices: &[u64],
1467        column: impl AsRef<str>,
1468    ) -> Result<Vec<BlobFile>> {
1469        let row_addrs = row_offsets_to_row_addresses(self, row_indices).await?;
1470        blob::take_blobs_by_addresses(self, &row_addrs, column.as_ref()).await
1471    }
1472
1473    /// Get a stream of batches based on iterator of ranges of row numbers.
1474    ///
1475    /// This is an experimental API. It may change at any time.
1476    pub fn take_scan(
1477        &self,
1478        row_ranges: Pin<Box<dyn Stream<Item = Result<Range<u64>>> + Send>>,
1479        projection: Arc<Schema>,
1480        batch_readahead: usize,
1481    ) -> DatasetRecordBatchStream {
1482        take::take_scan(self, row_ranges, projection, batch_readahead)
1483    }
1484
1485    /// Randomly sample `n` rows from the dataset.
1486    ///
1487    /// The returned rows are in row-id order (not random order), which allows
1488    /// the underlying take operation to use an efficient sorted code path.
1489    pub async fn sample(&self, n: usize, projection: &Schema) -> Result<RecordBatch> {
1490        use rand::seq::IteratorRandom;
1491        let num_rows = self.count_rows(None).await?;
1492        let mut ids = (0..num_rows as u64).choose_multiple(&mut rand::rng(), n);
1493        ids.sort_unstable();
1494        self.take(&ids, projection.clone()).await
1495    }
1496
1497    /// Delete rows based on a predicate.
1498    pub async fn delete(&mut self, predicate: &str) -> Result<write::delete::DeleteResult> {
1499        info!(target: TRACE_DATASET_EVENTS, event=DATASET_DELETING_EVENT, uri = &self.uri, predicate=predicate);
1500        write::delete::delete(self, predicate).await
1501    }
1502
1503    /// Truncate the dataset by deleting all rows.
1504    pub async fn truncate_table(&mut self) -> Result<()> {
1505        self.delete("true").await.map(|_| ())
1506    }
1507
1508    /// Add new base paths to the dataset.
1509    ///
1510    /// This method allows you to register additional storage locations (buckets)
1511    /// that can be used for future data writes. The base paths are added to the
1512    /// dataset's manifest and can be referenced by name in subsequent write operations.
1513    ///
1514    /// # Arguments
1515    ///
1516    /// * `new_bases` - A vector of `lance_table::format::BasePath` objects representing the new storage
1517    ///   locations to add. Each base path should have a unique name and path.
1518    ///
1519    /// # Returns
1520    ///
1521    /// Returns a new `Dataset` instance with the updated manifest containing the
1522    /// new base paths.
1523    pub async fn add_bases(
1524        self: &Arc<Self>,
1525        new_bases: Vec<lance_table::format::BasePath>,
1526        transaction_properties: Option<HashMap<String, String>>,
1527    ) -> Result<Self> {
1528        let operation = Operation::UpdateBases { new_bases };
1529
1530        let transaction = TransactionBuilder::new(self.manifest.version, operation)
1531            .transaction_properties(transaction_properties.map(Arc::new))
1532            .build();
1533
1534        let new_dataset = CommitBuilder::new(self.clone())
1535            .execute(transaction)
1536            .await?;
1537
1538        Ok(new_dataset)
1539    }
1540
1541    pub async fn count_deleted_rows(&self) -> Result<usize> {
1542        futures::stream::iter(self.get_fragments())
1543            .map(|f| async move { f.count_deletions().await })
1544            .buffer_unordered(self.object_store.io_parallelism())
1545            .try_fold(0, |acc, x| futures::future::ready(Ok(acc + x)))
1546            .await
1547    }
1548
1549    pub fn object_store(&self) -> &ObjectStore {
1550        &self.object_store
1551    }
1552
1553    /// Clone this dataset with a different object store binding.
1554    ///
1555    /// The returned dataset shares metadata, session state, and caches with the
1556    /// original dataset, but all subsequent operations on the returned dataset
1557    /// use the supplied object store.
1558    pub fn with_object_store(
1559        &self,
1560        object_store: Arc<ObjectStore>,
1561        store_params: Option<ObjectStoreParams>,
1562    ) -> Self {
1563        let mut cloned = self.clone();
1564        cloned.object_store = object_store;
1565        if let Some(store_params) = store_params {
1566            cloned.store_params = Some(Box::new(store_params));
1567        }
1568        cloned
1569    }
1570
1571    /// Returns the initial storage options used when opening this dataset, if any.
1572    ///
1573    /// This returns the static initial options without triggering any refresh.
1574    /// For the latest refreshed options, use [`Self::latest_storage_options`].
1575    #[deprecated(since = "0.25.0", note = "Use initial_storage_options() instead")]
1576    pub fn storage_options(&self) -> Option<&HashMap<String, String>> {
1577        self.initial_storage_options()
1578    }
1579
1580    /// Returns the initial storage options without triggering any refresh.
1581    ///
1582    /// For the latest refreshed options, use [`Self::latest_storage_options`].
1583    pub fn initial_storage_options(&self) -> Option<&HashMap<String, String>> {
1584        self.store_params
1585            .as_ref()
1586            .and_then(|params| params.storage_options())
1587    }
1588
1589    /// Returns the storage options provider used when opening this dataset, if any.
1590    pub fn storage_options_provider(
1591        &self,
1592    ) -> Option<Arc<dyn lance_io::object_store::StorageOptionsProvider>> {
1593        self.store_params
1594            .as_ref()
1595            .and_then(|params| params.storage_options_accessor.as_ref())
1596            .and_then(|accessor| accessor.provider().cloned())
1597    }
1598
1599    /// Returns the unified storage options accessor for this dataset, if any.
1600    ///
1601    /// The accessor handles both static and dynamic storage options with automatic
1602    /// caching and refresh. Use [`StorageOptionsAccessor::get_storage_options`] to
1603    /// get the latest options.
1604    pub fn storage_options_accessor(&self) -> Option<Arc<StorageOptionsAccessor>> {
1605        self.store_params
1606            .as_ref()
1607            .and_then(|params| params.get_accessor())
1608    }
1609
1610    /// Returns the latest (possibly refreshed) storage options.
1611    ///
1612    /// If a dynamic storage options provider is configured, this will return
1613    /// the cached options if still valid, or fetch fresh options if expired.
1614    ///
1615    /// For the initial static options without refresh, use [`Self::storage_options`].
1616    ///
1617    /// # Returns
1618    ///
1619    /// - `Ok(Some(options))` - Storage options are available (static or refreshed)
1620    /// - `Ok(None)` - No storage options were configured for this dataset
1621    /// - `Err(...)` - Error occurred while fetching/refreshing options from provider
1622    pub async fn latest_storage_options(&self) -> Result<Option<StorageOptions>> {
1623        // First check if we have an accessor (handles both static and dynamic options)
1624        if let Some(accessor) = self.storage_options_accessor() {
1625            let options = accessor.get_storage_options().await?;
1626            return Ok(Some(options));
1627        }
1628
1629        // Fallback to initial storage options if no accessor
1630        Ok(self.initial_storage_options().cloned().map(StorageOptions))
1631    }
1632
1633    pub fn data_dir(&self) -> Path {
1634        self.base.child(DATA_DIR)
1635    }
1636
1637    pub fn indices_dir(&self) -> Path {
1638        self.base.child(INDICES_DIR)
1639    }
1640
1641    pub fn transactions_dir(&self) -> Path {
1642        self.base.child(TRANSACTIONS_DIR)
1643    }
1644
1645    pub fn deletions_dir(&self) -> Path {
1646        self.base.child(DELETIONS_DIR)
1647    }
1648
1649    pub fn versions_dir(&self) -> Path {
1650        self.base.child(VERSIONS_DIR)
1651    }
1652
1653    pub(crate) fn data_file_dir(&self, data_file: &DataFile) -> Result<Path> {
1654        match data_file.base_id.as_ref() {
1655            Some(base_id) => {
1656                let base_paths = &self.manifest.base_paths;
1657                let base_path = base_paths.get(base_id).ok_or_else(|| {
1658                    Error::invalid_input(format!(
1659                        "base_path id {} not found for data_file {}",
1660                        base_id, data_file.path
1661                    ))
1662                })?;
1663                let path = base_path.extract_path(self.session.store_registry())?;
1664                if base_path.is_dataset_root {
1665                    Ok(path.child(DATA_DIR))
1666                } else {
1667                    Ok(path)
1668                }
1669            }
1670            None => Ok(self.base.child(DATA_DIR)),
1671        }
1672    }
1673
1674    /// Get the ObjectStore for a specific path based on base_id
1675    pub(crate) async fn object_store_for_base(&self, base_id: u32) -> Result<Arc<ObjectStore>> {
1676        let base_path = self.manifest.base_paths.get(&base_id).ok_or_else(|| {
1677            Error::invalid_input(format!("Dataset base path with ID {} not found", base_id))
1678        })?;
1679
1680        let (store, _) = ObjectStore::from_uri_and_params(
1681            self.session.store_registry(),
1682            &base_path.path,
1683            &self.store_params.as_deref().cloned().unwrap_or_default(),
1684        )
1685        .await?;
1686
1687        Ok(store)
1688    }
1689
1690    pub(crate) fn dataset_dir_for_deletion(&self, deletion_file: &DeletionFile) -> Result<Path> {
1691        match deletion_file.base_id.as_ref() {
1692            Some(base_id) => {
1693                let base_paths = &self.manifest.base_paths;
1694                let base_path = base_paths.get(base_id).ok_or_else(|| {
1695                    Error::invalid_input(format!(
1696                        "base_path id {} not found for deletion_file {:?}",
1697                        base_id, deletion_file
1698                    ))
1699                })?;
1700
1701                if !base_path.is_dataset_root {
1702                    return Err(Error::internal(format!(
1703                        "base_path id {} is not a dataset root for deletion_file {:?}",
1704                        base_id, deletion_file
1705                    )));
1706                }
1707                base_path.extract_path(self.session.store_registry())
1708            }
1709            None => Ok(self.base.clone()),
1710        }
1711    }
1712
1713    /// Get the indices directory for a specific index, considering its base_id
1714    pub(crate) fn indice_files_dir(&self, index: &IndexMetadata) -> Result<Path> {
1715        match index.base_id.as_ref() {
1716            Some(base_id) => {
1717                let base_paths = &self.manifest.base_paths;
1718                let base_path = base_paths.get(base_id).ok_or_else(|| {
1719                    Error::invalid_input(format!(
1720                        "base_path id {} not found for index {}",
1721                        base_id, index.uuid
1722                    ))
1723                })?;
1724                let path = base_path.extract_path(self.session.store_registry())?;
1725                if base_path.is_dataset_root {
1726                    Ok(path.child(INDICES_DIR))
1727                } else {
1728                    // For non-dataset-root base paths, we assume the path already points to the indices directory
1729                    Ok(path)
1730                }
1731            }
1732            None => Ok(self.base.child(INDICES_DIR)),
1733        }
1734    }
1735
1736    pub fn session(&self) -> Arc<Session> {
1737        self.session.clone()
1738    }
1739
1740    pub fn version(&self) -> Version {
1741        Version::from(self.manifest.as_ref())
1742    }
1743
1744    /// Get the number of entries currently in the index cache.
1745    pub async fn index_cache_entry_count(&self) -> usize {
1746        self.session.index_cache.size().await
1747    }
1748
1749    /// Get cache hit ratio.
1750    pub async fn index_cache_hit_rate(&self) -> f32 {
1751        let stats = self.session.index_cache_stats().await;
1752        stats.hit_ratio()
1753    }
1754
1755    pub fn cache_size_bytes(&self) -> u64 {
1756        self.session.deep_size_of() as u64
1757    }
1758
1759    /// Get all versions.
1760    pub async fn versions(&self) -> Result<Vec<Version>> {
1761        let mut versions: Vec<Version> = self
1762            .commit_handler
1763            .list_manifest_locations(&self.base, &self.object_store, false)
1764            .try_filter_map(|location| async move {
1765                match read_manifest(&self.object_store, &location.path, location.size).await {
1766                    Ok(manifest) => Ok(Some(Version::from(&manifest))),
1767                    Err(e) => Err(e),
1768                }
1769            })
1770            .try_collect()
1771            .await?;
1772
1773        // TODO: this API should support pagination
1774        versions.sort_by_key(|v| v.version);
1775
1776        Ok(versions)
1777    }
1778
1779    /// Get the latest version of the dataset
1780    /// This is meant to be a fast path for checking if a dataset has changed. This is why
1781    /// we don't return the full version struct.
1782    pub async fn latest_version_id(&self) -> Result<u64> {
1783        Ok(self
1784            .commit_handler
1785            .resolve_latest_location(&self.base, &self.object_store)
1786            .await?
1787            .version)
1788    }
1789
1790    pub fn count_fragments(&self) -> usize {
1791        self.manifest.fragments.len()
1792    }
1793
1794    /// Get the schema of the dataset
1795    pub fn schema(&self) -> &Schema {
1796        &self.manifest.schema
1797    }
1798
1799    /// Similar to [Self::schema], but only returns fields that are not marked as blob columns
1800    /// Creates a new empty projection into the dataset schema
1801    pub fn empty_projection(self: &Arc<Self>) -> Projection {
1802        Projection::empty(self.clone())
1803    }
1804
1805    /// Creates a projection that includes all columns in the dataset
1806    pub fn full_projection(self: &Arc<Self>) -> Projection {
1807        Projection::full(self.clone())
1808    }
1809
1810    /// Get fragments.
1811    pub fn get_fragments(&self) -> Vec<FileFragment> {
1812        let dataset = Arc::new(self.clone());
1813        self.manifest
1814            .fragments
1815            .iter()
1816            .map(|f| FileFragment::new(dataset.clone(), f.clone()))
1817            .collect()
1818    }
1819
1820    pub fn get_fragment(&self, fragment_id: usize) -> Option<FileFragment> {
1821        let dataset = Arc::new(self.clone());
1822        let fragment = self
1823            .manifest
1824            .fragments
1825            .iter()
1826            .find(|f| f.id == fragment_id as u64)?;
1827        Some(FileFragment::new(dataset, fragment.clone()))
1828    }
1829
1830    pub fn fragments(&self) -> &Arc<Vec<Fragment>> {
1831        &self.manifest.fragments
1832    }
1833
1834    // Gets a filtered list of fragments from ids in O(N) time instead of using
1835    // `get_fragment` which would require O(N^2) time.
1836    pub fn get_frags_from_ordered_ids(&self, ordered_ids: &[u32]) -> Vec<Option<FileFragment>> {
1837        let mut fragments = Vec::with_capacity(ordered_ids.len());
1838        let mut id_iter = ordered_ids.iter();
1839        let mut id = id_iter.next();
1840        // This field is just used to assert the ids are in order
1841        let mut last_id: i64 = -1;
1842        for frag in self.manifest.fragments.iter() {
1843            let mut the_id = if let Some(id) = id { *id } else { break };
1844            // Assert the given ids are, in fact, in order
1845            assert!(the_id as i64 > last_id);
1846            // For any IDs we've passed we can assume that no fragment exists any longer
1847            // with that ID.
1848            while the_id < frag.id as u32 {
1849                fragments.push(None);
1850                last_id = the_id as i64;
1851                id = id_iter.next();
1852                the_id = if let Some(id) = id { *id } else { break };
1853            }
1854
1855            if the_id == frag.id as u32 {
1856                fragments.push(Some(FileFragment::new(
1857                    Arc::new(self.clone()),
1858                    frag.clone(),
1859                )));
1860                last_id = the_id as i64;
1861                id = id_iter.next();
1862            }
1863        }
1864        fragments
1865    }
1866
1867    // This method filters deleted items from `addr_or_ids` using `addrs` as a reference
1868    async fn filter_addr_or_ids(&self, addr_or_ids: &[u64], addrs: &[u64]) -> Result<Vec<u64>> {
1869        if addrs.is_empty() {
1870            return Ok(Vec::new());
1871        }
1872
1873        let mut perm = permutation::sort(addrs);
1874        // First we sort the addrs, then we transform from Vec<u64> to Vec<Option<u64>> and then
1875        // we un-sort and use the None values to filter `addr_or_ids`
1876        let sorted_addrs = perm.apply_slice(addrs);
1877
1878        // Only collect deletion vectors for the fragments referenced by the given addrs
1879        let referenced_frag_ids = sorted_addrs
1880            .iter()
1881            .map(|addr| RowAddress::from(*addr).fragment_id())
1882            .dedup()
1883            .collect::<Vec<_>>();
1884        let frags = self.get_frags_from_ordered_ids(&referenced_frag_ids);
1885        let dv_futs = frags
1886            .iter()
1887            .map(|frag| {
1888                if let Some(frag) = frag {
1889                    frag.get_deletion_vector().boxed()
1890                } else {
1891                    std::future::ready(Ok(None)).boxed()
1892                }
1893            })
1894            .collect::<Vec<_>>();
1895        let dvs = stream::iter(dv_futs)
1896            .buffered(self.object_store.io_parallelism())
1897            .try_collect::<Vec<_>>()
1898            .await?;
1899
1900        // Iterate through the sorted addresses and sorted fragments (and sorted deletion vectors)
1901        // and filter out addresses that have been deleted
1902        let mut filtered_sorted_addrs = Vec::with_capacity(sorted_addrs.len());
1903        let mut sorted_addr_iter = sorted_addrs.into_iter().map(RowAddress::from);
1904        let mut next_addr = sorted_addr_iter.next().unwrap();
1905        let mut exhausted = false;
1906
1907        for frag_dv in frags.iter().zip(dvs).zip(referenced_frag_ids.iter()) {
1908            let ((frag, dv), frag_id) = frag_dv;
1909            if frag.is_some() {
1910                // Frag exists
1911                if let Some(dv) = dv.as_ref() {
1912                    // Deletion vector exists, scan DV
1913                    for deleted in dv.to_sorted_iter() {
1914                        while next_addr.fragment_id() == *frag_id
1915                            && next_addr.row_offset() < deleted
1916                        {
1917                            filtered_sorted_addrs.push(Some(u64::from(next_addr)));
1918                            if let Some(next) = sorted_addr_iter.next() {
1919                                next_addr = next;
1920                            } else {
1921                                exhausted = true;
1922                                break;
1923                            }
1924                        }
1925                        if exhausted {
1926                            break;
1927                        }
1928                        if next_addr.fragment_id() != *frag_id {
1929                            break;
1930                        }
1931                        if next_addr.row_offset() == deleted {
1932                            filtered_sorted_addrs.push(None);
1933                            if let Some(next) = sorted_addr_iter.next() {
1934                                next_addr = next;
1935                            } else {
1936                                exhausted = true;
1937                                break;
1938                            }
1939                        }
1940                    }
1941                }
1942                if exhausted {
1943                    break;
1944                }
1945                // Either no deletion vector, or we've exhausted it, keep everything else
1946                // in this frag
1947                while next_addr.fragment_id() == *frag_id {
1948                    filtered_sorted_addrs.push(Some(u64::from(next_addr)));
1949                    if let Some(next) = sorted_addr_iter.next() {
1950                        next_addr = next;
1951                    } else {
1952                        break;
1953                    }
1954                }
1955            } else {
1956                // Frag doesn't exist (possibly deleted), delete all items
1957                while next_addr.fragment_id() == *frag_id {
1958                    filtered_sorted_addrs.push(None);
1959                    if let Some(next) = sorted_addr_iter.next() {
1960                        next_addr = next;
1961                    } else {
1962                        break;
1963                    }
1964                }
1965            }
1966        }
1967
1968        // filtered_sorted_ids is now a Vec with the same size as sorted_addrs, but with None
1969        // values where the corresponding address was deleted.  We now need to un-sort it and
1970        // filter out the deleted addresses.
1971        perm.apply_inv_slice_in_place(&mut filtered_sorted_addrs);
1972        Ok(addr_or_ids
1973            .iter()
1974            .zip(filtered_sorted_addrs)
1975            .filter_map(|(addr_or_id, maybe_addr)| maybe_addr.map(|_| *addr_or_id))
1976            .collect())
1977    }
1978
1979    pub(crate) async fn filter_deleted_ids(&self, ids: &[u64]) -> Result<Vec<u64>> {
1980        let addresses = if let Some(row_id_index) = get_row_id_index(self).await? {
1981            let addresses = ids
1982                .iter()
1983                .filter_map(|id| row_id_index.get(*id).map(|address| address.into()))
1984                .collect::<Vec<_>>();
1985            Cow::Owned(addresses)
1986        } else {
1987            Cow::Borrowed(ids)
1988        };
1989
1990        self.filter_addr_or_ids(ids, &addresses).await
1991    }
1992
1993    /// Gets the number of files that are so small they don't even have a full
1994    /// group. These are considered too small because reading many of them is
1995    /// much less efficient than reading a single file because the separate files
1996    /// split up what would otherwise be single IO requests into multiple.
1997    pub async fn num_small_files(&self, max_rows_per_group: usize) -> usize {
1998        futures::stream::iter(self.get_fragments())
1999            .map(|f| async move { f.physical_rows().await })
2000            .buffered(self.object_store.io_parallelism())
2001            .try_filter(|row_count| futures::future::ready(*row_count < max_rows_per_group))
2002            .count()
2003            .await
2004    }
2005
2006    pub async fn validate(&self) -> Result<()> {
2007        // All fragments have unique ids
2008        let id_counts =
2009            self.manifest
2010                .fragments
2011                .iter()
2012                .map(|f| f.id)
2013                .fold(HashMap::new(), |mut acc, id| {
2014                    *acc.entry(id).or_insert(0) += 1;
2015                    acc
2016                });
2017        for (id, count) in id_counts {
2018            if count > 1 {
2019                return Err(Error::corrupt_file(
2020                    self.base.clone(),
2021                    format!(
2022                        "Duplicate fragment id {} found in dataset {:?}",
2023                        id, self.base
2024                    ),
2025                ));
2026            }
2027        }
2028
2029        // Fragments are sorted in increasing fragment id order
2030        self.manifest
2031            .fragments
2032            .iter()
2033            .map(|f| f.id)
2034            .try_fold(0, |prev, id| {
2035                if id < prev {
2036                    Err(Error::corrupt_file(self.base.clone(), format!(
2037                        "Fragment ids are not sorted in increasing fragment-id order. Found {} after {} in dataset {:?}",
2038                        id, prev, self.base
2039                    )))
2040                } else {
2041                    Ok(id)
2042                }
2043            })?;
2044
2045        // All fragments have equal lengths
2046        futures::stream::iter(self.get_fragments())
2047            .map(|f| async move { f.validate().await })
2048            .buffer_unordered(self.object_store.io_parallelism())
2049            .try_collect::<Vec<()>>()
2050            .await?;
2051
2052        // Validate indices
2053        let indices = self.load_indices().await?;
2054        self.validate_indices(&indices)?;
2055
2056        Ok(())
2057    }
2058
2059    fn validate_indices(&self, indices: &[IndexMetadata]) -> Result<()> {
2060        // Make sure there are no duplicate ids
2061        let mut index_ids = HashSet::new();
2062        for index in indices.iter() {
2063            if !index_ids.insert(&index.uuid) {
2064                return Err(Error::corrupt_file(
2065                    self.manifest_location.path.clone(),
2066                    format!(
2067                        "Duplicate index id {} found in dataset {:?}",
2068                        &index.uuid, self.base
2069                    ),
2070                ));
2071            }
2072        }
2073
2074        // For each index name, make sure there is no overlap in fragment bitmaps
2075        if let Err(err) = detect_overlapping_fragments(indices) {
2076            let mut message = "Overlapping fragments detected in dataset.".to_string();
2077            for (index_name, overlapping_frags) in err.bad_indices {
2078                message.push_str(&format!(
2079                    "\nIndex {:?} has overlapping fragments: {:?}",
2080                    index_name, overlapping_frags
2081                ));
2082            }
2083            return Err(Error::corrupt_file(
2084                self.manifest_location.path.clone(),
2085                message,
2086            ));
2087        };
2088
2089        Ok(())
2090    }
2091
2092    /// Migrate the dataset to use the new manifest path scheme.
2093    ///
2094    /// This function will rename all V1 manifests to [ManifestNamingScheme::V2].
2095    /// These paths provide more efficient opening of datasets with many versions
2096    /// on object stores.
2097    ///
2098    /// This function is idempotent, and can be run multiple times without
2099    /// changing the state of the object store.
2100    ///
2101    /// However, it should not be run while other concurrent operations are happening.
2102    /// And it should also run until completion before resuming other operations.
2103    ///
2104    /// ```rust
2105    /// # use lance::dataset::Dataset;
2106    /// # use lance_table::io::commit::ManifestNamingScheme;
2107    /// # use lance_datagen::{array, RowCount, BatchCount};
2108    /// # use arrow_array::types::Int32Type;
2109    /// # use lance::dataset::write::WriteParams;
2110    /// # let data = lance_datagen::gen_batch()
2111    /// #  .col("key", array::step::<Int32Type>())
2112    /// #  .into_reader_rows(RowCount::from(10), BatchCount::from(1));
2113    /// # let fut = async {
2114    /// # let params = WriteParams {
2115    /// #     enable_v2_manifest_paths: false,
2116    /// #     ..Default::default()
2117    /// # };
2118    /// let mut dataset = Dataset::write(data, "memory://test", Some(params)).await.unwrap();
2119    /// assert_eq!(dataset.manifest_location().naming_scheme, ManifestNamingScheme::V1);
2120    ///
2121    /// dataset.migrate_manifest_paths_v2().await.unwrap();
2122    /// assert_eq!(dataset.manifest_location().naming_scheme, ManifestNamingScheme::V2);
2123    /// # };
2124    /// # tokio::runtime::Runtime::new().unwrap().block_on(fut);
2125    /// ```
2126    pub async fn migrate_manifest_paths_v2(&mut self) -> Result<()> {
2127        migrate_scheme_to_v2(self.object_store(), &self.base).await?;
2128        // We need to re-open.
2129        let latest_version = self.latest_version_id().await?;
2130        *self = self.checkout_version(latest_version).await?;
2131        Ok(())
2132    }
2133
2134    /// Shallow clone the target version into a new dataset at target_path.
2135    /// 'target_path': the uri string to clone the dataset into.
2136    /// 'version': the version cloned from, could be a version number or tag.
2137    /// 'store_params': the object store params to use for the new dataset.
2138    pub async fn shallow_clone(
2139        &mut self,
2140        target_path: &str,
2141        version: impl Into<refs::Ref>,
2142        store_params: Option<ObjectStoreParams>,
2143    ) -> Result<Self> {
2144        let (ref_name, version_number) = self.resolve_reference(version.into()).await?;
2145        let clone_op = Operation::Clone {
2146            is_shallow: true,
2147            ref_name,
2148            ref_version: version_number,
2149            ref_path: self.uri.clone(),
2150            branch_name: None,
2151        };
2152        let transaction = Transaction::new(version_number, clone_op, None);
2153
2154        let builder = CommitBuilder::new(WriteDestination::Uri(target_path))
2155            .with_store_params(
2156                store_params.unwrap_or(self.store_params.as_deref().cloned().unwrap_or_default()),
2157            )
2158            .with_object_store(Arc::new(self.object_store().clone()))
2159            .with_commit_handler(self.commit_handler.clone())
2160            .with_storage_format(self.manifest.data_storage_format.lance_file_version()?);
2161        builder.execute(transaction).await
2162    }
2163
2164    /// Deep clone the target version into a new dataset at target_path.
2165    /// This performs a server-side copy of all relevant dataset files (data files,
2166    /// deletion files, and any external row-id files) into the target dataset
2167    /// without loading data into memory.
2168    ///
2169    /// Parameters:
2170    /// - `target_path`: the URI string to clone the dataset into.
2171    /// - `version`: the version cloned from, could be a version number, branch head, or tag.
2172    /// - `store_params`: the object store params to use for the new dataset.
2173    pub async fn deep_clone(
2174        &mut self,
2175        target_path: &str,
2176        version: impl Into<refs::Ref>,
2177        store_params: Option<ObjectStoreParams>,
2178    ) -> Result<Self> {
2179        use futures::StreamExt;
2180
2181        // Resolve source dataset and its manifest using checkout_version
2182        let src_ds = self.checkout_version(version).await?;
2183        let src_paths = src_ds.collect_paths().await?;
2184
2185        // Prepare target object store and base path
2186        let (target_store, target_base) = ObjectStore::from_uri_and_params(
2187            self.session.store_registry(),
2188            target_path,
2189            &store_params.clone().unwrap_or_default(),
2190        )
2191        .await?;
2192
2193        // Prevent cloning into an existing target dataset
2194        if self
2195            .commit_handler
2196            .resolve_latest_location(&target_base, &target_store)
2197            .await
2198            .is_ok()
2199        {
2200            return Err(Error::dataset_already_exists(target_path.to_string()));
2201        }
2202
2203        let build_absolute_path = |relative_path: &str, base: &Path| -> Path {
2204            let mut path = base.clone();
2205            for seg in relative_path.split('/') {
2206                if !seg.is_empty() {
2207                    path = path.child(seg);
2208                }
2209            }
2210            path
2211        };
2212
2213        // TODO: Leverage object store bulk copy for efficient deep_clone
2214        //
2215        // All cloud storage providers support batch copy APIs that would provide significant
2216        // performance improvements. We use single file copy before we have upstream support.
2217        //
2218        // Tracked by: https://github.com/lance-format/lance/issues/5435
2219        let io_parallelism = self.object_store.io_parallelism();
2220        let copy_futures = src_paths
2221            .iter()
2222            .map(|(relative_path, base)| {
2223                let store = Arc::clone(&target_store);
2224                let src_path = build_absolute_path(relative_path, base);
2225                let target_path = build_absolute_path(relative_path, &target_base);
2226                async move { store.copy(&src_path, &target_path).await.map(|_| ()) }
2227            })
2228            .collect::<Vec<_>>();
2229
2230        futures::stream::iter(copy_futures)
2231            .buffer_unordered(io_parallelism)
2232            .collect::<Vec<_>>()
2233            .await
2234            .into_iter()
2235            .collect::<Result<Vec<_>>>()?;
2236
2237        // Record a Clone operation and commit via CommitBuilder
2238        let ref_name = src_ds.manifest.branch.clone();
2239        let ref_version = src_ds.manifest_location.version;
2240        let clone_op = Operation::Clone {
2241            is_shallow: false,
2242            ref_name,
2243            ref_version,
2244            ref_path: src_ds.uri().to_string(),
2245            branch_name: None,
2246        };
2247        let txn = Transaction::new(ref_version, clone_op, None);
2248        let builder = CommitBuilder::new(WriteDestination::Uri(target_path))
2249            .with_store_params(store_params.clone().unwrap_or_default())
2250            .with_object_store(target_store.clone())
2251            .with_commit_handler(self.commit_handler.clone())
2252            .with_storage_format(self.manifest.data_storage_format.lance_file_version()?);
2253        let new_ds = builder.execute(txn).await?;
2254        Ok(new_ds)
2255    }
2256
2257    async fn resolve_reference(&self, reference: refs::Ref) -> Result<(Option<String>, u64)> {
2258        match reference {
2259            refs::Ref::Version(branch, version_number) => {
2260                if let Some(version_number) = version_number {
2261                    Ok((branch, version_number))
2262                } else {
2263                    let branch_location = self.branch_location().find_branch(branch.as_deref())?;
2264                    let version_number = self
2265                        .commit_handler
2266                        .resolve_latest_location(&branch_location.path, &self.object_store)
2267                        .await?
2268                        .version;
2269                    Ok((branch, version_number))
2270                }
2271            }
2272            refs::Ref::VersionNumber(version_number) => {
2273                Ok((self.manifest.branch.clone(), version_number))
2274            }
2275            refs::Ref::Tag(tag_name) => {
2276                let tag_contents = self.tags().get(tag_name.as_str()).await?;
2277                Ok((tag_contents.branch, tag_contents.version))
2278            }
2279        }
2280    }
2281
2282    /// Collect all (relative_path, path) of the dataset files.
2283    async fn collect_paths(&self) -> Result<Vec<(String, Path)>> {
2284        let mut file_paths: Vec<(String, Path)> = Vec::new();
2285        for fragment in self.manifest.fragments.iter() {
2286            if let Some(RowIdMeta::External(external_file)) = &fragment.row_id_meta {
2287                return Err(Error::internal(format!(
2288                    "External row_id_meta is not supported yet. external file path: {}",
2289                    external_file.path
2290                )));
2291            }
2292            for data_file in fragment.files.iter() {
2293                let base_root = if let Some(base_id) = data_file.base_id {
2294                    let base_path =
2295                        self.manifest.base_paths.get(&base_id).ok_or_else(|| {
2296                            Error::internal(format!("base_id {} not found", base_id))
2297                        })?;
2298                    Path::parse(base_path.path.as_str())?
2299                } else {
2300                    self.base.clone()
2301                };
2302                file_paths.push((
2303                    format!("{}/{}", DATA_DIR, data_file.path.clone()),
2304                    base_root,
2305                ));
2306            }
2307            if let Some(deletion_file) = &fragment.deletion_file {
2308                let base_root = if let Some(base_id) = deletion_file.base_id {
2309                    let base_path =
2310                        self.manifest.base_paths.get(&base_id).ok_or_else(|| {
2311                            Error::internal(format!("base_id {} not found", base_id))
2312                        })?;
2313                    Path::parse(base_path.path.as_str())?
2314                } else {
2315                    self.base.clone()
2316                };
2317                file_paths.push((
2318                    relative_deletion_file_path(fragment.id, deletion_file),
2319                    base_root,
2320                ));
2321            }
2322        }
2323
2324        let indices = read_manifest_indexes(
2325            self.object_store.as_ref(),
2326            &self.manifest_location,
2327            &self.manifest,
2328        )
2329        .await?;
2330
2331        for index in &indices {
2332            let base_root = if let Some(base_id) = index.base_id {
2333                let base_path = self
2334                    .manifest
2335                    .base_paths
2336                    .get(&base_id)
2337                    .ok_or_else(|| Error::internal(format!("base_id {} not found", base_id)))?;
2338                Path::parse(base_path.path.as_str())?
2339            } else {
2340                self.base.clone()
2341            };
2342            let index_root = base_root.child(INDICES_DIR).child(index.uuid.to_string());
2343            let mut stream = self.object_store.read_dir_all(&index_root, None);
2344            while let Some(meta) = stream.next().await.transpose()? {
2345                if let Some(filename) = meta.location.filename() {
2346                    file_paths.push((
2347                        format!("{}/{}/{}", INDICES_DIR, index.uuid, filename),
2348                        base_root.clone(),
2349                    ));
2350                }
2351            }
2352        }
2353        Ok(file_paths)
2354    }
2355
2356    /// Run a SQL query against the dataset.
2357    /// The underlying SQL engine is DataFusion.
2358    /// Please refer to the DataFusion documentation for supported SQL syntax.
2359    pub fn sql(&self, sql: &str) -> SqlQueryBuilder {
2360        SqlQueryBuilder::new(self.clone(), sql)
2361    }
2362
2363    /// Returns true if Lance supports writing this datatype with nulls.
2364    pub(crate) fn lance_supports_nulls(&self, datatype: &DataType) -> bool {
2365        match self
2366            .manifest()
2367            .data_storage_format
2368            .lance_file_version()
2369            .unwrap_or(LanceFileVersion::Legacy)
2370            .resolve()
2371        {
2372            LanceFileVersion::Legacy => matches!(
2373                datatype,
2374                DataType::Utf8
2375                    | DataType::LargeUtf8
2376                    | DataType::Binary
2377                    | DataType::List(_)
2378                    | DataType::FixedSizeBinary(_)
2379                    | DataType::FixedSizeList(_, _)
2380            ),
2381            LanceFileVersion::V2_0 => !matches!(datatype, DataType::Struct(..)),
2382            _ => true,
2383        }
2384    }
2385}
2386
2387pub(crate) struct NewTransactionResult<'a> {
2388    pub dataset: BoxFuture<'a, Result<Dataset>>,
2389    pub new_transactions: BoxStream<'a, Result<(u64, Arc<Transaction>)>>,
2390}
2391
2392pub(crate) fn load_new_transactions(dataset: &Dataset) -> NewTransactionResult<'_> {
2393    // Re-use the same list call for getting the latest manifest and the metadata
2394    // for all manifests in between.
2395    let io_parallelism = dataset.object_store().io_parallelism();
2396    let latest_version = dataset.manifest.version;
2397    let locations = dataset
2398        .commit_handler
2399        .list_manifest_locations(&dataset.base, dataset.object_store(), true)
2400        .try_take_while(move |location| {
2401            futures::future::ready(Ok(location.version > latest_version))
2402        });
2403
2404    // Will send the latest manifest via a channel.
2405    let (latest_tx, latest_rx) = tokio::sync::oneshot::channel();
2406    let mut latest_tx = Some(latest_tx);
2407
2408    let manifests = locations
2409        .map_ok(move |location| {
2410            let latest_tx = latest_tx.take();
2411            async move {
2412                let manifest_key = ManifestKey {
2413                    version: location.version,
2414                    e_tag: location.e_tag.as_deref(),
2415                };
2416                let manifest = if let Some(cached) =
2417                    dataset.metadata_cache.get_with_key(&manifest_key).await
2418                {
2419                    cached
2420                } else {
2421                    let loaded = Arc::new(
2422                        Dataset::load_manifest(
2423                            dataset.object_store(),
2424                            &location,
2425                            &dataset.uri,
2426                            dataset.session.as_ref(),
2427                        )
2428                        .await?,
2429                    );
2430                    dataset
2431                        .metadata_cache
2432                        .insert_with_key(&manifest_key, loaded.clone())
2433                        .await;
2434                    loaded
2435                };
2436
2437                if let Some(latest_tx) = latest_tx {
2438                    // We ignore the error, since we don't care if the receiver is dropped.
2439                    let _ = latest_tx.send((manifest.clone(), location.clone()));
2440                }
2441
2442                Ok((manifest, location))
2443            }
2444        })
2445        .try_buffer_unordered(io_parallelism / 2);
2446    let transactions = manifests
2447        .map_ok(move |(manifest, location)| async move {
2448            let manifest_copy = manifest.clone();
2449            let tx_key = TransactionKey {
2450                version: manifest.version,
2451            };
2452            let transaction =
2453                if let Some(cached) = dataset.metadata_cache.get_with_key(&tx_key).await {
2454                    cached
2455                } else {
2456                    let dataset_version = Dataset::checkout_manifest(
2457                        dataset.object_store.clone(),
2458                        dataset.base.clone(),
2459                        dataset.uri.clone(),
2460                        manifest_copy.clone(),
2461                        location,
2462                        dataset.session(),
2463                        dataset.commit_handler.clone(),
2464                        dataset.file_reader_options.clone(),
2465                        dataset.store_params.as_deref().cloned(),
2466                    )?;
2467                    let loaded =
2468                        Arc::new(dataset_version.read_transaction().await?.ok_or_else(|| {
2469                            Error::internal(format!(
2470                                "Dataset version {} does not have a transaction file",
2471                                manifest_copy.version
2472                            ))
2473                        })?);
2474                    dataset
2475                        .metadata_cache
2476                        .insert_with_key(&tx_key, loaded.clone())
2477                        .await;
2478                    loaded
2479                };
2480            Ok((manifest.version, transaction))
2481        })
2482        .try_buffer_unordered(io_parallelism / 2);
2483
2484    let dataset = async move {
2485        if let Ok((latest_manifest, location)) = latest_rx.await {
2486            // If we got the latest manifest, we can checkout the dataset.
2487            Dataset::checkout_manifest(
2488                dataset.object_store.clone(),
2489                dataset.base.clone(),
2490                dataset.uri.clone(),
2491                latest_manifest,
2492                location,
2493                dataset.session(),
2494                dataset.commit_handler.clone(),
2495                dataset.file_reader_options.clone(),
2496                dataset.store_params.as_deref().cloned(),
2497            )
2498        } else {
2499            // If we didn't get the latest manifest, we can still return the dataset
2500            // with the current manifest.
2501            Ok(dataset.clone())
2502        }
2503    }
2504    .boxed();
2505
2506    let new_transactions = transactions.boxed();
2507
2508    NewTransactionResult {
2509        dataset,
2510        new_transactions,
2511    }
2512}
2513
2514/// # Schema Evolution
2515///
2516/// Lance datasets support evolving the schema. Several operations are
2517/// supported that mirror common SQL operations:
2518///
2519/// - [Self::add_columns()]: Add new columns to the dataset, similar to `ALTER TABLE ADD COLUMN`.
2520/// - [Self::drop_columns()]: Drop columns from the dataset, similar to `ALTER TABLE DROP COLUMN`.
2521/// - [Self::alter_columns()]: Modify columns in the dataset, changing their name, type, or nullability.
2522///   Similar to `ALTER TABLE ALTER COLUMN`.
2523///
2524/// In addition, one operation is unique to Lance: [`merge`](Self::merge). This
2525/// operation allows inserting precomputed data into the dataset.
2526///
2527/// Because these operations change the schema of the dataset, they will conflict
2528/// with most other concurrent operations. Therefore, they should be performed
2529/// when no other write operations are being run.
2530impl Dataset {
2531    /// Append new columns to the dataset.
2532    pub async fn add_columns(
2533        &mut self,
2534        transforms: NewColumnTransform,
2535        read_columns: Option<Vec<String>>,
2536        batch_size: Option<u32>,
2537    ) -> Result<()> {
2538        schema_evolution::add_columns(self, transforms, read_columns, batch_size).await
2539    }
2540
2541    /// Modify columns in the dataset, changing their name, type, or nullability.
2542    ///
2543    /// If only changing the name or nullability of a column, this is a zero-copy
2544    /// operation and any indices will be preserved. If changing the type of a
2545    /// column, the data for that column will be rewritten and any indices will
2546    /// be dropped. The old column data will not be immediately deleted. To remove
2547    /// it, call [optimize::compact_files()] and then
2548    /// [cleanup::cleanup_old_versions()] on the dataset.
2549    pub async fn alter_columns(&mut self, alterations: &[ColumnAlteration]) -> Result<()> {
2550        schema_evolution::alter_columns(self, alterations).await
2551    }
2552
2553    /// Remove columns from the dataset.
2554    ///
2555    /// This is a metadata-only operation and does not remove the data from the
2556    /// underlying storage. In order to remove the data, you must subsequently
2557    /// call [optimize::compact_files()] to rewrite the data without the removed columns and
2558    /// then call [cleanup::cleanup_old_versions()] to remove the old files.
2559    pub async fn drop_columns(&mut self, columns: &[&str]) -> Result<()> {
2560        info!(target: TRACE_DATASET_EVENTS, event=DATASET_DROPPING_COLUMN_EVENT, uri = &self.uri, columns = columns.join(","));
2561        schema_evolution::drop_columns(self, columns).await
2562    }
2563
2564    /// Drop columns from the dataset and return updated dataset. Note that this
2565    /// is a zero-copy operation and column is not physically removed from the
2566    /// dataset.
2567    /// Parameters:
2568    /// - `columns`: the list of column names to drop.
2569    #[deprecated(since = "0.9.12", note = "Please use `drop_columns` instead.")]
2570    pub async fn drop(&mut self, columns: &[&str]) -> Result<()> {
2571        self.drop_columns(columns).await
2572    }
2573
2574    async fn merge_impl(
2575        &mut self,
2576        stream: Box<dyn RecordBatchReader + Send>,
2577        left_on: &str,
2578        right_on: &str,
2579    ) -> Result<()> {
2580        // Sanity check.
2581        if self.schema().field(left_on).is_none() && left_on != ROW_ID && left_on != ROW_ADDR {
2582            return Err(Error::invalid_input(format!(
2583                "Column {} does not exist in the left side dataset",
2584                left_on
2585            )));
2586        };
2587        let right_schema = stream.schema();
2588        if right_schema.field_with_name(right_on).is_err() {
2589            return Err(Error::invalid_input(format!(
2590                "Column {} does not exist in the right side dataset",
2591                right_on
2592            )));
2593        };
2594        for field in right_schema.fields() {
2595            if field.name() == right_on {
2596                // right_on is allowed to exist in the dataset, since it may be
2597                // the same as left_on.
2598                continue;
2599            }
2600            if self.schema().field(field.name()).is_some() {
2601                return Err(Error::invalid_input(format!(
2602                    "Column {} exists in both sides of the dataset",
2603                    field.name()
2604                )));
2605            }
2606        }
2607
2608        // Hash join
2609        let joiner = Arc::new(HashJoiner::try_new(stream, right_on).await?);
2610        // Final schema is union of current schema, plus the RHS schema without
2611        // the right_on key.
2612        let mut new_schema: Schema = self.schema().merge(joiner.out_schema().as_ref())?;
2613        new_schema.set_field_id(Some(self.manifest.max_field_id()));
2614
2615        // Write new data file to each fragment. Parallelism is done over columns,
2616        // so no parallelism done at this level.
2617        let updated_fragments: Vec<Fragment> = stream::iter(self.get_fragments())
2618            .then(|f| {
2619                let joiner = joiner.clone();
2620                async move { f.merge(left_on, &joiner).await.map(|f| f.metadata) }
2621            })
2622            .try_collect::<Vec<_>>()
2623            .await?;
2624
2625        let transaction = Transaction::new(
2626            self.manifest.version,
2627            Operation::Merge {
2628                fragments: updated_fragments,
2629                schema: new_schema,
2630            },
2631            None,
2632        );
2633
2634        self.apply_commit(transaction, &Default::default(), &Default::default())
2635            .await?;
2636
2637        Ok(())
2638    }
2639
2640    /// Merge this dataset with another arrow Table / Dataset, and returns a new version of dataset.
2641    ///
2642    /// Parameters:
2643    ///
2644    /// - `stream`: the stream of [`RecordBatch`] to merge.
2645    /// - `left_on`: the column name to join on the left side (self).
2646    /// - `right_on`: the column name to join on the right side (stream).
2647    ///
2648    /// Returns: a new version of dataset.
2649    ///
2650    /// It performs a left-join on the two datasets.
2651    pub async fn merge(
2652        &mut self,
2653        stream: impl RecordBatchReader + Send + 'static,
2654        left_on: &str,
2655        right_on: &str,
2656    ) -> Result<()> {
2657        let stream = Box::new(stream);
2658        self.merge_impl(stream, left_on, right_on).await
2659    }
2660
2661    pub async fn merge_index_metadata(
2662        &self,
2663        index_uuid: &str,
2664        index_type: IndexType,
2665        batch_readhead: Option<usize>,
2666    ) -> Result<()> {
2667        let store = LanceIndexStore::from_dataset_for_new(self, index_uuid)?;
2668        let index_dir = self.indices_dir().child(index_uuid);
2669        match index_type {
2670            IndexType::Inverted => {
2671                // Call merge_index_files function for inverted index
2672                lance_index::scalar::inverted::builder::merge_index_files(
2673                    self.object_store(),
2674                    &index_dir,
2675                    Arc::new(store),
2676                )
2677                .await
2678            }
2679            IndexType::BTree => {
2680                // Call merge_index_files function for btree index
2681                lance_index::scalar::btree::merge_index_files(
2682                    self.object_store(),
2683                    &index_dir,
2684                    Arc::new(store),
2685                    batch_readhead,
2686                )
2687                .await
2688            }
2689            // Precise vector index types: IVF_FLAT, IVF_PQ, IVF_SQ
2690            IndexType::IvfFlat | IndexType::IvfPq | IndexType::IvfSq | IndexType::Vector => {
2691                // Merge distributed vector index partials and finalize root index via Lance IVF helper
2692                crate::index::vector::ivf::finalize_distributed_merge(
2693                    self.object_store(),
2694                    &index_dir,
2695                    Some(index_type),
2696                )
2697                .await?;
2698                Ok(())
2699            }
2700            _ => Err(Error::invalid_input_source(Box::new(std::io::Error::new(
2701                std::io::ErrorKind::InvalidInput,
2702                format!("Unsupported index type (patched): {}", index_type),
2703            )))),
2704        }
2705    }
2706}
2707
2708/// # Dataset metadata APIs
2709///
2710/// There are four kinds of metadata on datasets:
2711///
2712///  - **Schema metadata**: metadata about the data itself.
2713///  - **Field metadata**: metadata about the dataset itself.
2714///  - **Dataset metadata**: metadata about the dataset. For example, this could
2715///    store a created_at date.
2716///  - **Dataset config**: configuration values controlling how engines should
2717///    manage the dataset. This configures things like auto-cleanup.
2718///
2719/// You can get
2720impl Dataset {
2721    /// Get dataset metadata.
2722    pub fn metadata(&self) -> &HashMap<String, String> {
2723        &self.manifest.table_metadata
2724    }
2725
2726    /// Get the dataset config from manifest
2727    pub fn config(&self) -> &HashMap<String, String> {
2728        &self.manifest.config
2729    }
2730
2731    /// Delete keys from the config.
2732    #[deprecated(
2733        note = "Use the new update_config(values, replace) method - pass None values to delete keys"
2734    )]
2735    pub async fn delete_config_keys(&mut self, delete_keys: &[&str]) -> Result<()> {
2736        let updates = delete_keys.iter().map(|key| (*key, None));
2737        self.update_config(updates).await?;
2738        Ok(())
2739    }
2740
2741    /// Update table metadata.
2742    ///
2743    /// Pass `None` for a value to remove that key.
2744    ///
2745    /// Use `.replace()` to replace the entire metadata map instead of merging.
2746    ///
2747    /// Returns the updated metadata map after the operation.
2748    ///
2749    /// ```
2750    /// # use lance::{Dataset, Result};
2751    /// # use lance::dataset::transaction::UpdateMapEntry;
2752    /// # async fn test_update_metadata(dataset: &mut Dataset) -> Result<()> {
2753    /// // Update single key
2754    /// dataset.update_metadata([("key", "value")]).await?;
2755    ///
2756    /// // Remove a key
2757    /// dataset.update_metadata([("to_delete", None)]).await?;
2758    ///
2759    /// // Clear all metadata
2760    /// dataset.update_metadata([] as [UpdateMapEntry; 0]).replace().await?;
2761    ///
2762    /// // Replace full metadata
2763    /// dataset.update_metadata([("k1", "v1"), ("k2", "v2")]).replace().await?;
2764    /// # Ok(())
2765    /// # }
2766    /// ```
2767    pub fn update_metadata(
2768        &mut self,
2769        values: impl IntoIterator<Item = impl Into<UpdateMapEntry>>,
2770    ) -> metadata::UpdateMetadataBuilder<'_> {
2771        metadata::UpdateMetadataBuilder::new(self, values, metadata::MetadataType::TableMetadata)
2772    }
2773
2774    /// Update config.
2775    ///
2776    /// Pass `None` for a value to remove that key.
2777    ///
2778    /// Use `.replace()` to replace the entire config map instead of merging.
2779    ///
2780    /// Returns the updated config map after the operation.
2781    ///
2782    /// ```
2783    /// # use lance::{Dataset, Result};
2784    /// # use lance::dataset::transaction::UpdateMapEntry;
2785    /// # async fn test_update_config(dataset: &mut Dataset) -> Result<()> {
2786    /// // Update single key
2787    /// dataset.update_config([("key", "value")]).await?;
2788    ///
2789    /// // Remove a key
2790    /// dataset.update_config([("to_delete", None)]).await?;
2791    ///
2792    /// // Clear all config
2793    /// dataset.update_config([] as [UpdateMapEntry; 0]).replace().await?;
2794    ///
2795    /// // Replace full config
2796    /// dataset.update_config([("k1", "v1"), ("k2", "v2")]).replace().await?;
2797    /// # Ok(())
2798    /// # }
2799    /// ```
2800    pub fn update_config(
2801        &mut self,
2802        values: impl IntoIterator<Item = impl Into<UpdateMapEntry>>,
2803    ) -> metadata::UpdateMetadataBuilder<'_> {
2804        metadata::UpdateMetadataBuilder::new(self, values, metadata::MetadataType::Config)
2805    }
2806
2807    /// Update schema metadata.
2808    ///
2809    /// Pass `None` for a value to remove that key.
2810    ///
2811    /// Use `.replace()` to replace the entire schema metadata map instead of merging.
2812    ///
2813    /// Returns the updated schema metadata map after the operation.
2814    ///
2815    /// ```
2816    /// # use lance::{Dataset, Result};
2817    /// # use lance::dataset::transaction::UpdateMapEntry;
2818    /// # async fn test_update_schema_metadata(dataset: &mut Dataset) -> Result<()> {
2819    /// // Update single key
2820    /// dataset.update_schema_metadata([("key", "value")]).await?;
2821    ///
2822    /// // Remove a key
2823    /// dataset.update_schema_metadata([("to_delete", None)]).await?;
2824    ///
2825    /// // Clear all schema metadata
2826    /// dataset.update_schema_metadata([] as [UpdateMapEntry; 0]).replace().await?;
2827    ///
2828    /// // Replace full schema metadata
2829    /// dataset.update_schema_metadata([("k1", "v1"), ("k2", "v2")]).replace().await?;
2830    /// # Ok(())
2831    /// # }
2832    /// ```
2833    pub fn update_schema_metadata(
2834        &mut self,
2835        values: impl IntoIterator<Item = impl Into<UpdateMapEntry>>,
2836    ) -> metadata::UpdateMetadataBuilder<'_> {
2837        metadata::UpdateMetadataBuilder::new(self, values, metadata::MetadataType::SchemaMetadata)
2838    }
2839
2840    /// Update schema metadata
2841    #[deprecated(note = "Use the new update_schema_metadata(values).replace() instead")]
2842    pub async fn replace_schema_metadata(
2843        &mut self,
2844        new_values: impl IntoIterator<Item = (String, String)>,
2845    ) -> Result<()> {
2846        let new_values = new_values
2847            .into_iter()
2848            .map(|(k, v)| (k, Some(v)))
2849            .collect::<HashMap<_, _>>();
2850        self.update_schema_metadata(new_values).replace().await?;
2851        Ok(())
2852    }
2853
2854    /// Update field metadata
2855    ///
2856    /// ```
2857    /// # use lance::{Dataset, Result};
2858    /// # use lance::dataset::transaction::UpdateMapEntry;
2859    /// # async fn test_update_field_metadata(dataset: &mut Dataset) -> Result<()> {
2860    /// // Update metadata by field path
2861    /// dataset.update_field_metadata()
2862    ///     .update("path.to_field", [("key", "value")])?
2863    ///     .await?;
2864    ///
2865    /// // Update metadata by field id
2866    /// dataset.update_field_metadata()
2867    ///     .update(12, [("key", "value")])?
2868    ///     .await?;
2869    ///
2870    /// // Clear field metadata
2871    /// dataset.update_field_metadata()
2872    ///     .replace("path.to_field", [] as [UpdateMapEntry; 0])?
2873    ///     .replace(12, [] as [UpdateMapEntry; 0])?
2874    ///     .await?;
2875    ///
2876    /// // Replace field metadata
2877    /// dataset.update_field_metadata()
2878    ///     .replace("field_name", [("k1", "v1"), ("k2", "v2")])?
2879    ///     .await?;
2880    /// # Ok(())
2881    /// # }
2882    /// ```
2883    pub fn update_field_metadata(&mut self) -> UpdateFieldMetadataBuilder<'_> {
2884        UpdateFieldMetadataBuilder::new(self)
2885    }
2886
2887    /// Update field metadata
2888    pub async fn replace_field_metadata(
2889        &mut self,
2890        new_values: impl IntoIterator<Item = (u32, HashMap<String, String>)>,
2891    ) -> Result<()> {
2892        let new_values = new_values.into_iter().collect::<HashMap<_, _>>();
2893        let field_metadata_updates = new_values
2894            .into_iter()
2895            .map(|(field_id, metadata)| {
2896                (
2897                    field_id as i32,
2898                    translate_schema_metadata_updates(&metadata),
2899                )
2900            })
2901            .collect();
2902        metadata::execute_metadata_update(
2903            self,
2904            Operation::UpdateConfig {
2905                config_updates: None,
2906                table_metadata_updates: None,
2907                schema_metadata_updates: None,
2908                field_metadata_updates,
2909            },
2910        )
2911        .await
2912    }
2913}
2914
2915#[async_trait::async_trait]
2916impl DatasetTakeRows for Dataset {
2917    fn schema(&self) -> &Schema {
2918        Self::schema(self)
2919    }
2920
2921    async fn take_rows(&self, row_ids: &[u64], projection: &Schema) -> Result<RecordBatch> {
2922        Self::take_rows(self, row_ids, projection.clone()).await
2923    }
2924}
2925
2926#[derive(Debug)]
2927pub(crate) struct ManifestWriteConfig {
2928    auto_set_feature_flags: bool,              // default true
2929    timestamp: Option<SystemTime>,             // default None
2930    use_stable_row_ids: bool,                  // default false
2931    use_legacy_format: Option<bool>,           // default None
2932    storage_format: Option<DataStorageFormat>, // default None
2933    disable_transaction_file: bool,            // default false
2934}
2935
2936impl Default for ManifestWriteConfig {
2937    fn default() -> Self {
2938        Self {
2939            auto_set_feature_flags: true,
2940            timestamp: None,
2941            use_stable_row_ids: false,
2942            disable_transaction_file: false,
2943            use_legacy_format: None,
2944            storage_format: None,
2945        }
2946    }
2947}
2948
2949impl ManifestWriteConfig {
2950    pub fn disable_transaction_file(&self) -> bool {
2951        self.disable_transaction_file
2952    }
2953}
2954
2955/// Commit a manifest file and create a copy at the latest manifest path.
2956#[allow(clippy::too_many_arguments)]
2957pub(crate) async fn write_manifest_file(
2958    object_store: &ObjectStore,
2959    commit_handler: &dyn CommitHandler,
2960    base_path: &Path,
2961    manifest: &mut Manifest,
2962    indices: Option<Vec<IndexMetadata>>,
2963    config: &ManifestWriteConfig,
2964    naming_scheme: ManifestNamingScheme,
2965    mut transaction: Option<&Transaction>,
2966) -> std::result::Result<ManifestLocation, CommitError> {
2967    if config.auto_set_feature_flags {
2968        apply_feature_flags(
2969            manifest,
2970            config.use_stable_row_ids,
2971            config.disable_transaction_file,
2972        )?;
2973    }
2974
2975    manifest.set_timestamp(timestamp_to_nanos(config.timestamp));
2976
2977    manifest.update_max_fragment_id();
2978
2979    commit_handler
2980        .commit(
2981            manifest,
2982            indices,
2983            base_path,
2984            object_store,
2985            write_manifest_file_to_path,
2986            naming_scheme,
2987            transaction.take().map(|tx| tx.into()),
2988        )
2989        .await
2990}
2991
2992impl Projectable for Dataset {
2993    fn schema(&self) -> &Schema {
2994        self.schema()
2995    }
2996}
2997
2998#[cfg(test)]
2999mod tests;