datasynth_output/
compressed.rs1use std::fs::File;
7use std::io::{self, BufWriter, Write};
8use std::path::{Path, PathBuf};
9
10#[derive(Debug, Clone)]
12pub struct CompressionConfig {
13 pub level: i32,
15 pub threads: u32,
17}
18
19impl Default for CompressionConfig {
20 fn default() -> Self {
21 Self {
22 level: 3,
23 threads: 0,
24 }
25 }
26}
27
28impl CompressionConfig {
29 pub fn with_level(mut self, level: i32) -> Self {
31 self.level = level.clamp(1, 22);
32 self
33 }
34
35 pub fn with_threads(mut self, threads: u32) -> Self {
37 self.threads = threads;
38 self
39 }
40}
41
42pub struct CompressedWriter<'a> {
47 encoder: zstd::Encoder<'a, BufWriter<File>>,
48 bytes_written: u64,
49}
50
51impl<'a> CompressedWriter<'a> {
52 pub fn new(path: &Path, config: &CompressionConfig) -> io::Result<Self> {
54 let file = File::create(path)?;
55 let buf_writer = BufWriter::with_capacity(256 * 1024, file);
56 let mut encoder = zstd::Encoder::new(buf_writer, config.level)?;
57
58 if config.threads > 0 {
60 encoder
61 .set_parameter(zstd::zstd_safe::CParameter::NbWorkers(config.threads))
62 .map_err(|_| io::Error::other("Failed to set zstd worker threads"))?;
63 }
64
65 Ok(Self {
66 encoder,
67 bytes_written: 0,
68 })
69 }
70
71 pub fn bytes_written(&self) -> u64 {
73 self.bytes_written
74 }
75
76 pub fn finish(self) -> io::Result<()> {
78 self.encoder.finish()?;
79 Ok(())
80 }
81}
82
83impl Write for CompressedWriter<'_> {
84 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
85 let n = self.encoder.write(buf)?;
86 self.bytes_written += n as u64;
87 Ok(n)
88 }
89
90 fn flush(&mut self) -> io::Result<()> {
91 self.encoder.flush()
92 }
93}
94
95pub fn compressed_path(path: &Path) -> PathBuf {
97 let mut p = path.as_os_str().to_owned();
98 p.push(".zst");
99 PathBuf::from(p)
100}
101
102#[cfg(test)]
103mod tests {
104 use super::*;
105 use std::io::Read;
106 use tempfile::tempdir;
107
108 #[test]
109 fn test_compressed_writer_roundtrip() {
110 let dir = tempdir().unwrap();
111 let path = dir.path().join("test.csv.zst");
112
113 let config = CompressionConfig::default();
114 let mut writer = CompressedWriter::new(&path, &config).unwrap();
115
116 let data = "id,name,value\n1,hello,42.5\n2,world,99.9\n";
117 writer.write_all(data.as_bytes()).unwrap();
118 writer.finish().unwrap();
119
120 let compressed = std::fs::read(&path).unwrap();
122 let mut decoder = zstd::Decoder::new(&compressed[..]).unwrap();
123 let mut decompressed = String::new();
124 decoder.read_to_string(&mut decompressed).unwrap();
125
126 assert_eq!(decompressed, data);
127 }
128
129 #[test]
130 fn test_compressed_writer_large_data() {
131 let dir = tempdir().unwrap();
132 let path = dir.path().join("large.csv.zst");
133
134 let config = CompressionConfig::default().with_level(3);
135 let mut writer = CompressedWriter::new(&path, &config).unwrap();
136
137 writer.write_all(b"id,name,value\n").unwrap();
139 for i in 0..10_000u32 {
140 let row = format!("{},item_{},{}.{:02}\n", i, i, i * 100, i % 100);
141 writer.write_all(row.as_bytes()).unwrap();
142 }
143 let bytes_written = writer.bytes_written();
144 writer.finish().unwrap();
145
146 let file_size = std::fs::metadata(&path).unwrap().len();
148 assert!(
149 file_size < bytes_written,
150 "Compressed size {} should be less than uncompressed {}",
151 file_size,
152 bytes_written
153 );
154
155 let compressed = std::fs::read(&path).unwrap();
157 let mut decoder = zstd::Decoder::new(&compressed[..]).unwrap();
158 let mut decompressed = String::new();
159 decoder.read_to_string(&mut decompressed).unwrap();
160 assert!(decompressed.starts_with("id,name,value\n"));
161 let line_count = decompressed.lines().count();
162 assert_eq!(line_count, 10_001); }
164
165 #[test]
166 fn test_compressed_path() {
167 let path = Path::new("/tmp/output/data.csv");
168 let cp = compressed_path(path);
169 assert_eq!(cp, PathBuf::from("/tmp/output/data.csv.zst"));
170 }
171
172 #[test]
173 fn test_compression_config() {
174 let config = CompressionConfig::default().with_level(6).with_threads(4);
175 assert_eq!(config.level, 6);
176 assert_eq!(config.threads, 4);
177 }
178
179 #[test]
180 fn test_compression_level_clamp() {
181 let config = CompressionConfig::default().with_level(50);
182 assert_eq!(config.level, 22);
183
184 let config = CompressionConfig::default().with_level(-5);
185 assert_eq!(config.level, 1);
186 }
187}