1use std::{
19 collections::HashSet,
20 io::Read,
21 iter::{repeat, Map, Repeat, Zip},
22 sync::Arc,
23};
24
25use apache_avro::{
26 to_value, types::Value as AvroValue, Reader as AvroReader, Schema as AvroSchema,
27 Writer as AvroWriter,
28};
29use iceberg_rust_spec::{
30 manifest::{Content, ManifestEntry, ManifestEntryV1, ManifestEntryV2, Status},
31 manifest_list::{self, FieldSummary, ManifestListEntry},
32 partition::{PartitionField, PartitionSpec},
33 schema::{Schema, SchemaV1, SchemaV2},
34 table_metadata::{FormatVersion, TableMetadata},
35 util::strip_prefix,
36 values::{Struct, Value},
37};
38use object_store::ObjectStore;
39
40use crate::{error::Error, spec};
41
42type ReaderZip<'a, R> = Zip<AvroReader<'a, R>, Repeat<Arc<(Schema, PartitionSpec, FormatVersion)>>>;
43type ReaderMap<'a, R> = Map<
44 ReaderZip<'a, R>,
45 fn(
46 (
47 Result<AvroValue, apache_avro::Error>,
48 Arc<(Schema, PartitionSpec, FormatVersion)>,
49 ),
50 ) -> Result<ManifestEntry, Error>,
51>;
52
53pub(crate) struct ManifestReader<'a, R: Read> {
62 reader: ReaderMap<'a, R>,
63}
64
65impl<R: Read> Iterator for ManifestReader<'_, R> {
66 type Item = Result<ManifestEntry, Error>;
67 fn next(&mut self) -> Option<Self::Item> {
68 self.reader.next()
69 }
70}
71
72impl<R: Read> ManifestReader<'_, R> {
73 pub(crate) fn new(reader: R) -> Result<Self, Error> {
91 let reader = AvroReader::new(reader)?;
92 let metadata = reader.user_metadata();
93
94 let format_version: FormatVersion = match metadata
95 .get("format-version")
96 .map(|bytes| String::from_utf8(bytes.clone()))
97 .transpose()?
98 .unwrap_or("1".to_string())
99 .as_str()
100 {
101 "1" => Ok(FormatVersion::V1),
102 "2" => Ok(FormatVersion::V2),
103 _ => Err(Error::InvalidFormat("format version".to_string())),
104 }?;
105
106 let schema: Schema = match format_version {
107 FormatVersion::V1 => TryFrom::<SchemaV1>::try_from(serde_json::from_slice(
108 metadata
109 .get("schema")
110 .ok_or(Error::InvalidFormat("manifest metadata".to_string()))?,
111 )?)?,
112 FormatVersion::V2 => TryFrom::<SchemaV2>::try_from(serde_json::from_slice(
113 metadata
114 .get("schema")
115 .ok_or(Error::InvalidFormat("manifest metadata".to_string()))?,
116 )?)?,
117 };
118
119 let partition_fields: Vec<PartitionField> = serde_json::from_slice(
120 metadata
121 .get("partition-spec")
122 .ok_or(Error::InvalidFormat("manifest metadata".to_string()))?,
123 )?;
124 let spec_id: i32 = metadata
125 .get("partition-spec-id")
126 .map(|x| String::from_utf8(x.clone()))
127 .transpose()?
128 .unwrap_or("0".to_string())
129 .parse()?;
130 let partition_spec = PartitionSpec::builder()
131 .with_spec_id(spec_id)
132 .with_fields(partition_fields)
133 .build()
134 .map_err(spec::error::Error::from)?;
135 Ok(Self {
136 reader: reader
137 .zip(repeat(Arc::new((schema, partition_spec, format_version))))
138 .map(avro_value_to_manifest_entry),
139 })
140 }
141}
142
143pub(crate) struct ManifestWriter<'schema, 'metadata> {
157 table_metadata: &'metadata TableMetadata,
158 manifest: ManifestListEntry,
159 writer: AvroWriter<'schema, Vec<u8>>,
160}
161
162impl<'schema, 'metadata> ManifestWriter<'schema, 'metadata> {
163 pub(crate) fn new(
181 manifest_location: &str,
182 snapshot_id: i64,
183 schema: &'schema AvroSchema,
184 table_metadata: &'metadata TableMetadata,
185 branch: Option<&str>,
186 ) -> Result<Self, Error> {
187 let mut writer = AvroWriter::new(schema, Vec::new());
188
189 writer.add_user_metadata(
190 "format-version".to_string(),
191 match table_metadata.format_version {
192 FormatVersion::V1 => "1".as_bytes(),
193 FormatVersion::V2 => "2".as_bytes(),
194 },
195 )?;
196
197 writer.add_user_metadata(
198 "schema".to_string(),
199 match table_metadata.format_version {
200 FormatVersion::V1 => serde_json::to_string(&Into::<SchemaV1>::into(
201 table_metadata.current_schema(branch)?.clone(),
202 ))?,
203 FormatVersion::V2 => serde_json::to_string(&Into::<SchemaV2>::into(
204 table_metadata.current_schema(branch)?.clone(),
205 ))?,
206 },
207 )?;
208
209 writer.add_user_metadata(
210 "schema-id".to_string(),
211 serde_json::to_string(&table_metadata.current_schema(branch)?.schema_id())?,
212 )?;
213
214 let spec_id = table_metadata.default_spec_id;
215
216 writer.add_user_metadata(
217 "partition-spec".to_string(),
218 serde_json::to_string(
219 &table_metadata
220 .partition_specs
221 .get(&spec_id)
222 .ok_or(Error::NotFound(format!("Partition spec with id {spec_id}")))?
223 .fields(),
224 )?,
225 )?;
226
227 writer.add_user_metadata(
228 "partition-spec-id".to_string(),
229 serde_json::to_string(&spec_id)?,
230 )?;
231
232 writer.add_user_metadata("content".to_string(), "data")?;
233
234 let manifest = ManifestListEntry {
235 format_version: table_metadata.format_version,
236 manifest_path: manifest_location.to_owned(),
237 manifest_length: 0,
238 partition_spec_id: table_metadata.default_spec_id,
239 content: manifest_list::Content::Data,
240 sequence_number: table_metadata.last_sequence_number + 1,
241 min_sequence_number: table_metadata.last_sequence_number + 1,
242 added_snapshot_id: snapshot_id,
243 added_files_count: Some(0),
244 existing_files_count: Some(0),
245 deleted_files_count: Some(0),
246 added_rows_count: Some(0),
247 existing_rows_count: Some(0),
248 deleted_rows_count: Some(0),
249 partitions: None,
250 key_metadata: None,
251 };
252
253 Ok(ManifestWriter {
254 manifest,
255 writer,
256 table_metadata,
257 })
258 }
259
260 pub(crate) fn from_existing(
283 manifest_reader: impl Iterator<Item = Result<ManifestEntry, Error>>,
284 mut manifest: ManifestListEntry,
285 schema: &'schema AvroSchema,
286 table_metadata: &'metadata TableMetadata,
287 branch: Option<&str>,
288 ) -> Result<Self, Error> {
289 let mut writer = AvroWriter::new(schema, Vec::new());
290
291 writer.add_user_metadata(
292 "format-version".to_string(),
293 match table_metadata.format_version {
294 FormatVersion::V1 => "1".as_bytes(),
295 FormatVersion::V2 => "2".as_bytes(),
296 },
297 )?;
298
299 writer.add_user_metadata(
300 "schema".to_string(),
301 match table_metadata.format_version {
302 FormatVersion::V1 => serde_json::to_string(&Into::<SchemaV1>::into(
303 table_metadata.current_schema(branch)?.clone(),
304 ))?,
305 FormatVersion::V2 => serde_json::to_string(&Into::<SchemaV2>::into(
306 table_metadata.current_schema(branch)?.clone(),
307 ))?,
308 },
309 )?;
310
311 writer.add_user_metadata(
312 "schema-id".to_string(),
313 serde_json::to_string(&table_metadata.current_schema(branch)?.schema_id())?,
314 )?;
315
316 let spec_id = table_metadata.default_spec_id;
317
318 writer.add_user_metadata(
319 "partition-spec".to_string(),
320 serde_json::to_string(
321 &table_metadata
322 .partition_specs
323 .get(&spec_id)
324 .ok_or(Error::NotFound(format!("Partition spec with id {spec_id}")))?
325 .fields(),
326 )?,
327 )?;
328
329 writer.add_user_metadata(
330 "partition-spec-id".to_string(),
331 serde_json::to_string(&spec_id)?,
332 )?;
333
334 writer.add_user_metadata("content".to_string(), "data")?;
335
336 writer.extend(
337 manifest_reader
338 .map(|entry| {
339 let mut entry = entry
340 .map_err(|err| apache_avro::Error::DeserializeValue(err.to_string()))?;
341 *entry.status_mut() = Status::Existing;
342 if entry.sequence_number().is_none() {
343 *entry.sequence_number_mut() = Some(manifest.sequence_number);
344 }
345 if entry.snapshot_id().is_none() {
346 *entry.snapshot_id_mut() = Some(manifest.added_snapshot_id);
347 }
348 to_value(entry)
349 })
350 .filter_map(Result::ok),
351 )?;
352
353 manifest.sequence_number = table_metadata.last_sequence_number + 1;
354
355 manifest.existing_files_count = Some(
356 manifest.existing_files_count.unwrap_or(0) + manifest.added_files_count.unwrap_or(0),
357 );
358
359 manifest.added_files_count = None;
360
361 Ok(ManifestWriter {
362 manifest,
363 writer,
364 table_metadata,
365 })
366 }
367
368 pub(crate) fn from_existing_with_filter(
404 bytes: &[u8],
405 mut manifest: ManifestListEntry,
406 filter: &HashSet<String>,
407 schema: &'schema AvroSchema,
408 table_metadata: &'metadata TableMetadata,
409 branch: Option<&str>,
410 ) -> Result<Self, Error> {
411 let manifest_reader = ManifestReader::new(bytes)?;
412
413 let mut writer = AvroWriter::new(schema, Vec::new());
414
415 writer.add_user_metadata(
416 "format-version".to_string(),
417 match table_metadata.format_version {
418 FormatVersion::V1 => "1".as_bytes(),
419 FormatVersion::V2 => "2".as_bytes(),
420 },
421 )?;
422
423 writer.add_user_metadata(
424 "schema".to_string(),
425 match table_metadata.format_version {
426 FormatVersion::V1 => serde_json::to_string(&Into::<SchemaV1>::into(
427 table_metadata.current_schema(branch)?.clone(),
428 ))?,
429 FormatVersion::V2 => serde_json::to_string(&Into::<SchemaV2>::into(
430 table_metadata.current_schema(branch)?.clone(),
431 ))?,
432 },
433 )?;
434
435 writer.add_user_metadata(
436 "schema-id".to_string(),
437 serde_json::to_string(&table_metadata.current_schema(branch)?.schema_id())?,
438 )?;
439
440 let spec_id = table_metadata.default_spec_id;
441
442 writer.add_user_metadata(
443 "partition-spec".to_string(),
444 serde_json::to_string(
445 &table_metadata
446 .partition_specs
447 .get(&spec_id)
448 .ok_or(Error::NotFound(format!("Partition spec with id {spec_id}")))?
449 .fields(),
450 )?,
451 )?;
452
453 writer.add_user_metadata(
454 "partition-spec-id".to_string(),
455 serde_json::to_string(&spec_id)?,
456 )?;
457
458 writer.add_user_metadata("content".to_string(), "data")?;
459
460 writer.extend(manifest_reader.filter_map(|entry| {
461 let mut entry = entry
462 .map_err(|err| apache_avro::Error::DeserializeValue(err.to_string()))
463 .unwrap();
464 if !filter.contains(entry.data_file().file_path()) {
465 *entry.status_mut() = Status::Existing;
466 if entry.sequence_number().is_none() {
467 *entry.sequence_number_mut() = Some(manifest.sequence_number);
468 }
469 if entry.snapshot_id().is_none() {
470 *entry.snapshot_id_mut() = Some(manifest.added_snapshot_id);
471 }
472 Some(to_value(entry).unwrap())
473 } else {
474 None
475 }
476 }))?;
477
478 manifest.sequence_number = table_metadata.last_sequence_number + 1;
479
480 manifest.existing_files_count = Some(
481 manifest.existing_files_count.unwrap_or(0) + manifest.added_files_count.unwrap_or(0),
482 );
483
484 manifest.added_files_count = None;
485
486 Ok(ManifestWriter {
487 manifest,
488 writer,
489 table_metadata,
490 })
491 }
492
493 pub(crate) fn append(&mut self, manifest_entry: ManifestEntry) -> Result<(), Error> {
513 let mut added_rows_count = 0;
514 let mut deleted_rows_count = 0;
515
516 if self.manifest.partitions.is_none() {
517 self.manifest.partitions = Some(
518 self.table_metadata
519 .default_partition_spec()?
520 .fields()
521 .iter()
522 .map(|_| FieldSummary {
523 contains_null: false,
524 contains_nan: None,
525 lower_bound: None,
526 upper_bound: None,
527 })
528 .collect::<Vec<FieldSummary>>(),
529 );
530 }
531
532 match manifest_entry.data_file().content() {
533 Content::Data => {
534 added_rows_count += manifest_entry.data_file().record_count();
535 }
536 Content::EqualityDeletes => {
537 deleted_rows_count += manifest_entry.data_file().record_count();
538 }
539 _ => (),
540 }
541 let status = *manifest_entry.status();
542
543 update_partitions(
544 self.manifest.partitions.as_mut().unwrap(),
545 manifest_entry.data_file().partition(),
546 self.table_metadata.default_partition_spec()?.fields(),
547 )?;
548
549 if let Some(sequence_number) = manifest_entry.sequence_number() {
550 if self.manifest.min_sequence_number > *sequence_number {
551 self.manifest.min_sequence_number = *sequence_number;
552 }
553 };
554
555 self.writer.append_ser(manifest_entry)?;
556
557 match status {
558 Status::Added => {
559 self.manifest.added_files_count = match self.manifest.added_files_count {
560 Some(count) => Some(count + 1),
561 None => Some(1),
562 };
563 }
564 Status::Existing => {
565 self.manifest.existing_files_count = match self.manifest.existing_files_count {
566 Some(count) => Some(count + 1),
567 None => Some(1),
568 };
569 }
570 Status::Deleted => (),
571 }
572
573 self.manifest.added_rows_count = match self.manifest.added_rows_count {
574 Some(count) => Some(count + added_rows_count),
575 None => Some(added_rows_count),
576 };
577
578 self.manifest.deleted_rows_count = match self.manifest.deleted_rows_count {
579 Some(count) => Some(count + deleted_rows_count),
580 None => Some(deleted_rows_count),
581 };
582
583 Ok(())
584 }
585
586 pub(crate) async fn finish(
604 mut self,
605 object_store: Arc<dyn ObjectStore>,
606 ) -> Result<ManifestListEntry, Error> {
607 let manifest_bytes = self.writer.into_inner()?;
608
609 let manifest_length: i64 = manifest_bytes.len() as i64;
610
611 self.manifest.manifest_length += manifest_length;
612
613 object_store
614 .put(
615 &strip_prefix(&self.manifest.manifest_path).as_str().into(),
616 manifest_bytes.into(),
617 )
618 .await?;
619 Ok(self.manifest)
620 }
621}
622
623#[allow(clippy::type_complexity)]
624fn avro_value_to_manifest_entry(
626 value: (
627 Result<AvroValue, apache_avro::Error>,
628 Arc<(Schema, PartitionSpec, FormatVersion)>,
629 ),
630) -> Result<ManifestEntry, Error> {
631 let entry = value.0?;
632 let schema = &value.1 .0;
633 let partition_spec = &value.1 .1;
634 let format_version = &value.1 .2;
635 match format_version {
636 FormatVersion::V2 => ManifestEntry::try_from_v2(
637 apache_avro::from_value::<ManifestEntryV2>(&entry)?,
638 schema,
639 partition_spec,
640 )
641 .map_err(Error::from),
642 FormatVersion::V1 => ManifestEntry::try_from_v1(
643 apache_avro::from_value::<ManifestEntryV1>(&entry)?,
644 schema,
645 partition_spec,
646 )
647 .map_err(Error::from),
648 }
649}
650
651fn update_partitions(
652 partitions: &mut [FieldSummary],
653 partition_values: &Struct,
654 partition_columns: &[PartitionField],
655) -> Result<(), Error> {
656 for (field, summary) in partition_columns.iter().zip(partitions.iter_mut()) {
657 let value = partition_values.get(field.name()).and_then(|x| x.as_ref());
658 if let Some(value) = value {
659 if summary.lower_bound.is_none() {
660 summary.lower_bound = Some(value.clone());
661 } else if let Some(lower_bound) = &mut summary.lower_bound {
662 match (value, lower_bound) {
663 (Value::Int(val), Value::Int(current)) => {
664 if *current > *val {
665 *current = *val
666 }
667 }
668 (Value::LongInt(val), Value::LongInt(current)) => {
669 if *current > *val {
670 *current = *val
671 }
672 }
673 (Value::Float(val), Value::Float(current)) => {
674 if *current > *val {
675 *current = *val
676 }
677 }
678 (Value::Double(val), Value::Double(current)) => {
679 if *current > *val {
680 *current = *val
681 }
682 }
683 (Value::Date(val), Value::Date(current)) => {
684 if *current > *val {
685 *current = *val
686 }
687 }
688 (Value::Time(val), Value::Time(current)) => {
689 if *current > *val {
690 *current = *val
691 }
692 }
693 (Value::Timestamp(val), Value::Timestamp(current)) => {
694 if *current > *val {
695 *current = *val
696 }
697 }
698 (Value::TimestampTZ(val), Value::TimestampTZ(current)) => {
699 if *current > *val {
700 *current = *val
701 }
702 }
703 _ => {}
704 }
705 }
706 if summary.upper_bound.is_none() {
707 summary.upper_bound = Some(value.clone());
708 } else if let Some(upper_bound) = &mut summary.upper_bound {
709 match (value, upper_bound) {
710 (Value::Int(val), Value::Int(current)) => {
711 if *current < *val {
712 *current = *val
713 }
714 }
715 (Value::LongInt(val), Value::LongInt(current)) => {
716 if *current < *val {
717 *current = *val
718 }
719 }
720 (Value::Float(val), Value::Float(current)) => {
721 if *current < *val {
722 *current = *val
723 }
724 }
725 (Value::Double(val), Value::Double(current)) => {
726 if *current < *val {
727 *current = *val
728 }
729 }
730 (Value::Date(val), Value::Date(current)) => {
731 if *current < *val {
732 *current = *val
733 }
734 }
735 (Value::Time(val), Value::Time(current)) => {
736 if *current < *val {
737 *current = *val
738 }
739 }
740 (Value::Timestamp(val), Value::Timestamp(current)) => {
741 if *current < *val {
742 *current = *val
743 }
744 }
745 (Value::TimestampTZ(val), Value::TimestampTZ(current)) => {
746 if *current < *val {
747 *current = *val
748 }
749 }
750 _ => {}
751 }
752 }
753 }
754 }
755 Ok(())
756}
757
758#[cfg(test)]
760mod tests {}