1use arrow_schema::Schema as ArrowSchema;
5use datafusion::execution::SendableRecordBatchStream;
6use futures::{StreamExt, TryStreamExt};
7use lance_core::datatypes::Schema;
8use lance_core::Error;
9use lance_datafusion::chunker::{break_stream, chunk_stream};
10use lance_datafusion::utils::StreamingWriteSource;
11use lance_file::v2::writer::FileWriterOptions;
12use lance_file::version::LanceFileVersion;
13use lance_file::writer::FileWriter;
14use lance_io::object_store::ObjectStore;
15use lance_table::format::{DataFile, Fragment};
16use lance_table::io::manifest::ManifestDescribing;
17use snafu::location;
18use std::borrow::Cow;
19use uuid::Uuid;
20
21use crate::dataset::builder::DatasetBuilder;
22use crate::dataset::write::do_write_fragments;
23use crate::dataset::{WriteMode, WriteParams, DATA_DIR};
24use crate::Result;
25
26pub(crate) fn generate_random_filename() -> String {
44 let uuid = Uuid::new_v4();
45 let bytes = uuid.as_bytes();
46
47 let mut out = String::with_capacity(50);
48
49 for &b in &bytes[..3] {
51 for i in (0..8).rev() {
52 out.push(if (b >> i) & 1 == 1 { '1' } else { '0' });
53 }
54 }
55
56 const HEX: &[u8; 16] = b"0123456789abcdef";
58 for &b in &bytes[3..] {
59 out.push(HEX[(b >> 4) as usize] as char);
60 out.push(HEX[(b & 0xf) as usize] as char);
61 }
62
63 out
64}
65
66pub struct FragmentCreateBuilder<'a> {
70 dataset_uri: &'a str,
71 schema: Option<&'a Schema>,
72 write_params: Option<&'a WriteParams>,
73}
74
75impl<'a> FragmentCreateBuilder<'a> {
76 pub fn new(dataset_uri: &'a str) -> Self {
77 Self {
78 dataset_uri,
79 schema: None,
80 write_params: None,
81 }
82 }
83
84 pub fn schema(mut self, schema: &'a Schema) -> Self {
91 self.schema = Some(schema);
92 self
93 }
94
95 pub fn write_params(mut self, params: &'a WriteParams) -> Self {
97 self.write_params = Some(params);
98 self
99 }
100
101 pub async fn write(
103 &self,
104 source: impl StreamingWriteSource,
105 id: Option<u64>,
106 ) -> Result<Fragment> {
107 let (stream, schema) = self.get_stream_and_schema(Box::new(source)).await?;
108 self.write_impl(stream, schema, id).await
109 }
110
111 pub async fn write_fragments(
113 &self,
114 source: impl StreamingWriteSource,
115 ) -> Result<Vec<Fragment>> {
116 let (stream, schema) = self.get_stream_and_schema(Box::new(source)).await?;
117 self.write_fragments_v2_impl(stream, schema).await
118 }
119
120 async fn write_v2_impl(
121 &self,
122 stream: SendableRecordBatchStream,
123 schema: Schema,
124 id: u64,
125 ) -> Result<Fragment> {
126 let params = self.write_params.map(Cow::Borrowed).unwrap_or_default();
127 let progress = params.progress.as_ref();
128
129 Self::validate_schema(&schema, stream.schema().as_ref())?;
130
131 let (object_store, base_path) = ObjectStore::from_uri_and_params(
132 params.store_registry(),
133 self.dataset_uri,
134 ¶ms.store_params.clone().unwrap_or_default(),
135 )
136 .await?;
137 let filename = format!("{}.lance", generate_random_filename());
138 let mut fragment = Fragment::new(id);
139 let full_path = base_path.child(DATA_DIR).child(filename.clone());
140 let obj_writer = object_store.create(&full_path).await?;
141 let mut writer = lance_file::v2::writer::FileWriter::try_new(
142 obj_writer,
143 schema,
144 FileWriterOptions {
145 format_version: params.data_storage_version,
146 ..Default::default()
147 },
148 )?;
149
150 let (major, minor) = writer.version().to_numbers();
151
152 let data_file = DataFile::new_unstarted(filename, major, minor);
153 fragment.files.push(data_file);
154
155 progress.begin(&fragment).await?;
156
157 let break_limit = (128 * 1024).min(params.max_rows_per_file);
158
159 let mut broken_stream = break_stream(stream, break_limit)
160 .map_ok(|batch| vec![batch])
161 .boxed();
162 while let Some(batched_chunk) = broken_stream.next().await {
163 let batch_chunk = batched_chunk?;
164 writer.write_batches(batch_chunk.iter()).await?;
165 }
166
167 fragment.physical_rows = Some(writer.finish().await? as usize);
168
169 if matches!(fragment.physical_rows, Some(0)) {
170 return Err(Error::invalid_input("Input data was empty.", location!()));
171 }
172
173 let field_ids = writer
174 .field_id_to_column_indices()
175 .iter()
176 .map(|(field_id, _)| *field_id as i32)
177 .collect::<Vec<_>>();
178 let column_indices = writer
179 .field_id_to_column_indices()
180 .iter()
181 .map(|(_, column_index)| *column_index as i32)
182 .collect::<Vec<_>>();
183
184 fragment.files[0].fields = field_ids;
185 fragment.files[0].column_indices = column_indices;
186
187 progress.complete(&fragment).await?;
188
189 Ok(fragment)
190 }
191 async fn write_fragments_v2_impl(
192 &self,
193 stream: SendableRecordBatchStream,
194 schema: Schema,
195 ) -> Result<Vec<Fragment>> {
196 let params = self.write_params.map(Cow::Borrowed).unwrap_or_default();
197
198 Self::validate_schema(&schema, stream.schema().as_ref())?;
199
200 let version = params.data_storage_version.unwrap_or_default();
201 let (object_store, base_path) = ObjectStore::from_uri_and_params(
202 params.store_registry(),
203 self.dataset_uri,
204 ¶ms.store_params.clone().unwrap_or_default(),
205 )
206 .await?;
207 do_write_fragments(
208 object_store,
209 &base_path,
210 &schema,
211 stream,
212 params.into_owned(),
213 version,
214 None, )
216 .await
217 }
218
219 async fn write_impl(
220 &self,
221 stream: SendableRecordBatchStream,
222 schema: Schema,
223 id: Option<u64>,
224 ) -> Result<Fragment> {
225 let id = id.unwrap_or_default();
226
227 let params = self.write_params.map(Cow::Borrowed).unwrap_or_default();
228
229 let storage_version = params.storage_version_or_default();
230
231 if storage_version != LanceFileVersion::Legacy {
232 return self.write_v2_impl(stream, schema, id).await;
233 }
234 let progress = params.progress.as_ref();
235
236 Self::validate_schema(&schema, stream.schema().as_ref())?;
237
238 let (object_store, base_path) = ObjectStore::from_uri_and_params(
239 params.store_registry(),
240 self.dataset_uri,
241 ¶ms.store_params.clone().unwrap_or_default(),
242 )
243 .await?;
244 let filename = format!("{}.lance", generate_random_filename());
245 let mut fragment = Fragment::with_file_legacy(id, &filename, &schema, None);
246 let full_path = base_path.child(DATA_DIR).child(filename.clone());
247 let mut writer = FileWriter::<ManifestDescribing>::try_new(
248 &object_store,
249 &full_path,
250 schema,
251 &Default::default(),
252 )
253 .await?;
254
255 progress.begin(&fragment).await?;
256
257 let mut buffered_reader = chunk_stream(stream, params.max_rows_per_group);
258 while let Some(batched_chunk) = buffered_reader.next().await {
259 let batch = batched_chunk?;
260 writer.write(&batch).await?;
261 }
262
263 if writer.is_empty() {
264 return Err(Error::invalid_input("Input data was empty.", location!()));
265 }
266
267 fragment.physical_rows = Some(writer.finish().await?);
268
269 progress.complete(&fragment).await?;
270
271 Ok(fragment)
272 }
273
274 async fn get_stream_and_schema(
275 &self,
276 source: impl StreamingWriteSource,
277 ) -> Result<(SendableRecordBatchStream, Schema)> {
278 if let Some(schema) = self.schema {
279 return Ok((source.into_stream(), schema.clone()));
280 } else if matches!(self.write_params.map(|p| p.mode), Some(WriteMode::Append)) {
281 if let Some(schema) = self.existing_dataset_schema().await? {
282 return Ok((source.into_stream(), schema));
283 }
284 }
285 source.into_stream_and_schema().await
286 }
287
288 async fn existing_dataset_schema(&self) -> Result<Option<Schema>> {
289 let mut builder = DatasetBuilder::from_uri(self.dataset_uri);
290 let storage_options = self
291 .write_params
292 .and_then(|p| p.store_params.as_ref())
293 .and_then(|p| p.storage_options.clone());
294 if let Some(storage_options) = storage_options {
295 builder = builder.with_storage_options(storage_options);
296 }
297 match builder.load().await {
298 Ok(dataset) => {
299 Ok(Some(dataset.schema().clone()))
302 }
303 Err(Error::DatasetNotFound { .. }) => {
304 Ok(None)
307 }
308 Err(e) => Err(e),
309 }
310 }
311
312 fn validate_schema(expected: &Schema, actual: &ArrowSchema) -> Result<()> {
313 if actual.fields().is_empty() {
314 return Err(Error::invalid_input(
315 "Cannot write with an empty schema.",
316 location!(),
317 ));
318 }
319 let actual_lance = Schema::try_from(actual)?;
320 actual_lance.check_compatible(expected, &Default::default())?;
321
322 Ok(())
323 }
324}
325
326#[cfg(test)]
327mod tests {
328 use std::sync::Arc;
329
330 use arrow_array::{
331 Int64Array, RecordBatch, RecordBatchIterator, RecordBatchReader, StringArray,
332 };
333 use arrow_schema::{DataType, Field as ArrowField};
334 use lance_arrow::SchemaExt;
335 use lance_core::utils::tempfile::{TempDir, TempStrDir};
336 use rstest::rstest;
337
338 use super::*;
339
340 fn test_data() -> Box<dyn RecordBatchReader + Send> {
341 let schema = Arc::new(ArrowSchema::new(vec![
342 ArrowField::new("a", DataType::Int64, false),
343 ArrowField::new("b", DataType::Utf8, false),
344 ]));
345 let batch = RecordBatch::try_new(
346 schema.clone(),
347 vec![
348 Arc::new(Int64Array::from(vec![1, 2, 3])),
349 Arc::new(StringArray::from(vec!["a", "b", "c"])),
350 ],
351 );
352 Box::new(RecordBatchIterator::new(vec![batch], schema))
353 }
354
355 #[tokio::test]
356 async fn test_fragment_write_validation() {
357 let empty_schema = Arc::new(ArrowSchema::empty());
359 let empty_reader = Box::new(RecordBatchIterator::new(vec![], empty_schema));
360 let tmp_dir = TempDir::default();
361 let result = FragmentCreateBuilder::new(&tmp_dir.path_str())
362 .write(empty_reader, None)
363 .await;
364 assert!(result.is_err());
365 assert!(
366 matches!(result.as_ref().unwrap_err(), Error::InvalidInput { source, .. }
367 if source.to_string().contains("Cannot write with an empty schema.")),
368 "{:?}",
369 &result
370 );
371
372 let arrow_schema = test_data().schema();
374 let empty_reader = Box::new(RecordBatchIterator::new(vec![], arrow_schema.clone()));
375 let result = FragmentCreateBuilder::new(tmp_dir.std_path().to_str().unwrap())
376 .write(empty_reader, None)
377 .await;
378 assert!(result.is_err());
379 assert!(
380 matches!(result.as_ref().unwrap_err(), Error::InvalidInput { source, .. }
381 if source.to_string().contains("Input data was empty.")),
382 "{:?}",
383 &result
384 );
385
386 let wrong_schema = arrow_schema
388 .as_ref()
389 .try_with_column(ArrowField::new("c", DataType::Utf8, false))
390 .unwrap();
391 let wrong_schema = Schema::try_from(&wrong_schema).unwrap();
392 let result = FragmentCreateBuilder::new(tmp_dir.std_path().to_str().unwrap())
393 .schema(&wrong_schema)
394 .write(test_data(), None)
395 .await;
396 assert!(result.is_err());
397 assert!(
398 matches!(result.as_ref().unwrap_err(), Error::SchemaMismatch { difference, .. }
399 if difference.contains("fields did not match")),
400 "{:?}",
401 &result
402 );
403 }
404
405 #[tokio::test]
406 async fn test_fragment_write_default_schema() {
407 let data = test_data();
409 let tmp_dir = TempStrDir::default();
410 let fragment = FragmentCreateBuilder::new(&tmp_dir)
411 .write(data, None)
412 .await
413 .unwrap();
414
415 assert_eq!(fragment.id, 0);
417 assert_eq!(fragment.deletion_file, None);
418 assert_eq!(fragment.files.len(), 1);
419 assert_eq!(fragment.files[0].fields, vec![0, 1]);
420 }
421
422 #[tokio::test]
423 async fn test_fragment_write_with_schema() {
424 let data = test_data();
426
427 let arrow_schema = data.schema();
428 let mut custom_schema = Schema::try_from(arrow_schema.as_ref()).unwrap();
429 custom_schema.mut_field_by_id(0).unwrap().id = 3;
430 custom_schema.mut_field_by_id(1).unwrap().id = 1;
431
432 let tmp_dir = TempStrDir::default();
433 let fragment = FragmentCreateBuilder::new(&tmp_dir)
434 .schema(&custom_schema)
435 .write(data, Some(42))
436 .await
437 .unwrap();
438
439 assert_eq!(fragment.id, 42);
440 assert_eq!(fragment.deletion_file, None);
441 assert_eq!(fragment.files.len(), 1);
442 assert_eq!(fragment.files[0].fields, vec![3, 1]);
443 assert_eq!(fragment.files[0].column_indices, vec![0, 1]);
444 }
445
446 #[tokio::test]
447 async fn test_write_fragments_validation() {
448 let empty_schema = Arc::new(ArrowSchema::empty());
450 let empty_reader = Box::new(RecordBatchIterator::new(vec![], empty_schema));
451 let tmp_dir = TempDir::default();
452 let result = FragmentCreateBuilder::new(&tmp_dir.path_str())
453 .write_fragments(empty_reader)
454 .await;
455 assert!(result.is_err());
456 assert!(
457 matches!(result.as_ref().unwrap_err(), Error::InvalidInput { source, .. }
458 if source.to_string().contains("Cannot write with an empty schema.")),
459 "{:?}",
460 &result
461 );
462
463 let arrow_schema = test_data().schema();
465 let empty_reader = Box::new(RecordBatchIterator::new(vec![], arrow_schema.clone()));
466 let result = FragmentCreateBuilder::new(tmp_dir.std_path().to_str().unwrap())
467 .write_fragments(empty_reader)
468 .await;
469 assert!(result.is_ok());
470 assert_eq!(result.unwrap().len(), 0);
471
472 let wrong_schema = arrow_schema
474 .as_ref()
475 .try_with_column(ArrowField::new("c", DataType::Utf8, false))
476 .unwrap();
477 let wrong_schema = Schema::try_from(&wrong_schema).unwrap();
478 let result = FragmentCreateBuilder::new(tmp_dir.std_path().to_str().unwrap())
479 .schema(&wrong_schema)
480 .write_fragments(test_data())
481 .await;
482 assert!(result.is_err());
483 assert!(
484 matches!(result.as_ref().unwrap_err(), Error::SchemaMismatch { difference, .. }
485 if difference.contains("fields did not match")),
486 "{:?}",
487 &result
488 );
489 }
490
491 #[tokio::test]
492 async fn test_write_fragments_default_schema() {
493 let data = test_data();
495 let tmp_dir = TempStrDir::default();
496 let fragments = FragmentCreateBuilder::new(&tmp_dir)
497 .write_fragments(data)
498 .await
499 .unwrap();
500
501 assert_eq!(fragments.len(), 1);
503 assert_eq!(fragments[0].deletion_file, None);
504 assert_eq!(fragments[0].files.len(), 1);
505 assert_eq!(fragments[0].files[0].fields, vec![0, 1]);
506 }
507
508 #[tokio::test]
509 async fn test_write_fragments_with_options() {
510 let data = test_data();
512 let tmp_dir = TempStrDir::default();
513 let writer_params = WriteParams {
514 max_rows_per_file: 1,
515 ..Default::default()
516 };
517 let fragments = FragmentCreateBuilder::new(&tmp_dir)
518 .write_params(&writer_params)
519 .write_fragments(data)
520 .await
521 .unwrap();
522
523 assert_eq!(fragments.len(), 3);
524 assert_eq!(fragments[0].deletion_file, None);
525 assert_eq!(fragments[0].files.len(), 1);
526 assert_eq!(fragments[0].files[0].column_indices, vec![0, 1]);
527 assert_eq!(fragments[1].deletion_file, None);
528 assert_eq!(fragments[1].files.len(), 1);
529 assert_eq!(fragments[1].files[0].column_indices, vec![0, 1]);
530 assert_eq!(fragments[2].deletion_file, None);
531 assert_eq!(fragments[2].files.len(), 1);
532 assert_eq!(fragments[2].files[0].column_indices, vec![0, 1]);
533 }
534
535 #[rstest]
536 #[tokio::test]
537 async fn test_write_with_format_version(
538 #[values(
539 LanceFileVersion::V2_0,
540 LanceFileVersion::V2_1,
541 LanceFileVersion::Legacy,
542 LanceFileVersion::Stable
543 )]
544 file_version: LanceFileVersion,
545 ) {
546 let data = test_data();
547 let tmp_dir = TempStrDir::default();
548 let writer_params = WriteParams {
549 data_storage_version: Some(file_version),
550 ..Default::default()
551 };
552 let fragment = FragmentCreateBuilder::new(&tmp_dir)
553 .write_params(&writer_params)
554 .write(data, None)
555 .await
556 .unwrap();
557
558 assert!(!fragment.files.is_empty());
559 fragment.files.iter().for_each(|f| {
560 let (major_version, minor_version) = file_version.to_numbers();
561 assert_eq!(f.file_major_version, major_version);
562 assert_eq!(f.file_minor_version, minor_version);
563 })
564 }
565
566 #[rstest]
567 #[tokio::test]
568 async fn test_write_fragments_with_format_version(
569 #[values(
570 LanceFileVersion::V2_0,
571 LanceFileVersion::V2_1,
572 LanceFileVersion::Legacy,
573 LanceFileVersion::Stable
574 )]
575 file_version: LanceFileVersion,
576 ) {
577 let data = test_data();
578 let tmp_dir = TempStrDir::default();
579 let writer_params = WriteParams {
580 data_storage_version: Some(file_version),
581 ..Default::default()
582 };
583 let fragment = FragmentCreateBuilder::new(&tmp_dir)
584 .write_params(&writer_params)
585 .write_fragments(data)
586 .await
587 .unwrap();
588
589 assert!(!fragment.is_empty());
590 fragment[0].files.iter().for_each(|f| {
591 let (major_version, minor_version) = file_version.to_numbers();
592 assert_eq!(f.file_major_version, major_version);
593 assert_eq!(f.file_minor_version, minor_version);
594 })
595 }
596
597 #[test]
598 fn test_binary_filename_generation() {
599 use std::collections::HashSet;
600
601 let mut filenames = HashSet::new();
603 for _ in 0..100 {
604 let filename = generate_random_filename();
605
606 assert_eq!(filename.len(), 50, "Filename should be 50 characters");
608
609 let binary_part = &filename[0..24];
611 assert!(
612 binary_part.chars().all(|c| c == '0' || c == '1'),
613 "First 24 chars should be binary: {}",
614 binary_part
615 );
616
617 let hex_part = &filename[24..];
619 assert_eq!(hex_part.len(), 26, "Hex part should be 26 characters");
620 assert!(
621 hex_part.chars().all(|c| c.is_ascii_hexdigit()),
622 "Last 26 chars should be hex: {}",
623 hex_part
624 );
625
626 assert!(filenames.insert(filename.clone()));
628 }
629 }
630}