Skip to main content

polars_io/utils/
byte_source.rs

1use std::ops::Range;
2use std::path::Path;
3use std::sync::Arc;
4
5use polars_buffer::Buffer;
6use polars_core::prelude::PlHashMap;
7use polars_error::{PolarsResult, feature_gated};
8use polars_utils::_limit_path_len_io_err;
9use polars_utils::mmap::MMapSemaphore;
10use polars_utils::pl_path::PlRefPath;
11
12use crate::cloud::options::CloudOptions;
13#[cfg(feature = "cloud")]
14use crate::cloud::{
15    CloudLocation, ObjectStorePath, PolarsObjectStore, build_object_store, object_path_from_str,
16};
17
18#[allow(async_fn_in_trait)]
19pub trait ByteSource: Send + Sync {
20    async fn get_size(&self) -> PolarsResult<usize>;
21    /// # Panics
22    /// Panics if `range` is not in bounds.
23    async fn get_range(&self, range: Range<usize>) -> PolarsResult<Buffer<u8>>;
24    /// Note: This will mutably sort ranges for coalescing.
25    async fn get_ranges(
26        &self,
27        ranges: &mut [Range<usize>],
28    ) -> PolarsResult<PlHashMap<usize, Buffer<u8>>>;
29}
30
31/// Byte source backed by a `Buffer`, which can potentially be memory-mapped.
32pub struct BufferByteSource(pub Buffer<u8>);
33
34impl BufferByteSource {
35    async fn try_new_mmap_from_path(
36        path: &Path,
37        _cloud_options: Option<&CloudOptions>,
38    ) -> PolarsResult<Self> {
39        let file = Arc::new(
40            tokio::fs::File::open(path)
41                .await
42                .map_err(|err| _limit_path_len_io_err(path, err))?
43                .into_std()
44                .await,
45        );
46
47        Ok(Self(Buffer::from_owner(MMapSemaphore::new_from_file(
48            &file,
49        )?)))
50    }
51}
52
53impl ByteSource for BufferByteSource {
54    async fn get_size(&self) -> PolarsResult<usize> {
55        Ok(self.0.as_ref().len())
56    }
57
58    async fn get_range(&self, range: Range<usize>) -> PolarsResult<Buffer<u8>> {
59        let out = self.0.clone().sliced(range);
60        Ok(out)
61    }
62
63    async fn get_ranges(
64        &self,
65        ranges: &mut [Range<usize>],
66    ) -> PolarsResult<PlHashMap<usize, Buffer<u8>>> {
67        Ok(ranges
68            .iter()
69            .map(|x| (x.start, self.0.clone().sliced(x.clone())))
70            .collect())
71    }
72}
73
74#[cfg(feature = "cloud")]
75pub struct ObjectStoreByteSource {
76    store: PolarsObjectStore,
77    path: ObjectStorePath,
78}
79
80#[cfg(feature = "cloud")]
81impl ObjectStoreByteSource {
82    async fn try_new_from_path(
83        path: PlRefPath,
84        cloud_options: Option<&CloudOptions>,
85    ) -> PolarsResult<Self> {
86        let (CloudLocation { prefix, .. }, store) =
87            build_object_store(path, cloud_options, false).await?;
88        let path = object_path_from_str(&prefix)?;
89
90        Ok(Self { store, path })
91    }
92}
93
94#[cfg(feature = "cloud")]
95impl ByteSource for ObjectStoreByteSource {
96    async fn get_size(&self) -> PolarsResult<usize> {
97        Ok(self.store.head(&self.path).await?.size as usize)
98    }
99
100    async fn get_range(&self, range: Range<usize>) -> PolarsResult<Buffer<u8>> {
101        self.store.get_range(&self.path, range).await
102    }
103
104    async fn get_ranges(
105        &self,
106        ranges: &mut [Range<usize>],
107    ) -> PolarsResult<PlHashMap<usize, Buffer<u8>>> {
108        self.store.get_ranges_sort(&self.path, ranges).await
109    }
110}
111
112/// Dynamic dispatch to async functions.
113pub enum DynByteSource {
114    Buffer(BufferByteSource),
115    #[cfg(feature = "cloud")]
116    Cloud(ObjectStoreByteSource),
117}
118
119impl DynByteSource {
120    pub fn variant_name(&self) -> &str {
121        match self {
122            Self::Buffer(_) => "Buffer",
123            #[cfg(feature = "cloud")]
124            Self::Cloud(_) => "Cloud",
125        }
126    }
127}
128
129impl Default for DynByteSource {
130    fn default() -> Self {
131        Self::Buffer(BufferByteSource(Buffer::new()))
132    }
133}
134
135impl ByteSource for DynByteSource {
136    async fn get_size(&self) -> PolarsResult<usize> {
137        match self {
138            Self::Buffer(v) => v.get_size().await,
139            #[cfg(feature = "cloud")]
140            Self::Cloud(v) => v.get_size().await,
141        }
142    }
143
144    async fn get_range(&self, range: Range<usize>) -> PolarsResult<Buffer<u8>> {
145        match self {
146            Self::Buffer(v) => v.get_range(range).await,
147            #[cfg(feature = "cloud")]
148            Self::Cloud(v) => v.get_range(range).await,
149        }
150    }
151
152    async fn get_ranges(
153        &self,
154        ranges: &mut [Range<usize>],
155    ) -> PolarsResult<PlHashMap<usize, Buffer<u8>>> {
156        match self {
157            Self::Buffer(v) => v.get_ranges(ranges).await,
158            #[cfg(feature = "cloud")]
159            Self::Cloud(v) => v.get_ranges(ranges).await,
160        }
161    }
162}
163
164impl From<BufferByteSource> for DynByteSource {
165    fn from(value: BufferByteSource) -> Self {
166        Self::Buffer(value)
167    }
168}
169
170#[cfg(feature = "cloud")]
171impl From<ObjectStoreByteSource> for DynByteSource {
172    fn from(value: ObjectStoreByteSource) -> Self {
173        Self::Cloud(value)
174    }
175}
176
177impl From<Buffer<u8>> for DynByteSource {
178    fn from(value: Buffer<u8>) -> Self {
179        Self::Buffer(BufferByteSource(value))
180    }
181}
182
183#[derive(Clone, Debug)]
184pub enum DynByteSourceBuilder {
185    Mmap,
186    /// Supports both cloud and local files, requires cloud feature.
187    ObjectStore,
188}
189
190impl DynByteSourceBuilder {
191    pub async fn try_build_from_path(
192        &self,
193        path: PlRefPath,
194        cloud_options: Option<&CloudOptions>,
195    ) -> PolarsResult<DynByteSource> {
196        Ok(match self {
197            Self::Mmap => {
198                BufferByteSource::try_new_mmap_from_path(path.as_std_path(), cloud_options)
199                    .await?
200                    .into()
201            },
202            Self::ObjectStore => feature_gated!(
203                "cloud",
204                ObjectStoreByteSource::try_new_from_path(path, cloud_options)
205                    .await?
206                    .into()
207            ),
208        })
209    }
210}