iceberg_rust/table/
manifest.rs

1//! Provides functionality for reading and writing Iceberg manifest files.
2//!
3//! This module implements the core manifest handling capabilities:
4//! - Reading manifest files via [`ManifestReader`]
5//! - Writing new manifest files via [`ManifestWriter`]
6//! - Converting between manifest formats (V1/V2)
7//! - Managing manifest entries and their metadata
8//! - Tracking partition statistics and file counts
9//!
10//! Manifest files are a key part of Iceberg tables, containing:
11//! - Data file locations and metadata
12//! - Partition information
13//! - File statistics and metrics
14//! - Schema and partition spec references
15//!
16//! The module handles both V1 and V2 manifest formats transparently.
17
18use std::{
19    collections::HashSet,
20    future::Future,
21    io::Read,
22    iter::{repeat, Map, Repeat, Zip},
23    sync::Arc,
24};
25
26use apache_avro::{
27    to_value, types::Value as AvroValue, Reader as AvroReader, Schema as AvroSchema,
28    Writer as AvroWriter,
29};
30use futures::TryFutureExt;
31use iceberg_rust_spec::{
32    manifest::{Content, ManifestEntry, ManifestEntryV1, ManifestEntryV2, Status},
33    manifest_list::{self, FieldSummary, ManifestListEntry},
34    partition::{PartitionField, PartitionSpec},
35    schema::{Schema, SchemaV1, SchemaV2},
36    table_metadata::{FormatVersion, TableMetadata},
37    util::strip_prefix,
38    values::{Struct, Value},
39};
40use object_store::ObjectStore;
41
42use crate::error::Error;
43
44type ReaderZip<'a, R> = Zip<AvroReader<'a, R>, Repeat<Arc<(Schema, PartitionSpec, FormatVersion)>>>;
45type ReaderMap<'a, R> = Map<
46    ReaderZip<'a, R>,
47    fn(
48        (
49            Result<AvroValue, apache_avro::Error>,
50            Arc<(Schema, PartitionSpec, FormatVersion)>,
51        ),
52    ) -> Result<ManifestEntry, Error>,
53>;
54
55/// A reader for Iceberg manifest files that provides an iterator over manifest entries.
56///
57/// The reader handles both V1 and V2 manifest formats and automatically converts entries
58/// to the appropriate version based on the manifest metadata.
59///
60/// # Type Parameters
61/// * `'a` - The lifetime of the underlying reader
62/// * `R` - The type implementing `Read` that provides the manifest data
63pub(crate) struct ManifestReader<'a, R: Read> {
64    reader: ReaderMap<'a, R>,
65}
66
67impl<R: Read> Iterator for ManifestReader<'_, R> {
68    type Item = Result<ManifestEntry, Error>;
69    fn next(&mut self) -> Option<Self::Item> {
70        self.reader.next()
71    }
72}
73
74impl<R: Read> ManifestReader<'_, R> {
75    /// Creates a new ManifestReader from a reader implementing the Read trait.
76    ///
77    /// This method initializes a reader that can parse both V1 and V2 manifest formats.
78    /// It extracts metadata from the Avro file including format version, schema, and partition spec information.
79    ///
80    /// # Arguments
81    /// * `reader` - A type implementing the `Read` trait that provides access to the manifest file data
82    ///
83    /// # Returns
84    /// * `Result<Self, Error>` - A new ManifestReader instance or an error if initialization fails
85    ///
86    /// # Errors
87    /// Returns an error if:
88    /// * The Avro reader cannot be created
89    /// * Required metadata fields are missing
90    /// * Format version is invalid
91    /// * Schema or partition spec information cannot be parsed
92    pub(crate) fn new(reader: R) -> Result<Self, Error> {
93        let reader = AvroReader::new(reader)?;
94        let metadata = reader.user_metadata();
95
96        let format_version: FormatVersion = match metadata
97            .get("format-version")
98            .map(|bytes| String::from_utf8(bytes.clone()))
99            .transpose()?
100            .unwrap_or("1".to_string())
101            .as_str()
102        {
103            "1" => Ok(FormatVersion::V1),
104            "2" => Ok(FormatVersion::V2),
105            _ => Err(Error::InvalidFormat("format version".to_string())),
106        }?;
107
108        let schema: Schema = match format_version {
109            FormatVersion::V1 => TryFrom::<SchemaV1>::try_from(serde_json::from_slice(
110                metadata
111                    .get("schema")
112                    .ok_or(Error::InvalidFormat("manifest metadata".to_string()))?,
113            )?)?,
114            FormatVersion::V2 => TryFrom::<SchemaV2>::try_from(serde_json::from_slice(
115                metadata
116                    .get("schema")
117                    .ok_or(Error::InvalidFormat("manifest metadata".to_string()))?,
118            )?)?,
119        };
120
121        let partition_fields: Vec<PartitionField> = serde_json::from_slice(
122            metadata
123                .get("partition-spec")
124                .ok_or(Error::InvalidFormat("manifest metadata".to_string()))?,
125        )?;
126        let spec_id: i32 = metadata
127            .get("partition-spec-id")
128            .map(|x| String::from_utf8(x.clone()))
129            .transpose()?
130            .unwrap_or("0".to_string())
131            .parse()?;
132        let partition_spec = PartitionSpec::builder()
133            .with_spec_id(spec_id)
134            .with_fields(partition_fields)
135            .build()?;
136        Ok(Self {
137            reader: reader
138                .zip(repeat(Arc::new((schema, partition_spec, format_version))))
139                .map(avro_value_to_manifest_entry),
140        })
141    }
142}
143
144/// A writer for Iceberg manifest files that handles creating and updating manifest entries.
145///
146/// ManifestWriter manages both creating new manifests and updating existing ones, handling
147/// the complexities of manifest metadata, entry tracking, and partition summaries.
148///
149/// # Type Parameters
150/// * `'schema` - The lifetime of the Avro schema used for writing entries
151/// * `'metadata` - The lifetime of the table metadata reference
152///
153/// # Fields
154/// * `table_metadata` - Reference to the table's metadata containing schema and partition information
155/// * `manifest` - The manifest list entry being built or modified
156/// * `writer` - The underlying Avro writer for serializing manifest entries
157pub(crate) struct ManifestWriter<'schema, 'metadata> {
158    table_metadata: &'metadata TableMetadata,
159    manifest: ManifestListEntry,
160    writer: AvroWriter<'schema, Vec<u8>>,
161}
162
163impl<'schema, 'metadata> ManifestWriter<'schema, 'metadata> {
164    /// Creates a new ManifestWriter for writing manifest entries to a new manifest file.
165    ///
166    /// # Arguments
167    /// * `manifest_location` - The location where the manifest file will be written
168    /// * `snapshot_id` - The ID of the snapshot this manifest belongs to
169    /// * `schema` - The Avro schema used for serializing manifest entries
170    /// * `table_metadata` - The table metadata containing schema and partition information
171    /// * `branch` - Optional branch name to get the current schema from
172    ///
173    /// # Returns
174    /// * `Result<Self, Error>` - A new ManifestWriter instance or an error if initialization fails
175    ///
176    /// # Errors
177    /// Returns an error if:
178    /// * The Avro writer cannot be created
179    /// * Required metadata fields cannot be serialized
180    /// * The partition spec ID is not found in table metadata
181    pub(crate) fn new(
182        manifest_location: &str,
183        snapshot_id: i64,
184        schema: &'schema AvroSchema,
185        table_metadata: &'metadata TableMetadata,
186        content: manifest_list::Content,
187        branch: Option<&str>,
188    ) -> Result<Self, Error> {
189        let mut writer = AvroWriter::new(schema, Vec::new());
190
191        writer.add_user_metadata(
192            "format-version".to_string(),
193            match table_metadata.format_version {
194                FormatVersion::V1 => "1".as_bytes(),
195                FormatVersion::V2 => "2".as_bytes(),
196            },
197        )?;
198
199        writer.add_user_metadata(
200            "schema".to_string(),
201            match table_metadata.format_version {
202                FormatVersion::V1 => serde_json::to_string(&Into::<SchemaV1>::into(
203                    table_metadata.current_schema(branch)?.clone(),
204                ))?,
205                FormatVersion::V2 => serde_json::to_string(&Into::<SchemaV2>::into(
206                    table_metadata.current_schema(branch)?.clone(),
207                ))?,
208            },
209        )?;
210
211        writer.add_user_metadata(
212            "schema-id".to_string(),
213            serde_json::to_string(&table_metadata.current_schema(branch)?.schema_id())?,
214        )?;
215
216        let spec_id = table_metadata.default_spec_id;
217
218        writer.add_user_metadata(
219            "partition-spec".to_string(),
220            serde_json::to_string(
221                &table_metadata
222                    .partition_specs
223                    .get(&spec_id)
224                    .ok_or(Error::NotFound(format!("Partition spec with id {spec_id}")))?
225                    .fields(),
226            )?,
227        )?;
228
229        writer.add_user_metadata(
230            "partition-spec-id".to_string(),
231            serde_json::to_string(&spec_id)?,
232        )?;
233
234        writer.add_user_metadata(
235            "content".to_string(),
236            match content {
237                manifest_list::Content::Data => "data",
238                manifest_list::Content::Deletes => "deletes",
239            },
240        )?;
241
242        let manifest = ManifestListEntry {
243            format_version: table_metadata.format_version,
244            manifest_path: manifest_location.to_owned(),
245            manifest_length: 0,
246            partition_spec_id: table_metadata.default_spec_id,
247            content,
248            sequence_number: table_metadata.last_sequence_number + 1,
249            min_sequence_number: table_metadata.last_sequence_number + 1,
250            added_snapshot_id: snapshot_id,
251            added_files_count: Some(0),
252            existing_files_count: Some(0),
253            deleted_files_count: Some(0),
254            added_rows_count: Some(0),
255            existing_rows_count: Some(0),
256            deleted_rows_count: Some(0),
257            partitions: None,
258            key_metadata: None,
259        };
260
261        Ok(ManifestWriter {
262            manifest,
263            writer,
264            table_metadata,
265        })
266    }
267
268    /// Creates a ManifestWriter from an existing manifest file, preserving its entries.
269    ///
270    /// This method reads an existing manifest file and creates a new writer that includes
271    /// all the existing entries with their status updated to "Existing". It also updates
272    /// sequence numbers and snapshot IDs as needed.
273    ///
274    /// # Arguments
275    /// * `bytes` - The raw bytes of the existing manifest file
276    /// * `manifest` - The manifest list entry describing the existing manifest
277    /// * `schema` - The Avro schema used for serializing manifest entries
278    /// * `table_metadata` - The table metadata containing schema and partition information
279    /// * `branch` - Optional branch name to get the current schema from
280    ///
281    /// # Returns
282    /// * `Result<Self, Error>` - A new ManifestWriter instance or an error if initialization fails
283    ///
284    /// # Errors
285    /// Returns an error if:
286    /// * The existing manifest cannot be read
287    /// * The Avro writer cannot be created
288    /// * Required metadata fields cannot be serialized
289    /// * The partition spec ID is not found in table metadata
290    pub(crate) fn from_existing(
291        manifest_reader: impl Iterator<Item = Result<ManifestEntry, Error>>,
292        mut manifest: ManifestListEntry,
293        schema: &'schema AvroSchema,
294        table_metadata: &'metadata TableMetadata,
295        branch: Option<&str>,
296    ) -> Result<Self, Error> {
297        let mut writer = AvroWriter::new(schema, Vec::new());
298
299        writer.add_user_metadata(
300            "format-version".to_string(),
301            match table_metadata.format_version {
302                FormatVersion::V1 => "1".as_bytes(),
303                FormatVersion::V2 => "2".as_bytes(),
304            },
305        )?;
306
307        writer.add_user_metadata(
308            "schema".to_string(),
309            match table_metadata.format_version {
310                FormatVersion::V1 => serde_json::to_string(&Into::<SchemaV1>::into(
311                    table_metadata.current_schema(branch)?.clone(),
312                ))?,
313                FormatVersion::V2 => serde_json::to_string(&Into::<SchemaV2>::into(
314                    table_metadata.current_schema(branch)?.clone(),
315                ))?,
316            },
317        )?;
318
319        writer.add_user_metadata(
320            "schema-id".to_string(),
321            serde_json::to_string(&table_metadata.current_schema(branch)?.schema_id())?,
322        )?;
323
324        let spec_id = table_metadata.default_spec_id;
325
326        writer.add_user_metadata(
327            "partition-spec".to_string(),
328            serde_json::to_string(
329                &table_metadata
330                    .partition_specs
331                    .get(&spec_id)
332                    .ok_or(Error::NotFound(format!("Partition spec with id {spec_id}")))?
333                    .fields(),
334            )?,
335        )?;
336
337        writer.add_user_metadata(
338            "partition-spec-id".to_string(),
339            serde_json::to_string(&spec_id)?,
340        )?;
341
342        writer.add_user_metadata(
343            "content".to_string(),
344            match manifest.content {
345                manifest_list::Content::Data => "data",
346                manifest_list::Content::Deletes => "deletes",
347            },
348        )?;
349
350        writer.extend(
351            manifest_reader
352                .map(|entry| {
353                    let mut entry = entry
354                        .map_err(|err| apache_avro::Error::DeserializeValue(err.to_string()))?;
355                    *entry.status_mut() = Status::Existing;
356                    if entry.sequence_number().is_none() {
357                        *entry.sequence_number_mut() = Some(manifest.sequence_number);
358                    }
359                    if entry.snapshot_id().is_none() {
360                        *entry.snapshot_id_mut() = Some(manifest.added_snapshot_id);
361                    }
362                    to_value(entry)
363                })
364                .filter_map(Result::ok),
365        )?;
366
367        manifest.sequence_number = table_metadata.last_sequence_number + 1;
368
369        manifest.existing_files_count = Some(
370            manifest.existing_files_count.unwrap_or(0) + manifest.added_files_count.unwrap_or(0),
371        );
372
373        manifest.added_files_count = None;
374
375        Ok(ManifestWriter {
376            manifest,
377            writer,
378            table_metadata,
379        })
380    }
381
382    /// Creates a ManifestWriter from an existing manifest file with selective filtering of entries.
383    ///
384    /// This method reads an existing manifest file and creates a new writer that includes
385    /// only the entries whose file paths are NOT in the provided filter set. Entries that
386    /// pass the filter have their status updated to "Existing" and their sequence numbers
387    /// and snapshot IDs updated as needed.
388    ///
389    /// This is particularly useful for overwrite operations where specific files need to be
390    /// excluded from the new manifest while preserving other existing entries.
391    ///
392    /// # Arguments
393    /// * `bytes` - The raw bytes of the existing manifest file
394    /// * `manifest` - The manifest list entry describing the existing manifest
395    /// * `filter` - A set of file paths to exclude from the new manifest
396    /// * `schema` - The Avro schema used for serializing manifest entries
397    /// * `table_metadata` - The table metadata containing schema and partition information
398    /// * `branch` - Optional branch name to get the current schema from
399    ///
400    /// # Returns
401    /// * `Result<Self, Error>` - A new ManifestWriter instance or an error if initialization fails
402    ///
403    /// # Errors
404    /// Returns an error if:
405    /// * The existing manifest cannot be read
406    /// * The Avro writer cannot be created
407    /// * Required metadata fields cannot be serialized
408    /// * The partition spec ID is not found in table metadata
409    ///
410    /// # Behavior
411    /// - Entries whose file paths are in the `filter` set are excluded from the new manifest
412    /// - Remaining entries have their status set to `Status::Existing`
413    /// - Sequence numbers are updated for entries that don't have them
414    /// - Snapshot IDs are updated for entries that don't have them
415    /// - The manifest's sequence number is incremented
416    /// - File counts are updated to reflect the filtered entries
417    pub(crate) fn from_existing_with_filter(
418        bytes: &[u8],
419        mut manifest: ManifestListEntry,
420        filter: &HashSet<String>,
421        schema: &'schema AvroSchema,
422        table_metadata: &'metadata TableMetadata,
423        branch: Option<&str>,
424    ) -> Result<Self, Error> {
425        let manifest_reader = ManifestReader::new(bytes)?;
426
427        let mut writer = AvroWriter::new(schema, Vec::new());
428
429        writer.add_user_metadata(
430            "format-version".to_string(),
431            match table_metadata.format_version {
432                FormatVersion::V1 => "1".as_bytes(),
433                FormatVersion::V2 => "2".as_bytes(),
434            },
435        )?;
436
437        writer.add_user_metadata(
438            "schema".to_string(),
439            match table_metadata.format_version {
440                FormatVersion::V1 => serde_json::to_string(&Into::<SchemaV1>::into(
441                    table_metadata.current_schema(branch)?.clone(),
442                ))?,
443                FormatVersion::V2 => serde_json::to_string(&Into::<SchemaV2>::into(
444                    table_metadata.current_schema(branch)?.clone(),
445                ))?,
446            },
447        )?;
448
449        writer.add_user_metadata(
450            "schema-id".to_string(),
451            serde_json::to_string(&table_metadata.current_schema(branch)?.schema_id())?,
452        )?;
453
454        let spec_id = table_metadata.default_spec_id;
455
456        writer.add_user_metadata(
457            "partition-spec".to_string(),
458            serde_json::to_string(
459                &table_metadata
460                    .partition_specs
461                    .get(&spec_id)
462                    .ok_or(Error::NotFound(format!("Partition spec with id {spec_id}")))?
463                    .fields(),
464            )?,
465        )?;
466
467        writer.add_user_metadata(
468            "partition-spec-id".to_string(),
469            serde_json::to_string(&spec_id)?,
470        )?;
471
472        writer.add_user_metadata(
473            "content".to_string(),
474            match manifest.content {
475                manifest_list::Content::Data => "data",
476                manifest_list::Content::Deletes => "deletes",
477            },
478        )?;
479
480        writer.extend(manifest_reader.filter_map(|entry| {
481            let mut entry = entry
482                .map_err(|err| apache_avro::Error::DeserializeValue(err.to_string()))
483                .unwrap();
484            if !filter.contains(entry.data_file().file_path()) {
485                *entry.status_mut() = Status::Existing;
486                if entry.sequence_number().is_none() {
487                    *entry.sequence_number_mut() = Some(manifest.sequence_number);
488                }
489                if entry.snapshot_id().is_none() {
490                    *entry.snapshot_id_mut() = Some(manifest.added_snapshot_id);
491                }
492                Some(to_value(entry).unwrap())
493            } else {
494                None
495            }
496        }))?;
497
498        manifest.sequence_number = table_metadata.last_sequence_number + 1;
499
500        manifest.existing_files_count = Some(
501            manifest.existing_files_count.unwrap_or(0) + manifest.added_files_count.unwrap_or(0),
502        );
503
504        manifest.added_files_count = None;
505
506        Ok(ManifestWriter {
507            manifest,
508            writer,
509            table_metadata,
510        })
511    }
512
513    /// Appends a manifest entry to the manifest file and updates summary statistics.
514    ///
515    /// This method adds a new manifest entry while maintaining:
516    /// - Partition statistics (null values, bounds)
517    /// - File counts by status (added, existing, deleted)
518    /// - Row counts (added, deleted)
519    /// - Sequence number tracking
520    ///
521    /// # Arguments
522    /// * `manifest_entry` - The manifest entry to append
523    ///
524    /// # Returns
525    /// * `Result<(), Error>` - Ok if the entry was successfully appended, Error otherwise
526    ///
527    /// # Errors
528    /// Returns an error if:
529    /// * The entry cannot be serialized
530    /// * Partition statistics cannot be updated
531    /// * The default partition spec is not found
532    pub(crate) fn append(&mut self, manifest_entry: ManifestEntry) -> Result<(), Error> {
533        let mut added_rows_count = 0;
534        let mut deleted_rows_count = 0;
535
536        if self.manifest.partitions.is_none() {
537            self.manifest.partitions = Some(
538                self.table_metadata
539                    .default_partition_spec()?
540                    .fields()
541                    .iter()
542                    .map(|_| FieldSummary {
543                        contains_null: false,
544                        contains_nan: None,
545                        lower_bound: None,
546                        upper_bound: None,
547                    })
548                    .collect::<Vec<FieldSummary>>(),
549            );
550        }
551
552        match manifest_entry.data_file().content() {
553            Content::Data => {
554                added_rows_count += manifest_entry.data_file().record_count();
555            }
556            Content::EqualityDeletes => {
557                deleted_rows_count += manifest_entry.data_file().record_count();
558            }
559            _ => (),
560        }
561        let status = *manifest_entry.status();
562
563        update_partitions(
564            self.manifest.partitions.as_mut().unwrap(),
565            manifest_entry.data_file().partition(),
566            self.table_metadata.default_partition_spec()?.fields(),
567        )?;
568
569        if let Some(sequence_number) = manifest_entry.sequence_number() {
570            if self.manifest.min_sequence_number > *sequence_number {
571                self.manifest.min_sequence_number = *sequence_number;
572            }
573        };
574
575        self.writer.append_ser(manifest_entry)?;
576
577        match status {
578            Status::Added => {
579                self.manifest.added_files_count = match self.manifest.added_files_count {
580                    Some(count) => Some(count + 1),
581                    None => Some(1),
582                };
583            }
584            Status::Existing => {
585                self.manifest.existing_files_count = match self.manifest.existing_files_count {
586                    Some(count) => Some(count + 1),
587                    None => Some(1),
588                };
589            }
590            Status::Deleted => (),
591        }
592
593        self.manifest.added_rows_count = match self.manifest.added_rows_count {
594            Some(count) => Some(count + added_rows_count),
595            None => Some(added_rows_count),
596        };
597
598        self.manifest.deleted_rows_count = match self.manifest.deleted_rows_count {
599            Some(count) => Some(count + deleted_rows_count),
600            None => Some(deleted_rows_count),
601        };
602
603        Ok(())
604    }
605
606    /// Finalizes the manifest writer and writes the manifest file to storage.
607    ///
608    /// This method:
609    /// 1. Completes writing all entries
610    /// 2. Updates the manifest length
611    /// 3. Writes the manifest file to the object store
612    ///
613    /// # Arguments
614    /// * `object_store` - The object store to write the manifest file to
615    ///
616    /// # Returns
617    /// * `Result<ManifestListEntry, Error>` - The completed manifest list entry or an error
618    ///
619    /// # Errors
620    /// Returns an error if:
621    /// * The writer cannot be finalized
622    /// * The manifest file cannot be written to storage
623    pub(crate) async fn finish(
624        mut self,
625        object_store: Arc<dyn ObjectStore>,
626    ) -> Result<ManifestListEntry, Error> {
627        let manifest_bytes = self.writer.into_inner()?;
628
629        let manifest_length: i64 = manifest_bytes.len() as i64;
630
631        self.manifest.manifest_length += manifest_length;
632
633        object_store
634            .put(
635                &strip_prefix(&self.manifest.manifest_path).as_str().into(),
636                manifest_bytes.into(),
637            )
638            .await?;
639        Ok(self.manifest)
640    }
641
642    /// Finishes writing the manifest file concurrently.
643    ///
644    /// This method completes the manifest writing process by finalizing the writer
645    /// and returning both the manifest list entry and a future for the actual file upload.
646    /// The upload operation can be awaited separately, allowing for concurrent processing
647    /// of multiple manifest writes.
648    ///
649    /// # Arguments
650    ///
651    /// * `object_store` - The object store implementation used to persist the manifest file
652    ///
653    /// # Returns
654    ///
655    /// Returns a tuple containing:
656    /// - `ManifestListEntry`: The completed manifest entry with updated metadata
657    /// - `impl Future<Output = Result<PutResult, Error>>`: A future that performs the actual file upload
658    ///
659    /// # Errors
660    ///
661    /// Returns an error if:
662    /// - The writer cannot be finalized
663    /// - There are issues preparing the upload operation
664    pub(crate) fn finish_concurrently(
665        mut self,
666        object_store: Arc<dyn ObjectStore>,
667    ) -> Result<(ManifestListEntry, impl Future<Output = Result<(), Error>>), Error> {
668        let manifest_bytes = self.writer.into_inner()?;
669
670        let manifest_length: i64 = manifest_bytes.len() as i64;
671
672        self.manifest.manifest_length += manifest_length;
673
674        let path = strip_prefix(&self.manifest.manifest_path).as_str().into();
675        let future = async move {
676            object_store
677                .put(&path, manifest_bytes.into())
678                .map_ok(|_| ())
679                .map_err(Error::from)
680                .await
681        };
682        Ok((self.manifest, future))
683    }
684}
685
686#[allow(clippy::type_complexity)]
687/// Convert avro value to ManifestEntry based on the format version of the table.
688fn avro_value_to_manifest_entry(
689    value: (
690        Result<AvroValue, apache_avro::Error>,
691        Arc<(Schema, PartitionSpec, FormatVersion)>,
692    ),
693) -> Result<ManifestEntry, Error> {
694    let entry = value.0?;
695    let schema = &value.1 .0;
696    let partition_spec = &value.1 .1;
697    let format_version = &value.1 .2;
698    match format_version {
699        FormatVersion::V2 => ManifestEntry::try_from_v2(
700            apache_avro::from_value::<ManifestEntryV2>(&entry)?,
701            schema,
702            partition_spec,
703        )
704        .map_err(Error::from),
705        FormatVersion::V1 => ManifestEntry::try_from_v1(
706            apache_avro::from_value::<ManifestEntryV1>(&entry)?,
707            schema,
708            partition_spec,
709        )
710        .map_err(Error::from),
711    }
712}
713
714fn update_partitions(
715    partitions: &mut [FieldSummary],
716    partition_values: &Struct,
717    partition_columns: &[PartitionField],
718) -> Result<(), Error> {
719    for (field, summary) in partition_columns.iter().zip(partitions.iter_mut()) {
720        let value = partition_values.get(field.name()).and_then(|x| x.as_ref());
721        if let Some(value) = value {
722            if summary.lower_bound.is_none() {
723                summary.lower_bound = Some(value.clone());
724            } else if let Some(lower_bound) = &mut summary.lower_bound {
725                match (value, lower_bound) {
726                    (Value::Int(val), Value::Int(current)) => {
727                        if *current > *val {
728                            *current = *val
729                        }
730                    }
731                    (Value::LongInt(val), Value::LongInt(current)) => {
732                        if *current > *val {
733                            *current = *val
734                        }
735                    }
736                    (Value::Float(val), Value::Float(current)) => {
737                        if *current > *val {
738                            *current = *val
739                        }
740                    }
741                    (Value::Double(val), Value::Double(current)) => {
742                        if *current > *val {
743                            *current = *val
744                        }
745                    }
746                    (Value::Date(val), Value::Date(current)) => {
747                        if *current > *val {
748                            *current = *val
749                        }
750                    }
751                    (Value::Time(val), Value::Time(current)) => {
752                        if *current > *val {
753                            *current = *val
754                        }
755                    }
756                    (Value::Timestamp(val), Value::Timestamp(current)) => {
757                        if *current > *val {
758                            *current = *val
759                        }
760                    }
761                    (Value::TimestampTZ(val), Value::TimestampTZ(current)) => {
762                        if *current > *val {
763                            *current = *val
764                        }
765                    }
766                    _ => {}
767                }
768            }
769            if summary.upper_bound.is_none() {
770                summary.upper_bound = Some(value.clone());
771            } else if let Some(upper_bound) = &mut summary.upper_bound {
772                match (value, upper_bound) {
773                    (Value::Int(val), Value::Int(current)) => {
774                        if *current < *val {
775                            *current = *val
776                        }
777                    }
778                    (Value::LongInt(val), Value::LongInt(current)) => {
779                        if *current < *val {
780                            *current = *val
781                        }
782                    }
783                    (Value::Float(val), Value::Float(current)) => {
784                        if *current < *val {
785                            *current = *val
786                        }
787                    }
788                    (Value::Double(val), Value::Double(current)) => {
789                        if *current < *val {
790                            *current = *val
791                        }
792                    }
793                    (Value::Date(val), Value::Date(current)) => {
794                        if *current < *val {
795                            *current = *val
796                        }
797                    }
798                    (Value::Time(val), Value::Time(current)) => {
799                        if *current < *val {
800                            *current = *val
801                        }
802                    }
803                    (Value::Timestamp(val), Value::Timestamp(current)) => {
804                        if *current < *val {
805                            *current = *val
806                        }
807                    }
808                    (Value::TimestampTZ(val), Value::TimestampTZ(current)) => {
809                        if *current < *val {
810                            *current = *val
811                        }
812                    }
813                    _ => {}
814                }
815            }
816        }
817    }
818    Ok(())
819}
820
821/// TODO
822#[cfg(test)]
823mod tests {}