Skip to main content

hexz_cli/cmd/data/
convert.rs

1//! Convert external data formats into Hexz snapshots.
2//!
3//! Supports:
4//! - **tar**: Pure Rust via the `tar` crate (streaming, no extraction)
5//! - **hdf5**: Delegates to Python (`hexz.convert()`)
6//! - **webdataset**: Delegates to Python (`hexz.convert()`)
7
8use crate::ui::progress::create_progress_bar;
9use anyhow::{Context, Result, bail};
10use hexz_core::algo::compression::create_compressor_from_str;
11use hexz_core::ops::snapshot_writer::SnapshotWriter;
12use std::io::Read;
13use std::path::PathBuf;
14use std::sync::{Arc, Mutex};
15
16/// Execute the convert command.
17#[allow(clippy::too_many_arguments)]
18pub fn run(
19    format: String,
20    input: PathBuf,
21    output: PathBuf,
22    compression: String,
23    block_size: u32,
24    profile: Option<String>,
25    silent: bool,
26) -> Result<()> {
27    match format.to_lowercase().as_str() {
28        "tar" => convert_tar(input, output, compression, block_size, silent),
29        "hdf5" | "webdataset" => convert_via_python(
30            &format,
31            input,
32            output,
33            compression,
34            block_size,
35            profile,
36            silent,
37        ),
38        other => bail!("Unknown format: {other:?}. Supported formats: tar, hdf5, webdataset"),
39    }
40}
41
42/// Convert a tar archive to a Hexz snapshot using pure Rust.
43///
44/// Streams tar entries directly through the SnapshotWriter without
45/// extracting to disk. Stores a file manifest in snapshot metadata.
46fn convert_tar(
47    input: PathBuf,
48    output: PathBuf,
49    compression: String,
50    block_size: u32,
51    silent: bool,
52) -> Result<()> {
53    // Calculate total size for progress bar
54    let total_size = std::fs::metadata(&input)
55        .with_context(|| format!("Cannot read input file: {}", input.display()))?
56        .len();
57
58    // Set up progress bar
59    let pb = if !silent {
60        let pb = create_progress_bar(total_size);
61        Some(Arc::new(Mutex::new(pb)))
62    } else {
63        None
64    };
65
66    // Create compressor and snapshot writer
67    let (compressor, compression_type) =
68        create_compressor_from_str(&compression, None, None).map_err(|e| anyhow::anyhow!("{e}"))?;
69
70    let mut writer = SnapshotWriter::builder(&output, compressor, compression_type)
71        .block_size(block_size)
72        .build()
73        .map_err(|e| anyhow::anyhow!("{e}"))?;
74
75    // Open tar archive (supports .tar, .tar.gz, .tar.bz2, .tar.xz)
76    let file = std::fs::File::open(&input)
77        .with_context(|| format!("Cannot open tar file: {}", input.display()))?;
78
79    let mut archive = tar::Archive::new(file);
80
81    // Track file manifest for metadata
82    let mut source_files: Vec<serde_json::Value> = Vec::new();
83    let mut total_bytes: u64 = 0;
84    let mut bytes_from_archive: u64 = 0;
85
86    // Begin a disk stream for the tar data
87    // We'll set total_size after reading all entries by using a two-pass approach,
88    // but for streaming we start with the tar file size as an estimate.
89    writer.begin_stream(true, total_size);
90
91    for entry_result in archive.entries()? {
92        let mut entry = entry_result?;
93        let header = entry.header();
94
95        // Skip non-file entries (directories, symlinks, etc.)
96        if !header.entry_type().is_file() {
97            continue;
98        }
99
100        let name = entry.path()?.to_string_lossy().to_string();
101        let size = header.size()?;
102
103        // Read the entry data and write in blocks
104        let mut remaining = size;
105        let mut buf = vec![0u8; block_size as usize];
106
107        while remaining > 0 {
108            let to_read = std::cmp::min(remaining as usize, buf.len());
109            entry.read_exact(&mut buf[..to_read])?;
110
111            writer
112                .write_data_block(&buf[..to_read])
113                .map_err(|e| anyhow::anyhow!("{e}"))?;
114
115            remaining -= to_read as u64;
116            bytes_from_archive += to_read as u64;
117
118            if let Some(ref pb) = pb {
119                if let Ok(pb) = pb.lock() {
120                    // Approximate progress based on bytes read from archive
121                    pb.set_position(std::cmp::min(bytes_from_archive, total_size));
122                }
123            }
124        }
125
126        source_files.push(serde_json::json!({
127            "name": name,
128            "size": size,
129            "offset": total_bytes,
130        }));
131        total_bytes += size;
132    }
133
134    writer.end_stream().map_err(|e| anyhow::anyhow!("{e}"))?;
135
136    // Build metadata JSON
137    let metadata = serde_json::json!({
138        "source": {
139            "format": "tar",
140            "original_path": input.file_name().unwrap_or_default().to_string_lossy(),
141            "total_files": source_files.len(),
142            "total_bytes": total_bytes,
143            "source_files": source_files,
144        }
145    });
146    let meta_bytes = serde_json::to_vec(&metadata)?;
147
148    writer
149        .finalize(None, Some(&meta_bytes))
150        .map_err(|e| anyhow::anyhow!("{e}"))?;
151
152    if let Some(ref pb) = pb {
153        if let Ok(pb) = pb.lock() {
154            pb.finish_with_message("Done");
155        }
156    }
157
158    if !silent {
159        println!(
160            "Converted {} files ({} bytes) from tar archive",
161            source_files.len(),
162            total_bytes
163        );
164        println!("Snapshot created: {}", output.display());
165    }
166
167    Ok(())
168}
169
170/// Convert hdf5/webdataset by delegating to Python hexz.convert().
171fn convert_via_python(
172    format: &str,
173    input: PathBuf,
174    output: PathBuf,
175    compression: String,
176    block_size: u32,
177    profile: Option<String>,
178    silent: bool,
179) -> Result<()> {
180    if !silent {
181        println!("Converting {format} via Python...");
182    }
183
184    let profile_arg = match profile {
185        Some(ref p) => format!(", profile={p:?}"),
186        None => String::new(),
187    };
188
189    let python_code = format!(
190        r#"import hexz; hexz.convert({input:?}, {output:?}, format={format:?}, compression={compression:?}, block_size={block_size}{profile_arg})"#,
191        input = input.display().to_string(),
192        output = output.display().to_string(),
193    );
194
195    let status = std::process::Command::new("python3")
196        .arg("-c")
197        .arg(&python_code)
198        .status()
199        .context(
200            "Failed to run Python. Ensure Python 3 and the hexz package are installed.\n\
201             Install with: pip install hexz[hdf5]",
202        )?;
203
204    if !status.success() {
205        bail!(
206            "Python conversion failed (exit code: {:?}). \
207             Ensure the hexz Python package is installed: pip install hexz",
208            status.code()
209        );
210    }
211
212    if !silent {
213        println!("Snapshot created: {}", output.display());
214    }
215
216    Ok(())
217}