1use std::{
19 collections::HashSet,
20 future::Future,
21 io::Read,
22 iter::{repeat, Map, Repeat, Zip},
23 sync::Arc,
24};
25
26use apache_avro::{
27 to_value, types::Value as AvroValue, Reader as AvroReader, Schema as AvroSchema,
28 Writer as AvroWriter,
29};
30use futures::TryFutureExt;
31use iceberg_rust_spec::{
32 manifest::{Content, ManifestEntry, ManifestEntryV1, ManifestEntryV2, Status},
33 manifest_list::{self, FieldSummary, ManifestListEntry},
34 partition::{PartitionField, PartitionSpec},
35 schema::{Schema, SchemaV1, SchemaV2},
36 table_metadata::{FormatVersion, TableMetadata},
37 util::strip_prefix,
38 values::{Struct, Value},
39};
40use object_store::ObjectStore;
41
42use crate::error::Error;
43
44type ReaderZip<'a, R> = Zip<AvroReader<'a, R>, Repeat<Arc<(Schema, PartitionSpec, FormatVersion)>>>;
45type ReaderMap<'a, R> = Map<
46 ReaderZip<'a, R>,
47 fn(
48 (
49 Result<AvroValue, apache_avro::Error>,
50 Arc<(Schema, PartitionSpec, FormatVersion)>,
51 ),
52 ) -> Result<ManifestEntry, Error>,
53>;
54
55pub(crate) struct ManifestReader<'a, R: Read> {
64 reader: ReaderMap<'a, R>,
65}
66
67impl<R: Read> Iterator for ManifestReader<'_, R> {
68 type Item = Result<ManifestEntry, Error>;
69 fn next(&mut self) -> Option<Self::Item> {
70 self.reader.next()
71 }
72}
73
74impl<R: Read> ManifestReader<'_, R> {
75 pub(crate) fn new(reader: R) -> Result<Self, Error> {
93 let reader = AvroReader::new(reader)?;
94 let metadata = reader.user_metadata();
95
96 let format_version: FormatVersion = match metadata
97 .get("format-version")
98 .map(|bytes| String::from_utf8(bytes.clone()))
99 .transpose()?
100 .unwrap_or("1".to_string())
101 .as_str()
102 {
103 "1" => Ok(FormatVersion::V1),
104 "2" => Ok(FormatVersion::V2),
105 _ => Err(Error::InvalidFormat("format version".to_string())),
106 }?;
107
108 let schema: Schema = match format_version {
109 FormatVersion::V1 => TryFrom::<SchemaV1>::try_from(serde_json::from_slice(
110 metadata
111 .get("schema")
112 .ok_or(Error::InvalidFormat("manifest metadata".to_string()))?,
113 )?)?,
114 FormatVersion::V2 => TryFrom::<SchemaV2>::try_from(serde_json::from_slice(
115 metadata
116 .get("schema")
117 .ok_or(Error::InvalidFormat("manifest metadata".to_string()))?,
118 )?)?,
119 };
120
121 let partition_fields: Vec<PartitionField> = serde_json::from_slice(
122 metadata
123 .get("partition-spec")
124 .ok_or(Error::InvalidFormat("manifest metadata".to_string()))?,
125 )?;
126 let spec_id: i32 = metadata
127 .get("partition-spec-id")
128 .map(|x| String::from_utf8(x.clone()))
129 .transpose()?
130 .unwrap_or("0".to_string())
131 .parse()?;
132 let partition_spec = PartitionSpec::builder()
133 .with_spec_id(spec_id)
134 .with_fields(partition_fields)
135 .build()?;
136 Ok(Self {
137 reader: reader
138 .zip(repeat(Arc::new((schema, partition_spec, format_version))))
139 .map(avro_value_to_manifest_entry),
140 })
141 }
142}
143
144pub(crate) struct ManifestWriter<'schema, 'metadata> {
158 table_metadata: &'metadata TableMetadata,
159 manifest: ManifestListEntry,
160 writer: AvroWriter<'schema, Vec<u8>>,
161}
162
163impl<'schema, 'metadata> ManifestWriter<'schema, 'metadata> {
164 pub(crate) fn new(
182 manifest_location: &str,
183 snapshot_id: i64,
184 schema: &'schema AvroSchema,
185 table_metadata: &'metadata TableMetadata,
186 content: manifest_list::Content,
187 branch: Option<&str>,
188 ) -> Result<Self, Error> {
189 let mut writer = AvroWriter::new(schema, Vec::new());
190
191 writer.add_user_metadata(
192 "format-version".to_string(),
193 match table_metadata.format_version {
194 FormatVersion::V1 => "1".as_bytes(),
195 FormatVersion::V2 => "2".as_bytes(),
196 },
197 )?;
198
199 writer.add_user_metadata(
200 "schema".to_string(),
201 match table_metadata.format_version {
202 FormatVersion::V1 => serde_json::to_string(&Into::<SchemaV1>::into(
203 table_metadata.current_schema(branch)?.clone(),
204 ))?,
205 FormatVersion::V2 => serde_json::to_string(&Into::<SchemaV2>::into(
206 table_metadata.current_schema(branch)?.clone(),
207 ))?,
208 },
209 )?;
210
211 writer.add_user_metadata(
212 "schema-id".to_string(),
213 serde_json::to_string(&table_metadata.current_schema(branch)?.schema_id())?,
214 )?;
215
216 let spec_id = table_metadata.default_spec_id;
217
218 writer.add_user_metadata(
219 "partition-spec".to_string(),
220 serde_json::to_string(
221 &table_metadata
222 .partition_specs
223 .get(&spec_id)
224 .ok_or(Error::NotFound(format!("Partition spec with id {spec_id}")))?
225 .fields(),
226 )?,
227 )?;
228
229 writer.add_user_metadata(
230 "partition-spec-id".to_string(),
231 serde_json::to_string(&spec_id)?,
232 )?;
233
234 writer.add_user_metadata(
235 "content".to_string(),
236 match content {
237 manifest_list::Content::Data => "data",
238 manifest_list::Content::Deletes => "deletes",
239 },
240 )?;
241
242 let manifest = ManifestListEntry {
243 format_version: table_metadata.format_version,
244 manifest_path: manifest_location.to_owned(),
245 manifest_length: 0,
246 partition_spec_id: table_metadata.default_spec_id,
247 content,
248 sequence_number: table_metadata.last_sequence_number + 1,
249 min_sequence_number: table_metadata.last_sequence_number + 1,
250 added_snapshot_id: snapshot_id,
251 added_files_count: Some(0),
252 existing_files_count: Some(0),
253 deleted_files_count: Some(0),
254 added_rows_count: Some(0),
255 existing_rows_count: Some(0),
256 deleted_rows_count: Some(0),
257 partitions: None,
258 key_metadata: None,
259 };
260
261 Ok(ManifestWriter {
262 manifest,
263 writer,
264 table_metadata,
265 })
266 }
267
268 pub(crate) fn from_existing(
291 manifest_reader: impl Iterator<Item = Result<ManifestEntry, Error>>,
292 mut manifest: ManifestListEntry,
293 schema: &'schema AvroSchema,
294 table_metadata: &'metadata TableMetadata,
295 branch: Option<&str>,
296 ) -> Result<Self, Error> {
297 let mut writer = AvroWriter::new(schema, Vec::new());
298
299 writer.add_user_metadata(
300 "format-version".to_string(),
301 match table_metadata.format_version {
302 FormatVersion::V1 => "1".as_bytes(),
303 FormatVersion::V2 => "2".as_bytes(),
304 },
305 )?;
306
307 writer.add_user_metadata(
308 "schema".to_string(),
309 match table_metadata.format_version {
310 FormatVersion::V1 => serde_json::to_string(&Into::<SchemaV1>::into(
311 table_metadata.current_schema(branch)?.clone(),
312 ))?,
313 FormatVersion::V2 => serde_json::to_string(&Into::<SchemaV2>::into(
314 table_metadata.current_schema(branch)?.clone(),
315 ))?,
316 },
317 )?;
318
319 writer.add_user_metadata(
320 "schema-id".to_string(),
321 serde_json::to_string(&table_metadata.current_schema(branch)?.schema_id())?,
322 )?;
323
324 let spec_id = table_metadata.default_spec_id;
325
326 writer.add_user_metadata(
327 "partition-spec".to_string(),
328 serde_json::to_string(
329 &table_metadata
330 .partition_specs
331 .get(&spec_id)
332 .ok_or(Error::NotFound(format!("Partition spec with id {spec_id}")))?
333 .fields(),
334 )?,
335 )?;
336
337 writer.add_user_metadata(
338 "partition-spec-id".to_string(),
339 serde_json::to_string(&spec_id)?,
340 )?;
341
342 writer.add_user_metadata(
343 "content".to_string(),
344 match manifest.content {
345 manifest_list::Content::Data => "data",
346 manifest_list::Content::Deletes => "deletes",
347 },
348 )?;
349
350 writer.extend(
351 manifest_reader
352 .map(|entry| {
353 let mut entry = entry
354 .map_err(|err| apache_avro::Error::DeserializeValue(err.to_string()))?;
355 *entry.status_mut() = Status::Existing;
356 if entry.sequence_number().is_none() {
357 *entry.sequence_number_mut() = Some(manifest.sequence_number);
358 }
359 if entry.snapshot_id().is_none() {
360 *entry.snapshot_id_mut() = Some(manifest.added_snapshot_id);
361 }
362 to_value(entry)
363 })
364 .filter_map(Result::ok),
365 )?;
366
367 manifest.sequence_number = table_metadata.last_sequence_number + 1;
368
369 manifest.existing_files_count = Some(
370 manifest.existing_files_count.unwrap_or(0) + manifest.added_files_count.unwrap_or(0),
371 );
372
373 manifest.added_files_count = None;
374
375 Ok(ManifestWriter {
376 manifest,
377 writer,
378 table_metadata,
379 })
380 }
381
382 pub(crate) fn from_existing_with_filter(
418 bytes: &[u8],
419 mut manifest: ManifestListEntry,
420 filter: &HashSet<String>,
421 schema: &'schema AvroSchema,
422 table_metadata: &'metadata TableMetadata,
423 branch: Option<&str>,
424 ) -> Result<Self, Error> {
425 let manifest_reader = ManifestReader::new(bytes)?;
426
427 let mut writer = AvroWriter::new(schema, Vec::new());
428
429 writer.add_user_metadata(
430 "format-version".to_string(),
431 match table_metadata.format_version {
432 FormatVersion::V1 => "1".as_bytes(),
433 FormatVersion::V2 => "2".as_bytes(),
434 },
435 )?;
436
437 writer.add_user_metadata(
438 "schema".to_string(),
439 match table_metadata.format_version {
440 FormatVersion::V1 => serde_json::to_string(&Into::<SchemaV1>::into(
441 table_metadata.current_schema(branch)?.clone(),
442 ))?,
443 FormatVersion::V2 => serde_json::to_string(&Into::<SchemaV2>::into(
444 table_metadata.current_schema(branch)?.clone(),
445 ))?,
446 },
447 )?;
448
449 writer.add_user_metadata(
450 "schema-id".to_string(),
451 serde_json::to_string(&table_metadata.current_schema(branch)?.schema_id())?,
452 )?;
453
454 let spec_id = table_metadata.default_spec_id;
455
456 writer.add_user_metadata(
457 "partition-spec".to_string(),
458 serde_json::to_string(
459 &table_metadata
460 .partition_specs
461 .get(&spec_id)
462 .ok_or(Error::NotFound(format!("Partition spec with id {spec_id}")))?
463 .fields(),
464 )?,
465 )?;
466
467 writer.add_user_metadata(
468 "partition-spec-id".to_string(),
469 serde_json::to_string(&spec_id)?,
470 )?;
471
472 writer.add_user_metadata(
473 "content".to_string(),
474 match manifest.content {
475 manifest_list::Content::Data => "data",
476 manifest_list::Content::Deletes => "deletes",
477 },
478 )?;
479
480 writer.extend(manifest_reader.filter_map(|entry| {
481 let mut entry = entry
482 .map_err(|err| apache_avro::Error::DeserializeValue(err.to_string()))
483 .unwrap();
484 if !filter.contains(entry.data_file().file_path()) {
485 *entry.status_mut() = Status::Existing;
486 if entry.sequence_number().is_none() {
487 *entry.sequence_number_mut() = Some(manifest.sequence_number);
488 }
489 if entry.snapshot_id().is_none() {
490 *entry.snapshot_id_mut() = Some(manifest.added_snapshot_id);
491 }
492 Some(to_value(entry).unwrap())
493 } else {
494 None
495 }
496 }))?;
497
498 manifest.sequence_number = table_metadata.last_sequence_number + 1;
499
500 manifest.existing_files_count = Some(
501 manifest.existing_files_count.unwrap_or(0) + manifest.added_files_count.unwrap_or(0),
502 );
503
504 manifest.added_files_count = None;
505
506 Ok(ManifestWriter {
507 manifest,
508 writer,
509 table_metadata,
510 })
511 }
512
513 pub(crate) fn append(&mut self, manifest_entry: ManifestEntry) -> Result<(), Error> {
533 let mut added_rows_count = 0;
534 let mut deleted_rows_count = 0;
535
536 if self.manifest.partitions.is_none() {
537 self.manifest.partitions = Some(
538 self.table_metadata
539 .default_partition_spec()?
540 .fields()
541 .iter()
542 .map(|_| FieldSummary {
543 contains_null: false,
544 contains_nan: None,
545 lower_bound: None,
546 upper_bound: None,
547 })
548 .collect::<Vec<FieldSummary>>(),
549 );
550 }
551
552 match manifest_entry.data_file().content() {
553 Content::Data => {
554 added_rows_count += manifest_entry.data_file().record_count();
555 }
556 Content::EqualityDeletes => {
557 deleted_rows_count += manifest_entry.data_file().record_count();
558 }
559 _ => (),
560 }
561 let status = *manifest_entry.status();
562
563 update_partitions(
564 self.manifest.partitions.as_mut().unwrap(),
565 manifest_entry.data_file().partition(),
566 self.table_metadata.default_partition_spec()?.fields(),
567 )?;
568
569 if let Some(sequence_number) = manifest_entry.sequence_number() {
570 if self.manifest.min_sequence_number > *sequence_number {
571 self.manifest.min_sequence_number = *sequence_number;
572 }
573 };
574
575 self.writer.append_ser(manifest_entry)?;
576
577 match status {
578 Status::Added => {
579 self.manifest.added_files_count = match self.manifest.added_files_count {
580 Some(count) => Some(count + 1),
581 None => Some(1),
582 };
583 }
584 Status::Existing => {
585 self.manifest.existing_files_count = match self.manifest.existing_files_count {
586 Some(count) => Some(count + 1),
587 None => Some(1),
588 };
589 }
590 Status::Deleted => (),
591 }
592
593 self.manifest.added_rows_count = match self.manifest.added_rows_count {
594 Some(count) => Some(count + added_rows_count),
595 None => Some(added_rows_count),
596 };
597
598 self.manifest.deleted_rows_count = match self.manifest.deleted_rows_count {
599 Some(count) => Some(count + deleted_rows_count),
600 None => Some(deleted_rows_count),
601 };
602
603 Ok(())
604 }
605
606 pub(crate) async fn finish(
624 mut self,
625 object_store: Arc<dyn ObjectStore>,
626 ) -> Result<ManifestListEntry, Error> {
627 let manifest_bytes = self.writer.into_inner()?;
628
629 let manifest_length: i64 = manifest_bytes.len() as i64;
630
631 self.manifest.manifest_length += manifest_length;
632
633 object_store
634 .put(
635 &strip_prefix(&self.manifest.manifest_path).as_str().into(),
636 manifest_bytes.into(),
637 )
638 .await?;
639 Ok(self.manifest)
640 }
641
642 pub(crate) fn finish_concurrently(
665 mut self,
666 object_store: Arc<dyn ObjectStore>,
667 ) -> Result<(ManifestListEntry, impl Future<Output = Result<(), Error>>), Error> {
668 let manifest_bytes = self.writer.into_inner()?;
669
670 let manifest_length: i64 = manifest_bytes.len() as i64;
671
672 self.manifest.manifest_length += manifest_length;
673
674 let path = strip_prefix(&self.manifest.manifest_path).as_str().into();
675 let future = async move {
676 object_store
677 .put(&path, manifest_bytes.into())
678 .map_ok(|_| ())
679 .map_err(Error::from)
680 .await
681 };
682 Ok((self.manifest, future))
683 }
684}
685
686#[allow(clippy::type_complexity)]
687fn avro_value_to_manifest_entry(
689 value: (
690 Result<AvroValue, apache_avro::Error>,
691 Arc<(Schema, PartitionSpec, FormatVersion)>,
692 ),
693) -> Result<ManifestEntry, Error> {
694 let entry = value.0?;
695 let schema = &value.1 .0;
696 let partition_spec = &value.1 .1;
697 let format_version = &value.1 .2;
698 match format_version {
699 FormatVersion::V2 => ManifestEntry::try_from_v2(
700 apache_avro::from_value::<ManifestEntryV2>(&entry)?,
701 schema,
702 partition_spec,
703 )
704 .map_err(Error::from),
705 FormatVersion::V1 => ManifestEntry::try_from_v1(
706 apache_avro::from_value::<ManifestEntryV1>(&entry)?,
707 schema,
708 partition_spec,
709 )
710 .map_err(Error::from),
711 }
712}
713
714fn update_partitions(
715 partitions: &mut [FieldSummary],
716 partition_values: &Struct,
717 partition_columns: &[PartitionField],
718) -> Result<(), Error> {
719 for (field, summary) in partition_columns.iter().zip(partitions.iter_mut()) {
720 let value = partition_values.get(field.name()).and_then(|x| x.as_ref());
721 if let Some(value) = value {
722 if summary.lower_bound.is_none() {
723 summary.lower_bound = Some(value.clone());
724 } else if let Some(lower_bound) = &mut summary.lower_bound {
725 match (value, lower_bound) {
726 (Value::Int(val), Value::Int(current)) => {
727 if *current > *val {
728 *current = *val
729 }
730 }
731 (Value::LongInt(val), Value::LongInt(current)) => {
732 if *current > *val {
733 *current = *val
734 }
735 }
736 (Value::Float(val), Value::Float(current)) => {
737 if *current > *val {
738 *current = *val
739 }
740 }
741 (Value::Double(val), Value::Double(current)) => {
742 if *current > *val {
743 *current = *val
744 }
745 }
746 (Value::Date(val), Value::Date(current)) => {
747 if *current > *val {
748 *current = *val
749 }
750 }
751 (Value::Time(val), Value::Time(current)) => {
752 if *current > *val {
753 *current = *val
754 }
755 }
756 (Value::Timestamp(val), Value::Timestamp(current)) => {
757 if *current > *val {
758 *current = *val
759 }
760 }
761 (Value::TimestampTZ(val), Value::TimestampTZ(current)) => {
762 if *current > *val {
763 *current = *val
764 }
765 }
766 _ => {}
767 }
768 }
769 if summary.upper_bound.is_none() {
770 summary.upper_bound = Some(value.clone());
771 } else if let Some(upper_bound) = &mut summary.upper_bound {
772 match (value, upper_bound) {
773 (Value::Int(val), Value::Int(current)) => {
774 if *current < *val {
775 *current = *val
776 }
777 }
778 (Value::LongInt(val), Value::LongInt(current)) => {
779 if *current < *val {
780 *current = *val
781 }
782 }
783 (Value::Float(val), Value::Float(current)) => {
784 if *current < *val {
785 *current = *val
786 }
787 }
788 (Value::Double(val), Value::Double(current)) => {
789 if *current < *val {
790 *current = *val
791 }
792 }
793 (Value::Date(val), Value::Date(current)) => {
794 if *current < *val {
795 *current = *val
796 }
797 }
798 (Value::Time(val), Value::Time(current)) => {
799 if *current < *val {
800 *current = *val
801 }
802 }
803 (Value::Timestamp(val), Value::Timestamp(current)) => {
804 if *current < *val {
805 *current = *val
806 }
807 }
808 (Value::TimestampTZ(val), Value::TimestampTZ(current)) => {
809 if *current < *val {
810 *current = *val
811 }
812 }
813 _ => {}
814 }
815 }
816 }
817 }
818 Ok(())
819}
820
821#[cfg(test)]
823mod tests {}