Skip to main content

polars_io/utils/
file.rs

1use std::io;
2use std::ops::{Deref, DerefMut};
3
4#[cfg(feature = "cloud")]
5pub use async_writeable::{AsyncDynWriteable, AsyncWriteable};
6use polars_core::config;
7use polars_error::{PolarsError, PolarsResult, feature_gated, polars_err};
8use polars_utils::create_file;
9use polars_utils::file::close_file;
10use polars_utils::mmap::ensure_not_mapped;
11use polars_utils::pl_path::{PlRefPath, format_file_uri};
12
13use super::sync_on_close::SyncOnCloseType;
14use crate::cloud::CloudOptions;
15use crate::resolve_homedir;
16
17// TODO document precise contract.
18pub trait WriteableTrait: std::io::Write {
19    fn close(&mut self) -> std::io::Result<()>;
20    fn sync_all(&self) -> std::io::Result<()>;
21    fn sync_data(&self) -> std::io::Result<()>;
22}
23
24/// Holds a non-async writeable file, abstracted over local files or cloud files.
25///
26/// This implements `DerefMut` to a trait object implementing [`std::io::Write`].
27///
28/// Also see: `Writeable::try_into_async_writeable` and `AsyncWriteable`.
29#[allow(clippy::large_enum_variant)] // It will be boxed
30pub enum Writeable {
31    /// An abstract implementation for writable.
32    ///
33    /// This is used to implement writing to in-memory and arbitrary file descriptors.
34    Dyn(Box<dyn WriteableTrait + Send>),
35    Local(std::fs::File),
36    #[cfg(feature = "cloud")]
37    Cloud(crate::cloud::BlockingCloudWriter),
38}
39
40impl Writeable {
41    pub fn try_new(
42        path: PlRefPath,
43        #[cfg_attr(not(feature = "cloud"), expect(unused))] cloud_options: Option<&CloudOptions>,
44        #[cfg_attr(not(feature = "cloud"), expect(unused))] cloud_upload_chunk_size: usize,
45        #[cfg_attr(not(feature = "cloud"), expect(unused))] cloud_upload_max_concurrency: usize,
46    ) -> PolarsResult<Self> {
47        Ok(if path.has_scheme() {
48            feature_gated!("cloud", {
49                use crate::cloud::BlockingCloudWriter;
50
51                let writer =
52                    crate::pl_async::get_runtime().block_in_place_on(BlockingCloudWriter::new(
53                        path,
54                        cloud_options,
55                        cloud_upload_chunk_size,
56                        cloud_upload_max_concurrency,
57                    ))?;
58
59                Self::Cloud(writer)
60            })
61        } else if config::force_async() {
62            feature_gated!("cloud", {
63                use crate::cloud::BlockingCloudWriter;
64
65                let path = resolve_homedir(path.as_std_path());
66                create_file(&path)?;
67                let path = std::fs::canonicalize(&path)?;
68
69                ensure_not_mapped(&path.metadata()?)?;
70
71                let path = path.to_str().ok_or_else(|| polars_err!(non_utf8_path))?;
72                let path = format_file_uri(path);
73
74                let writer =
75                    crate::pl_async::get_runtime().block_in_place_on(BlockingCloudWriter::new(
76                        path,
77                        cloud_options,
78                        cloud_upload_chunk_size,
79                        cloud_upload_max_concurrency,
80                    ))?;
81
82                Self::Cloud(writer)
83            })
84        } else {
85            let path = resolve_homedir(path.as_std_path());
86            create_file(&path)?;
87
88            Self::Local(polars_utils::open_file_write(&path)?)
89        })
90    }
91
92    /// This returns `Result<>` - if a write was performed before calling this,
93    /// `CloudWriter` can be in an Err(_) state.
94    #[cfg(feature = "cloud")]
95    pub fn try_into_async_writeable(self) -> PolarsResult<AsyncWriteable> {
96        use self::async_writeable::AsyncDynWriteable;
97
98        match self {
99            Self::Dyn(v) => Ok(AsyncWriteable::Dyn(AsyncDynWriteable(v))),
100            Self::Local(v) => Ok(AsyncWriteable::Local(tokio::fs::File::from_std(v))),
101            // Moves the `BufWriter` out of the `BlockingCloudWriter` wrapper, as
102            // `BlockingCloudWriter` has a `Drop` impl that we don't want.
103            Self::Cloud(v) => v
104                .try_into_inner()
105                .map(AsyncWriteable::Cloud)
106                .map_err(PolarsError::from),
107        }
108    }
109
110    pub fn as_buffered(&mut self) -> BufferedWriteable<'_> {
111        match self {
112            Writeable::Dyn(v) => BufferedWriteable::BufWriter(std::io::BufWriter::new(v.as_mut())),
113            Writeable::Local(v) => BufferedWriteable::BufWriter(std::io::BufWriter::new(v)),
114            #[cfg(feature = "cloud")]
115            Writeable::Cloud(v) => BufferedWriteable::Direct(v as _),
116        }
117    }
118
119    pub fn sync_all(&self) -> io::Result<()> {
120        match self {
121            Self::Dyn(v) => v.sync_all(),
122            Self::Local(v) => v.sync_all(),
123            #[cfg(feature = "cloud")]
124            Self::Cloud(v) => v.sync_all(),
125        }
126    }
127
128    pub fn sync_data(&self) -> io::Result<()> {
129        match self {
130            Self::Dyn(v) => v.sync_data(),
131            Self::Local(v) => v.sync_data(),
132            #[cfg(feature = "cloud")]
133            Self::Cloud(v) => v.sync_data(),
134        }
135    }
136
137    pub fn close(self, sync: SyncOnCloseType) -> std::io::Result<()> {
138        match sync {
139            SyncOnCloseType::All => self.sync_all()?,
140            SyncOnCloseType::Data => self.sync_data()?,
141            SyncOnCloseType::None => {},
142        }
143
144        match self {
145            Self::Dyn(mut v) => v.close(),
146            Self::Local(v) => close_file(v),
147            #[cfg(feature = "cloud")]
148            Self::Cloud(mut v) => v.close(),
149        }
150    }
151}
152
153impl io::Write for Writeable {
154    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
155        match self {
156            Self::Dyn(v) => v.write(buf),
157            Self::Local(v) => v.write(buf),
158            #[cfg(feature = "cloud")]
159            Self::Cloud(v) => v.write(buf),
160        }
161    }
162
163    fn flush(&mut self) -> io::Result<()> {
164        self.sync_all()
165    }
166}
167
168impl Deref for Writeable {
169    type Target = dyn io::Write + Send;
170
171    fn deref(&self) -> &Self::Target {
172        match self {
173            Self::Dyn(v) => v,
174            Self::Local(v) => v,
175            #[cfg(feature = "cloud")]
176            Self::Cloud(v) => v,
177        }
178    }
179}
180
181impl DerefMut for Writeable {
182    fn deref_mut(&mut self) -> &mut Self::Target {
183        match self {
184            Self::Dyn(v) => v,
185            Self::Local(v) => v,
186            #[cfg(feature = "cloud")]
187            Self::Cloud(v) => v,
188        }
189    }
190}
191
192/// Avoid BufWriter wrapping on writers that already have internal buffering.
193pub enum BufferedWriteable<'a> {
194    BufWriter(std::io::BufWriter<&'a mut (dyn std::io::Write + Send)>),
195    Direct(&'a mut (dyn std::io::Write + Send)),
196}
197
198impl<'a> Deref for BufferedWriteable<'a> {
199    type Target = dyn io::Write + Send + 'a;
200
201    fn deref(&self) -> &Self::Target {
202        match self {
203            Self::BufWriter(v) => v as _,
204            Self::Direct(v) => v,
205        }
206    }
207}
208
209impl DerefMut for BufferedWriteable<'_> {
210    fn deref_mut(&mut self) -> &mut Self::Target {
211        match self {
212            Self::BufWriter(v) => v as _,
213            Self::Direct(v) => v,
214        }
215    }
216}
217#[cfg(feature = "cloud")]
218mod async_writeable {
219    use std::io;
220    use std::ops::{Deref, DerefMut};
221    use std::pin::Pin;
222    use std::task::{Context, Poll};
223
224    use polars_error::{PolarsError, PolarsResult};
225    use polars_utils::file::close_file;
226    use polars_utils::pl_path::PlRefPath;
227    use tokio::io::AsyncWriteExt;
228    use tokio::task;
229
230    use super::{Writeable, WriteableTrait};
231    use crate::cloud::CloudOptions;
232    use crate::utils::sync_on_close::SyncOnCloseType;
233
234    /// Turn an abstract io::Write into an abstract tokio::io::AsyncWrite.
235    pub struct AsyncDynWriteable(pub Box<dyn WriteableTrait + Send>);
236
237    impl tokio::io::AsyncWrite for AsyncDynWriteable {
238        fn poll_write(
239            self: Pin<&mut Self>,
240            _cx: &mut Context<'_>,
241            buf: &[u8],
242        ) -> Poll<io::Result<usize>> {
243            let result = task::block_in_place(|| self.get_mut().0.write(buf));
244            Poll::Ready(result)
245        }
246
247        fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
248            let result = task::block_in_place(|| self.get_mut().0.flush());
249            Poll::Ready(result)
250        }
251
252        fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
253            self.poll_flush(cx)
254        }
255    }
256
257    /// Holds an async writeable file, abstracted over local files or cloud files.
258    ///
259    /// This implements `DerefMut` to a trait object implementing [`tokio::io::AsyncWrite`].
260    ///
261    /// Note: It is important that you do not call `shutdown()` on the deref'ed `AsyncWrite` object.
262    /// You should instead call the [`AsyncWriteable::close`] at the end.
263    pub enum AsyncWriteable {
264        Dyn(AsyncDynWriteable),
265        Local(tokio::fs::File),
266        Cloud(object_store::buffered::BufWriter),
267    }
268
269    impl AsyncWriteable {
270        pub async fn try_new(
271            path: PlRefPath,
272            cloud_options: Option<&CloudOptions>,
273            cloud_upload_chunk_size: usize,
274            cloud_upload_max_concurrency: usize,
275        ) -> PolarsResult<Self> {
276            // TODO: Native async impl
277            Writeable::try_new(
278                path,
279                cloud_options,
280                cloud_upload_chunk_size,
281                cloud_upload_max_concurrency,
282            )
283            .and_then(|x| x.try_into_async_writeable())
284        }
285
286        pub async fn sync_all(&mut self) -> io::Result<()> {
287            match self {
288                Self::Dyn(v) => task::block_in_place(|| v.0.as_ref().sync_all()),
289                Self::Local(v) => v.sync_all().await,
290                Self::Cloud(_) => Ok(()),
291            }
292        }
293
294        pub async fn sync_data(&mut self) -> io::Result<()> {
295            match self {
296                Self::Dyn(v) => task::block_in_place(|| v.0.as_ref().sync_data()),
297                Self::Local(v) => v.sync_data().await,
298                Self::Cloud(_) => Ok(()),
299            }
300        }
301
302        pub async fn close(mut self, sync: SyncOnCloseType) -> PolarsResult<()> {
303            match sync {
304                SyncOnCloseType::All => self.sync_all().await?,
305                SyncOnCloseType::Data => self.sync_data().await?,
306                SyncOnCloseType::None => {},
307            }
308
309            match self {
310                Self::Dyn(mut v) => {
311                    v.shutdown().await.map_err(PolarsError::from)?;
312                    Ok(task::block_in_place(|| v.0.close())?)
313                },
314                Self::Local(v) => async {
315                    let f = v.into_std().await;
316                    close_file(f)
317                }
318                .await
319                .map_err(PolarsError::from),
320                Self::Cloud(mut v) => v.shutdown().await.map_err(PolarsError::from),
321            }
322        }
323    }
324
325    impl Deref for AsyncWriteable {
326        type Target = dyn tokio::io::AsyncWrite + Send + Unpin;
327
328        fn deref(&self) -> &Self::Target {
329            match self {
330                Self::Dyn(v) => v,
331                Self::Local(v) => v,
332                Self::Cloud(v) => v,
333            }
334        }
335    }
336
337    impl DerefMut for AsyncWriteable {
338        fn deref_mut(&mut self) -> &mut Self::Target {
339            match self {
340                Self::Dyn(v) => v,
341                Self::Local(v) => v,
342                Self::Cloud(v) => v,
343            }
344        }
345    }
346}