polars_io/utils/
byte_source.rs1use std::ops::Range;
2use std::path::Path;
3use std::sync::Arc;
4
5use polars_buffer::Buffer;
6use polars_core::prelude::PlHashMap;
7use polars_error::{PolarsResult, feature_gated};
8use polars_utils::_limit_path_len_io_err;
9use polars_utils::mmap::MMapSemaphore;
10use polars_utils::pl_path::PlRefPath;
11
12use crate::cloud::options::CloudOptions;
13#[cfg(feature = "cloud")]
14use crate::cloud::{
15 CloudLocation, ObjectStorePath, PolarsObjectStore, build_object_store, object_path_from_str,
16};
17
18#[allow(async_fn_in_trait)]
19pub trait ByteSource: Send + Sync {
20 async fn get_size(&self) -> PolarsResult<usize>;
21 async fn get_range(&self, range: Range<usize>) -> PolarsResult<Buffer<u8>>;
24 async fn get_ranges(
26 &self,
27 ranges: &mut [Range<usize>],
28 ) -> PolarsResult<PlHashMap<usize, Buffer<u8>>>;
29}
30
31pub struct BufferByteSource(pub Buffer<u8>);
33
34impl BufferByteSource {
35 async fn try_new_mmap_from_path(
36 path: &Path,
37 _cloud_options: Option<&CloudOptions>,
38 ) -> PolarsResult<Self> {
39 let file = Arc::new(
40 tokio::fs::File::open(path)
41 .await
42 .map_err(|err| _limit_path_len_io_err(path, err))?
43 .into_std()
44 .await,
45 );
46
47 Ok(Self(Buffer::from_owner(MMapSemaphore::new_from_file(
48 &file,
49 )?)))
50 }
51}
52
53impl ByteSource for BufferByteSource {
54 async fn get_size(&self) -> PolarsResult<usize> {
55 Ok(self.0.as_ref().len())
56 }
57
58 async fn get_range(&self, range: Range<usize>) -> PolarsResult<Buffer<u8>> {
59 let out = self.0.clone().sliced(range);
60 Ok(out)
61 }
62
63 async fn get_ranges(
64 &self,
65 ranges: &mut [Range<usize>],
66 ) -> PolarsResult<PlHashMap<usize, Buffer<u8>>> {
67 Ok(ranges
68 .iter()
69 .map(|x| (x.start, self.0.clone().sliced(x.clone())))
70 .collect())
71 }
72}
73
74#[cfg(feature = "cloud")]
75pub struct ObjectStoreByteSource {
76 store: PolarsObjectStore,
77 path: ObjectStorePath,
78}
79
80#[cfg(feature = "cloud")]
81impl ObjectStoreByteSource {
82 async fn try_new_from_path(
83 path: PlRefPath,
84 cloud_options: Option<&CloudOptions>,
85 ) -> PolarsResult<Self> {
86 let (CloudLocation { prefix, .. }, store) =
87 build_object_store(path, cloud_options, false).await?;
88 let path = object_path_from_str(&prefix)?;
89
90 Ok(Self { store, path })
91 }
92}
93
94#[cfg(feature = "cloud")]
95impl ByteSource for ObjectStoreByteSource {
96 async fn get_size(&self) -> PolarsResult<usize> {
97 Ok(self.store.head(&self.path).await?.size as usize)
98 }
99
100 async fn get_range(&self, range: Range<usize>) -> PolarsResult<Buffer<u8>> {
101 self.store.get_range(&self.path, range).await
102 }
103
104 async fn get_ranges(
105 &self,
106 ranges: &mut [Range<usize>],
107 ) -> PolarsResult<PlHashMap<usize, Buffer<u8>>> {
108 self.store.get_ranges_sort(&self.path, ranges).await
109 }
110}
111
112pub enum DynByteSource {
114 Buffer(BufferByteSource),
115 #[cfg(feature = "cloud")]
116 Cloud(ObjectStoreByteSource),
117}
118
119impl DynByteSource {
120 pub fn variant_name(&self) -> &str {
121 match self {
122 Self::Buffer(_) => "Buffer",
123 #[cfg(feature = "cloud")]
124 Self::Cloud(_) => "Cloud",
125 }
126 }
127}
128
129impl Default for DynByteSource {
130 fn default() -> Self {
131 Self::Buffer(BufferByteSource(Buffer::new()))
132 }
133}
134
135impl ByteSource for DynByteSource {
136 async fn get_size(&self) -> PolarsResult<usize> {
137 match self {
138 Self::Buffer(v) => v.get_size().await,
139 #[cfg(feature = "cloud")]
140 Self::Cloud(v) => v.get_size().await,
141 }
142 }
143
144 async fn get_range(&self, range: Range<usize>) -> PolarsResult<Buffer<u8>> {
145 match self {
146 Self::Buffer(v) => v.get_range(range).await,
147 #[cfg(feature = "cloud")]
148 Self::Cloud(v) => v.get_range(range).await,
149 }
150 }
151
152 async fn get_ranges(
153 &self,
154 ranges: &mut [Range<usize>],
155 ) -> PolarsResult<PlHashMap<usize, Buffer<u8>>> {
156 match self {
157 Self::Buffer(v) => v.get_ranges(ranges).await,
158 #[cfg(feature = "cloud")]
159 Self::Cloud(v) => v.get_ranges(ranges).await,
160 }
161 }
162}
163
164impl From<BufferByteSource> for DynByteSource {
165 fn from(value: BufferByteSource) -> Self {
166 Self::Buffer(value)
167 }
168}
169
170#[cfg(feature = "cloud")]
171impl From<ObjectStoreByteSource> for DynByteSource {
172 fn from(value: ObjectStoreByteSource) -> Self {
173 Self::Cloud(value)
174 }
175}
176
177impl From<Buffer<u8>> for DynByteSource {
178 fn from(value: Buffer<u8>) -> Self {
179 Self::Buffer(BufferByteSource(value))
180 }
181}
182
183#[derive(Clone, Debug)]
184pub enum DynByteSourceBuilder {
185 Mmap,
186 ObjectStore,
188}
189
190impl DynByteSourceBuilder {
191 pub async fn try_build_from_path(
192 &self,
193 path: PlRefPath,
194 cloud_options: Option<&CloudOptions>,
195 ) -> PolarsResult<DynByteSource> {
196 Ok(match self {
197 Self::Mmap => {
198 BufferByteSource::try_new_mmap_from_path(path.as_std_path(), cloud_options)
199 .await?
200 .into()
201 },
202 Self::ObjectStore => feature_gated!(
203 "cloud",
204 ObjectStoreByteSource::try_new_from_path(path, cloud_options)
205 .await?
206 .into()
207 ),
208 })
209 }
210}