1use std::io::{self as std_io, Read, Write};
31use std::path::Path;
32use tokio::fs;
33use tokio::io::{AsyncReadExt, AsyncWriteExt};
34
35pub use dsq_formats::{serialize, CompressionLevel, DataFormat, FormatWriteOptions, WriteOptions};
39
40pub mod file_writer;
42pub mod memory_writer;
43pub mod traits;
44
45pub mod options;
47
48pub use file_writer::{to_path, to_path_with_format, FileWriter};
50pub use memory_writer::{to_memory, MemoryWriter};
51pub use traits::DataWriter;
52
53pub type Result<T> = std::result::Result<T, Error>;
55
56#[derive(Debug, thiserror::Error)]
58pub enum Error {
59 #[error("IO error: {0}")]
60 Io(#[from] std::io::Error),
61 #[error("Polars error: {0}")]
62 Polars(#[from] polars::error::PolarsError),
63 #[error("Format error: {0}")]
64 Format(String),
65 #[error("Other error: {0}")]
66 Other(String),
67}
68
69impl From<dsq_formats::Error> for Error {
70 fn from(e: dsq_formats::Error) -> Self {
71 Error::Other(e.to_string())
72 }
73}
74
75impl From<anyhow::Error> for Error {
76 fn from(e: anyhow::Error) -> Self {
77 Error::Other(e.to_string())
78 }
79}
80
81impl From<apache_avro::Error> for Error {
82 fn from(e: apache_avro::Error) -> Self {
83 Error::Other(e.to_string())
84 }
85}
86
87impl Error {
88 pub fn operation(msg: impl Into<String>) -> Self {
90 Error::Other(msg.into())
91 }
92}
93
94pub async fn read_file<P: AsRef<Path>>(path: P) -> Result<Vec<u8>> {
104 fs::read(path).await.map_err(Error::from)
105}
106
107pub async fn write_file<P: AsRef<Path>>(path: P, data: &[u8]) -> Result<()> {
117 fs::write(path, data).await.map_err(Error::from)
118}
119
120pub async fn read_stdin() -> Result<Vec<u8>> {
130 let mut buffer = Vec::new();
131 tokio::io::stdin().read_to_end(&mut buffer).await?;
132 Ok(buffer)
133}
134
135pub async fn write_stdout(data: &[u8]) -> Result<()> {
145 let mut stdout = tokio::io::stdout();
146 stdout.write_all(data).await?;
147 stdout.flush().await?;
148 Ok(())
149}
150
151pub async fn write_stderr(data: &[u8]) -> Result<()> {
161 let mut stderr = tokio::io::stderr();
162 stderr.write_all(data).await?;
163 stderr.flush().await?;
164 Ok(())
165}
166
167pub fn read_file_sync<P: AsRef<Path>>(path: P) -> Result<Vec<u8>> {
170 std::fs::read(path).map_err(Error::from)
171}
172
173pub fn write_file_sync<P: AsRef<Path>>(path: P, data: &[u8]) -> Result<()> {
175 std::fs::write(path, data).map_err(Error::from)
176}
177
178pub fn read_stdin_sync() -> Result<Vec<u8>> {
180 let mut buffer = Vec::new();
181 std_io::stdin().read_to_end(&mut buffer)?;
182 Ok(buffer)
183}
184
185pub fn write_stdout_sync(data: &[u8]) -> Result<()> {
187 std_io::stdout().write_all(data)?;
188 std_io::stdout().flush()?;
189 Ok(())
190}
191
192pub fn write_stderr_sync(data: &[u8]) -> Result<()> {
194 std_io::stderr().write_all(data)?;
195 std_io::stderr().flush()?;
196 Ok(())
197}
198
199#[cfg(test)]
200mod tests {
201 use super::*;
202
203 use tempfile::NamedTempFile;
204
205 #[tokio::test]
206 async fn test_read_write_file() {
207 let temp_file = NamedTempFile::new().unwrap();
208 let test_data = b"Hello, world!";
209
210 write_file(temp_file.path(), test_data).await.unwrap();
211 let read_data = read_file(temp_file.path()).await.unwrap();
212
213 assert_eq!(read_data, test_data);
214 }
215
216 #[test]
217 fn test_read_write_file_sync() {
218 let temp_file = NamedTempFile::new().unwrap();
219 let test_data = b"Hello, world!";
220
221 write_file_sync(temp_file.path(), test_data).unwrap();
222 let read_data = read_file_sync(temp_file.path()).unwrap();
223
224 assert_eq!(read_data, test_data);
225 }
226}