lance/dataset/fragment/
write.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4use arrow_schema::Schema as ArrowSchema;
5use datafusion::execution::SendableRecordBatchStream;
6use futures::{StreamExt, TryStreamExt};
7use lance_core::datatypes::Schema;
8use lance_core::Error;
9use lance_datafusion::chunker::{break_stream, chunk_stream};
10use lance_datafusion::utils::StreamingWriteSource;
11use lance_file::v2::writer::FileWriterOptions;
12use lance_file::version::LanceFileVersion;
13use lance_file::writer::FileWriter;
14use lance_io::object_store::ObjectStore;
15use lance_table::format::{DataFile, Fragment};
16use lance_table::io::manifest::ManifestDescribing;
17use snafu::location;
18use std::borrow::Cow;
19use uuid::Uuid;
20
21use crate::dataset::builder::DatasetBuilder;
22use crate::dataset::write::do_write_fragments;
23use crate::dataset::{WriteMode, WriteParams, DATA_DIR};
24use crate::Result;
25
26/// Generates a filename optimized for S3 throughput using a UUID-based approach.
27///
28/// This approach follows Apache Iceberg's ObjectStoreLocationProvider pattern:
29/// - Takes a UUID (16 bytes total)
30/// - Uses first 3 bytes (24 bits) as binary string prefix for S3 distribution
31/// - Uses remaining 13 bytes as hex string for uniqueness
32///
33/// Format: `<24-bit-binary><remaining-hex>`
34/// Example: "101100101101010011010110a1b2c3d4e5f6g7h8i9j0"
35///
36/// We use binary instead of hex for the prefix because it helps S3 scale up on the
37/// prefix faster with fewer throttling and retries. Binary provides maximum entropy per character
38/// (1 bit) compared to hex (4 bits), allowing S3's internal partitioning to more
39/// quickly recognize the access pattern and scale appropriately.
40///
41/// The binary prefix ensures files are distributed evenly across S3 prefixes,
42/// minimizing throttling and maximizing throughput, while maintaining uniqueness.
43pub(crate) fn generate_random_filename() -> String {
44    let uuid = Uuid::new_v4();
45    let bytes = uuid.as_bytes();
46
47    let mut out = String::with_capacity(50);
48
49    // Convert first 3 bytes to binary string (24 bits)
50    for &b in &bytes[..3] {
51        for i in (0..8).rev() {
52            out.push(if (b >> i) & 1 == 1 { '1' } else { '0' });
53        }
54    }
55
56    // Convert remaining 13 bytes to hex string (26 chars)
57    const HEX: &[u8; 16] = b"0123456789abcdef";
58    for &b in &bytes[3..] {
59        out.push(HEX[(b >> 4) as usize] as char);
60        out.push(HEX[(b & 0xf) as usize] as char);
61    }
62
63    out
64}
65
66/// Builder for writing a new fragment.
67///
68/// This builder can be re-used to write multiple fragments.
69pub struct FragmentCreateBuilder<'a> {
70    dataset_uri: &'a str,
71    schema: Option<&'a Schema>,
72    write_params: Option<&'a WriteParams>,
73}
74
75impl<'a> FragmentCreateBuilder<'a> {
76    pub fn new(dataset_uri: &'a str) -> Self {
77        Self {
78            dataset_uri,
79            schema: None,
80            write_params: None,
81        }
82    }
83
84    /// Set the schema of the fragment. If it is not known, it will be inferred.
85    ///
86    /// If the schema isn't provided, but the `write_mode` is `WriteMode::Append`,
87    /// the schema will be inferred from the existing dataset.
88    ///
89    /// If that fails, the schema will be inferred from the first batch.
90    pub fn schema(mut self, schema: &'a Schema) -> Self {
91        self.schema = Some(schema);
92        self
93    }
94
95    /// Set the write parameters.
96    pub fn write_params(mut self, params: &'a WriteParams) -> Self {
97        self.write_params = Some(params);
98        self
99    }
100
101    /// Write a fragment.
102    pub async fn write(
103        &self,
104        source: impl StreamingWriteSource,
105        id: Option<u64>,
106    ) -> Result<Fragment> {
107        let (stream, schema) = self.get_stream_and_schema(Box::new(source)).await?;
108        self.write_impl(stream, schema, id).await
109    }
110
111    /// Write multi fragment which separated by max_rows_per_file.
112    pub async fn write_fragments(
113        &self,
114        source: impl StreamingWriteSource,
115    ) -> Result<Vec<Fragment>> {
116        let (stream, schema) = self.get_stream_and_schema(Box::new(source)).await?;
117        self.write_fragments_v2_impl(stream, schema).await
118    }
119
120    async fn write_v2_impl(
121        &self,
122        stream: SendableRecordBatchStream,
123        schema: Schema,
124        id: u64,
125    ) -> Result<Fragment> {
126        let params = self.write_params.map(Cow::Borrowed).unwrap_or_default();
127        let progress = params.progress.as_ref();
128
129        Self::validate_schema(&schema, stream.schema().as_ref())?;
130
131        let (object_store, base_path) = ObjectStore::from_uri_and_params(
132            params.store_registry(),
133            self.dataset_uri,
134            &params.store_params.clone().unwrap_or_default(),
135        )
136        .await?;
137        let filename = format!("{}.lance", generate_random_filename());
138        let mut fragment = Fragment::new(id);
139        let full_path = base_path.child(DATA_DIR).child(filename.clone());
140        let obj_writer = object_store.create(&full_path).await?;
141        let mut writer = lance_file::v2::writer::FileWriter::try_new(
142            obj_writer,
143            schema,
144            FileWriterOptions {
145                format_version: params.data_storage_version,
146                ..Default::default()
147            },
148        )?;
149
150        let (major, minor) = writer.version().to_numbers();
151
152        let data_file = DataFile::new_unstarted(filename, major, minor);
153        fragment.files.push(data_file);
154
155        progress.begin(&fragment).await?;
156
157        let break_limit = (128 * 1024).min(params.max_rows_per_file);
158
159        let mut broken_stream = break_stream(stream, break_limit)
160            .map_ok(|batch| vec![batch])
161            .boxed();
162        while let Some(batched_chunk) = broken_stream.next().await {
163            let batch_chunk = batched_chunk?;
164            writer.write_batches(batch_chunk.iter()).await?;
165        }
166
167        fragment.physical_rows = Some(writer.finish().await? as usize);
168
169        if matches!(fragment.physical_rows, Some(0)) {
170            return Err(Error::invalid_input("Input data was empty.", location!()));
171        }
172
173        let field_ids = writer
174            .field_id_to_column_indices()
175            .iter()
176            .map(|(field_id, _)| *field_id as i32)
177            .collect::<Vec<_>>();
178        let column_indices = writer
179            .field_id_to_column_indices()
180            .iter()
181            .map(|(_, column_index)| *column_index as i32)
182            .collect::<Vec<_>>();
183
184        fragment.files[0].fields = field_ids;
185        fragment.files[0].column_indices = column_indices;
186
187        progress.complete(&fragment).await?;
188
189        Ok(fragment)
190    }
191    async fn write_fragments_v2_impl(
192        &self,
193        stream: SendableRecordBatchStream,
194        schema: Schema,
195    ) -> Result<Vec<Fragment>> {
196        let params = self.write_params.map(Cow::Borrowed).unwrap_or_default();
197
198        Self::validate_schema(&schema, stream.schema().as_ref())?;
199
200        let version = params.data_storage_version.unwrap_or_default();
201        let (object_store, base_path) = ObjectStore::from_uri_and_params(
202            params.store_registry(),
203            self.dataset_uri,
204            &params.store_params.clone().unwrap_or_default(),
205        )
206        .await?;
207        do_write_fragments(
208            object_store,
209            &base_path,
210            &schema,
211            stream,
212            params.into_owned(),
213            version,
214            None, // Fragment creation doesn't use target_bases
215        )
216        .await
217    }
218
219    async fn write_impl(
220        &self,
221        stream: SendableRecordBatchStream,
222        schema: Schema,
223        id: Option<u64>,
224    ) -> Result<Fragment> {
225        let id = id.unwrap_or_default();
226
227        let params = self.write_params.map(Cow::Borrowed).unwrap_or_default();
228
229        let storage_version = params.storage_version_or_default();
230
231        if storage_version != LanceFileVersion::Legacy {
232            return self.write_v2_impl(stream, schema, id).await;
233        }
234        let progress = params.progress.as_ref();
235
236        Self::validate_schema(&schema, stream.schema().as_ref())?;
237
238        let (object_store, base_path) = ObjectStore::from_uri_and_params(
239            params.store_registry(),
240            self.dataset_uri,
241            &params.store_params.clone().unwrap_or_default(),
242        )
243        .await?;
244        let filename = format!("{}.lance", generate_random_filename());
245        let mut fragment = Fragment::with_file_legacy(id, &filename, &schema, None);
246        let full_path = base_path.child(DATA_DIR).child(filename.clone());
247        let mut writer = FileWriter::<ManifestDescribing>::try_new(
248            &object_store,
249            &full_path,
250            schema,
251            &Default::default(),
252        )
253        .await?;
254
255        progress.begin(&fragment).await?;
256
257        let mut buffered_reader = chunk_stream(stream, params.max_rows_per_group);
258        while let Some(batched_chunk) = buffered_reader.next().await {
259            let batch = batched_chunk?;
260            writer.write(&batch).await?;
261        }
262
263        if writer.is_empty() {
264            return Err(Error::invalid_input("Input data was empty.", location!()));
265        }
266
267        fragment.physical_rows = Some(writer.finish().await?);
268
269        progress.complete(&fragment).await?;
270
271        Ok(fragment)
272    }
273
274    async fn get_stream_and_schema(
275        &self,
276        source: impl StreamingWriteSource,
277    ) -> Result<(SendableRecordBatchStream, Schema)> {
278        if let Some(schema) = self.schema {
279            return Ok((source.into_stream(), schema.clone()));
280        } else if matches!(self.write_params.map(|p| p.mode), Some(WriteMode::Append)) {
281            if let Some(schema) = self.existing_dataset_schema().await? {
282                return Ok((source.into_stream(), schema));
283            }
284        }
285        source.into_stream_and_schema().await
286    }
287
288    async fn existing_dataset_schema(&self) -> Result<Option<Schema>> {
289        let mut builder = DatasetBuilder::from_uri(self.dataset_uri);
290        let storage_options = self
291            .write_params
292            .and_then(|p| p.store_params.as_ref())
293            .and_then(|p| p.storage_options.clone());
294        if let Some(storage_options) = storage_options {
295            builder = builder.with_storage_options(storage_options);
296        }
297        match builder.load().await {
298            Ok(dataset) => {
299                // Use the schema from the dataset, because it has the correct
300                // field ids.
301                Ok(Some(dataset.schema().clone()))
302            }
303            Err(Error::DatasetNotFound { .. }) => {
304                // If the dataset does not exist, we can use the schema from
305                // the reader.
306                Ok(None)
307            }
308            Err(e) => Err(e),
309        }
310    }
311
312    fn validate_schema(expected: &Schema, actual: &ArrowSchema) -> Result<()> {
313        if actual.fields().is_empty() {
314            return Err(Error::invalid_input(
315                "Cannot write with an empty schema.",
316                location!(),
317            ));
318        }
319        let actual_lance = Schema::try_from(actual)?;
320        actual_lance.check_compatible(expected, &Default::default())?;
321
322        Ok(())
323    }
324}
325
326#[cfg(test)]
327mod tests {
328    use std::sync::Arc;
329
330    use arrow_array::{
331        Int64Array, RecordBatch, RecordBatchIterator, RecordBatchReader, StringArray,
332    };
333    use arrow_schema::{DataType, Field as ArrowField};
334    use lance_arrow::SchemaExt;
335    use lance_core::utils::tempfile::{TempDir, TempStrDir};
336    use rstest::rstest;
337
338    use super::*;
339
340    fn test_data() -> Box<dyn RecordBatchReader + Send> {
341        let schema = Arc::new(ArrowSchema::new(vec![
342            ArrowField::new("a", DataType::Int64, false),
343            ArrowField::new("b", DataType::Utf8, false),
344        ]));
345        let batch = RecordBatch::try_new(
346            schema.clone(),
347            vec![
348                Arc::new(Int64Array::from(vec![1, 2, 3])),
349                Arc::new(StringArray::from(vec!["a", "b", "c"])),
350            ],
351        );
352        Box::new(RecordBatchIterator::new(vec![batch], schema))
353    }
354
355    #[tokio::test]
356    async fn test_fragment_write_validation() {
357        // Writing with empty schema produces an error
358        let empty_schema = Arc::new(ArrowSchema::empty());
359        let empty_reader = Box::new(RecordBatchIterator::new(vec![], empty_schema));
360        let tmp_dir = TempDir::default();
361        let result = FragmentCreateBuilder::new(&tmp_dir.path_str())
362            .write(empty_reader, None)
363            .await;
364        assert!(result.is_err());
365        assert!(
366            matches!(result.as_ref().unwrap_err(), Error::InvalidInput { source, .. }
367            if source.to_string().contains("Cannot write with an empty schema.")),
368            "{:?}",
369            &result
370        );
371
372        // Writing empty reader produces an error
373        let arrow_schema = test_data().schema();
374        let empty_reader = Box::new(RecordBatchIterator::new(vec![], arrow_schema.clone()));
375        let result = FragmentCreateBuilder::new(tmp_dir.std_path().to_str().unwrap())
376            .write(empty_reader, None)
377            .await;
378        assert!(result.is_err());
379        assert!(
380            matches!(result.as_ref().unwrap_err(), Error::InvalidInput { source, .. }
381            if source.to_string().contains("Input data was empty.")),
382            "{:?}",
383            &result
384        );
385
386        // Writing with incorrect schema produces an error.
387        let wrong_schema = arrow_schema
388            .as_ref()
389            .try_with_column(ArrowField::new("c", DataType::Utf8, false))
390            .unwrap();
391        let wrong_schema = Schema::try_from(&wrong_schema).unwrap();
392        let result = FragmentCreateBuilder::new(tmp_dir.std_path().to_str().unwrap())
393            .schema(&wrong_schema)
394            .write(test_data(), None)
395            .await;
396        assert!(result.is_err());
397        assert!(
398            matches!(result.as_ref().unwrap_err(), Error::SchemaMismatch { difference, .. }
399            if difference.contains("fields did not match")),
400            "{:?}",
401            &result
402        );
403    }
404
405    #[tokio::test]
406    async fn test_fragment_write_default_schema() {
407        // Infers schema and uses 0 as default field id
408        let data = test_data();
409        let tmp_dir = TempStrDir::default();
410        let fragment = FragmentCreateBuilder::new(&tmp_dir)
411            .write(data, None)
412            .await
413            .unwrap();
414
415        // If unspecified, the fragment id should be 0.
416        assert_eq!(fragment.id, 0);
417        assert_eq!(fragment.deletion_file, None);
418        assert_eq!(fragment.files.len(), 1);
419        assert_eq!(fragment.files[0].fields, vec![0, 1]);
420    }
421
422    #[tokio::test]
423    async fn test_fragment_write_with_schema() {
424        // Uses provided schema. Field ids are correct in fragment metadata.
425        let data = test_data();
426
427        let arrow_schema = data.schema();
428        let mut custom_schema = Schema::try_from(arrow_schema.as_ref()).unwrap();
429        custom_schema.mut_field_by_id(0).unwrap().id = 3;
430        custom_schema.mut_field_by_id(1).unwrap().id = 1;
431
432        let tmp_dir = TempStrDir::default();
433        let fragment = FragmentCreateBuilder::new(&tmp_dir)
434            .schema(&custom_schema)
435            .write(data, Some(42))
436            .await
437            .unwrap();
438
439        assert_eq!(fragment.id, 42);
440        assert_eq!(fragment.deletion_file, None);
441        assert_eq!(fragment.files.len(), 1);
442        assert_eq!(fragment.files[0].fields, vec![3, 1]);
443        assert_eq!(fragment.files[0].column_indices, vec![0, 1]);
444    }
445
446    #[tokio::test]
447    async fn test_write_fragments_validation() {
448        // Writing with empty schema produces an error
449        let empty_schema = Arc::new(ArrowSchema::empty());
450        let empty_reader = Box::new(RecordBatchIterator::new(vec![], empty_schema));
451        let tmp_dir = TempDir::default();
452        let result = FragmentCreateBuilder::new(&tmp_dir.path_str())
453            .write_fragments(empty_reader)
454            .await;
455        assert!(result.is_err());
456        assert!(
457            matches!(result.as_ref().unwrap_err(), Error::InvalidInput { source, .. }
458            if source.to_string().contains("Cannot write with an empty schema.")),
459            "{:?}",
460            &result
461        );
462
463        // Writing empty reader produces an error
464        let arrow_schema = test_data().schema();
465        let empty_reader = Box::new(RecordBatchIterator::new(vec![], arrow_schema.clone()));
466        let result = FragmentCreateBuilder::new(tmp_dir.std_path().to_str().unwrap())
467            .write_fragments(empty_reader)
468            .await;
469        assert!(result.is_ok());
470        assert_eq!(result.unwrap().len(), 0);
471
472        // Writing with incorrect schema produces an error.
473        let wrong_schema = arrow_schema
474            .as_ref()
475            .try_with_column(ArrowField::new("c", DataType::Utf8, false))
476            .unwrap();
477        let wrong_schema = Schema::try_from(&wrong_schema).unwrap();
478        let result = FragmentCreateBuilder::new(tmp_dir.std_path().to_str().unwrap())
479            .schema(&wrong_schema)
480            .write_fragments(test_data())
481            .await;
482        assert!(result.is_err());
483        assert!(
484            matches!(result.as_ref().unwrap_err(), Error::SchemaMismatch { difference, .. }
485            if difference.contains("fields did not match")),
486            "{:?}",
487            &result
488        );
489    }
490
491    #[tokio::test]
492    async fn test_write_fragments_default_schema() {
493        // Infers schema and uses 0 as default field id
494        let data = test_data();
495        let tmp_dir = TempStrDir::default();
496        let fragments = FragmentCreateBuilder::new(&tmp_dir)
497            .write_fragments(data)
498            .await
499            .unwrap();
500
501        // If unspecified, the fragment id should be 0.
502        assert_eq!(fragments.len(), 1);
503        assert_eq!(fragments[0].deletion_file, None);
504        assert_eq!(fragments[0].files.len(), 1);
505        assert_eq!(fragments[0].files[0].fields, vec![0, 1]);
506    }
507
508    #[tokio::test]
509    async fn test_write_fragments_with_options() {
510        // Uses provided schema. Field ids are correct in fragment metadata.
511        let data = test_data();
512        let tmp_dir = TempStrDir::default();
513        let writer_params = WriteParams {
514            max_rows_per_file: 1,
515            ..Default::default()
516        };
517        let fragments = FragmentCreateBuilder::new(&tmp_dir)
518            .write_params(&writer_params)
519            .write_fragments(data)
520            .await
521            .unwrap();
522
523        assert_eq!(fragments.len(), 3);
524        assert_eq!(fragments[0].deletion_file, None);
525        assert_eq!(fragments[0].files.len(), 1);
526        assert_eq!(fragments[0].files[0].column_indices, vec![0, 1]);
527        assert_eq!(fragments[1].deletion_file, None);
528        assert_eq!(fragments[1].files.len(), 1);
529        assert_eq!(fragments[1].files[0].column_indices, vec![0, 1]);
530        assert_eq!(fragments[2].deletion_file, None);
531        assert_eq!(fragments[2].files.len(), 1);
532        assert_eq!(fragments[2].files[0].column_indices, vec![0, 1]);
533    }
534
535    #[rstest]
536    #[tokio::test]
537    async fn test_write_with_format_version(
538        #[values(
539            LanceFileVersion::V2_0,
540            LanceFileVersion::V2_1,
541            LanceFileVersion::Legacy,
542            LanceFileVersion::Stable
543        )]
544        file_version: LanceFileVersion,
545    ) {
546        let data = test_data();
547        let tmp_dir = TempStrDir::default();
548        let writer_params = WriteParams {
549            data_storage_version: Some(file_version),
550            ..Default::default()
551        };
552        let fragment = FragmentCreateBuilder::new(&tmp_dir)
553            .write_params(&writer_params)
554            .write(data, None)
555            .await
556            .unwrap();
557
558        assert!(!fragment.files.is_empty());
559        fragment.files.iter().for_each(|f| {
560            let (major_version, minor_version) = file_version.to_numbers();
561            assert_eq!(f.file_major_version, major_version);
562            assert_eq!(f.file_minor_version, minor_version);
563        })
564    }
565
566    #[rstest]
567    #[tokio::test]
568    async fn test_write_fragments_with_format_version(
569        #[values(
570            LanceFileVersion::V2_0,
571            LanceFileVersion::V2_1,
572            LanceFileVersion::Legacy,
573            LanceFileVersion::Stable
574        )]
575        file_version: LanceFileVersion,
576    ) {
577        let data = test_data();
578        let tmp_dir = TempStrDir::default();
579        let writer_params = WriteParams {
580            data_storage_version: Some(file_version),
581            ..Default::default()
582        };
583        let fragment = FragmentCreateBuilder::new(&tmp_dir)
584            .write_params(&writer_params)
585            .write_fragments(data)
586            .await
587            .unwrap();
588
589        assert!(!fragment.is_empty());
590        fragment[0].files.iter().for_each(|f| {
591            let (major_version, minor_version) = file_version.to_numbers();
592            assert_eq!(f.file_major_version, major_version);
593            assert_eq!(f.file_minor_version, minor_version);
594        })
595    }
596
597    #[test]
598    fn test_binary_filename_generation() {
599        use std::collections::HashSet;
600
601        // Test format and uniqueness
602        let mut filenames = HashSet::new();
603        for _ in 0..100 {
604            let filename = generate_random_filename();
605
606            // Should be 50 characters: 24 binary + 26 hex
607            assert_eq!(filename.len(), 50, "Filename should be 50 characters");
608
609            // First 24 should be binary
610            let binary_part = &filename[0..24];
611            assert!(
612                binary_part.chars().all(|c| c == '0' || c == '1'),
613                "First 24 chars should be binary: {}",
614                binary_part
615            );
616
617            // Last 26 should be hex
618            let hex_part = &filename[24..];
619            assert_eq!(hex_part.len(), 26, "Hex part should be 26 characters");
620            assert!(
621                hex_part.chars().all(|c| c.is_ascii_hexdigit()),
622                "Last 26 chars should be hex: {}",
623                hex_part
624            );
625
626            // Should be unique
627            assert!(filenames.insert(filename.clone()));
628        }
629    }
630}