dsq_io/
lib.rs

1//! dsq-io: Low-level I/O utilities for dsq
2//!
3//! This crate provides basic functions for reading and writing bytes to/from
4//! files, STDIN, and STDOUT. It handles the low-level I/O operations without
5//! any format-specific logic.
6//!
7//! # Features
8//!
9//! - Synchronous and asynchronous I/O
10//! - File reading/writing
11//! - STDIN/STDOUT handling
12//! - Network I/O (planned)
13//!
14//! # Examples
15//!
16//! Reading from a file:
17//! ```rust,ignore
18//! use dsq_io::read_file;
19//!
20//! let data = read_file("data.txt").await.unwrap();
21//! ```
22//!
23//! Writing to STDOUT:
24//! ```rust,ignore
25//! use dsq_io::write_stdout;
26//!
27//! write_stdout(b"Hello, world!").await.unwrap();
28//! ```
29
30use std::io::{self as std_io, Read, Write};
31use std::path::Path;
32use tokio::fs;
33use tokio::io::{AsyncReadExt, AsyncWriteExt};
34
35// Low-level I/O only - format parsing is in dsq-formats
36
37// Re-export from dsq-formats for convenience
38pub use dsq_formats::{serialize, CompressionLevel, DataFormat, FormatWriteOptions, WriteOptions};
39
40// Writer modules
41pub mod file_writer;
42pub mod memory_writer;
43pub mod traits;
44
45// Options
46pub mod options;
47
48// Re-export writer types
49pub use file_writer::{to_path, to_path_with_format, FileWriter};
50pub use memory_writer::{to_memory, MemoryWriter};
51pub use traits::DataWriter;
52
53/// Error type for I/O operations
54pub type Result<T> = std::result::Result<T, Error>;
55
56/// I/O error type
57#[derive(Debug, thiserror::Error)]
58pub enum Error {
59    #[error("IO error: {0}")]
60    Io(#[from] std::io::Error),
61    #[error("Polars error: {0}")]
62    Polars(#[from] polars::error::PolarsError),
63    #[error("Format error: {0}")]
64    Format(String),
65    #[error("Other error: {0}")]
66    Other(String),
67}
68
69impl From<dsq_formats::Error> for Error {
70    fn from(e: dsq_formats::Error) -> Self {
71        Error::Other(e.to_string())
72    }
73}
74
75impl From<anyhow::Error> for Error {
76    fn from(e: anyhow::Error) -> Self {
77        Error::Other(e.to_string())
78    }
79}
80
81impl From<apache_avro::Error> for Error {
82    fn from(e: apache_avro::Error) -> Self {
83        Error::Other(e.to_string())
84    }
85}
86
87impl Error {
88    /// Create an operation error with a custom message
89    pub fn operation(msg: impl Into<String>) -> Self {
90        Error::Other(msg.into())
91    }
92}
93
94/// Read all bytes from a file asynchronously
95///
96/// # Examples
97///
98/// ```rust,ignore
99/// use dsq_io::read_file;
100///
101/// let data = read_file("data.txt").await.unwrap();
102/// ```
103pub async fn read_file<P: AsRef<Path>>(path: P) -> Result<Vec<u8>> {
104    fs::read(path).await.map_err(Error::from)
105}
106
107/// Write bytes to a file asynchronously
108///
109/// # Examples
110///
111/// ```rust,ignore
112/// use dsq_io::write_file;
113///
114/// write_file("output.txt", b"Hello, world!").await.unwrap();
115/// ```
116pub async fn write_file<P: AsRef<Path>>(path: P, data: &[u8]) -> Result<()> {
117    fs::write(path, data).await.map_err(Error::from)
118}
119
120/// Read all bytes from STDIN asynchronously
121///
122/// # Examples
123///
124/// ```rust,ignore
125/// use dsq_io::read_stdin;
126///
127/// let data = read_stdin().await.unwrap();
128/// ```
129pub async fn read_stdin() -> Result<Vec<u8>> {
130    let mut buffer = Vec::new();
131    tokio::io::stdin().read_to_end(&mut buffer).await?;
132    Ok(buffer)
133}
134
135/// Write bytes to STDOUT asynchronously
136///
137/// # Examples
138///
139/// ```rust,ignore
140/// use dsq_io::write_stdout;
141///
142/// write_stdout(b"Hello, world!").await.unwrap();
143/// ```
144pub async fn write_stdout(data: &[u8]) -> Result<()> {
145    let mut stdout = tokio::io::stdout();
146    stdout.write_all(data).await?;
147    stdout.flush().await?;
148    Ok(())
149}
150
151/// Write bytes to STDERR asynchronously
152///
153/// # Examples
154///
155/// ```rust,ignore
156/// use dsq_io::write_stderr;
157///
158/// write_stderr(b"Error message").await.unwrap();
159/// ```
160pub async fn write_stderr(data: &[u8]) -> Result<()> {
161    let mut stderr = tokio::io::stderr();
162    stderr.write_all(data).await?;
163    stderr.flush().await?;
164    Ok(())
165}
166
167/// Synchronous versions for compatibility
168/// Read all bytes from a file synchronously
169pub fn read_file_sync<P: AsRef<Path>>(path: P) -> Result<Vec<u8>> {
170    std::fs::read(path).map_err(Error::from)
171}
172
173/// Write bytes to a file synchronously
174pub fn write_file_sync<P: AsRef<Path>>(path: P, data: &[u8]) -> Result<()> {
175    std::fs::write(path, data).map_err(Error::from)
176}
177
178/// Read all bytes from STDIN synchronously
179pub fn read_stdin_sync() -> Result<Vec<u8>> {
180    let mut buffer = Vec::new();
181    std_io::stdin().read_to_end(&mut buffer)?;
182    Ok(buffer)
183}
184
185/// Write bytes to STDOUT synchronously
186pub fn write_stdout_sync(data: &[u8]) -> Result<()> {
187    std_io::stdout().write_all(data)?;
188    std_io::stdout().flush()?;
189    Ok(())
190}
191
192/// Write bytes to STDERR synchronously
193pub fn write_stderr_sync(data: &[u8]) -> Result<()> {
194    std_io::stderr().write_all(data)?;
195    std_io::stderr().flush()?;
196    Ok(())
197}
198
199#[cfg(test)]
200mod tests {
201    use super::*;
202
203    use tempfile::NamedTempFile;
204
205    #[tokio::test]
206    async fn test_read_write_file() {
207        let temp_file = NamedTempFile::new().unwrap();
208        let test_data = b"Hello, world!";
209
210        write_file(temp_file.path(), test_data).await.unwrap();
211        let read_data = read_file(temp_file.path()).await.unwrap();
212
213        assert_eq!(read_data, test_data);
214    }
215
216    #[test]
217    fn test_read_write_file_sync() {
218        let temp_file = NamedTempFile::new().unwrap();
219        let test_data = b"Hello, world!";
220
221        write_file_sync(temp_file.path(), test_data).unwrap();
222        let read_data = read_file_sync(temp_file.path()).unwrap();
223
224        assert_eq!(read_data, test_data);
225    }
226}