use std::io::{BufRead, BufReader, Read, Seek};
use std::os::fd::OwnedFd;
use base64::prelude::*;
use cap_std::fs::{Dir, File};
use crc::{CRC_64_GO_ISO, Crc};
use flate2::read::GzDecoder;
use serde::Deserialize;
use crate::error::{Result, StorageError};
use crate::layer::Layer;
use crate::storage::Storage;
const CRC64_ISO: Crc<u64> = Crc::<u64>::new(&CRC_64_GO_ISO);
#[derive(Debug)]
pub enum TarSplitItem {
Segment(Vec<u8>),
FileContent {
fd: OwnedFd,
size: u64,
name: String,
},
}
#[derive(Debug, Deserialize)]
struct TarSplitEntryRaw {
#[serde(rename = "type")]
type_id: u8,
#[serde(default)]
name: Option<String>,
#[serde(default)]
size: Option<u64>,
#[serde(default)]
crc64: Option<String>,
#[serde(default)]
payload: Option<String>,
}
#[derive(Debug)]
enum TarSplitEntry {
File {
name: Option<String>,
size: Option<u64>,
crc64: Option<String>,
},
Segment {
payload: Option<String>,
},
}
impl TarSplitEntry {
fn from_raw(raw: TarSplitEntryRaw) -> Result<Self> {
match raw.type_id {
1 => Ok(TarSplitEntry::File {
name: raw.name,
size: raw.size,
crc64: raw.crc64,
}),
2 => Ok(TarSplitEntry::Segment {
payload: raw.payload,
}),
_ => Err(StorageError::TarSplitError(format!(
"Invalid tar-split entry type: {}",
raw.type_id
))),
}
}
}
#[derive(Debug, Clone)]
pub struct TarHeader {
pub name: String,
pub mode: u32,
pub uid: u32,
pub gid: u32,
pub size: u64,
pub mtime: i64,
pub typeflag: u8,
pub linkname: String,
pub uname: String,
pub gname: String,
pub devmajor: u32,
pub devminor: u32,
}
impl TarHeader {
pub fn from_bytes(header_bytes: &[u8]) -> Result<Self> {
let header_array: &[u8; tar_core::HEADER_SIZE] = header_bytes.try_into().map_err(|_| {
StorageError::TarSplitError(format!(
"TAR header wrong size: {} bytes (expected {})",
header_bytes.len(),
tar_core::HEADER_SIZE
))
})?;
let header = tar_core::Header::from_bytes(header_array);
let name = String::from_utf8(header.path_bytes().to_vec()).map_err(|e| {
StorageError::TarSplitError(format!("Non-UTF-8 path in TAR header: {}", e))
})?;
let mode = header
.mode()
.map_err(|e| StorageError::TarSplitError(format!("Invalid mode: {}", e)))?;
let uid = header
.uid()
.map_err(|e| StorageError::TarSplitError(format!("Invalid uid: {}", e)))?
as u32;
let gid = header
.gid()
.map_err(|e| StorageError::TarSplitError(format!("Invalid gid: {}", e)))?
as u32;
let size = header
.entry_size()
.map_err(|e| StorageError::TarSplitError(format!("Invalid size: {}", e)))?;
let mtime = header
.mtime()
.map_err(|e| StorageError::TarSplitError(format!("Invalid mtime: {}", e)))?
as i64;
let typeflag = header.entry_type().as_byte();
let link_bytes = header.link_name_bytes();
let linkname = if link_bytes.is_empty() {
String::new()
} else {
String::from_utf8(link_bytes.to_vec()).map_err(|e| {
StorageError::TarSplitError(format!("Non-UTF-8 link name in TAR header: {}", e))
})?
};
let uname = header
.username()
.map(|b| {
String::from_utf8(b.to_vec()).map_err(|e| {
StorageError::TarSplitError(format!("Non-UTF-8 username in TAR header: {}", e))
})
})
.transpose()?
.unwrap_or_default();
let gname = header
.groupname()
.map(|b| {
String::from_utf8(b.to_vec()).map_err(|e| {
StorageError::TarSplitError(format!(
"Non-UTF-8 group name in TAR header: {}",
e
))
})
})
.transpose()?
.unwrap_or_default();
let devmajor = header
.device_major()
.map_err(|e| StorageError::TarSplitError(format!("Invalid devmajor: {}", e)))?
.unwrap_or(0);
let devminor = header
.device_minor()
.map_err(|e| StorageError::TarSplitError(format!("Invalid devminor: {}", e)))?
.unwrap_or(0);
Ok(TarHeader {
name,
mode,
uid,
gid,
size,
mtime,
typeflag,
linkname,
uname,
gname,
devmajor,
devminor,
})
}
pub fn is_regular_file(&self) -> bool {
self.typeflag == b'0' || self.typeflag == b'\0'
}
pub fn is_directory(&self) -> bool {
self.typeflag == b'5'
}
pub fn is_symlink(&self) -> bool {
self.typeflag == b'2'
}
pub fn is_hardlink(&self) -> bool {
self.typeflag == b'1'
}
pub fn normalized_name(&self) -> &str {
self.name.strip_prefix("./").unwrap_or(&self.name)
}
}
#[derive(Debug)]
pub struct TarSplitFdStream {
layer: Layer,
storage_root: Dir,
reader: BufReader<GzDecoder<File>>,
entry_count: usize,
}
impl TarSplitFdStream {
pub fn new(storage: &Storage, layer: &Layer) -> Result<Self> {
let layers_dir = storage.root_dir().open_dir("overlay-layers").map_err(|e| {
StorageError::TarSplitError(format!("Failed to open overlay-layers directory: {}", e))
})?;
let filename = format!("{}.tar-split.gz", layer.id());
let file = layers_dir.open(&filename).map_err(|e| {
StorageError::TarSplitError(format!(
"Failed to open tar-split file {}: {}",
filename, e
))
})?;
let gz_decoder = GzDecoder::new(file);
let reader = BufReader::new(gz_decoder);
let layer = Layer::open(storage, layer.id())?;
let storage_root = storage.root_dir().try_clone()?;
Ok(Self {
layer,
storage_root,
reader,
entry_count: 0,
})
}
fn open_file_in_chain(&self, path: &str) -> Result<cap_std::fs::File> {
let normalized_path = path.strip_prefix("./").unwrap_or(path);
match self.layer.diff_dir().open(normalized_path) {
Ok(file) => return Ok(file),
Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
}
Err(e) => return Err(StorageError::Io(e)),
}
self.search_parent_layers(&self.layer, normalized_path, 0)
}
fn search_parent_layers(
&self,
current_layer: &Layer,
path: &str,
depth: usize,
) -> Result<cap_std::fs::File> {
const MAX_DEPTH: usize = 500;
if depth >= MAX_DEPTH {
return Err(StorageError::TarSplitError(format!(
"Layer chain exceeds maximum depth of {} while searching for file: {}",
MAX_DEPTH, path
)));
}
let parent_links = current_layer.parent_links();
for link_id in parent_links {
let parent_id = self.resolve_link_direct(link_id)?;
match self.open_file_in_layer(&parent_id, path) {
Ok(file) => return Ok(file),
Err(StorageError::Io(e)) if e.kind() == std::io::ErrorKind::NotFound => {
match self.search_by_layer_id(&parent_id, path, depth + 1) {
Ok(file) => return Ok(file),
Err(StorageError::TarSplitError(_)) => continue, Err(e) => return Err(e),
}
}
Err(e) => return Err(e),
}
}
Err(StorageError::TarSplitError(format!(
"File not found in layer chain: {}",
path
)))
}
fn search_by_layer_id(
&self,
layer_id: &str,
path: &str,
depth: usize,
) -> Result<cap_std::fs::File> {
const MAX_DEPTH: usize = 500;
if depth >= MAX_DEPTH {
return Err(StorageError::TarSplitError(format!(
"Layer chain exceeds maximum depth of {} while searching for file: {}",
MAX_DEPTH, path
)));
}
match self.open_file_in_layer(layer_id, path) {
Ok(file) => return Ok(file),
Err(StorageError::Io(e)) if e.kind() == std::io::ErrorKind::NotFound => {
}
Err(e) => return Err(e),
}
let parent_links = self.read_layer_parent_links(layer_id)?;
for link_id in parent_links {
let parent_id = self.resolve_link_direct(&link_id)?;
match self.search_by_layer_id(&parent_id, path, depth + 1) {
Ok(file) => return Ok(file),
Err(StorageError::TarSplitError(_)) => continue, Err(e) => return Err(e),
}
}
Err(StorageError::TarSplitError(format!(
"File not found in layer chain: {}",
path
)))
}
fn resolve_link_direct(&self, link_id: &str) -> Result<String> {
let overlay_dir = self.storage_root.open_dir("overlay")?;
let link_dir = overlay_dir.open_dir("l")?;
let target = link_dir.read_link(link_id).map_err(|e| {
StorageError::LinkReadError(format!("Failed to read link {}: {}", link_id, e))
})?;
let target_str = target.to_str().ok_or_else(|| {
StorageError::LinkReadError("Invalid UTF-8 in link target".to_string())
})?;
let components: Vec<&str> = target_str.split('/').collect();
if components.len() >= 2 {
let layer_id = components[components.len() - 2];
if !layer_id.is_empty() && layer_id != ".." {
return Ok(layer_id.to_string());
}
}
Err(StorageError::LinkReadError(format!(
"Invalid link target format: {}",
target_str
)))
}
fn open_file_in_layer(&self, layer_id: &str, path: &str) -> Result<cap_std::fs::File> {
let overlay_dir = self.storage_root.open_dir("overlay")?;
let layer_dir = overlay_dir.open_dir(layer_id)?;
let diff_dir = layer_dir.open_dir("diff")?;
diff_dir.open(path).map_err(StorageError::Io)
}
fn read_layer_parent_links(&self, layer_id: &str) -> Result<Vec<String>> {
let overlay_dir = self.storage_root.open_dir("overlay")?;
let layer_dir = overlay_dir.open_dir(layer_id)?;
match layer_dir.read_to_string("lower") {
Ok(content) => Ok(content
.trim()
.split(':')
.filter_map(|s| s.strip_prefix("l/"))
.map(|s| s.to_string())
.collect()),
Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(Vec::new()), Err(e) => Err(StorageError::Io(e)),
}
}
fn verify_crc64(
&self,
file: &mut cap_std::fs::File,
expected_b64: &str,
size: u64,
) -> Result<()> {
let expected_bytes = BASE64_STANDARD.decode(expected_b64).map_err(|e| {
StorageError::TarSplitError(format!("Failed to decode base64 CRC64: {}", e))
})?;
if expected_bytes.len() != 8 {
return Err(StorageError::TarSplitError(format!(
"Invalid CRC64 length: {} bytes",
expected_bytes.len()
)));
}
let expected = u64::from_be_bytes(expected_bytes.try_into().unwrap());
let mut digest = CRC64_ISO.digest();
let mut buffer = vec![0u8; 8192];
let mut bytes_read = 0u64;
loop {
let n = file.read(&mut buffer).map_err(|e| {
StorageError::TarSplitError(format!(
"Failed to read file for CRC64 verification: {}",
e
))
})?;
if n == 0 {
break;
}
digest.update(&buffer[..n]);
bytes_read += n as u64;
}
if bytes_read != size {
return Err(StorageError::TarSplitError(format!(
"File size mismatch: expected {}, got {}",
size, bytes_read
)));
}
let computed = digest.finalize();
if computed != expected {
return Err(StorageError::TarSplitError(format!(
"CRC64 mismatch: expected {:016x}, got {:016x}",
expected, computed
)));
}
Ok(())
}
#[allow(clippy::should_implement_trait)]
pub fn next(&mut self) -> Result<Option<TarSplitItem>> {
loop {
let mut line = String::new();
match self.reader.read_line(&mut line) {
Ok(0) => {
return Ok(None);
}
Ok(_) => {
let raw: TarSplitEntryRaw = serde_json::from_str(&line).map_err(|e| {
StorageError::TarSplitError(format!(
"Failed to parse tar-split entry: {}",
e
))
})?;
let entry = TarSplitEntry::from_raw(raw)?;
match entry {
TarSplitEntry::Segment { payload } => {
if let Some(payload_b64) = payload {
let payload_bytes =
BASE64_STANDARD.decode(&payload_b64).map_err(|e| {
StorageError::TarSplitError(format!(
"Failed to decode base64 payload: {}",
e
))
})?;
return Ok(Some(TarSplitItem::Segment(payload_bytes)));
}
}
TarSplitEntry::File { name, size, crc64 } => {
self.entry_count += 1;
let file_size = size.unwrap_or(0);
if file_size > 0 {
let path = name.as_ref().ok_or_else(|| {
StorageError::TarSplitError(
"FileType entry missing name".to_string(),
)
})?;
let mut file = self.open_file_in_chain(path)?;
if let Some(ref crc64_b64) = crc64 {
self.verify_crc64(&mut file, crc64_b64, file_size)?;
file.rewind().map_err(StorageError::Io)?;
}
let std_file = file.into_std();
let owned_fd: OwnedFd = std_file.into();
return Ok(Some(TarSplitItem::FileContent {
fd: owned_fd,
size: file_size,
name: path.clone(),
}));
}
}
}
}
Err(e) => {
return Err(StorageError::TarSplitError(format!(
"Failed to read tar-split line: {}",
e
)));
}
}
}
}
pub fn entry_count(&self) -> usize {
self.entry_count
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_tar_header_type_checks() {
let mut header = TarHeader {
name: "test.txt".to_string(),
mode: 0o644,
uid: 1000,
gid: 1000,
size: 100,
mtime: 0,
typeflag: b'0',
linkname: String::new(),
uname: "user".to_string(),
gname: "group".to_string(),
devmajor: 0,
devminor: 0,
};
assert!(header.is_regular_file());
assert!(!header.is_directory());
assert!(!header.is_symlink());
header.typeflag = b'5';
assert!(!header.is_regular_file());
assert!(header.is_directory());
header.typeflag = b'2';
assert!(header.is_symlink());
}
#[test]
fn test_tar_split_entry_deserialization() {
let json_segment = r#"{"type":2,"payload":"dXN0YXIAMDA="}"#;
let raw: TarSplitEntryRaw = serde_json::from_str(json_segment).unwrap();
let entry = TarSplitEntry::from_raw(raw).unwrap();
match entry {
TarSplitEntry::Segment { payload } => {
assert_eq!(payload, Some("dXN0YXIAMDA=".to_string()));
}
_ => panic!("Expected Segment variant"),
}
let json_file = r#"{"type":1,"name":"./etc/hosts","size":123,"crc64":"AAAAAAAAAA=="}"#;
let raw: TarSplitEntryRaw = serde_json::from_str(json_file).unwrap();
let entry = TarSplitEntry::from_raw(raw).unwrap();
match entry {
TarSplitEntry::File { name, size, crc64 } => {
assert_eq!(name, Some("./etc/hosts".to_string()));
assert_eq!(size, Some(123));
assert_eq!(crc64, Some("AAAAAAAAAA==".to_string()));
}
_ => panic!("Expected File variant"),
}
let json_invalid = r#"{"type":99}"#;
let raw: TarSplitEntryRaw = serde_json::from_str(json_invalid).unwrap();
let result = TarSplitEntry::from_raw(raw);
assert!(result.is_err());
}
}