polars_plan/dsl/
scan_sources.rs

1use std::fmt::{Debug, Formatter};
2use std::fs::File;
3use std::path::{Path, PathBuf};
4use std::sync::Arc;
5
6use polars_core::error::{PolarsResult, feature_gated};
7use polars_io::cloud::CloudOptions;
8#[cfg(feature = "cloud")]
9use polars_io::file_cache::FileCacheEntry;
10#[cfg(feature = "cloud")]
11use polars_io::utils::byte_source::{DynByteSource, DynByteSourceBuilder};
12use polars_io::{expand_paths, expand_paths_hive, expanded_from_single_directory};
13use polars_utils::mmap::MemSlice;
14use polars_utils::pl_str::PlSmallStr;
15
16use super::UnifiedScanArgs;
17
18/// Set of sources to scan from
19///
20/// This can either be a list of paths to files, opened files or in-memory buffers. Mixing of
21/// buffers is not currently possible.
22#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
23#[derive(Clone)]
24pub enum ScanSources {
25    Paths(Arc<[PathBuf]>),
26
27    #[cfg_attr(feature = "serde", serde(skip))]
28    Files(Arc<[File]>),
29    #[cfg_attr(feature = "serde", serde(skip))]
30    Buffers(Arc<[MemSlice]>),
31}
32
33impl Debug for ScanSources {
34    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
35        match self {
36            Self::Paths(p) => write!(f, "paths: {:?}", p.as_ref()),
37            Self::Files(p) => write!(f, "files: {} files", p.len()),
38            Self::Buffers(b) => write!(f, "buffers: {} in-memory-buffers", b.len()),
39        }
40    }
41}
42
43/// A reference to a single item in [`ScanSources`]
44#[derive(Debug, Clone, Copy)]
45pub enum ScanSourceRef<'a> {
46    Path(&'a Path),
47    File(&'a File),
48    Buffer(&'a MemSlice),
49}
50
51/// A single source to scan from
52#[derive(Debug, Clone)]
53pub enum ScanSource {
54    Path(Arc<Path>),
55    File(Arc<File>),
56    Buffer(MemSlice),
57}
58
59impl ScanSource {
60    pub fn from_sources(sources: ScanSources) -> Result<Self, ScanSources> {
61        if sources.len() == 1 {
62            match sources {
63                ScanSources::Paths(ps) => Ok(Self::Path(ps.as_ref()[0].clone().into())),
64                ScanSources::Files(fs) => {
65                    assert_eq!(fs.len(), 1);
66                    let ptr: *const File = Arc::into_raw(fs) as *const File;
67                    // SAFETY: A [T] with length 1 can be interpreted as T
68                    let f: Arc<File> = unsafe { Arc::from_raw(ptr) };
69
70                    Ok(Self::File(f))
71                },
72                ScanSources::Buffers(bs) => Ok(Self::Buffer(bs.as_ref()[0].clone())),
73            }
74        } else {
75            Err(sources)
76        }
77    }
78
79    pub fn into_sources(self) -> ScanSources {
80        match self {
81            ScanSource::Path(p) => ScanSources::Paths([p.to_path_buf()].into()),
82            ScanSource::File(f) => {
83                let ptr: *const [File] = std::ptr::slice_from_raw_parts(Arc::into_raw(f), 1);
84                // SAFETY: A T can be interpreted as [T] with length 1.
85                let fs: Arc<[File]> = unsafe { Arc::from_raw(ptr) };
86                ScanSources::Files(fs)
87            },
88            ScanSource::Buffer(m) => ScanSources::Buffers([m].into()),
89        }
90    }
91
92    pub fn as_scan_source_ref(&self) -> ScanSourceRef {
93        match self {
94            ScanSource::Path(path) => ScanSourceRef::Path(path.as_ref()),
95            ScanSource::File(file) => ScanSourceRef::File(file.as_ref()),
96            ScanSource::Buffer(mem_slice) => ScanSourceRef::Buffer(mem_slice),
97        }
98    }
99
100    pub fn run_async(&self) -> bool {
101        self.as_scan_source_ref().run_async()
102    }
103
104    pub fn is_cloud_url(&self) -> bool {
105        if let ScanSource::Path(path) = self {
106            polars_io::is_cloud_url(path.as_ref())
107        } else {
108            false
109        }
110    }
111}
112
113/// An iterator for [`ScanSources`]
114pub struct ScanSourceIter<'a> {
115    sources: &'a ScanSources,
116    offset: usize,
117}
118
119impl Default for ScanSources {
120    fn default() -> Self {
121        // We need to use `Paths` here to avoid erroring when doing hive-partitioned scans of empty
122        // file lists.
123        Self::Paths(Arc::default())
124    }
125}
126
127impl std::hash::Hash for ScanSources {
128    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
129        std::mem::discriminant(self).hash(state);
130
131        // @NOTE: This is a bit crazy
132        //
133        // We don't really want to hash the file descriptors or the whole buffers so for now we
134        // just settle with the fact that the memory behind Arc's does not really move. Therefore,
135        // we can just hash the pointer.
136        match self {
137            Self::Paths(paths) => paths.hash(state),
138            Self::Files(files) => files.as_ptr().hash(state),
139            Self::Buffers(buffers) => buffers.as_ptr().hash(state),
140        }
141    }
142}
143
144impl PartialEq for ScanSources {
145    fn eq(&self, other: &Self) -> bool {
146        match (self, other) {
147            (ScanSources::Paths(l), ScanSources::Paths(r)) => l == r,
148            (ScanSources::Files(l), ScanSources::Files(r)) => std::ptr::eq(l.as_ptr(), r.as_ptr()),
149            (ScanSources::Buffers(l), ScanSources::Buffers(r)) => {
150                std::ptr::eq(l.as_ptr(), r.as_ptr())
151            },
152            _ => false,
153        }
154    }
155}
156
157impl Eq for ScanSources {}
158
159impl ScanSources {
160    pub fn expand_paths(
161        &self,
162        scan_args: &UnifiedScanArgs,
163        #[allow(unused_variables)] cloud_options: Option<&CloudOptions>,
164    ) -> PolarsResult<Self> {
165        match self {
166            Self::Paths(paths) => Ok(Self::Paths(expand_paths(
167                paths,
168                scan_args.glob,
169                cloud_options,
170            )?)),
171            v => Ok(v.clone()),
172        }
173    }
174
175    /// This will update `scan_args.hive_options.enabled` to `true` if the existing value is `None`
176    /// and the paths are expanded from a single directory. Otherwise the existing value is maintained.
177    #[cfg(any(feature = "ipc", feature = "parquet"))]
178    pub fn expand_paths_with_hive_update(
179        &self,
180        scan_args: &mut UnifiedScanArgs,
181        #[allow(unused_variables)] cloud_options: Option<&CloudOptions>,
182    ) -> PolarsResult<Self> {
183        match self {
184            Self::Paths(paths) => {
185                let (expanded_paths, hive_start_idx) = expand_paths_hive(
186                    paths,
187                    scan_args.glob,
188                    cloud_options,
189                    scan_args.hive_options.enabled.unwrap_or(false),
190                )?;
191
192                if scan_args.hive_options.enabled.is_none()
193                    && expanded_from_single_directory(paths, expanded_paths.as_ref())
194                {
195                    scan_args.hive_options.enabled = Some(true);
196                }
197                scan_args.hive_options.hive_start_idx = hive_start_idx;
198
199                Ok(Self::Paths(expanded_paths))
200            },
201            v => Ok(v.clone()),
202        }
203    }
204
205    pub fn iter(&self) -> ScanSourceIter {
206        ScanSourceIter {
207            sources: self,
208            offset: 0,
209        }
210    }
211
212    /// Are the sources all paths?
213    pub fn is_paths(&self) -> bool {
214        matches!(self, Self::Paths(_))
215    }
216
217    /// Try cast the scan sources to [`ScanSources::Paths`]
218    pub fn as_paths(&self) -> Option<&[PathBuf]> {
219        match self {
220            Self::Paths(paths) => Some(paths.as_ref()),
221            Self::Files(_) | Self::Buffers(_) => None,
222        }
223    }
224
225    /// Try cast the scan sources to [`ScanSources::Paths`] with a clone
226    pub fn into_paths(&self) -> Option<Arc<[PathBuf]>> {
227        match self {
228            Self::Paths(paths) => Some(paths.clone()),
229            Self::Files(_) | Self::Buffers(_) => None,
230        }
231    }
232
233    /// Try get the first path in the scan sources
234    pub fn first_path(&self) -> Option<&Path> {
235        match self {
236            Self::Paths(paths) => paths.first().map(|p| p.as_path()),
237            Self::Files(_) | Self::Buffers(_) => None,
238        }
239    }
240
241    /// Is the first path a cloud URL?
242    pub fn is_cloud_url(&self) -> bool {
243        self.first_path().is_some_and(polars_io::is_cloud_url)
244    }
245
246    pub fn len(&self) -> usize {
247        match self {
248            Self::Paths(s) => s.len(),
249            Self::Files(s) => s.len(),
250            Self::Buffers(s) => s.len(),
251        }
252    }
253
254    pub fn is_empty(&self) -> bool {
255        self.len() == 0
256    }
257
258    pub fn first(&self) -> Option<ScanSourceRef> {
259        self.get(0)
260    }
261
262    /// Turn the [`ScanSources`] into some kind of identifier
263    pub fn id(&self) -> PlSmallStr {
264        if self.is_empty() {
265            return PlSmallStr::from_static("EMPTY");
266        }
267
268        match self {
269            Self::Paths(paths) => {
270                PlSmallStr::from_str(paths.first().unwrap().to_string_lossy().as_ref())
271            },
272            Self::Files(_) => PlSmallStr::from_static("OPEN_FILES"),
273            Self::Buffers(_) => PlSmallStr::from_static("IN_MEMORY"),
274        }
275    }
276
277    /// Get the scan source at specific address
278    pub fn get(&self, idx: usize) -> Option<ScanSourceRef> {
279        match self {
280            Self::Paths(paths) => paths.get(idx).map(|p| ScanSourceRef::Path(p)),
281            Self::Files(files) => files.get(idx).map(ScanSourceRef::File),
282            Self::Buffers(buffers) => buffers.get(idx).map(ScanSourceRef::Buffer),
283        }
284    }
285
286    /// Get the scan source at specific address
287    ///
288    /// # Panics
289    ///
290    /// If the `idx` is out of range.
291    #[track_caller]
292    pub fn at(&self, idx: usize) -> ScanSourceRef {
293        self.get(idx).unwrap()
294    }
295}
296
297impl ScanSourceRef<'_> {
298    /// Get the name for `include_paths`
299    pub fn to_include_path_name(&self) -> &str {
300        match self {
301            Self::Path(path) => path.to_str().unwrap(),
302            Self::File(_) => "open-file",
303            Self::Buffer(_) => "in-mem",
304        }
305    }
306
307    // @TODO: I would like to remove this function eventually.
308    pub fn into_owned(&self) -> PolarsResult<ScanSource> {
309        Ok(match self {
310            ScanSourceRef::Path(path) => ScanSource::Path((*path).into()),
311            ScanSourceRef::File(file) => {
312                if let Ok(file) = file.try_clone() {
313                    ScanSource::File(Arc::new(file))
314                } else {
315                    ScanSource::Buffer(self.to_memslice()?)
316                }
317            },
318            ScanSourceRef::Buffer(buffer) => ScanSource::Buffer((*buffer).clone()),
319        })
320    }
321
322    /// Turn the scan source into a memory slice
323    pub fn to_memslice(&self) -> PolarsResult<MemSlice> {
324        self.to_memslice_possibly_async(false, None, 0)
325    }
326
327    #[allow(clippy::wrong_self_convention)]
328    #[cfg(feature = "cloud")]
329    fn to_memslice_async<F: Fn(Arc<FileCacheEntry>) -> PolarsResult<std::fs::File>>(
330        &self,
331        assume: F,
332        run_async: bool,
333    ) -> PolarsResult<MemSlice> {
334        match self {
335            ScanSourceRef::Path(path) => {
336                let path_str = path.to_str();
337                let file = if run_async && path_str.is_some() {
338                    feature_gated!("cloud", {
339                        // This isn't filled if we modified the DSL (e.g. in cloud)
340                        let entry = polars_io::file_cache::FILE_CACHE.get_entry(path_str.unwrap());
341
342                        if let Some(entry) = entry {
343                            assume(entry)?
344                        } else {
345                            polars_utils::open_file(path)?
346                        }
347                    })
348                } else {
349                    polars_utils::open_file(path)?
350                };
351
352                MemSlice::from_file(&file)
353            },
354            ScanSourceRef::File(file) => MemSlice::from_file(file),
355            ScanSourceRef::Buffer(buff) => Ok((*buff).clone()),
356        }
357    }
358
359    #[cfg(feature = "cloud")]
360    pub fn to_memslice_async_assume_latest(&self, run_async: bool) -> PolarsResult<MemSlice> {
361        self.to_memslice_async(|entry| entry.try_open_assume_latest(), run_async)
362    }
363
364    #[cfg(feature = "cloud")]
365    pub fn to_memslice_async_check_latest(&self, run_async: bool) -> PolarsResult<MemSlice> {
366        self.to_memslice_async(|entry| entry.try_open_check_latest(), run_async)
367    }
368
369    #[cfg(not(feature = "cloud"))]
370    fn to_memslice_async(&self, run_async: bool) -> PolarsResult<MemSlice> {
371        match self {
372            ScanSourceRef::Path(path) => {
373                let file = polars_utils::open_file(path)?;
374                MemSlice::from_file(&file)
375            },
376            ScanSourceRef::File(file) => MemSlice::from_file(file),
377            ScanSourceRef::Buffer(buff) => Ok((*buff).clone()),
378        }
379    }
380
381    #[cfg(not(feature = "cloud"))]
382    pub fn to_memslice_async_assume_latest(&self, run_async: bool) -> PolarsResult<MemSlice> {
383        self.to_memslice_async(run_async)
384    }
385
386    #[cfg(not(feature = "cloud"))]
387    pub fn to_memslice_async_check_latest(&self, run_async: bool) -> PolarsResult<MemSlice> {
388        self.to_memslice_async(run_async)
389    }
390
391    pub fn to_memslice_possibly_async(
392        &self,
393        run_async: bool,
394        #[cfg(feature = "cloud")] cache_entries: Option<
395            &Vec<Arc<polars_io::file_cache::FileCacheEntry>>,
396        >,
397        #[cfg(not(feature = "cloud"))] cache_entries: Option<&()>,
398        index: usize,
399    ) -> PolarsResult<MemSlice> {
400        match self {
401            Self::Path(path) => {
402                let file = if run_async {
403                    feature_gated!("cloud", {
404                        cache_entries.unwrap()[index].try_open_check_latest()?
405                    })
406                } else {
407                    polars_utils::open_file(path)?
408                };
409
410                MemSlice::from_file(&file)
411            },
412            Self::File(file) => MemSlice::from_file(file),
413            Self::Buffer(buff) => Ok((*buff).clone()),
414        }
415    }
416
417    #[cfg(feature = "cloud")]
418    pub async fn to_dyn_byte_source(
419        &self,
420        builder: &DynByteSourceBuilder,
421        cloud_options: Option<&CloudOptions>,
422    ) -> PolarsResult<DynByteSource> {
423        match self {
424            Self::Path(path) => {
425                builder
426                    .try_build_from_path(path.to_str().unwrap(), cloud_options)
427                    .await
428            },
429            Self::File(file) => Ok(DynByteSource::from(MemSlice::from_file(file)?)),
430            Self::Buffer(buff) => Ok(DynByteSource::from((*buff).clone())),
431        }
432    }
433
434    pub(crate) fn run_async(&self) -> bool {
435        matches!(self, Self::Path(p) if polars_io::is_cloud_url(p) || polars_core::config::force_async())
436    }
437}
438
439impl<'a> Iterator for ScanSourceIter<'a> {
440    type Item = ScanSourceRef<'a>;
441
442    fn next(&mut self) -> Option<Self::Item> {
443        let item = match self.sources {
444            ScanSources::Paths(paths) => ScanSourceRef::Path(paths.get(self.offset)?),
445            ScanSources::Files(files) => ScanSourceRef::File(files.get(self.offset)?),
446            ScanSources::Buffers(buffers) => ScanSourceRef::Buffer(buffers.get(self.offset)?),
447        };
448
449        self.offset += 1;
450        Some(item)
451    }
452
453    fn size_hint(&self) -> (usize, Option<usize>) {
454        let len = self.sources.len() - self.offset;
455        (len, Some(len))
456    }
457}
458
459impl ExactSizeIterator for ScanSourceIter<'_> {}