use alloc::string::String;
use alloc::vec::Vec;
use std::collections::HashMap;
use std::fs::File;
use std::io::{BufReader, Read, Seek};
use std::path::{Path, PathBuf};
use std::sync::{Arc, Mutex, RwLock};
use zip::ZipArchive;
#[derive(Clone)]
pub enum LazyDataSource {
Zip(Arc<Mutex<ZipSource>>),
Tar(Arc<Mutex<TarSource>>),
LegacyMultiStorage(Arc<Mutex<LegacyMultiStorageSource>>),
}
pub struct ZipSource {
path: PathBuf,
file_list: Vec<(String, u64, u64)>, }
pub struct TarSource {
storage_map: HashMap<String, (usize, usize)>,
storages_data: Vec<u8>,
}
pub struct LegacyMultiStorageSource {
path: PathBuf,
data_offset: u64,
#[allow(dead_code)]
data_size: u64,
storage_map: RwLock<Option<HashMap<String, (u64, u64)>>>,
storage_keys: RwLock<Option<Vec<String>>>,
storage_usage: RwLock<HashMap<String, usize>>, }
impl ZipSource {
pub fn new(path: PathBuf) -> std::io::Result<Self> {
let file = File::open(&path)?;
let reader = BufReader::new(file);
let mut archive = ZipArchive::new(reader)?;
let mut file_list = Vec::new();
for i in 0..archive.len() {
let file = archive.by_index(i)?;
let name = file.name().to_string();
let offset = file.data_start();
let compressed_size = file.compressed_size();
file_list.push((
name,
offset.expect("should have an offset"),
compressed_size,
));
}
Ok(Self { path, file_list })
}
pub fn contains(&self, name: &str) -> bool {
self.file_list.iter().any(|(n, _, _)| n == name)
}
pub fn data_files(&self) -> Vec<String> {
self.file_list
.iter()
.filter(|(name, _, _)| name.starts_with("data/") || name.contains("/data/"))
.filter(|(name, _, _)| !name.ends_with(".pkl") && !name.ends_with("/"))
.map(|(name, _, _)| name.clone())
.collect()
}
pub fn read_file(&self, name: &str) -> std::io::Result<Vec<u8>> {
let file = File::open(&self.path)?;
let reader = BufReader::new(file);
let mut archive = ZipArchive::new(reader)?;
let mut file = archive.by_name(name)?;
let mut contents = Vec::with_capacity(file.size() as usize);
file.read_to_end(&mut contents)?;
Ok(contents)
}
pub fn read_file_range(
&self,
name: &str,
offset: usize,
length: usize,
) -> std::io::Result<Vec<u8>> {
let file = File::open(&self.path)?;
let reader = BufReader::new(file);
let mut archive = ZipArchive::new(reader)?;
let mut file = archive.by_name(name)?;
let mut buffer = vec![0u8; length];
let mut skip_buffer = vec![0u8; offset.min(8192)];
let mut skipped = 0;
while skipped < offset {
let to_skip = (offset - skipped).min(skip_buffer.len());
file.read_exact(&mut skip_buffer[..to_skip])?;
skipped += to_skip;
}
file.read_exact(&mut buffer)?;
Ok(buffer)
}
}
impl LegacyMultiStorageSource {
pub fn new(path: PathBuf, data_offset: u64, data_size: u64) -> Self {
Self {
path,
data_offset,
data_size,
storage_map: RwLock::new(None),
storage_keys: RwLock::new(None),
storage_usage: RwLock::new(HashMap::new()),
}
}
pub fn set_storage_keys(&self, keys: Vec<String>) {
let mut storage_keys = self
.storage_keys
.write()
.unwrap_or_else(|poisoned| poisoned.into_inner());
*storage_keys = Some(keys);
}
pub fn track_storage_usage(&self, storage_key: &str, offset: usize, size: usize) {
let mut usage = self
.storage_usage
.write()
.unwrap_or_else(|poisoned| poisoned.into_inner());
let max_extent = offset + size;
usage
.entry(storage_key.to_string())
.and_modify(|current| *current = (*current).max(max_extent))
.or_insert(max_extent);
drop(usage);
self.try_build_storage_map();
}
fn try_build_storage_map(&self) {
if self
.storage_map
.read()
.unwrap_or_else(|poisoned| poisoned.into_inner())
.is_some()
{
return;
}
let keys_guard = self
.storage_keys
.read()
.unwrap_or_else(|poisoned| poisoned.into_inner());
if let Some(ref keys) = *keys_guard {
let usage = self
.storage_usage
.read()
.unwrap_or_else(|poisoned| poisoned.into_inner());
if keys.iter().all(|k| usage.contains_key(k)) {
let mut map = HashMap::new();
let mut current_offset = 0u64;
for key in keys {
if let Some(&size) = usage.get(key) {
map.insert(key.clone(), (current_offset + 8, size as u64));
current_offset += 8 + size as u64;
}
}
drop(keys_guard);
drop(usage);
let mut storage_map = self
.storage_map
.write()
.unwrap_or_else(|poisoned| poisoned.into_inner());
*storage_map = Some(map);
}
}
}
pub fn read(&self, key: &str) -> std::io::Result<Vec<u8>> {
let storage_key = key.split('/').next_back().unwrap_or(key);
let storage_map = self
.storage_map
.read()
.unwrap_or_else(|poisoned| poisoned.into_inner());
if let Some(ref map) = *storage_map
&& let Some(&(offset, size)) = map.get(storage_key)
{
let mut file = File::open(&self.path)?;
file.seek(std::io::SeekFrom::Start(self.data_offset + offset))?;
let mut buffer = vec![0u8; size as usize];
file.read_exact(&mut buffer)?;
return Ok(buffer);
}
Err(std::io::Error::new(
std::io::ErrorKind::InvalidData,
format!(
"Storage boundaries not available for key '{}'. Cannot perform lazy loading.",
storage_key
),
))
}
}
impl TarSource {
pub fn new(storages_data: Vec<u8>) -> std::io::Result<Self> {
use super::pickle_reader::{read_pickle, storage_type_to_element_size};
use std::io::Cursor;
let mut storage_map = HashMap::new();
let mut pos = 0usize;
let mut cursor = Cursor::new(&storages_data[pos..]);
let storage_count =
if let Ok(super::pickle_reader::Object::Int(count)) = read_pickle(&mut cursor) {
pos += cursor.position() as usize;
count as usize
} else {
0
};
for _i in 0..storage_count {
if pos >= storages_data.len() {
break;
}
let mut cursor = Cursor::new(&storages_data[pos..]);
if let Ok(obj) = read_pickle(&mut cursor) {
let pickle_size = cursor.position() as usize;
pos += pickle_size;
let (storage_key, storage_type) = match obj {
super::pickle_reader::Object::Tuple(tuple) if tuple.len() >= 3 => {
let key = match &tuple[0] {
super::pickle_reader::Object::Int(i) => i.to_string(),
super::pickle_reader::Object::String(s) => s.clone(),
_ => continue,
};
let stype = match &tuple[2] {
super::pickle_reader::Object::Class { name, .. } => name.clone(),
other => {
return Err(std::io::Error::new(
std::io::ErrorKind::InvalidData,
format!("Expected Class for storage type, got {:?}", other),
));
}
};
(key, stype)
}
_ => continue,
};
if pos + 8 > storages_data.len() {
break;
}
let num_elements = u64::from_le_bytes([
storages_data[pos],
storages_data[pos + 1],
storages_data[pos + 2],
storages_data[pos + 3],
storages_data[pos + 4],
storages_data[pos + 5],
storages_data[pos + 6],
storages_data[pos + 7],
]) as usize;
pos += 8;
let element_size = storage_type_to_element_size(&storage_type)
.map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))?;
let data_size = num_elements * element_size;
storage_map.insert(storage_key, (pos, data_size));
pos += data_size;
} else {
break;
}
}
Ok(Self {
storage_map,
storages_data,
})
}
pub fn read_file(&self, key: &str) -> std::io::Result<Vec<u8>> {
let storage_key = key.split('/').next_back().unwrap_or(key);
if let Some(&(offset, size)) = self.storage_map.get(storage_key)
&& offset + size <= self.storages_data.len()
{
return Ok(self.storages_data[offset..offset + size].to_vec());
}
Err(std::io::Error::new(
std::io::ErrorKind::NotFound,
format!("Storage key '{}' not found in TAR archive", storage_key),
))
}
pub fn read_file_range(
&self,
key: &str,
offset: usize,
length: usize,
) -> std::io::Result<Vec<u8>> {
let storage_key = key.split('/').next_back().unwrap_or(key);
if let Some(&(storage_offset, storage_size)) = self.storage_map.get(storage_key)
&& storage_offset + storage_size <= self.storages_data.len()
{
let start = storage_offset + offset;
let end = (storage_offset + offset + length).min(storage_offset + storage_size);
return Ok(self.storages_data[start..end].to_vec());
}
Err(std::io::Error::new(
std::io::ErrorKind::NotFound,
format!("Storage key '{}' not found in TAR archive", storage_key),
))
}
pub fn contains(&self, key: &str) -> bool {
let storage_key = key.split('/').next_back().unwrap_or(key);
self.storage_map.contains_key(storage_key)
}
pub fn keys(&self) -> Vec<String> {
self.storage_map.keys().cloned().collect()
}
}
impl LazyDataSource {
pub fn from_zip(path: impl AsRef<Path>) -> std::io::Result<Self> {
Ok(Self::Zip(Arc::new(Mutex::new(ZipSource::new(
path.as_ref().to_path_buf(),
)?))))
}
pub fn from_tar(storages_data: &[u8]) -> std::io::Result<Self> {
Ok(Self::Tar(Arc::new(Mutex::new(TarSource::new(
storages_data.to_vec(),
)?))))
}
pub fn from_legacy_multi_storage(
path: impl AsRef<Path>,
data_offset: u64,
data_size: u64,
) -> Self {
Self::LegacyMultiStorage(Arc::new(Mutex::new(LegacyMultiStorageSource::new(
path.as_ref().to_path_buf(),
data_offset,
data_size,
))))
}
pub fn read(&self, key: &str) -> std::io::Result<Vec<u8>> {
match self {
Self::Zip(source) => {
let source = source
.lock()
.unwrap_or_else(|poisoned| poisoned.into_inner());
source.read_file(key)
}
Self::Tar(source) => {
let source = source
.lock()
.unwrap_or_else(|poisoned| poisoned.into_inner());
source.read_file(key)
}
Self::LegacyMultiStorage(source) => {
let source = source
.lock()
.unwrap_or_else(|poisoned| poisoned.into_inner());
source.read(key)
}
}
}
pub fn read_range(&self, key: &str, offset: usize, length: usize) -> std::io::Result<Vec<u8>> {
match self {
Self::Zip(source) => {
let source = source
.lock()
.unwrap_or_else(|poisoned| poisoned.into_inner());
source.read_file_range(key, offset, length)
}
Self::Tar(source) => {
let source = source
.lock()
.unwrap_or_else(|poisoned| poisoned.into_inner());
source.read_file_range(key, offset, length)
}
Self::LegacyMultiStorage(source) => {
let storage_key = key.split('/').next_back().unwrap_or(key);
let source = source
.lock()
.unwrap_or_else(|poisoned| poisoned.into_inner());
let storage_map = source
.storage_map
.read()
.unwrap_or_else(|poisoned| poisoned.into_inner());
if let Some(ref map) = *storage_map
&& let Some(&(storage_offset, storage_size)) = map.get(storage_key)
{
let file_offset = source.data_offset + storage_offset + offset as u64;
let read_length = length.min((storage_size as usize).saturating_sub(offset));
let mut file = File::open(&source.path)?;
file.seek(std::io::SeekFrom::Start(file_offset))?;
let mut buffer = vec![0u8; read_length];
file.read_exact(&mut buffer)?;
Ok(buffer)
} else {
Err(std::io::Error::new(
std::io::ErrorKind::InvalidData,
format!(
"Storage boundaries not available for key '{}'. Cannot perform lazy loading.",
storage_key
),
))
}
}
}
}
pub fn contains(&self, key: &str) -> bool {
match self {
Self::Zip(source) => {
let source = source
.lock()
.unwrap_or_else(|poisoned| poisoned.into_inner());
source.contains(key)
}
Self::Tar(source) => {
let source = source
.lock()
.unwrap_or_else(|poisoned| poisoned.into_inner());
source.contains(key)
}
Self::LegacyMultiStorage(_) => true, }
}
pub fn keys(&self) -> Vec<String> {
match self {
Self::Zip(source) => {
let source = source
.lock()
.unwrap_or_else(|poisoned| poisoned.into_inner());
source.data_files()
}
Self::Tar(source) => {
let source = source
.lock()
.unwrap_or_else(|poisoned| poisoned.into_inner());
source.keys()
}
Self::LegacyMultiStorage(_) => vec![], }
}
}