polars_plan/dsl/file_scan/
deletion.rs

1use std::ops::Range;
2use std::sync::Arc;
3
4use polars_core::prelude::PlIndexMap;
5
6// Note, there are a lot of single variant enums here, but the intention is that we'll support
7// Delta deletion vectors as well at some point in the future.
8
9#[derive(Debug, Clone, Eq, PartialEq)]
10#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
11#[cfg_attr(feature = "dsl-schema", derive(schemars::JsonSchema))]
12pub enum DeletionFilesList {
13    // Chose to use IndexMap<usize, Arc<[String]>>:
14    // * There may be data files without deletion files.
15    // * A single data file may have multiple associated deletion files.
16    // * Needs to be sliceable for cloud execution.
17    //
18    // Other possible options:
19    // * ListArray(inner: Utf8Array)
20    /// Iceberg positional deletes
21    IcebergPositionDelete(Arc<PlIndexMap<usize, Arc<[String]>>>),
22}
23
24impl DeletionFilesList {
25    /// Converts `Some(v)` to `None` if `v` is empty.
26    pub fn filter_empty(this: Option<Self>) -> Option<Self> {
27        use DeletionFilesList::*;
28
29        match this {
30            Some(IcebergPositionDelete(paths)) => {
31                (!paths.is_empty()).then_some(IcebergPositionDelete(paths))
32            },
33            None => None,
34        }
35    }
36
37    /// Returns a new DeletionFilesList for the sources within the specified range.
38    pub fn slice(&self, range: Range<usize>) -> Self {
39        use DeletionFilesList::*;
40
41        match self {
42            IcebergPositionDelete(paths) => IcebergPositionDelete(Arc::new(
43                paths.as_slice()[range]
44                    .iter()
45                    .map(|(k, v)| (*k, v.clone()))
46                    .collect(),
47            )),
48        }
49    }
50
51    pub fn num_files_with_deletions(&self) -> usize {
52        use DeletionFilesList::*;
53
54        match self {
55            IcebergPositionDelete(paths) => paths.len(),
56        }
57    }
58}
59
60impl std::hash::Hash for DeletionFilesList {
61    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
62        use DeletionFilesList::*;
63
64        std::mem::discriminant(self).hash(state);
65
66        match self {
67            IcebergPositionDelete(paths) => {
68                let addr = paths
69                    .first()
70                    .map_or(0, |(_, paths)| Arc::as_ptr(paths) as *const () as usize);
71
72                addr.hash(state)
73            },
74        }
75    }
76}
77
78impl std::fmt::Display for DeletionFilesList {
79    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
80        use DeletionFilesList::*;
81
82        match self {
83            IcebergPositionDelete(paths) => {
84                let s = if paths.len() == 1 { "" } else { "s" };
85                write!(f, "iceberg-position-delete: {} source{s}", paths.len())?;
86            },
87        }
88
89        Ok(())
90    }
91}