1use arrow::array::*;
4use std::collections::HashMap;
5use std::fs::{File, OpenOptions};
6use std::io::{BufReader, Read, Seek, SeekFrom, Write};
7use std::path::{Path, PathBuf};
8use std::sync::Arc;
9
10use crate::common_config::{CONFIG, StrategicConfig};
11use crate::{ChunkMeta, FileMeta, decompress_archive};
12use anyhow::{Context, Result};
13use arrow::array::{
14 Array, ArrayBuilder, ArrayRef, BooleanArray, BooleanBuilder, FixedSizeBinaryArray, ListArray,
15 ListBuilder, StringArray, StringBuilder, StructArray, StructBuilder, UInt64Array,
16 UInt64Builder, make_builder,
17};
18use arrow::datatypes::{DataType, Field, Fields, Schema};
19use arrow::ipc::reader::FileReader;
20use arrow::record_batch::RecordBatch;
21use once_cell::sync::Lazy;
22
23pub static ZNIPPY_INDEX_SCHEMA: Lazy<Arc<Schema>> = Lazy::new(|| {
24 Arc::new(Schema::new(vec![
25 Field::new("relative_path", DataType::Utf8, false),
26 Field::new("compressed", DataType::Boolean, true),
27 Field::new("uncompressed_size", DataType::UInt64, false),
28 Field::new(
29 "chunks",
30 DataType::List(Arc::new(Field::new(
31 "item",
32 DataType::Struct(Fields::from(vec![
33 Field::new("zdata_offset", DataType::UInt64, false),
34 Field::new("fdata_offset", DataType::UInt64, false),
35 Field::new("length", DataType::UInt64, false),
36 Field::new("chunk_seq", DataType::UInt32, false),
37 Field::new("checksum_group", DataType::UInt8, false),
38 ])),
39 true,
40 ))),
41 true,
42 ),
43 ]))
44});
45
46pub fn build_arrow_metadata_for_checksums_and_config(
47 checksums: &[[u8; 32]],
48 config: &StrategicConfig,
49) -> HashMap<String, String> {
50 let mut metadata = HashMap::new();
51
52 for (i, hash) in checksums.iter().enumerate() {
54 metadata.insert(format!("checksum_group_{}", i), hex::encode(hash));
55 }
56
57 metadata.insert(
59 "max_core_in_flight".into(),
60 config.max_core_in_flight.to_string(),
61 );
62 metadata.insert(
63 "max_core_in_compress".into(),
64 config.max_core_in_compress.to_string(),
65 );
66 metadata.insert("max_mem_allowed".into(), config.max_mem_allowed.to_string());
67 metadata.insert(
68 "min_free_memory_ratio".into(),
69 config.min_free_memory_ratio.to_string(),
70 );
71 metadata.insert(
72 "file_split_block_size".into(),
73 config.file_split_block_size.to_string(),
74 );
75 metadata.insert("max_chunks".into(), config.max_chunks.to_string());
76 metadata.insert(
77 "compression_level".into(),
78 config.compression_level.to_string(),
79 );
80 metadata.insert(
81 "zstd_output_buffer_size".into(),
82 config.zstd_output_buffer_size.to_string(),
83 );
84
85 metadata
86}
87
88pub fn extract_config_from_arrow_metadata(
89 metadata: &std::collections::HashMap<String, String>,
90) -> anyhow::Result<StrategicConfig> {
91 Ok(StrategicConfig {
92 max_core_allowed: 0,
93 max_core_in_flight: metadata
94 .get("max_core_in_flight")
95 .ok_or_else(|| anyhow::anyhow!("Missing 'max_core_in_flight' in metadata"))?
96 .parse()?,
97
98 max_core_in_compress: metadata
99 .get("max_core_in_compress")
100 .ok_or_else(|| anyhow::anyhow!("Missing 'max_core_in_compress' in metadata"))?
101 .parse()?,
102
103 max_mem_allowed: metadata
104 .get("max_mem_allowed")
105 .ok_or_else(|| anyhow::anyhow!("Missing 'max_mem_allowed' in metadata"))?
106 .parse()?,
107
108 min_free_memory_ratio: metadata
109 .get("min_free_memory_ratio")
110 .ok_or_else(|| anyhow::anyhow!("Missing 'min_free_memory_ratio' in metadata"))?
111 .parse()?,
112
113 file_split_block_size: metadata
114 .get("file_split_block_size")
115 .ok_or_else(|| anyhow::anyhow!("Missing 'file_split_block_size' in metadata"))?
116 .parse()?,
117
118 max_chunks: metadata
119 .get("max_chunks")
120 .ok_or_else(|| anyhow::anyhow!("Missing 'max_chunks' in metadata"))?
121 .parse()?,
122
123 compression_level: metadata
124 .get("compression_level")
125 .ok_or_else(|| anyhow::anyhow!("Missing 'compression_level' in metadata"))?
126 .parse()?,
127
128 zstd_output_buffer_size: metadata
129 .get("zstd_output_buffer_size")
130 .ok_or_else(|| anyhow::anyhow!("Missing 'zstd_output_buffer_size' in metadata"))?
131 .parse()?,
132 })
133}
134
135pub fn znippy_index_schema() -> &'static Arc<Schema> {
136 &ZNIPPY_INDEX_SCHEMA
137}
138
139pub fn is_probably_compressed(path: &Path) -> bool {
140 if let Some(ext) = path.extension().and_then(|e| e.to_str()) {
141 let ext = ext.to_ascii_lowercase();
142 matches!(
143 ext.as_str(),
144 "zip"
145 | "gz"
146 | "bz2"
147 | "xz"
148 | "lz"
149 | "lzma"
150 | "7z"
151 | "rar"
152 | "cab"
153 | "jar"
154 | "war"
155 | "ear"
156 | "zst"
157 | "sz"
158 | "lz4"
159 | "tgz"
160 | "txz"
161 | "tbz"
162 | "apk"
163 | "dmg"
164 | "deb"
165 | "rpm"
166 | "arrow"
167 | "mpeg"
168 | "mpg"
169 | "jpeg"
170 | "jpg"
171 | "gif"
172 | "bmp"
173 | "png"
174 | "znippy"
175 | "zdata"
176 | "parquet"
177 | "webp"
178 | "webm"
179 )
180 } else {
181 false
182 }
183}
184
185pub fn should_skip_compression(path: &Path) -> bool {
186 is_probably_compressed(path)
187}
188
189pub fn attach_metadata(
190 rb: RecordBatch,
191 metadata: HashMap<String, String>,
192) -> arrow::error::Result<RecordBatch> {
193 let old_schema = rb.schema();
194
195 let new_schema = Schema::new_with_metadata(old_schema.fields().to_vec(), metadata);
197
198 RecordBatch::try_new(Arc::new(new_schema), rb.columns().to_vec())
200}
201pub fn build_arrow_batch_from_files(
202 files: &[FileMeta],
203 input_dir: &Path,
204) -> arrow::error::Result<RecordBatch> {
205 let schema = ZNIPPY_INDEX_SCHEMA.as_ref().clone();
206
207 let mut relative_path_builder = StringBuilder::new();
208 let mut compressed_builder = BooleanBuilder::new();
209 let mut uncompressed_size_builder = UInt64Builder::new();
210
211 let chunk_struct_builder = StructBuilder::new(
213 vec![
214 Field::new("zdata_offset", DataType::UInt64, false),
215 Field::new("fdata_offset", DataType::UInt64, false),
216 Field::new("length", DataType::UInt64, false),
217 Field::new("chunk_seq", DataType::UInt32, false),
218 Field::new("checksum_group", DataType::UInt8, false),
219 ],
220 vec![
221 Box::new(UInt64Builder::new()),
222 Box::new(UInt64Builder::new()),
223 Box::new(UInt64Builder::new()),
224 Box::new(UInt32Builder::new()),
225 Box::new(UInt8Builder::new()),
226 ],
227 );
228
229 let mut chunks_builder = ListBuilder::new(chunk_struct_builder);
230
231 for file in files {
232 let full_path = Path::new(&file.relative_path);
235 let rel_path = full_path
236 .strip_prefix(input_dir)
237 .expect("❌ build_arrow_batch_from_files: file path is not under input_dir")
238 .to_string_lossy();
239
240 relative_path_builder.append_value(&rel_path);
241
242 compressed_builder.append_value(file.compressed);
243 uncompressed_size_builder.append_value(file.uncompressed_size);
244
245 let struct_builder = chunks_builder.values();
247
248 if file.chunks.is_empty() {
250 chunks_builder.append_null();
251 continue;
252 }
253 log::debug!(
254 "Batch schema stats → file: {} has {} chunks",
255 file.relative_path,
256 file.chunks.len()
257 );
258
259 for chunk in &file.chunks {
261 let zdata_offset_builder = struct_builder.field_builder::<UInt64Builder>(0).unwrap();
263 zdata_offset_builder.append_value(chunk.zdata_offset);
264
265 let fdata_offset_builder = struct_builder.field_builder::<UInt64Builder>(1).unwrap();
267 fdata_offset_builder.append_value(chunk.fdata_offset);
268
269 let length_builder = struct_builder.field_builder::<UInt64Builder>(2).unwrap();
270 length_builder.append_value(chunk.compressed_size);
271
272 let chunk_seq_builder = struct_builder.field_builder::<UInt32Builder>(3).unwrap();
273 chunk_seq_builder.append_value(chunk.chunk_seq);
274
275 let checksum_group_builder = struct_builder.field_builder::<UInt8Builder>(4).unwrap();
276 checksum_group_builder.append_value(chunk.checksum_group);
277
278 struct_builder.append(true); }
280
281 chunks_builder.append(true); }
283
284 RecordBatch::try_new(
285 Arc::new(schema),
286 vec![
287 Arc::new(relative_path_builder.finish()),
288 Arc::new(compressed_builder.finish()),
289 Arc::new(uncompressed_size_builder.finish()),
290 Arc::new(chunks_builder.finish()),
291 ],
292 )
293}
294
295pub fn read_znippy_index(path: &Path) -> Result<(Arc<Schema>, Vec<RecordBatch>)> {
296 let file = File::open(path)?;
297 let reader = FileReader::try_new(BufReader::new(file), None)?;
298 let schema = reader.schema();
299 let batches = reader.collect::<Result<Vec<_>, _>>()?;
300 eprintln!(
301 "Batch schema fields: {:?}",
302 schema.fields().iter().map(|f| f.name()).collect::<Vec<_>>()
303 );
304
305 Ok((schema, batches))
306}
307
308#[derive(Debug, Default)]
309pub struct VerifyReport {
310 pub total_files: usize,
311 pub verified_files: usize,
312 pub corrupt_files: usize,
313 pub total_bytes: u64,
314 pub verified_bytes: u64,
315 pub corrupt_bytes: u64,
316 pub chunks: u64,
317}
318
319pub fn list_archive_contents(path: &Path) -> Result<()> {
320 let (_schema, batches) = read_znippy_index(path)?;
321 for batch in batches {
322 println!("{:?}", batch);
323 }
324 Ok(())
325}
326
327pub fn verify_archive_integrity(path: &Path) -> Result<VerifyReport> {
328 let out_dir = PathBuf::from("/dev/null");
329 decompress_archive(path, false, &out_dir)
330}