use std::fs::{self, File};
use std::io::Write;
use std::path::{Path, PathBuf};
#[cfg(feature = "state_machine")]
pub mod csv;
#[cfg(feature = "state_machine")]
pub mod json;
#[cfg(feature = "state_machine")]
pub mod parquet;
#[cfg(feature = "state_machine")]
pub mod table;
pub mod value_fmt;
#[cfg(feature = "state_machine")]
pub use csv::{CSVWriter, StreamingCSVWriter};
#[cfg(feature = "state_machine")]
pub use json::{JSONWriter, StreamingJSONWriter};
#[cfg(feature = "state_machine")]
#[allow(unused_imports)]
pub use parquet::{
create_streaming_parquet_writer, create_streaming_parquet_writer_from_writer, ParquetWriter,
};
#[cfg(feature = "state_machine")]
#[allow(unused_imports)]
pub use table::TableWriter;
#[allow(unused_imports)]
pub use value_fmt::ValueFormatter;
#[derive(Debug, Clone, Default)]
pub enum OutputTarget {
#[default]
Stdout,
File(PathBuf),
}
impl OutputTarget {
pub fn is_file(&self) -> bool {
matches!(self, OutputTarget::File(_))
}
#[allow(dead_code)]
pub fn path(&self) -> Option<&PathBuf> {
match self {
OutputTarget::File(p) => Some(p),
OutputTarget::Stdout => None,
}
}
}
#[derive(Debug)]
pub enum OutputError {
Io(std::io::Error),
DirectoryNotFound(PathBuf),
FileExists(PathBuf),
ParquetRequiresFile,
}
impl std::fmt::Display for OutputError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
OutputError::Io(e) => write!(f, "I/O error: {}", e),
OutputError::DirectoryNotFound(p) => {
write!(f, "Directory not found: {}", p.display())
}
OutputError::FileExists(p) => {
write!(
f,
"File already exists: {}. Use --overwrite to replace.",
p.display()
)
}
OutputError::ParquetRequiresFile => {
write!(
f,
"Parquet format requires file output.\n\n\
Use --output/-o to specify an output file:\n\
cqlite --out parquet --output results.parquet -e 'SELECT * FROM table'"
)
}
}
}
}
impl std::error::Error for OutputError {
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
match self {
OutputError::Io(e) => Some(e),
_ => None,
}
}
}
#[cfg(feature = "state_machine")]
use cqlite_core::query::{QueryMetadata, QueryRow};
#[cfg(feature = "state_machine")]
pub trait StreamingWriter: Send {
fn write_header(&mut self, metadata: &QueryMetadata) -> Result<(), OutputError>;
fn write_chunk(&mut self, rows: &[QueryRow]) -> Result<usize, OutputError>;
fn finalize(&mut self) -> Result<(), OutputError>;
#[allow(dead_code)]
fn rows_written(&self) -> u64;
#[allow(dead_code)]
fn bytes_written(&self) -> Option<u64> {
None
}
}
#[derive(Debug, Clone, Default)]
#[allow(dead_code)]
pub struct StreamingProgress {
pub rows_written: u64,
pub bytes_written: Option<u64>,
pub elapsed_ms: u64,
pub total_rows_hint: Option<u64>,
}
#[allow(dead_code)]
impl StreamingProgress {
pub fn percent(&self) -> Option<f64> {
self.total_rows_hint.map(|total| {
if total == 0 {
100.0
} else {
(self.rows_written as f64 / total as f64) * 100.0
}
})
}
pub fn rows_per_second(&self) -> f64 {
if self.elapsed_ms == 0 {
0.0
} else {
(self.rows_written as f64) / (self.elapsed_ms as f64 / 1000.0)
}
}
}
pub fn write_to_target(
content: &[u8],
target: &OutputTarget,
overwrite: bool,
) -> Result<(), OutputError> {
match target {
OutputTarget::Stdout => {
std::io::stdout()
.write_all(content)
.map_err(OutputError::Io)?;
std::io::stdout().flush().map_err(OutputError::Io)
}
OutputTarget::File(path) => write_atomic(path, content, overwrite),
}
}
fn write_atomic(path: &Path, content: &[u8], overwrite: bool) -> Result<(), OutputError> {
if let Some(parent) = path.parent() {
if !parent.as_os_str().is_empty() && !parent.exists() {
return Err(OutputError::DirectoryNotFound(parent.to_path_buf()));
}
}
if !overwrite && path.exists() {
return Err(OutputError::FileExists(path.to_path_buf()));
}
let extension = path
.extension()
.unwrap_or_default()
.to_string_lossy()
.to_string();
let temp_path = path.with_extension(format!("{}.tmp.{}", extension, std::process::id()));
let write_result = (|| {
let mut file = File::create(&temp_path).map_err(OutputError::Io)?;
file.write_all(content).map_err(OutputError::Io)?;
file.sync_all().map_err(OutputError::Io)?;
Ok(())
})();
if let Err(e) = write_result {
let _ = fs::remove_file(&temp_path);
return Err(e);
}
fs::rename(&temp_path, path).map_err(|e| {
let _ = fs::remove_file(&temp_path);
OutputError::Io(e)
})
}
#[cfg(test)]
mod tests {
use super::*;
use tempfile::TempDir;
#[test]
fn test_output_target_default_is_stdout() {
let target = OutputTarget::default();
assert!(matches!(target, OutputTarget::Stdout));
assert!(!target.is_file());
}
#[test]
fn test_output_target_file() {
let target = OutputTarget::File(PathBuf::from("/tmp/test.json"));
assert!(target.is_file());
assert_eq!(target.path(), Some(&PathBuf::from("/tmp/test.json")));
}
#[test]
fn test_write_to_file_creates_file() {
let temp_dir = TempDir::new().unwrap();
let file_path = temp_dir.path().join("output.json");
let content = b"test content";
let target = OutputTarget::File(file_path.clone());
write_to_target(content, &target, false).unwrap();
assert!(file_path.exists());
let written = std::fs::read_to_string(&file_path).unwrap();
assert_eq!(written, "test content");
}
#[test]
fn test_write_to_file_respects_overwrite_false() {
let temp_dir = TempDir::new().unwrap();
let file_path = temp_dir.path().join("existing.json");
std::fs::write(&file_path, "existing content").unwrap();
let content = b"new content";
let target = OutputTarget::File(file_path.clone());
let result = write_to_target(content, &target, false);
assert!(matches!(result, Err(OutputError::FileExists(_))));
let written = std::fs::read_to_string(&file_path).unwrap();
assert_eq!(written, "existing content");
}
#[test]
fn test_write_to_file_respects_overwrite_true() {
let temp_dir = TempDir::new().unwrap();
let file_path = temp_dir.path().join("existing.json");
std::fs::write(&file_path, "existing content").unwrap();
let content = b"new content";
let target = OutputTarget::File(file_path.clone());
write_to_target(content, &target, true).unwrap();
let written = std::fs::read_to_string(&file_path).unwrap();
assert_eq!(written, "new content");
}
#[test]
fn test_write_to_file_missing_directory_error() {
let file_path = PathBuf::from("/nonexistent/directory/output.json");
let content = b"test content";
let target = OutputTarget::File(file_path);
let result = write_to_target(content, &target, false);
assert!(matches!(result, Err(OutputError::DirectoryNotFound(_))));
}
#[test]
fn test_write_atomic_no_temp_file_on_success() {
let temp_dir = TempDir::new().unwrap();
let file_path = temp_dir.path().join("output.json");
let content = b"test content";
write_atomic(&file_path, content, false).unwrap();
let entries: Vec<_> = std::fs::read_dir(temp_dir.path())
.unwrap()
.filter_map(|e| e.ok())
.collect();
assert_eq!(entries.len(), 1);
assert_eq!(entries[0].file_name(), "output.json");
}
#[test]
fn test_output_error_display() {
let io_err = OutputError::Io(std::io::Error::new(
std::io::ErrorKind::NotFound,
"test error",
));
assert!(io_err.to_string().contains("I/O error"));
let dir_err = OutputError::DirectoryNotFound(PathBuf::from("/missing"));
assert!(dir_err.to_string().contains("Directory not found"));
let exists_err = OutputError::FileExists(PathBuf::from("/existing"));
assert!(exists_err.to_string().contains("File already exists"));
assert!(exists_err.to_string().contains("--overwrite"));
let parquet_err = OutputError::ParquetRequiresFile;
assert!(parquet_err.to_string().contains("Parquet format requires"));
assert!(parquet_err.to_string().contains("--output"));
}
}