1use std::io;
2use std::ops::{Deref, DerefMut};
3
4#[cfg(feature = "cloud")]
5pub use async_writeable::{AsyncDynWriteable, AsyncWriteable};
6use polars_core::config;
7use polars_error::{PolarsError, PolarsResult, feature_gated, polars_err};
8use polars_utils::create_file;
9use polars_utils::file::close_file;
10use polars_utils::mmap::ensure_not_mapped;
11use polars_utils::pl_path::{PlRefPath, format_file_uri};
12
13use super::sync_on_close::SyncOnCloseType;
14use crate::cloud::CloudOptions;
15use crate::resolve_homedir;
16
17pub trait WriteableTrait: std::io::Write {
19 fn close(&mut self) -> std::io::Result<()>;
20 fn sync_all(&self) -> std::io::Result<()>;
21 fn sync_data(&self) -> std::io::Result<()>;
22}
23
24#[allow(clippy::large_enum_variant)] pub enum Writeable {
31 Dyn(Box<dyn WriteableTrait + Send>),
35 Local(std::fs::File),
36 #[cfg(feature = "cloud")]
37 Cloud(crate::cloud::BlockingCloudWriter),
38}
39
40impl Writeable {
41 pub fn try_new(
42 path: PlRefPath,
43 #[cfg_attr(not(feature = "cloud"), expect(unused))] cloud_options: Option<&CloudOptions>,
44 #[cfg_attr(not(feature = "cloud"), expect(unused))] cloud_upload_chunk_size: usize,
45 #[cfg_attr(not(feature = "cloud"), expect(unused))] cloud_upload_max_concurrency: usize,
46 ) -> PolarsResult<Self> {
47 Ok(if path.has_scheme() {
48 feature_gated!("cloud", {
49 use crate::cloud::BlockingCloudWriter;
50
51 let writer =
52 crate::pl_async::get_runtime().block_in_place_on(BlockingCloudWriter::new(
53 path,
54 cloud_options,
55 cloud_upload_chunk_size,
56 cloud_upload_max_concurrency,
57 ))?;
58
59 Self::Cloud(writer)
60 })
61 } else if config::force_async() {
62 feature_gated!("cloud", {
63 use crate::cloud::BlockingCloudWriter;
64
65 let path = resolve_homedir(path.as_std_path());
66 create_file(&path)?;
67 let path = std::fs::canonicalize(&path)?;
68
69 ensure_not_mapped(&path.metadata()?)?;
70
71 let path = path.to_str().ok_or_else(|| polars_err!(non_utf8_path))?;
72 let path = format_file_uri(path);
73
74 let writer =
75 crate::pl_async::get_runtime().block_in_place_on(BlockingCloudWriter::new(
76 path,
77 cloud_options,
78 cloud_upload_chunk_size,
79 cloud_upload_max_concurrency,
80 ))?;
81
82 Self::Cloud(writer)
83 })
84 } else {
85 let path = resolve_homedir(path.as_std_path());
86 create_file(&path)?;
87
88 Self::Local(polars_utils::open_file_write(&path)?)
89 })
90 }
91
92 #[cfg(feature = "cloud")]
95 pub fn try_into_async_writeable(self) -> PolarsResult<AsyncWriteable> {
96 use self::async_writeable::AsyncDynWriteable;
97
98 match self {
99 Self::Dyn(v) => Ok(AsyncWriteable::Dyn(AsyncDynWriteable(v))),
100 Self::Local(v) => Ok(AsyncWriteable::Local(tokio::fs::File::from_std(v))),
101 Self::Cloud(v) => v
104 .try_into_inner()
105 .map(AsyncWriteable::Cloud)
106 .map_err(PolarsError::from),
107 }
108 }
109
110 pub fn as_buffered(&mut self) -> BufferedWriteable<'_> {
111 match self {
112 Writeable::Dyn(v) => BufferedWriteable::BufWriter(std::io::BufWriter::new(v.as_mut())),
113 Writeable::Local(v) => BufferedWriteable::BufWriter(std::io::BufWriter::new(v)),
114 #[cfg(feature = "cloud")]
115 Writeable::Cloud(v) => BufferedWriteable::Direct(v as _),
116 }
117 }
118
119 pub fn sync_all(&self) -> io::Result<()> {
120 match self {
121 Self::Dyn(v) => v.sync_all(),
122 Self::Local(v) => v.sync_all(),
123 #[cfg(feature = "cloud")]
124 Self::Cloud(v) => v.sync_all(),
125 }
126 }
127
128 pub fn sync_data(&self) -> io::Result<()> {
129 match self {
130 Self::Dyn(v) => v.sync_data(),
131 Self::Local(v) => v.sync_data(),
132 #[cfg(feature = "cloud")]
133 Self::Cloud(v) => v.sync_data(),
134 }
135 }
136
137 pub fn close(self, sync: SyncOnCloseType) -> std::io::Result<()> {
138 match sync {
139 SyncOnCloseType::All => self.sync_all()?,
140 SyncOnCloseType::Data => self.sync_data()?,
141 SyncOnCloseType::None => {},
142 }
143
144 match self {
145 Self::Dyn(mut v) => v.close(),
146 Self::Local(v) => close_file(v),
147 #[cfg(feature = "cloud")]
148 Self::Cloud(mut v) => v.close(),
149 }
150 }
151}
152
153impl io::Write for Writeable {
154 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
155 match self {
156 Self::Dyn(v) => v.write(buf),
157 Self::Local(v) => v.write(buf),
158 #[cfg(feature = "cloud")]
159 Self::Cloud(v) => v.write(buf),
160 }
161 }
162
163 fn flush(&mut self) -> io::Result<()> {
164 self.sync_all()
165 }
166}
167
168impl Deref for Writeable {
169 type Target = dyn io::Write + Send;
170
171 fn deref(&self) -> &Self::Target {
172 match self {
173 Self::Dyn(v) => v,
174 Self::Local(v) => v,
175 #[cfg(feature = "cloud")]
176 Self::Cloud(v) => v,
177 }
178 }
179}
180
181impl DerefMut for Writeable {
182 fn deref_mut(&mut self) -> &mut Self::Target {
183 match self {
184 Self::Dyn(v) => v,
185 Self::Local(v) => v,
186 #[cfg(feature = "cloud")]
187 Self::Cloud(v) => v,
188 }
189 }
190}
191
192pub enum BufferedWriteable<'a> {
194 BufWriter(std::io::BufWriter<&'a mut (dyn std::io::Write + Send)>),
195 Direct(&'a mut (dyn std::io::Write + Send)),
196}
197
198impl<'a> Deref for BufferedWriteable<'a> {
199 type Target = dyn io::Write + Send + 'a;
200
201 fn deref(&self) -> &Self::Target {
202 match self {
203 Self::BufWriter(v) => v as _,
204 Self::Direct(v) => v,
205 }
206 }
207}
208
209impl DerefMut for BufferedWriteable<'_> {
210 fn deref_mut(&mut self) -> &mut Self::Target {
211 match self {
212 Self::BufWriter(v) => v as _,
213 Self::Direct(v) => v,
214 }
215 }
216}
217#[cfg(feature = "cloud")]
218mod async_writeable {
219 use std::io;
220 use std::ops::{Deref, DerefMut};
221 use std::pin::Pin;
222 use std::task::{Context, Poll};
223
224 use polars_error::{PolarsError, PolarsResult};
225 use polars_utils::file::close_file;
226 use polars_utils::pl_path::PlRefPath;
227 use tokio::io::AsyncWriteExt;
228 use tokio::task;
229
230 use super::{Writeable, WriteableTrait};
231 use crate::cloud::CloudOptions;
232 use crate::utils::sync_on_close::SyncOnCloseType;
233
234 pub struct AsyncDynWriteable(pub Box<dyn WriteableTrait + Send>);
236
237 impl tokio::io::AsyncWrite for AsyncDynWriteable {
238 fn poll_write(
239 self: Pin<&mut Self>,
240 _cx: &mut Context<'_>,
241 buf: &[u8],
242 ) -> Poll<io::Result<usize>> {
243 let result = task::block_in_place(|| self.get_mut().0.write(buf));
244 Poll::Ready(result)
245 }
246
247 fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
248 let result = task::block_in_place(|| self.get_mut().0.flush());
249 Poll::Ready(result)
250 }
251
252 fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
253 self.poll_flush(cx)
254 }
255 }
256
257 pub enum AsyncWriteable {
264 Dyn(AsyncDynWriteable),
265 Local(tokio::fs::File),
266 Cloud(object_store::buffered::BufWriter),
267 }
268
269 impl AsyncWriteable {
270 pub async fn try_new(
271 path: PlRefPath,
272 cloud_options: Option<&CloudOptions>,
273 cloud_upload_chunk_size: usize,
274 cloud_upload_max_concurrency: usize,
275 ) -> PolarsResult<Self> {
276 Writeable::try_new(
278 path,
279 cloud_options,
280 cloud_upload_chunk_size,
281 cloud_upload_max_concurrency,
282 )
283 .and_then(|x| x.try_into_async_writeable())
284 }
285
286 pub async fn sync_all(&mut self) -> io::Result<()> {
287 match self {
288 Self::Dyn(v) => task::block_in_place(|| v.0.as_ref().sync_all()),
289 Self::Local(v) => v.sync_all().await,
290 Self::Cloud(_) => Ok(()),
291 }
292 }
293
294 pub async fn sync_data(&mut self) -> io::Result<()> {
295 match self {
296 Self::Dyn(v) => task::block_in_place(|| v.0.as_ref().sync_data()),
297 Self::Local(v) => v.sync_data().await,
298 Self::Cloud(_) => Ok(()),
299 }
300 }
301
302 pub async fn close(mut self, sync: SyncOnCloseType) -> PolarsResult<()> {
303 match sync {
304 SyncOnCloseType::All => self.sync_all().await?,
305 SyncOnCloseType::Data => self.sync_data().await?,
306 SyncOnCloseType::None => {},
307 }
308
309 match self {
310 Self::Dyn(mut v) => {
311 v.shutdown().await.map_err(PolarsError::from)?;
312 Ok(task::block_in_place(|| v.0.close())?)
313 },
314 Self::Local(v) => async {
315 let f = v.into_std().await;
316 close_file(f)
317 }
318 .await
319 .map_err(PolarsError::from),
320 Self::Cloud(mut v) => v.shutdown().await.map_err(PolarsError::from),
321 }
322 }
323 }
324
325 impl Deref for AsyncWriteable {
326 type Target = dyn tokio::io::AsyncWrite + Send + Unpin;
327
328 fn deref(&self) -> &Self::Target {
329 match self {
330 Self::Dyn(v) => v,
331 Self::Local(v) => v,
332 Self::Cloud(v) => v,
333 }
334 }
335 }
336
337 impl DerefMut for AsyncWriteable {
338 fn deref_mut(&mut self) -> &mut Self::Target {
339 match self {
340 Self::Dyn(v) => v,
341 Self::Local(v) => v,
342 Self::Cloud(v) => v,
343 }
344 }
345 }
346}