znippy_common/
index.rs

1// index.rs – innehåller tidigare file_entry.rs samt funktioner som ska exporteras i lib.rs
2
3use arrow::array::*;
4use std::collections::HashMap;
5use std::fs::{File, OpenOptions};
6use std::io::{BufReader, Read, Seek, SeekFrom, Write};
7use std::path::{Path, PathBuf};
8use std::sync::Arc;
9
10use crate::common_config::{CONFIG, StrategicConfig};
11use crate::{ChunkMeta, FileMeta, decompress_archive};
12use anyhow::{Context, Result};
13use arrow::array::{
14    Array, ArrayBuilder, ArrayRef, BooleanArray, BooleanBuilder, FixedSizeBinaryArray, ListArray,
15    ListBuilder, StringArray, StringBuilder, StructArray, StructBuilder, UInt64Array,
16    UInt64Builder, make_builder,
17};
18use arrow::datatypes::{DataType, Field, Fields, Schema};
19use arrow::ipc::reader::FileReader;
20use arrow::record_batch::RecordBatch;
21use once_cell::sync::Lazy;
22
23pub static ZNIPPY_INDEX_SCHEMA: Lazy<Arc<Schema>> = Lazy::new(|| {
24    Arc::new(Schema::new(vec![
25        Field::new("relative_path", DataType::Utf8, false),
26        Field::new("compressed", DataType::Boolean, true),
27        Field::new("uncompressed_size", DataType::UInt64, false),
28        Field::new(
29            "chunks",
30            DataType::List(Arc::new(Field::new(
31                "item",
32                DataType::Struct(Fields::from(vec![
33                    Field::new("zdata_offset", DataType::UInt64, false),
34                    Field::new("fdata_offset", DataType::UInt64, false),
35                    Field::new("length", DataType::UInt64, false),
36                    Field::new("chunk_seq", DataType::UInt32, false),
37                    Field::new("checksum_group", DataType::UInt8, false),
38                ])),
39                true,
40            ))),
41            true,
42        ),
43    ]))
44});
45
46pub fn build_arrow_metadata_for_checksums_and_config(
47    checksums: &[[u8; 32]],
48    config: &StrategicConfig,
49) -> HashMap<String, String> {
50    let mut metadata = HashMap::new();
51
52    // Lägg in checksummor
53    for (i, hash) in checksums.iter().enumerate() {
54        metadata.insert(format!("checksum_group_{}", i), hex::encode(hash));
55    }
56
57    // Lägg in CONFIG-parametrar
58    metadata.insert(
59        "max_core_in_flight".into(),
60        config.max_core_in_flight.to_string(),
61    );
62    metadata.insert(
63        "max_core_in_compress".into(),
64        config.max_core_in_compress.to_string(),
65    );
66    metadata.insert("max_mem_allowed".into(), config.max_mem_allowed.to_string());
67    metadata.insert(
68        "min_free_memory_ratio".into(),
69        config.min_free_memory_ratio.to_string(),
70    );
71    metadata.insert(
72        "file_split_block_size".into(),
73        config.file_split_block_size.to_string(),
74    );
75    metadata.insert("max_chunks".into(), config.max_chunks.to_string());
76    metadata.insert(
77        "compression_level".into(),
78        config.compression_level.to_string(),
79    );
80    metadata.insert(
81        "zstd_output_buffer_size".into(),
82        config.zstd_output_buffer_size.to_string(),
83    );
84
85    metadata
86}
87
88pub fn extract_config_from_arrow_metadata(
89    metadata: &std::collections::HashMap<String, String>,
90) -> anyhow::Result<StrategicConfig> {
91    Ok(StrategicConfig {
92        max_core_allowed: 0,
93        max_core_in_flight: metadata
94            .get("max_core_in_flight")
95            .ok_or_else(|| anyhow::anyhow!("Missing 'max_core_in_flight' in metadata"))?
96            .parse()?,
97
98        max_core_in_compress: metadata
99            .get("max_core_in_compress")
100            .ok_or_else(|| anyhow::anyhow!("Missing 'max_core_in_compress' in metadata"))?
101            .parse()?,
102
103        max_mem_allowed: metadata
104            .get("max_mem_allowed")
105            .ok_or_else(|| anyhow::anyhow!("Missing 'max_mem_allowed' in metadata"))?
106            .parse()?,
107
108        min_free_memory_ratio: metadata
109            .get("min_free_memory_ratio")
110            .ok_or_else(|| anyhow::anyhow!("Missing 'min_free_memory_ratio' in metadata"))?
111            .parse()?,
112
113        file_split_block_size: metadata
114            .get("file_split_block_size")
115            .ok_or_else(|| anyhow::anyhow!("Missing 'file_split_block_size' in metadata"))?
116            .parse()?,
117
118        max_chunks: metadata
119            .get("max_chunks")
120            .ok_or_else(|| anyhow::anyhow!("Missing 'max_chunks' in metadata"))?
121            .parse()?,
122
123        compression_level: metadata
124            .get("compression_level")
125            .ok_or_else(|| anyhow::anyhow!("Missing 'compression_level' in metadata"))?
126            .parse()?,
127
128        zstd_output_buffer_size: metadata
129            .get("zstd_output_buffer_size")
130            .ok_or_else(|| anyhow::anyhow!("Missing 'zstd_output_buffer_size' in metadata"))?
131            .parse()?,
132    })
133}
134
135pub fn znippy_index_schema() -> &'static Arc<Schema> {
136    &ZNIPPY_INDEX_SCHEMA
137}
138
139pub fn is_probably_compressed(path: &Path) -> bool {
140    if let Some(ext) = path.extension().and_then(|e| e.to_str()) {
141        let ext = ext.to_ascii_lowercase();
142        matches!(
143            ext.as_str(),
144            "zip"
145                | "gz"
146                | "bz2"
147                | "xz"
148                | "lz"
149                | "lzma"
150                | "7z"
151                | "rar"
152                | "cab"
153                | "jar"
154                | "war"
155                | "ear"
156                | "zst"
157                | "sz"
158                | "lz4"
159                | "tgz"
160                | "txz"
161                | "tbz"
162                | "apk"
163                | "dmg"
164                | "deb"
165                | "rpm"
166                | "arrow"
167                | "mpeg"
168                | "mpg"
169                | "jpeg"
170                | "jpg"
171                | "gif"
172                | "bmp"
173                | "png"
174                | "znippy"
175                | "zdata"
176                | "parquet"
177                | "webp"
178                | "webm"
179        )
180    } else {
181        false
182    }
183}
184
185pub fn should_skip_compression(path: &Path) -> bool {
186    is_probably_compressed(path)
187}
188
189pub fn attach_metadata(
190    rb: RecordBatch,
191    metadata: HashMap<String, String>,
192) -> arrow::error::Result<RecordBatch> {
193    let old_schema = rb.schema();
194
195    // Clone the schema and apply new metadata
196    let new_schema = Schema::new_with_metadata(old_schema.fields().to_vec(), metadata);
197
198    // Construct a new RecordBatch with the same data but updated schema
199    RecordBatch::try_new(Arc::new(new_schema), rb.columns().to_vec())
200}
201pub fn build_arrow_batch_from_files(
202    files: &[FileMeta],
203    input_dir: &Path,
204) -> arrow::error::Result<RecordBatch> {
205    let schema = ZNIPPY_INDEX_SCHEMA.as_ref().clone();
206
207    let mut relative_path_builder = StringBuilder::new();
208    let mut compressed_builder = BooleanBuilder::new();
209    let mut uncompressed_size_builder = UInt64Builder::new();
210
211    // Create the StructBuilder for chunk data
212    let chunk_struct_builder = StructBuilder::new(
213        vec![
214            Field::new("zdata_offset", DataType::UInt64, false),
215            Field::new("fdata_offset", DataType::UInt64, false),
216            Field::new("length", DataType::UInt64, false),
217            Field::new("chunk_seq", DataType::UInt32, false),
218            Field::new("checksum_group", DataType::UInt8, false),
219        ],
220        vec![
221            Box::new(UInt64Builder::new()),
222            Box::new(UInt64Builder::new()),
223            Box::new(UInt64Builder::new()),
224            Box::new(UInt32Builder::new()),
225            Box::new(UInt8Builder::new()),
226        ],
227    );
228
229    let mut chunks_builder = ListBuilder::new(chunk_struct_builder);
230
231    for file in files {
232        // Append each file's path, compression status, and uncompressed size
233
234        let full_path = Path::new(&file.relative_path);
235        let rel_path = full_path
236            .strip_prefix(input_dir)
237            .expect("❌ build_arrow_batch_from_files: file path is not under input_dir")
238            .to_string_lossy();
239
240        relative_path_builder.append_value(&rel_path);
241
242        compressed_builder.append_value(file.compressed);
243        uncompressed_size_builder.append_value(file.uncompressed_size);
244
245        // Access the values for appending chunk data
246        let struct_builder = chunks_builder.values();
247
248        // If there are no chunks, append null for this file's chunk list
249        if file.chunks.is_empty() {
250            chunks_builder.append_null();
251            continue;
252        }
253        log::debug!(
254            "Batch schema stats → file: {} has  {} chunks",
255            file.relative_path,
256            file.chunks.len()
257        );
258
259        // Iterate over the chunks in the file and append the data to the struct
260        for chunk in &file.chunks {
261            // Check and append values to the struct builder
262            let zdata_offset_builder = struct_builder.field_builder::<UInt64Builder>(0).unwrap();
263            zdata_offset_builder.append_value(chunk.zdata_offset);
264
265            // Check and append values to the struct builder
266            let fdata_offset_builder = struct_builder.field_builder::<UInt64Builder>(1).unwrap();
267            fdata_offset_builder.append_value(chunk.fdata_offset);
268
269            let length_builder = struct_builder.field_builder::<UInt64Builder>(2).unwrap();
270            length_builder.append_value(chunk.compressed_size);
271
272            let chunk_seq_builder = struct_builder.field_builder::<UInt32Builder>(3).unwrap();
273            chunk_seq_builder.append_value(chunk.chunk_seq);
274
275            let checksum_group_builder = struct_builder.field_builder::<UInt8Builder>(4).unwrap();
276            checksum_group_builder.append_value(chunk.checksum_group);
277
278            struct_builder.append(true); // Append this chunk
279        }
280
281        chunks_builder.append(true); // Append this file's chunks to the list
282    }
283
284    RecordBatch::try_new(
285        Arc::new(schema),
286        vec![
287            Arc::new(relative_path_builder.finish()),
288            Arc::new(compressed_builder.finish()),
289            Arc::new(uncompressed_size_builder.finish()),
290            Arc::new(chunks_builder.finish()),
291        ],
292    )
293}
294
295pub fn read_znippy_index(path: &Path) -> Result<(Arc<Schema>, Vec<RecordBatch>)> {
296    let file = File::open(path)?;
297    let reader = FileReader::try_new(BufReader::new(file), None)?;
298    let schema = reader.schema();
299    let batches = reader.collect::<Result<Vec<_>, _>>()?;
300    eprintln!(
301        "Batch schema fields: {:?}",
302        schema.fields().iter().map(|f| f.name()).collect::<Vec<_>>()
303    );
304
305    Ok((schema, batches))
306}
307
308#[derive(Debug, Default)]
309pub struct VerifyReport {
310    pub total_files: usize,
311    pub verified_files: usize,
312    pub corrupt_files: usize,
313    pub total_bytes: u64,
314    pub verified_bytes: u64,
315    pub corrupt_bytes: u64,
316    pub chunks: u64,
317}
318
319pub fn list_archive_contents(path: &Path) -> Result<()> {
320    let (_schema, batches) = read_znippy_index(path)?;
321    for batch in batches {
322        println!("{:?}", batch);
323    }
324    Ok(())
325}
326
327pub fn verify_archive_integrity(path: &Path) -> Result<VerifyReport> {
328    let out_dir = PathBuf::from("/dev/null");
329    decompress_archive(path, false, &out_dir)
330}