use std::io::{self, Read, Seek, SeekFrom};
use crate::tree::TreeNode;
const QCOW2_MAGIC: u32 = 0x5146_49fb;
const HEADER_SIZE: usize = 72;
const CLUSTER_BITS_MIN: u32 = 9;
const CLUSTER_BITS_MAX: u32 = 21;
#[derive(Debug)]
pub enum Error {
TooShort,
BadMagic,
UnsupportedVersion(u32),
Encrypted,
BadClusterBits(u32),
Io(io::Error),
}
impl std::fmt::Display for Error {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Error::TooShort => {
write!(f, "QCOW2 file is shorter than the 72-byte header")
}
Error::BadMagic => {
write!(f, "QCOW2 magic 0x514649fb not found at offset 0")
}
Error::UnsupportedVersion(v) => {
write!(f, "QCOW2 version {v} is not supported (only 2 and 3)")
}
Error::Encrypted => {
write!(f, "QCOW2 image is encrypted; cannot read without key")
}
Error::BadClusterBits(b) => {
write!(f, "QCOW2 cluster_bits {b} is out of range (must be 9..=21)")
}
Error::Io(e) => write!(f, "QCOW2 I/O error: {e}"),
}
}
}
impl std::error::Error for Error {
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
match self {
Error::Io(e) => Some(e),
_ => None,
}
}
}
impl From<io::Error> for Error {
fn from(e: io::Error) -> Self {
Error::Io(e)
}
}
struct Header {
disk_size: u64,
}
fn read_header<R: Read + Seek>(r: &mut R) -> Result<Header, Error> {
let mut buf = [0u8; HEADER_SIZE];
r.read_exact(&mut buf).map_err(|e| {
if e.kind() == io::ErrorKind::UnexpectedEof {
Error::TooShort
} else {
Error::Io(e)
}
})?;
let magic = u32::from_be_bytes([buf[0], buf[1], buf[2], buf[3]]);
if magic != QCOW2_MAGIC {
return Err(Error::BadMagic);
}
let version = u32::from_be_bytes([buf[4], buf[5], buf[6], buf[7]]);
if version != 2 && version != 3 {
return Err(Error::UnsupportedVersion(version));
}
let cluster_bits = u32::from_be_bytes([buf[20], buf[21], buf[22], buf[23]]);
if !(CLUSTER_BITS_MIN..=CLUSTER_BITS_MAX).contains(&cluster_bits) {
return Err(Error::BadClusterBits(cluster_bits));
}
let disk_size = u64::from_be_bytes([
buf[24], buf[25], buf[26], buf[27], buf[28], buf[29], buf[30], buf[31],
]);
let encryption_method = u32::from_be_bytes([buf[32], buf[33], buf[34], buf[35]]);
if encryption_method != 0 {
return Err(Error::Encrypted);
}
Ok(Header { disk_size })
}
pub fn detect<R: Read + Seek>(r: &mut R) -> Result<(), Error> {
let pos = r.stream_position()?;
let result = detect_inner(r);
let _ = r.seek(SeekFrom::Start(pos));
result
}
fn detect_inner<R: Read + Seek>(r: &mut R) -> Result<(), Error> {
r.seek(SeekFrom::Start(0))?;
let mut magic_buf = [0u8; 4];
match r.read_exact(&mut magic_buf) {
Ok(()) => {}
Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => return Err(Error::TooShort),
Err(e) => return Err(Error::Io(e)),
}
let magic = u32::from_be_bytes(magic_buf);
if magic != QCOW2_MAGIC {
return Err(Error::BadMagic);
}
Ok(())
}
pub fn detect_and_parse<R: Read + Seek>(r: &mut R) -> Result<TreeNode, Error> {
r.seek(SeekFrom::Start(0))?;
let hdr = read_header(r)?;
let disk_size = hdr.disk_size;
let mut root = TreeNode::new_directory("/".to_string());
let mut disk_node = TreeNode::new_file("disk.qcow2".to_string(), disk_size);
disk_node.file_length = Some(disk_size);
root.add_child(disk_node);
root.calculate_directory_size();
Ok(root)
}
#[cfg(test)]
mod tests {
use super::*;
use std::io::Cursor;
const DEFAULT_VERSION: u32 = 2;
const DEFAULT_CLUSTER_BITS: u32 = 16;
const DEFAULT_DISK_SIZE: u64 = 10 * 1024 * 1024; const DEFAULT_ENCRYPTION: u32 = 0;
fn build_header(
version: u32,
cluster_bits: u32,
disk_size: u64,
encryption_method: u32,
) -> Vec<u8> {
let mut buf = vec![0u8; HEADER_SIZE];
buf[0..4].copy_from_slice(&QCOW2_MAGIC.to_be_bytes());
buf[4..8].copy_from_slice(&version.to_be_bytes());
buf[20..24].copy_from_slice(&cluster_bits.to_be_bytes());
buf[24..32].copy_from_slice(&disk_size.to_be_bytes());
buf[32..36].copy_from_slice(&encryption_method.to_be_bytes());
buf[36..40].copy_from_slice(&1u32.to_be_bytes());
buf[40..48].copy_from_slice(&196608u64.to_be_bytes());
buf[48..56].copy_from_slice(&65536u64.to_be_bytes());
buf[56..60].copy_from_slice(&1u32.to_be_bytes());
buf
}
fn minimal_image(
version: u32,
cluster_bits: u32,
disk_size: u64,
encryption_method: u32,
) -> Vec<u8> {
build_header(version, cluster_bits, disk_size, encryption_method)
}
#[test]
fn detect_v2_ok() {
let img = minimal_image(
DEFAULT_VERSION,
DEFAULT_CLUSTER_BITS,
DEFAULT_DISK_SIZE,
DEFAULT_ENCRYPTION,
);
let mut c = Cursor::new(&img);
assert!(detect(&mut c).is_ok(), "should detect valid QCOW2 v2");
}
#[test]
fn detect_v3_ok() {
let img = minimal_image(
3,
DEFAULT_CLUSTER_BITS,
DEFAULT_DISK_SIZE,
DEFAULT_ENCRYPTION,
);
let mut c = Cursor::new(&img);
assert!(detect(&mut c).is_ok(), "should detect valid QCOW2 v3");
}
#[test]
fn detect_restores_position() {
let img = minimal_image(
DEFAULT_VERSION,
DEFAULT_CLUSTER_BITS,
DEFAULT_DISK_SIZE,
DEFAULT_ENCRYPTION,
);
let mut c = Cursor::new(&img);
c.seek(SeekFrom::Start(7)).unwrap();
detect(&mut c).unwrap();
assert_eq!(
c.stream_position().unwrap(),
7,
"detect() must restore stream position"
);
}
#[test]
fn detect_rejects_bad_magic() {
let img = vec![0u8; HEADER_SIZE];
let mut c = Cursor::new(&img);
assert!(
matches!(detect(&mut c), Err(Error::BadMagic)),
"all-zeros should fail with BadMagic"
);
}
#[test]
fn detect_rejects_too_short() {
let img = vec![0u8; 3]; let mut c = Cursor::new(&img);
assert!(
matches!(detect(&mut c), Err(Error::TooShort)),
"3-byte image should fail with TooShort"
);
}
#[test]
fn parse_v2_tree_shape() {
let img = minimal_image(
DEFAULT_VERSION,
DEFAULT_CLUSTER_BITS,
DEFAULT_DISK_SIZE,
DEFAULT_ENCRYPTION,
);
let mut c = Cursor::new(&img);
let root = detect_and_parse(&mut c).expect("parse QCOW2 v2");
assert_eq!(root.name, "/");
assert!(root.is_directory);
assert_eq!(root.children.len(), 1);
let child = &root.children[0];
assert_eq!(child.name, "disk.qcow2");
assert!(!child.is_directory);
assert!(child.children.is_empty());
}
#[test]
fn parse_disk_size_reported() {
let disk_size: u64 = 20 * 1024 * 1024; let img = minimal_image(
DEFAULT_VERSION,
DEFAULT_CLUSTER_BITS,
disk_size,
DEFAULT_ENCRYPTION,
);
let mut c = Cursor::new(&img);
let root = detect_and_parse(&mut c).expect("parse QCOW2");
let disk = &root.children[0];
assert_eq!(
disk.size, disk_size,
"disk.qcow2 size should equal disk_size from header"
);
assert_eq!(
disk.file_length,
Some(disk_size),
"disk.qcow2 file_length should equal disk_size"
);
}
#[test]
fn parse_file_location_is_none() {
let img = minimal_image(
DEFAULT_VERSION,
DEFAULT_CLUSTER_BITS,
DEFAULT_DISK_SIZE,
DEFAULT_ENCRYPTION,
);
let mut c = Cursor::new(&img);
let root = detect_and_parse(&mut c).expect("parse QCOW2");
let disk = &root.children[0];
assert_eq!(
disk.file_location, None,
"QCOW2 disk.qcow2 must have file_location=None (L1/L2 indirection)"
);
}
#[test]
fn parse_v3_ok() {
let img = minimal_image(
3,
DEFAULT_CLUSTER_BITS,
DEFAULT_DISK_SIZE,
DEFAULT_ENCRYPTION,
);
let mut c = Cursor::new(&img);
let root = detect_and_parse(&mut c).expect("parse QCOW2 v3");
assert_eq!(root.children.len(), 1);
assert_eq!(root.children[0].name, "disk.qcow2");
}
#[test]
fn parse_rejects_unsupported_version() {
let img = minimal_image(
1,
DEFAULT_CLUSTER_BITS,
DEFAULT_DISK_SIZE,
DEFAULT_ENCRYPTION,
);
let mut c = Cursor::new(&img);
assert!(
matches!(detect_and_parse(&mut c), Err(Error::UnsupportedVersion(1))),
"version=1 should fail with UnsupportedVersion(1)"
);
}
#[test]
fn parse_rejects_encrypted() {
let img = minimal_image(DEFAULT_VERSION, DEFAULT_CLUSTER_BITS, DEFAULT_DISK_SIZE, 1);
let mut c = Cursor::new(&img);
assert!(
matches!(detect_and_parse(&mut c), Err(Error::Encrypted)),
"encryption_method=1 should fail with Encrypted"
);
}
#[test]
fn parse_rejects_bad_cluster_bits() {
let img = minimal_image(DEFAULT_VERSION, 8, DEFAULT_DISK_SIZE, DEFAULT_ENCRYPTION);
let mut c = Cursor::new(&img);
assert!(
matches!(detect_and_parse(&mut c), Err(Error::BadClusterBits(8))),
"cluster_bits=8 should fail with BadClusterBits(8)"
);
}
#[test]
fn error_display_too_short() {
assert!(
format!("{}", Error::TooShort).contains("72")
|| format!("{}", Error::TooShort).contains("short")
);
}
#[test]
fn error_display_bad_magic() {
assert!(
format!("{}", Error::BadMagic).contains("514649fb")
|| format!("{}", Error::BadMagic).contains("magic")
);
}
#[test]
fn error_display_unsupported_version() {
assert!(format!("{}", Error::UnsupportedVersion(5)).contains('5'));
}
#[test]
fn error_display_encrypted() {
assert!(format!("{}", Error::Encrypted).contains("encrypt"));
}
#[test]
fn error_display_bad_cluster_bits() {
assert!(format!("{}", Error::BadClusterBits(7)).contains('7'));
}
#[test]
fn error_display_io() {
let io = io::Error::other("disk");
assert!(format!("{}", Error::Io(io)).contains("disk"));
}
#[test]
fn error_source_io() {
use std::error::Error as StdError;
assert!(Error::Io(io::Error::other("s")).source().is_some());
}
#[test]
fn error_source_non_io() {
use std::error::Error as StdError;
assert!(Error::TooShort.source().is_none());
assert!(Error::BadMagic.source().is_none());
assert!(Error::Encrypted.source().is_none());
assert!(Error::UnsupportedVersion(2).source().is_none());
assert!(Error::BadClusterBits(5).source().is_none());
}
#[test]
fn error_from_io_error() {
let e = Error::from(io::Error::other("qcow2 test"));
assert!(matches!(e, Error::Io(_)));
}
#[test]
fn read_header_too_short_returns_error() {
let data: &[u8] = &[];
let mut c = Cursor::new(data);
assert!(matches!(read_header(&mut c), Err(Error::TooShort)));
}
#[test]
fn read_header_bad_magic_returns_error() {
let data = vec![0u8; HEADER_SIZE];
let mut c = Cursor::new(data);
assert!(matches!(read_header(&mut c), Err(Error::BadMagic)));
}
}