1use serde::{Deserialize, Serialize};
23use std::collections::HashMap;
24
25#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
27#[serde(rename_all = "lowercase")]
28pub enum Operation {
29 Append,
31 Overwrite,
33 Delete,
35 Replace,
37}
38
39impl std::fmt::Display for Operation {
40 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
41 match self {
42 Operation::Append => write!(f, "append"),
43 Operation::Overwrite => write!(f, "overwrite"),
44 Operation::Delete => write!(f, "delete"),
45 Operation::Replace => write!(f, "replace"),
46 }
47 }
48}
49
50#[derive(Debug, Clone, Serialize, Deserialize)]
60#[serde(rename_all = "kebab-case")]
61pub struct Snapshot {
62 pub snapshot_id: i64,
64
65 #[serde(skip_serializing_if = "Option::is_none")]
68 pub parent_snapshot_id: Option<i64>,
69
70 pub sequence_number: i64,
73
74 pub timestamp_ms: i64,
76
77 pub operation: Operation,
79
80 pub manifest_list: String,
82
83 #[serde(default)]
85 pub summary: HashMap<String, String>,
86
87 pub schema_id: i32,
89}
90
91impl Snapshot {
92 pub fn builder(snapshot_id: i64, manifest_list: impl Into<String>) -> SnapshotBuilder {
94 SnapshotBuilder::new(snapshot_id, manifest_list)
95 }
96
97 pub fn summary_property(&self, key: &str) -> Option<&str> {
99 self.summary.get(key).map(|s| s.as_str())
100 }
101
102 pub fn added_files_count(&self) -> Option<i64> {
104 self.summary_property("added-data-files")
105 .and_then(|s| s.parse().ok())
106 }
107
108 pub fn added_records_count(&self) -> Option<i64> {
110 self.summary_property("added-records")
111 .and_then(|s| s.parse().ok())
112 }
113
114 pub fn total_files_count(&self) -> Option<i64> {
116 self.summary_property("total-data-files")
117 .and_then(|s| s.parse().ok())
118 }
119
120 pub fn total_records_count(&self) -> Option<i64> {
122 self.summary_property("total-records")
123 .and_then(|s| s.parse().ok())
124 }
125
126 pub async fn all_files(
130 &self,
131 storage: &crate::storage::Storage,
132 ) -> anyhow::Result<(Vec<DataFile>, Vec<DataFile>)> {
133 let manifest_list = ManifestList::load(&self.manifest_list, storage).await?;
134 let mut data_files = Vec::new();
135 let mut delete_files = Vec::new();
136
137 for entry in manifest_list.entries {
138 let manifest = ManifestFile::load(&entry.manifest_path, storage).await?;
139 for m_entry in manifest.entries {
140 if m_entry.status != ManifestEntryStatus::Deleted {
141 match m_entry.data_file.content {
142 FileContent::Data => data_files.push(m_entry.data_file),
143 FileContent::PositionDeletes | FileContent::EqualityDeletes => {
144 delete_files.push(m_entry.data_file)
145 }
146 }
147 }
148 }
149 }
150
151 Ok((data_files, delete_files))
152 }
153
154 pub async fn all_data_files(
156 &self,
157 storage: &crate::storage::Storage,
158 ) -> anyhow::Result<Vec<DataFile>> {
159 let (data_files, _) = self.all_files(storage).await?;
160 Ok(data_files)
161 }
162}
163
164pub struct SnapshotBuilder {
166 snapshot_id: i64,
167 manifest_list: String,
168 parent_snapshot_id: Option<i64>,
169 sequence_number: i64,
170 timestamp_ms: i64,
171 operation: Operation,
172 summary: HashMap<String, String>,
173 schema_id: i32,
174}
175
176impl SnapshotBuilder {
177 pub fn new(snapshot_id: i64, manifest_list: impl Into<String>) -> Self {
179 Self {
180 snapshot_id,
181 manifest_list: manifest_list.into(),
182 parent_snapshot_id: None,
183 sequence_number: 0,
184 timestamp_ms: chrono::Utc::now().timestamp_millis(),
185 operation: Operation::Append,
186 summary: HashMap::new(),
187 schema_id: 0,
188 }
189 }
190
191 pub fn with_parent(mut self, parent_id: i64) -> Self {
193 self.parent_snapshot_id = Some(parent_id);
194 self
195 }
196
197 pub fn with_sequence_number(mut self, seq: i64) -> Self {
199 self.sequence_number = seq;
200 self
201 }
202
203 pub fn with_operation(mut self, op: Operation) -> Self {
205 self.operation = op;
206 self
207 }
208
209 pub fn with_schema_id(mut self, schema_id: i32) -> Self {
211 self.schema_id = schema_id;
212 self
213 }
214
215 pub fn with_summary_property(
217 mut self,
218 key: impl Into<String>,
219 value: impl Into<String>,
220 ) -> Self {
221 self.summary.insert(key.into(), value.into());
222 self
223 }
224
225 pub fn build(self) -> Snapshot {
227 Snapshot {
228 snapshot_id: self.snapshot_id,
229 parent_snapshot_id: self.parent_snapshot_id,
230 sequence_number: self.sequence_number,
231 timestamp_ms: self.timestamp_ms,
232 operation: self.operation,
233 manifest_list: self.manifest_list,
234 summary: self.summary,
235 schema_id: self.schema_id,
236 }
237 }
238}
239
240#[derive(Debug, Clone, Serialize, Deserialize)]
242#[serde(rename_all = "kebab-case")]
243pub struct ManifestList {
244 pub entries: Vec<ManifestFileEntry>,
246}
247
248impl ManifestList {
249 pub fn new() -> Self {
251 Self {
252 entries: Vec::new(),
253 }
254 }
255
256 pub fn add_entry(&mut self, entry: ManifestFileEntry) {
258 self.entries.push(entry);
259 }
260
261 pub fn total_data_files(&self) -> i64 {
263 self.entries
264 .iter()
265 .map(|e| e.added_files_count + e.existing_files_count)
266 .sum()
267 }
268
269 pub async fn load(path: &str, storage: &crate::storage::Storage) -> anyhow::Result<Self> {
271 let data = storage.read(path).await?;
272 Ok(serde_json::from_slice(&data)?)
273 }
274
275 pub async fn save(&self, path: &str, storage: &crate::storage::Storage) -> anyhow::Result<()> {
277 let data = serde_json::to_vec(self)?;
278 storage.write(path, data.into()).await?;
279 Ok(())
280 }
281}
282
283impl Default for ManifestList {
284 fn default() -> Self {
285 Self::new()
286 }
287}
288
289#[derive(Debug, Clone, Serialize, Deserialize)]
291#[serde(rename_all = "kebab-case")]
292pub struct ManifestFileEntry {
293 pub manifest_path: String,
295
296 pub manifest_length: i64,
298
299 pub partition_spec_id: i32,
301
302 pub content: ManifestContent,
304
305 pub sequence_number: i64,
307
308 pub min_sequence_number: i64,
310
311 pub added_snapshot_id: i64,
313
314 pub added_files_count: i64,
316
317 pub existing_files_count: i64,
319
320 pub deleted_files_count: i64,
322
323 pub added_rows_count: i64,
325
326 pub existing_rows_count: i64,
328
329 pub deleted_rows_count: i64,
331}
332
333#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
335#[serde(rename_all = "lowercase")]
336pub enum ManifestContent {
337 Data,
339 Deletes,
341}
342
343#[derive(Debug, Clone, Serialize, Deserialize)]
345#[serde(rename_all = "kebab-case")]
346pub struct ManifestFile {
347 pub schema: crate::schema::Schema,
349
350 pub partition_spec_id: i32,
352
353 pub content: ManifestContent,
355
356 pub entries: Vec<ManifestEntry>,
358}
359
360impl ManifestFile {
361 pub async fn load(path: &str, storage: &crate::storage::Storage) -> anyhow::Result<Self> {
363 let data = storage.read(path).await?;
364 Ok(serde_json::from_slice(&data)?)
365 }
366
367 pub async fn save(&self, path: &str, storage: &crate::storage::Storage) -> anyhow::Result<()> {
369 let data = serde_json::to_vec(self)?;
370 storage.write(path, data.into()).await?;
371 Ok(())
372 }
373}
374
375#[derive(Debug, Clone, Serialize, Deserialize)]
377#[serde(rename_all = "kebab-case")]
378pub struct ManifestEntry {
379 pub status: ManifestEntryStatus,
381
382 #[serde(skip_serializing_if = "Option::is_none")]
384 pub snapshot_id: Option<i64>,
385
386 #[serde(skip_serializing_if = "Option::is_none")]
388 pub sequence_number: Option<i64>,
389
390 pub data_file: DataFile,
392}
393
394#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
396#[serde(rename_all = "lowercase")]
397pub enum ManifestEntryStatus {
398 Existing,
400 Added,
402 Deleted,
404}
405
406#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
408#[serde(rename_all = "lowercase")]
409pub enum FileFormat {
410 Parquet,
412 Orc,
414 Avro,
416}
417
418#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
420#[serde(rename_all = "kebab-case")]
421pub enum FileContent {
422 Data,
424 PositionDeletes,
426 EqualityDeletes,
428}
429
430#[derive(Debug, Clone, Serialize, Deserialize)]
432#[serde(rename_all = "kebab-case")]
433pub struct DataFile {
434 pub file_path: String,
436
437 pub file_format: FileFormat,
439
440 pub content: FileContent,
442
443 #[serde(default)]
445 pub partition: HashMap<String, serde_json::Value>,
446
447 pub record_count: i64,
449
450 pub file_size_in_bytes: i64,
452
453 #[serde(default, skip_serializing_if = "HashMap::is_empty")]
455 pub column_sizes: HashMap<i32, i64>,
456
457 #[serde(default, skip_serializing_if = "HashMap::is_empty")]
459 pub value_counts: HashMap<i32, i64>,
460
461 #[serde(default, skip_serializing_if = "HashMap::is_empty")]
463 pub null_value_counts: HashMap<i32, i64>,
464
465 #[serde(default, skip_serializing_if = "HashMap::is_empty")]
467 pub nan_value_counts: HashMap<i32, i64>,
468
469 #[serde(default, skip_serializing_if = "HashMap::is_empty")]
471 pub lower_bounds: HashMap<i32, Vec<u8>>,
472
473 #[serde(default, skip_serializing_if = "HashMap::is_empty")]
475 pub upper_bounds: HashMap<i32, Vec<u8>>,
476
477 #[serde(default, skip_serializing_if = "HashMap::is_empty")]
479 pub statistics: HashMap<i32, crate::statistics::ColumnStats>,
480
481 #[serde(skip_serializing_if = "Option::is_none")]
483 pub sort_order_id: Option<i32>,
484}
485
486impl DataFile {
487 pub fn new(
489 file_path: impl Into<String>,
490 file_format: FileFormat,
491 record_count: i64,
492 file_size_in_bytes: i64,
493 ) -> Self {
494 Self {
495 file_path: file_path.into(),
496 file_format,
497 content: FileContent::Data,
498 partition: HashMap::new(),
499 record_count,
500 file_size_in_bytes,
501 column_sizes: HashMap::new(),
502 value_counts: HashMap::new(),
503 null_value_counts: HashMap::new(),
504 nan_value_counts: HashMap::new(),
505 lower_bounds: HashMap::new(),
506 upper_bounds: HashMap::new(),
507 statistics: HashMap::new(),
508 sort_order_id: None,
509 }
510 }
511
512 pub fn with_partition(mut self, partition: HashMap<String, serde_json::Value>) -> Self {
514 self.partition = partition;
515 self
516 }
517
518 pub fn with_column_stats(
520 mut self,
521 column_id: i32,
522 size: i64,
523 value_count: i64,
524 null_count: i64,
525 ) -> Self {
526 self.column_sizes.insert(column_id, size);
527 self.value_counts.insert(column_id, value_count);
528 self.null_value_counts.insert(column_id, null_count);
529 self
530 }
531}
532
533#[cfg(test)]
534mod tests {
535 use super::*;
536
537 #[test]
538 fn test_snapshot_builder() {
539 let snapshot = Snapshot::builder(1, "/manifest-list.avro")
540 .with_operation(Operation::Append)
541 .with_sequence_number(5)
542 .with_summary_property("added-data-files", "10")
543 .with_summary_property("added-records", "1000")
544 .build();
545
546 assert_eq!(snapshot.snapshot_id, 1);
547 assert_eq!(snapshot.operation, Operation::Append);
548 assert_eq!(snapshot.sequence_number, 5);
549 assert_eq!(snapshot.added_files_count(), Some(10));
550 assert_eq!(snapshot.added_records_count(), Some(1000));
551 }
552
553 #[test]
554 fn test_manifest_list() {
555 let mut list = ManifestList::new();
556 list.add_entry(ManifestFileEntry {
557 manifest_path: "/manifest-1.avro".into(),
558 manifest_length: 1024,
559 partition_spec_id: 0,
560 content: ManifestContent::Data,
561 sequence_number: 1,
562 min_sequence_number: 1,
563 added_snapshot_id: 1,
564 added_files_count: 5,
565 existing_files_count: 0,
566 deleted_files_count: 0,
567 added_rows_count: 500,
568 existing_rows_count: 0,
569 deleted_rows_count: 0,
570 });
571
572 assert_eq!(list.total_data_files(), 5);
573 }
574
575 #[test]
576 fn test_data_file() {
577 let file = DataFile::new("/data/file.parquet", FileFormat::Parquet, 1000, 1024 * 1024)
578 .with_column_stats(1, 512 * 1024, 1000, 0);
579
580 assert_eq!(file.record_count, 1000);
581 assert_eq!(file.column_sizes.get(&1), Some(&(512 * 1024)));
582 }
583}