1use std::fs::{self, File};
14use std::io::Write;
15use std::path::{Path, PathBuf};
16
17#[cfg(feature = "state_machine")]
18pub mod csv;
19#[cfg(feature = "state_machine")]
20pub mod json;
21#[cfg(feature = "state_machine")]
22pub mod parquet;
23#[cfg(feature = "state_machine")]
24pub mod table;
25pub mod value_fmt;
26
27#[cfg(feature = "state_machine")]
28pub use csv::{CSVWriter, StreamingCSVWriter};
29#[cfg(feature = "state_machine")]
30pub use json::{JSONWriter, StreamingJSONWriter};
31#[cfg(feature = "state_machine")]
32#[allow(unused_imports)]
33pub use parquet::{
34 create_streaming_parquet_writer, create_streaming_parquet_writer_from_writer, ParquetWriter,
35};
36#[cfg(feature = "state_machine")]
37#[allow(unused_imports)]
38pub use table::TableWriter;
39#[allow(unused_imports)]
40pub use value_fmt::ValueFormatter;
41
42#[derive(Debug, Clone, Default)]
48pub enum OutputTarget {
49 #[default]
51 Stdout,
52 File(PathBuf),
54}
55
56impl OutputTarget {
57 pub fn is_file(&self) -> bool {
59 matches!(self, OutputTarget::File(_))
60 }
61
62 #[allow(dead_code)]
64 pub fn path(&self) -> Option<&PathBuf> {
65 match self {
66 OutputTarget::File(p) => Some(p),
67 OutputTarget::Stdout => None,
68 }
69 }
70}
71
72#[derive(Debug)]
74pub enum OutputError {
75 Io(std::io::Error),
77 DirectoryNotFound(PathBuf),
79 FileExists(PathBuf),
81 ParquetRequiresFile,
83}
84
85impl std::fmt::Display for OutputError {
86 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
87 match self {
88 OutputError::Io(e) => write!(f, "I/O error: {}", e),
89 OutputError::DirectoryNotFound(p) => {
90 write!(f, "Directory not found: {}", p.display())
91 }
92 OutputError::FileExists(p) => {
93 write!(
94 f,
95 "File already exists: {}. Use --overwrite to replace.",
96 p.display()
97 )
98 }
99 OutputError::ParquetRequiresFile => {
100 write!(
101 f,
102 "Parquet format requires file output.\n\n\
103 Use --output/-o to specify an output file:\n\
104 cqlite --out parquet --output results.parquet -e 'SELECT * FROM table'"
105 )
106 }
107 }
108 }
109}
110
111impl std::error::Error for OutputError {
112 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
113 match self {
114 OutputError::Io(e) => Some(e),
115 _ => None,
116 }
117 }
118}
119
120#[cfg(feature = "state_machine")]
125use cqlite_core::query::{QueryMetadata, QueryRow};
126
127#[cfg(feature = "state_machine")]
168pub trait StreamingWriter: Send {
169 fn write_header(&mut self, metadata: &QueryMetadata) -> Result<(), OutputError>;
174
175 fn write_chunk(&mut self, rows: &[QueryRow]) -> Result<usize, OutputError>;
180
181 fn finalize(&mut self) -> Result<(), OutputError>;
186
187 #[allow(dead_code)]
191 fn rows_written(&self) -> u64;
192
193 #[allow(dead_code)]
197 fn bytes_written(&self) -> Option<u64> {
198 None
199 }
200}
201
202#[derive(Debug, Clone, Default)]
204#[allow(dead_code)]
205pub struct StreamingProgress {
206 pub rows_written: u64,
208 pub bytes_written: Option<u64>,
210 pub elapsed_ms: u64,
212 pub total_rows_hint: Option<u64>,
214}
215
216#[allow(dead_code)]
217impl StreamingProgress {
218 pub fn percent(&self) -> Option<f64> {
220 self.total_rows_hint.map(|total| {
221 if total == 0 {
222 100.0
223 } else {
224 (self.rows_written as f64 / total as f64) * 100.0
225 }
226 })
227 }
228
229 pub fn rows_per_second(&self) -> f64 {
231 if self.elapsed_ms == 0 {
232 0.0
233 } else {
234 (self.rows_written as f64) / (self.elapsed_ms as f64 / 1000.0)
235 }
236 }
237}
238
239pub fn write_to_target(
254 content: &[u8],
255 target: &OutputTarget,
256 overwrite: bool,
257) -> Result<(), OutputError> {
258 match target {
259 OutputTarget::Stdout => {
260 std::io::stdout()
261 .write_all(content)
262 .map_err(OutputError::Io)?;
263 std::io::stdout().flush().map_err(OutputError::Io)
264 }
265 OutputTarget::File(path) => write_atomic(path, content, overwrite),
266 }
267}
268
269fn write_atomic(path: &Path, content: &[u8], overwrite: bool) -> Result<(), OutputError> {
273 if let Some(parent) = path.parent() {
275 if !parent.as_os_str().is_empty() && !parent.exists() {
276 return Err(OutputError::DirectoryNotFound(parent.to_path_buf()));
277 }
278 }
279
280 if !overwrite && path.exists() {
282 return Err(OutputError::FileExists(path.to_path_buf()));
283 }
284
285 let extension = path
287 .extension()
288 .unwrap_or_default()
289 .to_string_lossy()
290 .to_string();
291 let temp_path = path.with_extension(format!("{}.tmp.{}", extension, std::process::id()));
292
293 let write_result = (|| {
295 let mut file = File::create(&temp_path).map_err(OutputError::Io)?;
296 file.write_all(content).map_err(OutputError::Io)?;
297 file.sync_all().map_err(OutputError::Io)?;
298 Ok(())
299 })();
300
301 if let Err(e) = write_result {
303 let _ = fs::remove_file(&temp_path);
304 return Err(e);
305 }
306
307 fs::rename(&temp_path, path).map_err(|e| {
309 let _ = fs::remove_file(&temp_path);
311 OutputError::Io(e)
312 })
313}
314
315#[cfg(test)]
316mod tests {
317 use super::*;
318 use tempfile::TempDir;
319
320 #[test]
321 fn test_output_target_default_is_stdout() {
322 let target = OutputTarget::default();
323 assert!(matches!(target, OutputTarget::Stdout));
324 assert!(!target.is_file());
325 }
326
327 #[test]
328 fn test_output_target_file() {
329 let target = OutputTarget::File(PathBuf::from("/tmp/test.json"));
330 assert!(target.is_file());
331 assert_eq!(target.path(), Some(&PathBuf::from("/tmp/test.json")));
332 }
333
334 #[test]
335 fn test_write_to_file_creates_file() {
336 let temp_dir = TempDir::new().unwrap();
337 let file_path = temp_dir.path().join("output.json");
338
339 let content = b"test content";
340 let target = OutputTarget::File(file_path.clone());
341
342 write_to_target(content, &target, false).unwrap();
343
344 assert!(file_path.exists());
345 let written = std::fs::read_to_string(&file_path).unwrap();
346 assert_eq!(written, "test content");
347 }
348
349 #[test]
350 fn test_write_to_file_respects_overwrite_false() {
351 let temp_dir = TempDir::new().unwrap();
352 let file_path = temp_dir.path().join("existing.json");
353
354 std::fs::write(&file_path, "existing content").unwrap();
356
357 let content = b"new content";
358 let target = OutputTarget::File(file_path.clone());
359
360 let result = write_to_target(content, &target, false);
361 assert!(matches!(result, Err(OutputError::FileExists(_))));
362
363 let written = std::fs::read_to_string(&file_path).unwrap();
365 assert_eq!(written, "existing content");
366 }
367
368 #[test]
369 fn test_write_to_file_respects_overwrite_true() {
370 let temp_dir = TempDir::new().unwrap();
371 let file_path = temp_dir.path().join("existing.json");
372
373 std::fs::write(&file_path, "existing content").unwrap();
375
376 let content = b"new content";
377 let target = OutputTarget::File(file_path.clone());
378
379 write_to_target(content, &target, true).unwrap();
380
381 let written = std::fs::read_to_string(&file_path).unwrap();
383 assert_eq!(written, "new content");
384 }
385
386 #[test]
387 fn test_write_to_file_missing_directory_error() {
388 let file_path = PathBuf::from("/nonexistent/directory/output.json");
389 let content = b"test content";
390 let target = OutputTarget::File(file_path);
391
392 let result = write_to_target(content, &target, false);
393 assert!(matches!(result, Err(OutputError::DirectoryNotFound(_))));
394 }
395
396 #[test]
397 fn test_write_atomic_no_temp_file_on_success() {
398 let temp_dir = TempDir::new().unwrap();
399 let file_path = temp_dir.path().join("output.json");
400
401 let content = b"test content";
402 write_atomic(&file_path, content, false).unwrap();
403
404 let entries: Vec<_> = std::fs::read_dir(temp_dir.path())
406 .unwrap()
407 .filter_map(|e| e.ok())
408 .collect();
409 assert_eq!(entries.len(), 1);
410 assert_eq!(entries[0].file_name(), "output.json");
411 }
412
413 #[test]
414 fn test_output_error_display() {
415 let io_err = OutputError::Io(std::io::Error::new(
416 std::io::ErrorKind::NotFound,
417 "test error",
418 ));
419 assert!(io_err.to_string().contains("I/O error"));
420
421 let dir_err = OutputError::DirectoryNotFound(PathBuf::from("/missing"));
422 assert!(dir_err.to_string().contains("Directory not found"));
423
424 let exists_err = OutputError::FileExists(PathBuf::from("/existing"));
425 assert!(exists_err.to_string().contains("File already exists"));
426 assert!(exists_err.to_string().contains("--overwrite"));
427
428 let parquet_err = OutputError::ParquetRequiresFile;
429 assert!(parquet_err.to_string().contains("Parquet format requires"));
430 assert!(parquet_err.to_string().contains("--output"));
431 }
432}