polars_io/cloud/
adaptors.rs1use std::sync::Arc;
4
5use object_store::ObjectStore;
6use object_store::buffered::BufWriter;
7use object_store::path::Path;
8use polars_error::PolarsResult;
9use polars_utils::pl_path::PlRefPath;
10use tokio::io::AsyncWriteExt;
11
12use super::{CloudOptions, object_path_from_str};
13use crate::pl_async::get_runtime;
14use crate::utils::file::WriteableTrait;
15
16fn clone_io_err(e: &std::io::Error) -> std::io::Error {
17 std::io::Error::new(e.kind(), e.to_string())
18}
19
20pub struct BlockingCloudWriter {
28 state: std::io::Result<BufWriter>,
29}
30
31impl BlockingCloudWriter {
32 pub fn new_with_object_store(
38 object_store: Arc<dyn ObjectStore>,
39 path: Path,
40 cloud_upload_chunk_size: usize,
41 cloud_upload_max_concurrency: usize,
42 ) -> PolarsResult<Self> {
43 let writer = BufWriter::with_capacity(object_store, path, cloud_upload_chunk_size)
44 .with_max_concurrency(cloud_upload_max_concurrency);
45 Ok(BlockingCloudWriter { state: Ok(writer) })
46 }
47
48 pub async fn new(
53 uri: PlRefPath,
54 cloud_options: Option<&CloudOptions>,
55 cloud_upload_chunk_size: usize,
56 cloud_upload_max_concurrency: usize,
57 ) -> PolarsResult<Self> {
58 let (cloud_location, object_store) =
59 crate::cloud::build_object_store(uri, cloud_options, false).await?;
60 Self::new_with_object_store(
61 object_store.to_dyn_object_store().await,
62 object_path_from_str(&cloud_location.prefix)?,
63 cloud_upload_chunk_size,
64 cloud_upload_max_concurrency,
65 )
66 }
67
68 pub fn try_into_inner(mut self) -> std::io::Result<BufWriter> {
70 std::mem::replace(&mut self.state, Err(std::io::Error::other("")))
73 }
74
75 pub fn close(&mut self) -> std::io::Result<()> {
78 match self.try_with_writer(|writer| get_runtime().block_in_place_on(writer.shutdown())) {
79 Ok(_) => {
80 self.state = Err(std::io::Error::other("closed"));
81 Ok(())
82 },
83 Err(e) => Err(e),
84 }
85 }
86
87 fn try_with_writer<F, O>(&mut self, func: F) -> std::io::Result<O>
88 where
89 F: Fn(&mut BufWriter) -> std::io::Result<O>,
90 {
91 let writer: &mut BufWriter = self.state.as_mut().map_err(|e| clone_io_err(e))?;
92 match func(writer) {
93 Ok(v) => Ok(v),
94 Err(e) => {
95 self.state = Err(clone_io_err(&e));
96 Err(e)
97 },
98 }
99 }
100}
101
102impl std::io::Write for BlockingCloudWriter {
103 fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
104 let buf = unsafe { std::mem::transmute::<&[u8], &'static [u8]>(buf) };
108
109 self.try_with_writer(|writer| {
110 get_runtime()
111 .block_in_place_on(async { writer.write_all(buf).await.map(|_t| buf.len()) })
112 })
113 }
114
115 fn flush(&mut self) -> std::io::Result<()> {
116 self.try_with_writer(|writer| get_runtime().block_in_place_on(writer.flush()))
117 }
118}
119
120impl WriteableTrait for BlockingCloudWriter {
121 fn close(&mut self) -> std::io::Result<()> {
122 BlockingCloudWriter::close(self)
123 }
124
125 fn sync_all(&self) -> std::io::Result<()> {
126 Ok(())
127 }
128
129 fn sync_data(&self) -> std::io::Result<()> {
130 Ok(())
131 }
132}
133
134impl Drop for BlockingCloudWriter {
135 fn drop(&mut self) {
136 if self.state.is_err() {
137 return;
138 }
139
140 match self.close() {
143 Ok(()) => {},
144 e @ Err(_) => {
145 if std::thread::panicking() {
146 eprintln!("ERROR: CloudWriter errored on close: {e:?}")
147 } else {
148 e.unwrap()
149 }
150 },
151 }
152 }
153}
154
155#[cfg(test)]
156mod tests {
157
158 use polars_core::df;
159 use polars_core::prelude::DataFrame;
160
161 use crate::{get_upload_chunk_size, get_upload_concurrency};
162
163 fn example_dataframe() -> DataFrame {
164 df!(
165 "foo" => &[1, 2, 3],
166 "bar" => &[None, Some("bak"), Some("baz")],
167 )
168 .unwrap()
169 }
170
171 #[test]
172 #[cfg(feature = "csv")]
173 fn csv_to_local_objectstore_cloudwriter() {
174 use super::*;
175 use crate::csv::write::CsvWriter;
176 use crate::prelude::SerWriter;
177
178 let mut df = example_dataframe();
179
180 let object_store: Arc<dyn ObjectStore> = Arc::new(
181 object_store::local::LocalFileSystem::new_with_prefix(std::env::temp_dir())
182 .expect("Could not initialize connection"),
183 );
184
185 let path: object_store::path::Path = "cloud_writer_example.csv".into();
186
187 let mut cloud_writer = BlockingCloudWriter::new_with_object_store(
188 object_store,
189 path,
190 get_upload_chunk_size(),
191 get_upload_concurrency(),
192 )
193 .unwrap();
194 CsvWriter::new(&mut cloud_writer)
195 .finish(&mut df)
196 .expect("Could not write DataFrame as CSV to remote location");
197 }
198
199 #[cfg_attr(target_os = "windows", ignore)]
201 #[cfg(feature = "csv")]
202 #[test]
203 fn cloudwriter_from_cloudlocation_test() {
204 use polars_utils::pl_path::format_file_uri;
205
206 use super::*;
207 use crate::csv::write::CsvWriter;
208 use crate::prelude::{CsvReadOptions, SerWriter};
209 use crate::{SerReader, get_upload_concurrency};
210
211 let mut df = example_dataframe();
212
213 let path = "/tmp/cloud_writer_example2.csv";
214
215 std::fs::File::create(path).unwrap();
216
217 let mut cloud_writer = get_runtime()
218 .block_on(BlockingCloudWriter::new(
219 format_file_uri(path),
220 None,
221 get_upload_chunk_size(),
222 get_upload_concurrency(),
223 ))
224 .unwrap();
225
226 CsvWriter::new(&mut cloud_writer)
227 .finish(&mut df)
228 .expect("Could not write DataFrame as CSV to remote location");
229
230 cloud_writer.close().unwrap();
231
232 assert_eq!(
233 CsvReadOptions::default()
234 .try_into_reader_with_file_path(Some(path.into()))
235 .unwrap()
236 .finish()
237 .unwrap(),
238 df
239 );
240 }
241}