iceberg_rust/table/
manifest.rs

1//! Provides functionality for reading and writing Iceberg manifest files.
2//!
3//! This module implements the core manifest handling capabilities:
4//! - Reading manifest files via [`ManifestReader`]
5//! - Writing new manifest files via [`ManifestWriter`]
6//! - Converting between manifest formats (V1/V2)
7//! - Managing manifest entries and their metadata
8//! - Tracking partition statistics and file counts
9//!
10//! Manifest files are a key part of Iceberg tables, containing:
11//! - Data file locations and metadata
12//! - Partition information
13//! - File statistics and metrics
14//! - Schema and partition spec references
15//!
16//! The module handles both V1 and V2 manifest formats transparently.
17
18use std::{
19    collections::HashSet,
20    io::Read,
21    iter::{repeat, Map, Repeat, Zip},
22    sync::Arc,
23};
24
25use apache_avro::{
26    to_value, types::Value as AvroValue, Reader as AvroReader, Schema as AvroSchema,
27    Writer as AvroWriter,
28};
29use iceberg_rust_spec::{
30    manifest::{Content, ManifestEntry, ManifestEntryV1, ManifestEntryV2, Status},
31    manifest_list::{self, FieldSummary, ManifestListEntry},
32    partition::{PartitionField, PartitionSpec},
33    schema::{Schema, SchemaV1, SchemaV2},
34    table_metadata::{FormatVersion, TableMetadata},
35    util::strip_prefix,
36    values::{Struct, Value},
37};
38use object_store::ObjectStore;
39
40use crate::{error::Error, spec};
41
42type ReaderZip<'a, R> = Zip<AvroReader<'a, R>, Repeat<Arc<(Schema, PartitionSpec, FormatVersion)>>>;
43type ReaderMap<'a, R> = Map<
44    ReaderZip<'a, R>,
45    fn(
46        (
47            Result<AvroValue, apache_avro::Error>,
48            Arc<(Schema, PartitionSpec, FormatVersion)>,
49        ),
50    ) -> Result<ManifestEntry, Error>,
51>;
52
53/// A reader for Iceberg manifest files that provides an iterator over manifest entries.
54///
55/// The reader handles both V1 and V2 manifest formats and automatically converts entries
56/// to the appropriate version based on the manifest metadata.
57///
58/// # Type Parameters
59/// * `'a` - The lifetime of the underlying reader
60/// * `R` - The type implementing `Read` that provides the manifest data
61pub(crate) struct ManifestReader<'a, R: Read> {
62    reader: ReaderMap<'a, R>,
63}
64
65impl<R: Read> Iterator for ManifestReader<'_, R> {
66    type Item = Result<ManifestEntry, Error>;
67    fn next(&mut self) -> Option<Self::Item> {
68        self.reader.next()
69    }
70}
71
72impl<R: Read> ManifestReader<'_, R> {
73    /// Creates a new ManifestReader from a reader implementing the Read trait.
74    ///
75    /// This method initializes a reader that can parse both V1 and V2 manifest formats.
76    /// It extracts metadata from the Avro file including format version, schema, and partition spec information.
77    ///
78    /// # Arguments
79    /// * `reader` - A type implementing the `Read` trait that provides access to the manifest file data
80    ///
81    /// # Returns
82    /// * `Result<Self, Error>` - A new ManifestReader instance or an error if initialization fails
83    ///
84    /// # Errors
85    /// Returns an error if:
86    /// * The Avro reader cannot be created
87    /// * Required metadata fields are missing
88    /// * Format version is invalid
89    /// * Schema or partition spec information cannot be parsed
90    pub(crate) fn new(reader: R) -> Result<Self, Error> {
91        let reader = AvroReader::new(reader)?;
92        let metadata = reader.user_metadata();
93
94        let format_version: FormatVersion = match metadata
95            .get("format-version")
96            .map(|bytes| String::from_utf8(bytes.clone()))
97            .transpose()?
98            .unwrap_or("1".to_string())
99            .as_str()
100        {
101            "1" => Ok(FormatVersion::V1),
102            "2" => Ok(FormatVersion::V2),
103            _ => Err(Error::InvalidFormat("format version".to_string())),
104        }?;
105
106        let schema: Schema = match format_version {
107            FormatVersion::V1 => TryFrom::<SchemaV1>::try_from(serde_json::from_slice(
108                metadata
109                    .get("schema")
110                    .ok_or(Error::InvalidFormat("manifest metadata".to_string()))?,
111            )?)?,
112            FormatVersion::V2 => TryFrom::<SchemaV2>::try_from(serde_json::from_slice(
113                metadata
114                    .get("schema")
115                    .ok_or(Error::InvalidFormat("manifest metadata".to_string()))?,
116            )?)?,
117        };
118
119        let partition_fields: Vec<PartitionField> = serde_json::from_slice(
120            metadata
121                .get("partition-spec")
122                .ok_or(Error::InvalidFormat("manifest metadata".to_string()))?,
123        )?;
124        let spec_id: i32 = metadata
125            .get("partition-spec-id")
126            .map(|x| String::from_utf8(x.clone()))
127            .transpose()?
128            .unwrap_or("0".to_string())
129            .parse()?;
130        let partition_spec = PartitionSpec::builder()
131            .with_spec_id(spec_id)
132            .with_fields(partition_fields)
133            .build()
134            .map_err(spec::error::Error::from)?;
135        Ok(Self {
136            reader: reader
137                .zip(repeat(Arc::new((schema, partition_spec, format_version))))
138                .map(avro_value_to_manifest_entry),
139        })
140    }
141}
142
143/// A writer for Iceberg manifest files that handles creating and updating manifest entries.
144///
145/// ManifestWriter manages both creating new manifests and updating existing ones, handling
146/// the complexities of manifest metadata, entry tracking, and partition summaries.
147///
148/// # Type Parameters
149/// * `'schema` - The lifetime of the Avro schema used for writing entries
150/// * `'metadata` - The lifetime of the table metadata reference
151///
152/// # Fields
153/// * `table_metadata` - Reference to the table's metadata containing schema and partition information
154/// * `manifest` - The manifest list entry being built or modified
155/// * `writer` - The underlying Avro writer for serializing manifest entries
156pub(crate) struct ManifestWriter<'schema, 'metadata> {
157    table_metadata: &'metadata TableMetadata,
158    manifest: ManifestListEntry,
159    writer: AvroWriter<'schema, Vec<u8>>,
160}
161
162impl<'schema, 'metadata> ManifestWriter<'schema, 'metadata> {
163    /// Creates a new ManifestWriter for writing manifest entries to a new manifest file.
164    ///
165    /// # Arguments
166    /// * `manifest_location` - The location where the manifest file will be written
167    /// * `snapshot_id` - The ID of the snapshot this manifest belongs to
168    /// * `schema` - The Avro schema used for serializing manifest entries
169    /// * `table_metadata` - The table metadata containing schema and partition information
170    /// * `branch` - Optional branch name to get the current schema from
171    ///
172    /// # Returns
173    /// * `Result<Self, Error>` - A new ManifestWriter instance or an error if initialization fails
174    ///
175    /// # Errors
176    /// Returns an error if:
177    /// * The Avro writer cannot be created
178    /// * Required metadata fields cannot be serialized
179    /// * The partition spec ID is not found in table metadata
180    pub(crate) fn new(
181        manifest_location: &str,
182        snapshot_id: i64,
183        schema: &'schema AvroSchema,
184        table_metadata: &'metadata TableMetadata,
185        branch: Option<&str>,
186    ) -> Result<Self, Error> {
187        let mut writer = AvroWriter::new(schema, Vec::new());
188
189        writer.add_user_metadata(
190            "format-version".to_string(),
191            match table_metadata.format_version {
192                FormatVersion::V1 => "1".as_bytes(),
193                FormatVersion::V2 => "2".as_bytes(),
194            },
195        )?;
196
197        writer.add_user_metadata(
198            "schema".to_string(),
199            match table_metadata.format_version {
200                FormatVersion::V1 => serde_json::to_string(&Into::<SchemaV1>::into(
201                    table_metadata.current_schema(branch)?.clone(),
202                ))?,
203                FormatVersion::V2 => serde_json::to_string(&Into::<SchemaV2>::into(
204                    table_metadata.current_schema(branch)?.clone(),
205                ))?,
206            },
207        )?;
208
209        writer.add_user_metadata(
210            "schema-id".to_string(),
211            serde_json::to_string(&table_metadata.current_schema(branch)?.schema_id())?,
212        )?;
213
214        let spec_id = table_metadata.default_spec_id;
215
216        writer.add_user_metadata(
217            "partition-spec".to_string(),
218            serde_json::to_string(
219                &table_metadata
220                    .partition_specs
221                    .get(&spec_id)
222                    .ok_or(Error::NotFound(format!("Partition spec with id {spec_id}")))?
223                    .fields(),
224            )?,
225        )?;
226
227        writer.add_user_metadata(
228            "partition-spec-id".to_string(),
229            serde_json::to_string(&spec_id)?,
230        )?;
231
232        writer.add_user_metadata("content".to_string(), "data")?;
233
234        let manifest = ManifestListEntry {
235            format_version: table_metadata.format_version,
236            manifest_path: manifest_location.to_owned(),
237            manifest_length: 0,
238            partition_spec_id: table_metadata.default_spec_id,
239            content: manifest_list::Content::Data,
240            sequence_number: table_metadata.last_sequence_number + 1,
241            min_sequence_number: table_metadata.last_sequence_number + 1,
242            added_snapshot_id: snapshot_id,
243            added_files_count: Some(0),
244            existing_files_count: Some(0),
245            deleted_files_count: Some(0),
246            added_rows_count: Some(0),
247            existing_rows_count: Some(0),
248            deleted_rows_count: Some(0),
249            partitions: None,
250            key_metadata: None,
251        };
252
253        Ok(ManifestWriter {
254            manifest,
255            writer,
256            table_metadata,
257        })
258    }
259
260    /// Creates a ManifestWriter from an existing manifest file, preserving its entries.
261    ///
262    /// This method reads an existing manifest file and creates a new writer that includes
263    /// all the existing entries with their status updated to "Existing". It also updates
264    /// sequence numbers and snapshot IDs as needed.
265    ///
266    /// # Arguments
267    /// * `bytes` - The raw bytes of the existing manifest file
268    /// * `manifest` - The manifest list entry describing the existing manifest
269    /// * `schema` - The Avro schema used for serializing manifest entries
270    /// * `table_metadata` - The table metadata containing schema and partition information
271    /// * `branch` - Optional branch name to get the current schema from
272    ///
273    /// # Returns
274    /// * `Result<Self, Error>` - A new ManifestWriter instance or an error if initialization fails
275    ///
276    /// # Errors
277    /// Returns an error if:
278    /// * The existing manifest cannot be read
279    /// * The Avro writer cannot be created
280    /// * Required metadata fields cannot be serialized
281    /// * The partition spec ID is not found in table metadata
282    pub(crate) fn from_existing(
283        manifest_reader: impl Iterator<Item = Result<ManifestEntry, Error>>,
284        mut manifest: ManifestListEntry,
285        schema: &'schema AvroSchema,
286        table_metadata: &'metadata TableMetadata,
287        branch: Option<&str>,
288    ) -> Result<Self, Error> {
289        let mut writer = AvroWriter::new(schema, Vec::new());
290
291        writer.add_user_metadata(
292            "format-version".to_string(),
293            match table_metadata.format_version {
294                FormatVersion::V1 => "1".as_bytes(),
295                FormatVersion::V2 => "2".as_bytes(),
296            },
297        )?;
298
299        writer.add_user_metadata(
300            "schema".to_string(),
301            match table_metadata.format_version {
302                FormatVersion::V1 => serde_json::to_string(&Into::<SchemaV1>::into(
303                    table_metadata.current_schema(branch)?.clone(),
304                ))?,
305                FormatVersion::V2 => serde_json::to_string(&Into::<SchemaV2>::into(
306                    table_metadata.current_schema(branch)?.clone(),
307                ))?,
308            },
309        )?;
310
311        writer.add_user_metadata(
312            "schema-id".to_string(),
313            serde_json::to_string(&table_metadata.current_schema(branch)?.schema_id())?,
314        )?;
315
316        let spec_id = table_metadata.default_spec_id;
317
318        writer.add_user_metadata(
319            "partition-spec".to_string(),
320            serde_json::to_string(
321                &table_metadata
322                    .partition_specs
323                    .get(&spec_id)
324                    .ok_or(Error::NotFound(format!("Partition spec with id {spec_id}")))?
325                    .fields(),
326            )?,
327        )?;
328
329        writer.add_user_metadata(
330            "partition-spec-id".to_string(),
331            serde_json::to_string(&spec_id)?,
332        )?;
333
334        writer.add_user_metadata("content".to_string(), "data")?;
335
336        writer.extend(
337            manifest_reader
338                .map(|entry| {
339                    let mut entry = entry
340                        .map_err(|err| apache_avro::Error::DeserializeValue(err.to_string()))?;
341                    *entry.status_mut() = Status::Existing;
342                    if entry.sequence_number().is_none() {
343                        *entry.sequence_number_mut() = Some(manifest.sequence_number);
344                    }
345                    if entry.snapshot_id().is_none() {
346                        *entry.snapshot_id_mut() = Some(manifest.added_snapshot_id);
347                    }
348                    to_value(entry)
349                })
350                .filter_map(Result::ok),
351        )?;
352
353        manifest.sequence_number = table_metadata.last_sequence_number + 1;
354
355        manifest.existing_files_count = Some(
356            manifest.existing_files_count.unwrap_or(0) + manifest.added_files_count.unwrap_or(0),
357        );
358
359        manifest.added_files_count = None;
360
361        Ok(ManifestWriter {
362            manifest,
363            writer,
364            table_metadata,
365        })
366    }
367
368    /// Creates a ManifestWriter from an existing manifest file with selective filtering of entries.
369    ///
370    /// This method reads an existing manifest file and creates a new writer that includes
371    /// only the entries whose file paths are NOT in the provided filter set. Entries that
372    /// pass the filter have their status updated to "Existing" and their sequence numbers
373    /// and snapshot IDs updated as needed.
374    ///
375    /// This is particularly useful for overwrite operations where specific files need to be
376    /// excluded from the new manifest while preserving other existing entries.
377    ///
378    /// # Arguments
379    /// * `bytes` - The raw bytes of the existing manifest file
380    /// * `manifest` - The manifest list entry describing the existing manifest
381    /// * `filter` - A set of file paths to exclude from the new manifest
382    /// * `schema` - The Avro schema used for serializing manifest entries
383    /// * `table_metadata` - The table metadata containing schema and partition information
384    /// * `branch` - Optional branch name to get the current schema from
385    ///
386    /// # Returns
387    /// * `Result<Self, Error>` - A new ManifestWriter instance or an error if initialization fails
388    ///
389    /// # Errors
390    /// Returns an error if:
391    /// * The existing manifest cannot be read
392    /// * The Avro writer cannot be created
393    /// * Required metadata fields cannot be serialized
394    /// * The partition spec ID is not found in table metadata
395    ///
396    /// # Behavior
397    /// - Entries whose file paths are in the `filter` set are excluded from the new manifest
398    /// - Remaining entries have their status set to `Status::Existing`
399    /// - Sequence numbers are updated for entries that don't have them
400    /// - Snapshot IDs are updated for entries that don't have them
401    /// - The manifest's sequence number is incremented
402    /// - File counts are updated to reflect the filtered entries
403    pub(crate) fn from_existing_with_filter(
404        bytes: &[u8],
405        mut manifest: ManifestListEntry,
406        filter: &HashSet<String>,
407        schema: &'schema AvroSchema,
408        table_metadata: &'metadata TableMetadata,
409        branch: Option<&str>,
410    ) -> Result<Self, Error> {
411        let manifest_reader = ManifestReader::new(bytes)?;
412
413        let mut writer = AvroWriter::new(schema, Vec::new());
414
415        writer.add_user_metadata(
416            "format-version".to_string(),
417            match table_metadata.format_version {
418                FormatVersion::V1 => "1".as_bytes(),
419                FormatVersion::V2 => "2".as_bytes(),
420            },
421        )?;
422
423        writer.add_user_metadata(
424            "schema".to_string(),
425            match table_metadata.format_version {
426                FormatVersion::V1 => serde_json::to_string(&Into::<SchemaV1>::into(
427                    table_metadata.current_schema(branch)?.clone(),
428                ))?,
429                FormatVersion::V2 => serde_json::to_string(&Into::<SchemaV2>::into(
430                    table_metadata.current_schema(branch)?.clone(),
431                ))?,
432            },
433        )?;
434
435        writer.add_user_metadata(
436            "schema-id".to_string(),
437            serde_json::to_string(&table_metadata.current_schema(branch)?.schema_id())?,
438        )?;
439
440        let spec_id = table_metadata.default_spec_id;
441
442        writer.add_user_metadata(
443            "partition-spec".to_string(),
444            serde_json::to_string(
445                &table_metadata
446                    .partition_specs
447                    .get(&spec_id)
448                    .ok_or(Error::NotFound(format!("Partition spec with id {spec_id}")))?
449                    .fields(),
450            )?,
451        )?;
452
453        writer.add_user_metadata(
454            "partition-spec-id".to_string(),
455            serde_json::to_string(&spec_id)?,
456        )?;
457
458        writer.add_user_metadata("content".to_string(), "data")?;
459
460        writer.extend(manifest_reader.filter_map(|entry| {
461            let mut entry = entry
462                .map_err(|err| apache_avro::Error::DeserializeValue(err.to_string()))
463                .unwrap();
464            if !filter.contains(entry.data_file().file_path()) {
465                *entry.status_mut() = Status::Existing;
466                if entry.sequence_number().is_none() {
467                    *entry.sequence_number_mut() = Some(manifest.sequence_number);
468                }
469                if entry.snapshot_id().is_none() {
470                    *entry.snapshot_id_mut() = Some(manifest.added_snapshot_id);
471                }
472                Some(to_value(entry).unwrap())
473            } else {
474                None
475            }
476        }))?;
477
478        manifest.sequence_number = table_metadata.last_sequence_number + 1;
479
480        manifest.existing_files_count = Some(
481            manifest.existing_files_count.unwrap_or(0) + manifest.added_files_count.unwrap_or(0),
482        );
483
484        manifest.added_files_count = None;
485
486        Ok(ManifestWriter {
487            manifest,
488            writer,
489            table_metadata,
490        })
491    }
492
493    /// Appends a manifest entry to the manifest file and updates summary statistics.
494    ///
495    /// This method adds a new manifest entry while maintaining:
496    /// - Partition statistics (null values, bounds)
497    /// - File counts by status (added, existing, deleted)
498    /// - Row counts (added, deleted)
499    /// - Sequence number tracking
500    ///
501    /// # Arguments
502    /// * `manifest_entry` - The manifest entry to append
503    ///
504    /// # Returns
505    /// * `Result<(), Error>` - Ok if the entry was successfully appended, Error otherwise
506    ///
507    /// # Errors
508    /// Returns an error if:
509    /// * The entry cannot be serialized
510    /// * Partition statistics cannot be updated
511    /// * The default partition spec is not found
512    pub(crate) fn append(&mut self, manifest_entry: ManifestEntry) -> Result<(), Error> {
513        let mut added_rows_count = 0;
514        let mut deleted_rows_count = 0;
515
516        if self.manifest.partitions.is_none() {
517            self.manifest.partitions = Some(
518                self.table_metadata
519                    .default_partition_spec()?
520                    .fields()
521                    .iter()
522                    .map(|_| FieldSummary {
523                        contains_null: false,
524                        contains_nan: None,
525                        lower_bound: None,
526                        upper_bound: None,
527                    })
528                    .collect::<Vec<FieldSummary>>(),
529            );
530        }
531
532        match manifest_entry.data_file().content() {
533            Content::Data => {
534                added_rows_count += manifest_entry.data_file().record_count();
535            }
536            Content::EqualityDeletes => {
537                deleted_rows_count += manifest_entry.data_file().record_count();
538            }
539            _ => (),
540        }
541        let status = *manifest_entry.status();
542
543        update_partitions(
544            self.manifest.partitions.as_mut().unwrap(),
545            manifest_entry.data_file().partition(),
546            self.table_metadata.default_partition_spec()?.fields(),
547        )?;
548
549        if let Some(sequence_number) = manifest_entry.sequence_number() {
550            if self.manifest.min_sequence_number > *sequence_number {
551                self.manifest.min_sequence_number = *sequence_number;
552            }
553        };
554
555        self.writer.append_ser(manifest_entry)?;
556
557        match status {
558            Status::Added => {
559                self.manifest.added_files_count = match self.manifest.added_files_count {
560                    Some(count) => Some(count + 1),
561                    None => Some(1),
562                };
563            }
564            Status::Existing => {
565                self.manifest.existing_files_count = match self.manifest.existing_files_count {
566                    Some(count) => Some(count + 1),
567                    None => Some(1),
568                };
569            }
570            Status::Deleted => (),
571        }
572
573        self.manifest.added_rows_count = match self.manifest.added_rows_count {
574            Some(count) => Some(count + added_rows_count),
575            None => Some(added_rows_count),
576        };
577
578        self.manifest.deleted_rows_count = match self.manifest.deleted_rows_count {
579            Some(count) => Some(count + deleted_rows_count),
580            None => Some(deleted_rows_count),
581        };
582
583        Ok(())
584    }
585
586    /// Finalizes the manifest writer and writes the manifest file to storage.
587    ///
588    /// This method:
589    /// 1. Completes writing all entries
590    /// 2. Updates the manifest length
591    /// 3. Writes the manifest file to the object store
592    ///
593    /// # Arguments
594    /// * `object_store` - The object store to write the manifest file to
595    ///
596    /// # Returns
597    /// * `Result<ManifestListEntry, Error>` - The completed manifest list entry or an error
598    ///
599    /// # Errors
600    /// Returns an error if:
601    /// * The writer cannot be finalized
602    /// * The manifest file cannot be written to storage
603    pub(crate) async fn finish(
604        mut self,
605        object_store: Arc<dyn ObjectStore>,
606    ) -> Result<ManifestListEntry, Error> {
607        let manifest_bytes = self.writer.into_inner()?;
608
609        let manifest_length: i64 = manifest_bytes.len() as i64;
610
611        self.manifest.manifest_length += manifest_length;
612
613        object_store
614            .put(
615                &strip_prefix(&self.manifest.manifest_path).as_str().into(),
616                manifest_bytes.into(),
617            )
618            .await?;
619        Ok(self.manifest)
620    }
621}
622
623#[allow(clippy::type_complexity)]
624/// Convert avro value to ManifestEntry based on the format version of the table.
625fn avro_value_to_manifest_entry(
626    value: (
627        Result<AvroValue, apache_avro::Error>,
628        Arc<(Schema, PartitionSpec, FormatVersion)>,
629    ),
630) -> Result<ManifestEntry, Error> {
631    let entry = value.0?;
632    let schema = &value.1 .0;
633    let partition_spec = &value.1 .1;
634    let format_version = &value.1 .2;
635    match format_version {
636        FormatVersion::V2 => ManifestEntry::try_from_v2(
637            apache_avro::from_value::<ManifestEntryV2>(&entry)?,
638            schema,
639            partition_spec,
640        )
641        .map_err(Error::from),
642        FormatVersion::V1 => ManifestEntry::try_from_v1(
643            apache_avro::from_value::<ManifestEntryV1>(&entry)?,
644            schema,
645            partition_spec,
646        )
647        .map_err(Error::from),
648    }
649}
650
651fn update_partitions(
652    partitions: &mut [FieldSummary],
653    partition_values: &Struct,
654    partition_columns: &[PartitionField],
655) -> Result<(), Error> {
656    for (field, summary) in partition_columns.iter().zip(partitions.iter_mut()) {
657        let value = partition_values.get(field.name()).and_then(|x| x.as_ref());
658        if let Some(value) = value {
659            if summary.lower_bound.is_none() {
660                summary.lower_bound = Some(value.clone());
661            } else if let Some(lower_bound) = &mut summary.lower_bound {
662                match (value, lower_bound) {
663                    (Value::Int(val), Value::Int(current)) => {
664                        if *current > *val {
665                            *current = *val
666                        }
667                    }
668                    (Value::LongInt(val), Value::LongInt(current)) => {
669                        if *current > *val {
670                            *current = *val
671                        }
672                    }
673                    (Value::Float(val), Value::Float(current)) => {
674                        if *current > *val {
675                            *current = *val
676                        }
677                    }
678                    (Value::Double(val), Value::Double(current)) => {
679                        if *current > *val {
680                            *current = *val
681                        }
682                    }
683                    (Value::Date(val), Value::Date(current)) => {
684                        if *current > *val {
685                            *current = *val
686                        }
687                    }
688                    (Value::Time(val), Value::Time(current)) => {
689                        if *current > *val {
690                            *current = *val
691                        }
692                    }
693                    (Value::Timestamp(val), Value::Timestamp(current)) => {
694                        if *current > *val {
695                            *current = *val
696                        }
697                    }
698                    (Value::TimestampTZ(val), Value::TimestampTZ(current)) => {
699                        if *current > *val {
700                            *current = *val
701                        }
702                    }
703                    _ => {}
704                }
705            }
706            if summary.upper_bound.is_none() {
707                summary.upper_bound = Some(value.clone());
708            } else if let Some(upper_bound) = &mut summary.upper_bound {
709                match (value, upper_bound) {
710                    (Value::Int(val), Value::Int(current)) => {
711                        if *current < *val {
712                            *current = *val
713                        }
714                    }
715                    (Value::LongInt(val), Value::LongInt(current)) => {
716                        if *current < *val {
717                            *current = *val
718                        }
719                    }
720                    (Value::Float(val), Value::Float(current)) => {
721                        if *current < *val {
722                            *current = *val
723                        }
724                    }
725                    (Value::Double(val), Value::Double(current)) => {
726                        if *current < *val {
727                            *current = *val
728                        }
729                    }
730                    (Value::Date(val), Value::Date(current)) => {
731                        if *current < *val {
732                            *current = *val
733                        }
734                    }
735                    (Value::Time(val), Value::Time(current)) => {
736                        if *current < *val {
737                            *current = *val
738                        }
739                    }
740                    (Value::Timestamp(val), Value::Timestamp(current)) => {
741                        if *current < *val {
742                            *current = *val
743                        }
744                    }
745                    (Value::TimestampTZ(val), Value::TimestampTZ(current)) => {
746                        if *current < *val {
747                            *current = *val
748                        }
749                    }
750                    _ => {}
751                }
752            }
753        }
754    }
755    Ok(())
756}
757
758/// TODO
759#[cfg(test)]
760mod tests {}