Skip to main content

polars_io/cloud/
adaptors.rs

1//! Interface with the object_store crate and define AsyncSeek, AsyncRead.
2
3use std::sync::Arc;
4
5use object_store::ObjectStore;
6use object_store::buffered::BufWriter;
7use object_store::path::Path;
8use polars_error::PolarsResult;
9use polars_utils::pl_path::PlRefPath;
10use tokio::io::AsyncWriteExt;
11
12use super::{CloudOptions, object_path_from_str};
13use crate::pl_async::get_runtime;
14use crate::utils::file::WriteableTrait;
15
16fn clone_io_err(e: &std::io::Error) -> std::io::Error {
17    std::io::Error::new(e.kind(), e.to_string())
18}
19
20/// Adaptor which wraps the interface of [ObjectStore::BufWriter] exposing a synchronous interface
21/// which implements `std::io::Write`.
22///
23/// This allows it to be used in sync code which would otherwise write to a simple File or byte stream,
24/// such as with `polars::prelude::CsvWriter`.
25///
26/// [ObjectStore::BufWriter]: https://docs.rs/object_store/latest/object_store/buffered/struct.BufWriter.html
27pub struct BlockingCloudWriter {
28    state: std::io::Result<BufWriter>,
29}
30
31impl BlockingCloudWriter {
32    /// Construct a new BlockingCloudWriter, re-using the given `object_store`
33    ///
34    /// Creates a new (current-thread) Tokio runtime
35    /// which bridges the sync writing process with the async ObjectStore multipart uploading.
36    /// TODO: Naming?
37    pub fn new_with_object_store(
38        object_store: Arc<dyn ObjectStore>,
39        path: Path,
40        cloud_upload_chunk_size: usize,
41        cloud_upload_max_concurrency: usize,
42    ) -> PolarsResult<Self> {
43        let writer = BufWriter::with_capacity(object_store, path, cloud_upload_chunk_size)
44            .with_max_concurrency(cloud_upload_max_concurrency);
45        Ok(BlockingCloudWriter { state: Ok(writer) })
46    }
47
48    /// Constructs a new BlockingCloudWriter from a path and an optional set of CloudOptions.
49    ///
50    /// Wrapper around `BlockingCloudWriter::new_with_object_store` that is useful if you only have a single write task.
51    /// TODO: Naming?
52    pub async fn new(
53        uri: PlRefPath,
54        cloud_options: Option<&CloudOptions>,
55        cloud_upload_chunk_size: usize,
56        cloud_upload_max_concurrency: usize,
57    ) -> PolarsResult<Self> {
58        let (cloud_location, object_store) =
59            crate::cloud::build_object_store(uri, cloud_options, false).await?;
60        Self::new_with_object_store(
61            object_store.to_dyn_object_store().await,
62            object_path_from_str(&cloud_location.prefix)?,
63            cloud_upload_chunk_size,
64            cloud_upload_max_concurrency,
65        )
66    }
67
68    /// Returns the underlying [`object_store::buffered::BufWriter`]
69    pub fn try_into_inner(mut self) -> std::io::Result<BufWriter> {
70        // We can't just return self.state:
71        // * cannot move out of type `adaptors::BlockingCloudWriter`, which implements the `Drop` trait
72        std::mem::replace(&mut self.state, Err(std::io::Error::other("")))
73    }
74
75    /// Closes the writer, or returns the existing error if it exists. After this function is called
76    /// the writer is guaranteed to be in an error state.
77    pub fn close(&mut self) -> std::io::Result<()> {
78        match self.try_with_writer(|writer| get_runtime().block_in_place_on(writer.shutdown())) {
79            Ok(_) => {
80                self.state = Err(std::io::Error::other("closed"));
81                Ok(())
82            },
83            Err(e) => Err(e),
84        }
85    }
86
87    fn try_with_writer<F, O>(&mut self, func: F) -> std::io::Result<O>
88    where
89        F: Fn(&mut BufWriter) -> std::io::Result<O>,
90    {
91        let writer: &mut BufWriter = self.state.as_mut().map_err(|e| clone_io_err(e))?;
92        match func(writer) {
93            Ok(v) => Ok(v),
94            Err(e) => {
95                self.state = Err(clone_io_err(&e));
96                Err(e)
97            },
98        }
99    }
100}
101
102impl std::io::Write for BlockingCloudWriter {
103    fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
104        // SAFETY:
105        // We extend the lifetime for the duration of this function. This is safe as we block the
106        // async runtime here
107        let buf = unsafe { std::mem::transmute::<&[u8], &'static [u8]>(buf) };
108
109        self.try_with_writer(|writer| {
110            get_runtime()
111                .block_in_place_on(async { writer.write_all(buf).await.map(|_t| buf.len()) })
112        })
113    }
114
115    fn flush(&mut self) -> std::io::Result<()> {
116        self.try_with_writer(|writer| get_runtime().block_in_place_on(writer.flush()))
117    }
118}
119
120impl WriteableTrait for BlockingCloudWriter {
121    fn close(&mut self) -> std::io::Result<()> {
122        BlockingCloudWriter::close(self)
123    }
124
125    fn sync_all(&self) -> std::io::Result<()> {
126        Ok(())
127    }
128
129    fn sync_data(&self) -> std::io::Result<()> {
130        Ok(())
131    }
132}
133
134impl Drop for BlockingCloudWriter {
135    fn drop(&mut self) {
136        if self.state.is_err() {
137            return;
138        }
139
140        // Note: We should not hit here - the writer should instead be explicitly closed.
141        // But we still have this here as a safety measure to prevent silently dropping errors.
142        match self.close() {
143            Ok(()) => {},
144            e @ Err(_) => {
145                if std::thread::panicking() {
146                    eprintln!("ERROR: CloudWriter errored on close: {e:?}")
147                } else {
148                    e.unwrap()
149                }
150            },
151        }
152    }
153}
154
155#[cfg(test)]
156mod tests {
157
158    use polars_core::df;
159    use polars_core::prelude::DataFrame;
160
161    use crate::{get_upload_chunk_size, get_upload_concurrency};
162
163    fn example_dataframe() -> DataFrame {
164        df!(
165            "foo" => &[1, 2, 3],
166            "bar" => &[None, Some("bak"), Some("baz")],
167        )
168        .unwrap()
169    }
170
171    #[test]
172    #[cfg(feature = "csv")]
173    fn csv_to_local_objectstore_cloudwriter() {
174        use super::*;
175        use crate::csv::write::CsvWriter;
176        use crate::prelude::SerWriter;
177
178        let mut df = example_dataframe();
179
180        let object_store: Arc<dyn ObjectStore> = Arc::new(
181            object_store::local::LocalFileSystem::new_with_prefix(std::env::temp_dir())
182                .expect("Could not initialize connection"),
183        );
184
185        let path: object_store::path::Path = "cloud_writer_example.csv".into();
186
187        let mut cloud_writer = BlockingCloudWriter::new_with_object_store(
188            object_store,
189            path,
190            get_upload_chunk_size(),
191            get_upload_concurrency(),
192        )
193        .unwrap();
194        CsvWriter::new(&mut cloud_writer)
195            .finish(&mut df)
196            .expect("Could not write DataFrame as CSV to remote location");
197    }
198
199    // Skip this tests on Windows since it does not have a convenient /tmp/ location.
200    #[cfg_attr(target_os = "windows", ignore)]
201    #[cfg(feature = "csv")]
202    #[test]
203    fn cloudwriter_from_cloudlocation_test() {
204        use polars_utils::pl_path::format_file_uri;
205
206        use super::*;
207        use crate::csv::write::CsvWriter;
208        use crate::prelude::{CsvReadOptions, SerWriter};
209        use crate::{SerReader, get_upload_concurrency};
210
211        let mut df = example_dataframe();
212
213        let path = "/tmp/cloud_writer_example2.csv";
214
215        std::fs::File::create(path).unwrap();
216
217        let mut cloud_writer = get_runtime()
218            .block_on(BlockingCloudWriter::new(
219                format_file_uri(path),
220                None,
221                get_upload_chunk_size(),
222                get_upload_concurrency(),
223            ))
224            .unwrap();
225
226        CsvWriter::new(&mut cloud_writer)
227            .finish(&mut df)
228            .expect("Could not write DataFrame as CSV to remote location");
229
230        cloud_writer.close().unwrap();
231
232        assert_eq!(
233            CsvReadOptions::default()
234                .try_into_reader_with_file_path(Some(path.into()))
235                .unwrap()
236                .finish()
237                .unwrap(),
238            df
239        );
240    }
241}