1use std::fmt::{Debug, Formatter};
2use std::fs::File;
3use std::path::{Path, PathBuf};
4use std::sync::Arc;
5
6use polars_core::error::{PolarsResult, feature_gated};
7use polars_io::cloud::CloudOptions;
8#[cfg(feature = "cloud")]
9use polars_io::file_cache::FileCacheEntry;
10#[cfg(feature = "cloud")]
11use polars_io::utils::byte_source::{DynByteSource, DynByteSourceBuilder};
12use polars_io::{expand_paths, expand_paths_hive, expanded_from_single_directory};
13use polars_utils::mmap::MemSlice;
14use polars_utils::pl_str::PlSmallStr;
15
16use super::UnifiedScanArgs;
17
18#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
23#[derive(Clone)]
24pub enum ScanSources {
25 Paths(Arc<[PathBuf]>),
26
27 #[cfg_attr(feature = "serde", serde(skip))]
28 Files(Arc<[File]>),
29 #[cfg_attr(feature = "serde", serde(skip))]
30 Buffers(Arc<[MemSlice]>),
31}
32
33impl Debug for ScanSources {
34 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
35 match self {
36 Self::Paths(p) => write!(f, "paths: {:?}", p.as_ref()),
37 Self::Files(p) => write!(f, "files: {} files", p.len()),
38 Self::Buffers(b) => write!(f, "buffers: {} in-memory-buffers", b.len()),
39 }
40 }
41}
42
43#[derive(Debug, Clone, Copy)]
45pub enum ScanSourceRef<'a> {
46 Path(&'a Path),
47 File(&'a File),
48 Buffer(&'a MemSlice),
49}
50
51#[derive(Debug, Clone)]
53pub enum ScanSource {
54 Path(Arc<Path>),
55 File(Arc<File>),
56 Buffer(MemSlice),
57}
58
59impl ScanSource {
60 pub fn from_sources(sources: ScanSources) -> Result<Self, ScanSources> {
61 if sources.len() == 1 {
62 match sources {
63 ScanSources::Paths(ps) => Ok(Self::Path(ps.as_ref()[0].clone().into())),
64 ScanSources::Files(fs) => {
65 assert_eq!(fs.len(), 1);
66 let ptr: *const File = Arc::into_raw(fs) as *const File;
67 let f: Arc<File> = unsafe { Arc::from_raw(ptr) };
69
70 Ok(Self::File(f))
71 },
72 ScanSources::Buffers(bs) => Ok(Self::Buffer(bs.as_ref()[0].clone())),
73 }
74 } else {
75 Err(sources)
76 }
77 }
78
79 pub fn into_sources(self) -> ScanSources {
80 match self {
81 ScanSource::Path(p) => ScanSources::Paths([p.to_path_buf()].into()),
82 ScanSource::File(f) => {
83 let ptr: *const [File] = std::ptr::slice_from_raw_parts(Arc::into_raw(f), 1);
84 let fs: Arc<[File]> = unsafe { Arc::from_raw(ptr) };
86 ScanSources::Files(fs)
87 },
88 ScanSource::Buffer(m) => ScanSources::Buffers([m].into()),
89 }
90 }
91
92 pub fn as_scan_source_ref(&self) -> ScanSourceRef {
93 match self {
94 ScanSource::Path(path) => ScanSourceRef::Path(path.as_ref()),
95 ScanSource::File(file) => ScanSourceRef::File(file.as_ref()),
96 ScanSource::Buffer(mem_slice) => ScanSourceRef::Buffer(mem_slice),
97 }
98 }
99
100 pub fn run_async(&self) -> bool {
101 self.as_scan_source_ref().run_async()
102 }
103
104 pub fn is_cloud_url(&self) -> bool {
105 if let ScanSource::Path(path) = self {
106 polars_io::is_cloud_url(path.as_ref())
107 } else {
108 false
109 }
110 }
111}
112
113pub struct ScanSourceIter<'a> {
115 sources: &'a ScanSources,
116 offset: usize,
117}
118
119impl Default for ScanSources {
120 fn default() -> Self {
121 Self::Paths(Arc::default())
124 }
125}
126
127impl std::hash::Hash for ScanSources {
128 fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
129 std::mem::discriminant(self).hash(state);
130
131 match self {
137 Self::Paths(paths) => paths.hash(state),
138 Self::Files(files) => files.as_ptr().hash(state),
139 Self::Buffers(buffers) => buffers.as_ptr().hash(state),
140 }
141 }
142}
143
144impl PartialEq for ScanSources {
145 fn eq(&self, other: &Self) -> bool {
146 match (self, other) {
147 (ScanSources::Paths(l), ScanSources::Paths(r)) => l == r,
148 (ScanSources::Files(l), ScanSources::Files(r)) => std::ptr::eq(l.as_ptr(), r.as_ptr()),
149 (ScanSources::Buffers(l), ScanSources::Buffers(r)) => {
150 std::ptr::eq(l.as_ptr(), r.as_ptr())
151 },
152 _ => false,
153 }
154 }
155}
156
157impl Eq for ScanSources {}
158
159impl ScanSources {
160 pub fn expand_paths(
161 &self,
162 scan_args: &UnifiedScanArgs,
163 #[allow(unused_variables)] cloud_options: Option<&CloudOptions>,
164 ) -> PolarsResult<Self> {
165 match self {
166 Self::Paths(paths) => Ok(Self::Paths(expand_paths(
167 paths,
168 scan_args.glob,
169 cloud_options,
170 )?)),
171 v => Ok(v.clone()),
172 }
173 }
174
175 #[cfg(any(feature = "ipc", feature = "parquet"))]
178 pub fn expand_paths_with_hive_update(
179 &self,
180 scan_args: &mut UnifiedScanArgs,
181 #[allow(unused_variables)] cloud_options: Option<&CloudOptions>,
182 ) -> PolarsResult<Self> {
183 match self {
184 Self::Paths(paths) => {
185 let (expanded_paths, hive_start_idx) = expand_paths_hive(
186 paths,
187 scan_args.glob,
188 cloud_options,
189 scan_args.hive_options.enabled.unwrap_or(false),
190 )?;
191
192 if scan_args.hive_options.enabled.is_none()
193 && expanded_from_single_directory(paths, expanded_paths.as_ref())
194 {
195 scan_args.hive_options.enabled = Some(true);
196 }
197 scan_args.hive_options.hive_start_idx = hive_start_idx;
198
199 Ok(Self::Paths(expanded_paths))
200 },
201 v => Ok(v.clone()),
202 }
203 }
204
205 pub fn iter(&self) -> ScanSourceIter {
206 ScanSourceIter {
207 sources: self,
208 offset: 0,
209 }
210 }
211
212 pub fn is_paths(&self) -> bool {
214 matches!(self, Self::Paths(_))
215 }
216
217 pub fn as_paths(&self) -> Option<&[PathBuf]> {
219 match self {
220 Self::Paths(paths) => Some(paths.as_ref()),
221 Self::Files(_) | Self::Buffers(_) => None,
222 }
223 }
224
225 pub fn into_paths(&self) -> Option<Arc<[PathBuf]>> {
227 match self {
228 Self::Paths(paths) => Some(paths.clone()),
229 Self::Files(_) | Self::Buffers(_) => None,
230 }
231 }
232
233 pub fn first_path(&self) -> Option<&Path> {
235 match self {
236 Self::Paths(paths) => paths.first().map(|p| p.as_path()),
237 Self::Files(_) | Self::Buffers(_) => None,
238 }
239 }
240
241 pub fn is_cloud_url(&self) -> bool {
243 self.first_path().is_some_and(polars_io::is_cloud_url)
244 }
245
246 pub fn len(&self) -> usize {
247 match self {
248 Self::Paths(s) => s.len(),
249 Self::Files(s) => s.len(),
250 Self::Buffers(s) => s.len(),
251 }
252 }
253
254 pub fn is_empty(&self) -> bool {
255 self.len() == 0
256 }
257
258 pub fn first(&self) -> Option<ScanSourceRef> {
259 self.get(0)
260 }
261
262 pub fn id(&self) -> PlSmallStr {
264 if self.is_empty() {
265 return PlSmallStr::from_static("EMPTY");
266 }
267
268 match self {
269 Self::Paths(paths) => {
270 PlSmallStr::from_str(paths.first().unwrap().to_string_lossy().as_ref())
271 },
272 Self::Files(_) => PlSmallStr::from_static("OPEN_FILES"),
273 Self::Buffers(_) => PlSmallStr::from_static("IN_MEMORY"),
274 }
275 }
276
277 pub fn get(&self, idx: usize) -> Option<ScanSourceRef> {
279 match self {
280 Self::Paths(paths) => paths.get(idx).map(|p| ScanSourceRef::Path(p)),
281 Self::Files(files) => files.get(idx).map(ScanSourceRef::File),
282 Self::Buffers(buffers) => buffers.get(idx).map(ScanSourceRef::Buffer),
283 }
284 }
285
286 #[track_caller]
292 pub fn at(&self, idx: usize) -> ScanSourceRef {
293 self.get(idx).unwrap()
294 }
295}
296
297impl ScanSourceRef<'_> {
298 pub fn to_include_path_name(&self) -> &str {
300 match self {
301 Self::Path(path) => path.to_str().unwrap(),
302 Self::File(_) => "open-file",
303 Self::Buffer(_) => "in-mem",
304 }
305 }
306
307 pub fn into_owned(&self) -> PolarsResult<ScanSource> {
309 Ok(match self {
310 ScanSourceRef::Path(path) => ScanSource::Path((*path).into()),
311 ScanSourceRef::File(file) => {
312 if let Ok(file) = file.try_clone() {
313 ScanSource::File(Arc::new(file))
314 } else {
315 ScanSource::Buffer(self.to_memslice()?)
316 }
317 },
318 ScanSourceRef::Buffer(buffer) => ScanSource::Buffer((*buffer).clone()),
319 })
320 }
321
322 pub fn to_memslice(&self) -> PolarsResult<MemSlice> {
324 self.to_memslice_possibly_async(false, None, 0)
325 }
326
327 #[allow(clippy::wrong_self_convention)]
328 #[cfg(feature = "cloud")]
329 fn to_memslice_async<F: Fn(Arc<FileCacheEntry>) -> PolarsResult<std::fs::File>>(
330 &self,
331 assume: F,
332 run_async: bool,
333 ) -> PolarsResult<MemSlice> {
334 match self {
335 ScanSourceRef::Path(path) => {
336 let path_str = path.to_str();
337 let file = if run_async && path_str.is_some() {
338 feature_gated!("cloud", {
339 let entry = polars_io::file_cache::FILE_CACHE.get_entry(path_str.unwrap());
341
342 if let Some(entry) = entry {
343 assume(entry)?
344 } else {
345 polars_utils::open_file(path)?
346 }
347 })
348 } else {
349 polars_utils::open_file(path)?
350 };
351
352 MemSlice::from_file(&file)
353 },
354 ScanSourceRef::File(file) => MemSlice::from_file(file),
355 ScanSourceRef::Buffer(buff) => Ok((*buff).clone()),
356 }
357 }
358
359 #[cfg(feature = "cloud")]
360 pub fn to_memslice_async_assume_latest(&self, run_async: bool) -> PolarsResult<MemSlice> {
361 self.to_memslice_async(|entry| entry.try_open_assume_latest(), run_async)
362 }
363
364 #[cfg(feature = "cloud")]
365 pub fn to_memslice_async_check_latest(&self, run_async: bool) -> PolarsResult<MemSlice> {
366 self.to_memslice_async(|entry| entry.try_open_check_latest(), run_async)
367 }
368
369 #[cfg(not(feature = "cloud"))]
370 fn to_memslice_async(&self, run_async: bool) -> PolarsResult<MemSlice> {
371 match self {
372 ScanSourceRef::Path(path) => {
373 let file = polars_utils::open_file(path)?;
374 MemSlice::from_file(&file)
375 },
376 ScanSourceRef::File(file) => MemSlice::from_file(file),
377 ScanSourceRef::Buffer(buff) => Ok((*buff).clone()),
378 }
379 }
380
381 #[cfg(not(feature = "cloud"))]
382 pub fn to_memslice_async_assume_latest(&self, run_async: bool) -> PolarsResult<MemSlice> {
383 self.to_memslice_async(run_async)
384 }
385
386 #[cfg(not(feature = "cloud"))]
387 pub fn to_memslice_async_check_latest(&self, run_async: bool) -> PolarsResult<MemSlice> {
388 self.to_memslice_async(run_async)
389 }
390
391 pub fn to_memslice_possibly_async(
392 &self,
393 run_async: bool,
394 #[cfg(feature = "cloud")] cache_entries: Option<
395 &Vec<Arc<polars_io::file_cache::FileCacheEntry>>,
396 >,
397 #[cfg(not(feature = "cloud"))] cache_entries: Option<&()>,
398 index: usize,
399 ) -> PolarsResult<MemSlice> {
400 match self {
401 Self::Path(path) => {
402 let file = if run_async {
403 feature_gated!("cloud", {
404 cache_entries.unwrap()[index].try_open_check_latest()?
405 })
406 } else {
407 polars_utils::open_file(path)?
408 };
409
410 MemSlice::from_file(&file)
411 },
412 Self::File(file) => MemSlice::from_file(file),
413 Self::Buffer(buff) => Ok((*buff).clone()),
414 }
415 }
416
417 #[cfg(feature = "cloud")]
418 pub async fn to_dyn_byte_source(
419 &self,
420 builder: &DynByteSourceBuilder,
421 cloud_options: Option<&CloudOptions>,
422 ) -> PolarsResult<DynByteSource> {
423 match self {
424 Self::Path(path) => {
425 builder
426 .try_build_from_path(path.to_str().unwrap(), cloud_options)
427 .await
428 },
429 Self::File(file) => Ok(DynByteSource::from(MemSlice::from_file(file)?)),
430 Self::Buffer(buff) => Ok(DynByteSource::from((*buff).clone())),
431 }
432 }
433
434 pub(crate) fn run_async(&self) -> bool {
435 matches!(self, Self::Path(p) if polars_io::is_cloud_url(p) || polars_core::config::force_async())
436 }
437}
438
439impl<'a> Iterator for ScanSourceIter<'a> {
440 type Item = ScanSourceRef<'a>;
441
442 fn next(&mut self) -> Option<Self::Item> {
443 let item = match self.sources {
444 ScanSources::Paths(paths) => ScanSourceRef::Path(paths.get(self.offset)?),
445 ScanSources::Files(files) => ScanSourceRef::File(files.get(self.offset)?),
446 ScanSources::Buffers(buffers) => ScanSourceRef::Buffer(buffers.get(self.offset)?),
447 };
448
449 self.offset += 1;
450 Some(item)
451 }
452
453 fn size_hint(&self) -> (usize, Option<usize>) {
454 let len = self.sources.len() - self.offset;
455 (len, Some(len))
456 }
457}
458
459impl ExactSizeIterator for ScanSourceIter<'_> {}