use flate2;
use flate2::read::ZlibDecoder;
use flate2::write::ZlibEncoder;
use ring::digest;
use rusqlite::{params, params_from_iter, Connection};
use std::fs::{self, File};
use std::io::{self, Read, Seek, SeekFrom, Take};
use std::path::{Path, PathBuf};
use zstd::stream::write::Encoder as ZstdEncoder;
use crate::container::Compression;
use crate::db::PackEntry;
use crate::io::{copy_by_chunk, ByteString, HashWriter, MaybeContentFormat, ReaderMaker};
use crate::{db, Container};
use crate::utils::Dir;
use crate::Error;
pub struct PObject {
pub id: String,
pub loc: PathBuf,
pub offset: u64,
pub raw_size: u64, pub size: u64,
pub compressed: bool,
}
impl PObject {
fn new<P: AsRef<Path>>(
id: &str,
loc: P,
offset: u64,
raw_size: u64,
size: u64,
compressed: bool,
) -> Self {
Self {
id: id.to_string(),
loc: loc.as_ref().to_path_buf(),
offset,
raw_size,
size,
compressed,
}
}
}
impl TryFrom<PObject> for ByteString {
type Error = Error;
fn try_from(obj: PObject) -> Result<Self, Self::Error> {
let mut rdr = obj.make_reader()?;
let mut buf = vec![];
let n = std::io::copy(&mut rdr, &mut buf)?;
if n == obj.raw_size {
Ok(buf)
} else {
Err(Error::UnexpectedCopySize {
expected: obj.raw_size,
got: n,
})
}
}
}
enum PReader {
Uncompressed(Take<File>),
Zlib(ZlibDecoder<Take<File>>),
}
impl Read for PReader {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
match self {
PReader::Zlib(inner) => inner.read(buf),
PReader::Uncompressed(inner) => inner.read(buf),
}
}
}
impl ReaderMaker for PObject {
fn make_reader(&self) -> Result<impl Read, crate::Error> {
let mut f = fs::OpenOptions::new().read(true).open(&self.loc)?;
f.seek(SeekFrom::Start(self.offset))?;
if self.compressed {
let rdr = PReader::Zlib(ZlibDecoder::new(f.take(self.size)));
Ok(rdr)
} else {
let rdr = PReader::Uncompressed(f.take(self.size));
Ok(rdr)
}
}
}
pub fn extract(hashkey: &str, cnt: &Container) -> Result<Option<PObject>, Error> {
cnt.valid()?;
let conn = Connection::open(cnt.packs_db())?;
if let Some(pn) = db::select(&conn, hashkey)? {
let pack_id = pn.pack_id;
let loc = cnt.packs().join(format!("{pack_id}"));
let obj = PObject::new(hashkey, loc, pn.offset, pn.raw_size, pn.size, pn.compressed);
Ok(Some(obj))
} else {
Ok(None)
}
}
fn _chunked<I>(mut iter: I, chunk_size: usize) -> impl Iterator<Item = Vec<I::Item>>
where
I: Iterator,
{
std::iter::from_fn(move || {
let chunk: Vec<_> = iter.by_ref().take(chunk_size).collect();
if chunk.is_empty() {
None
} else {
Some(chunk)
}
})
}
pub fn extract_many<'a, I>(
hashkeys: I,
cnt: &'a Container,
) -> Result<impl Iterator<Item = PObject> + 'a, Error>
where
I: IntoIterator + 'a,
I::Item: ToString,
{
cnt.valid()?;
let _max_chunk_iterate_length = 9500;
let in_sql_max_length = 950;
let conn = Connection::open(cnt.packs_db())?;
let chunked_iter = _chunked(hashkeys.into_iter(), in_sql_max_length);
let iter_vec = chunked_iter.flat_map(move |chunk| {
let placeholders: Vec<&str> = (0..chunk.len()).map(|_| "?").collect();
let mut stmt = conn.prepare_cached(&format!("SELECT hashkey, compressed, size, offset, length, pack_id FROM db_object WHERE hashkey IN ({})", placeholders.join(","))).unwrap();
let chunk = chunk.into_iter().map(|x| x.to_string());
let rows = stmt
.query_map(params_from_iter(chunk), |row| {
let hashkey: String = row.get(0)?;
let compressed: bool = row.get(1)?;
let raw_size: u64 = row.get(2)?;
let offset: u64 = row.get(3)?;
let size: u64 = row.get(4)?;
let pack_id: u64 = row.get(5)?;
Ok(PackEntry {
hashkey,
compressed,
raw_size,
size,
offset,
pack_id,
})
}).expect("query failed");
let mut rows = rows
.filter_map(Result::ok)
.collect::<Vec<_>>();
std::iter::from_fn(move || {
if let Some(pn) = rows.pop() {
let pack_id = pn.pack_id;
let packs_path = cnt.packs();
let loc = packs_path.join(format!("{pack_id}"));
let obj = PObject::new(
&pn.hashkey,
loc,
pn.offset,
pn.raw_size,
pn.size,
pn.compressed,
);
Some(obj)
} else {
None
}
})
});
Ok(iter_vec)
}
pub fn insert<T>(source: T, cnt: &Container) -> Result<(u64, u64, String), Error>
where
T: ReaderMaker,
{
let dpath_string = format!("{}.tmp", uuid::Uuid::new_v4());
let dst = cnt.sandbox().join(&dpath_string);
let mut sandbox_tmp = fs::File::create(&dst)?;
let mut hwriter = HashWriter::new(&mut sandbox_tmp, &digest::SHA256);
let chunk_size = 524_288; let mut stream = source.make_reader()?;
let bytes_copied = copy_by_chunk(&mut stream, &mut hwriter, chunk_size)
.map_err(|err| Error::ChunkCopyError { source: err })?;
let hash = hwriter.ctx.finish();
let hash_hex = hex::encode(hash);
let conn = Connection::open(cnt.packs_db())?;
if (db::select(&conn, hash_hex.as_str())?).is_some() {
eprintln!("{hash_hex} exist");
fs::remove_file(&dst)?;
return Ok((bytes_copied, bytes_copied, hash_hex));
}
let (bytes_read, bytes_write, hash_hex) = insert_many(vec![dst], cnt)?
.first()
.map(|(n_r, n_w, hash)| (*n_r, *n_w, hash.clone()))
.expect("problem when insert many to pack");
fs::remove_file(cnt.sandbox().join(&dpath_string))?;
Ok((bytes_read, bytes_write, hash_hex))
}
fn find_current_pack_id(packs: &PathBuf, pack_size_target: u64) -> Result<u64, Error> {
if Dir(packs).is_empty()? {
fs::File::create(packs.join("0"))?;
}
let mut current_pack_id = 0;
for entry in packs.read_dir()? {
let path = entry?.path();
if let Some(filename) = path.file_name() {
let n = filename.to_string_lossy();
let n = n
.parse::<u64>()
.map_err(|err| Error::ParsePackFilenameError {
source: err,
n: n.to_string(),
})?;
current_pack_id = std::cmp::max(current_pack_id, n);
}
}
let p = Dir(packs).at_path(&format!("{current_pack_id}"));
let fpack = fs::OpenOptions::new().read(true).open(p)?;
if fpack.metadata()?.len() >= pack_size_target {
current_pack_id += 1;
let p = Dir(packs).at_path(&format!("{current_pack_id}"));
fs::OpenOptions::new()
.create(true)
.write(true)
.truncate(true)
.open(&p)
.map_err(|err| Error::IoOpen {
source: err,
path: p,
})?;
}
Ok(current_pack_id)
}
pub fn insert_many<I>(sources: I, cnt: &Container) -> Result<Vec<(u64, u64, String)>, Error>
where
I: IntoIterator,
I::Item: ReaderMaker,
{
let compression = cnt.compression()?;
_insert_many_internal(sources, cnt, &compression)
}
pub fn _insert_many_internal<I>(
sources: I,
cnt: &Container,
compression: &Compression,
) -> Result<Vec<(u64, u64, String)>, Error>
where
I: IntoIterator,
I::Item: ReaderMaker,
{
cnt.valid()?;
let mut conn = Connection::open(cnt.packs_db())?;
let packs = cnt.packs();
let pack_size_target = cnt.config()?.pack_size_target;
let mut cwp_id = find_current_pack_id(&cnt.packs(), pack_size_target)?;
let cwp = cnt.packs().join(format!("{cwp_id}"));
let mut cwp = fs::OpenOptions::new()
.create(true)
.append(true)
.read(true)
.open(cwp)?;
let mut offset = cwp.seek(io::SeekFrom::End(0))?;
let dig_algo = &digest::SHA256;
let mut nbytes_hash = Vec::new();
let mut sources = sources.into_iter().peekable();
loop {
let tx = conn.transaction()?;
if offset >= pack_size_target {
cwp_id += 1;
offset = 0;
let p = Dir(&packs).at_path(&format!("{cwp_id}"));
cwp = fs::OpenOptions::new()
.create(true)
.write(true)
.truncate(true)
.open(p)?;
}
if sources.peek().is_none() {
break;
}
for rmaker in sources.by_ref() {
let chunk_size = 65_536;
let mut stream = rmaker.make_reader()?;
let (bytes_read, hash_hex, compressed) =
match (compression, rmaker.maybe_content_format()) {
(Compression::Zlib(level), Ok(MaybeContentFormat::MaybeLargeText)) => {
let mut writer =
ZlibEncoder::new(&mut cwp, flate2::Compression::new(*level));
let mut hwriter = HashWriter::new(&mut writer, dig_algo);
let bytes_copied = copy_by_chunk(&mut stream, &mut hwriter, chunk_size)?;
let hash = hwriter.finish();
let hash_hex = hex::encode(hash);
(bytes_copied, hash_hex, true)
}
(Compression::Zstd(lv), Ok(MaybeContentFormat::MaybeLargeText)) => {
let mut writer = ZstdEncoder::new(&mut cwp, *lv)?;
let mut hwriter = HashWriter::new(&mut writer, dig_algo);
let bytes_copied = copy_by_chunk(&mut stream, &mut hwriter, chunk_size)?;
let hash = hwriter.finish();
let hash_hex = hex::encode(hash);
(bytes_copied, hash_hex, true)
}
_ => {
let mut hwriter = HashWriter::new(&mut cwp, dig_algo);
let bytes_copied = copy_by_chunk(&mut stream, &mut hwriter, chunk_size)?;
let hash = hwriter.ctx.finish();
let hash_hex = hex::encode(hash);
(bytes_copied, hash_hex, false)
}
};
let bytes_write = cwp.stream_position()? - offset;
let mut stmt = tx.prepare_cached("INSERT OR IGNORE INTO db_object (hashkey, compressed, size, offset, length, pack_id) VALUES (?1, ?2, ?3, ?4, ?5, ?6)")?;
stmt.execute(params![
&hash_hex,
compressed,
bytes_read,
offset,
bytes_write,
cwp_id,
])
.map_err(|err| Error::SQLiteInsertError { source: err })?;
offset += bytes_write;
nbytes_hash.push((bytes_read, bytes_write, hash_hex));
if offset >= pack_size_target {
break;
}
}
tx.commit()?;
}
Ok(nbytes_hash)
}
#[cfg(test)]
mod tests {
use rstest::*;
use std::{collections::HashMap, io::Write};
use crate::{
io::ByteString,
stat,
test_utils::{new_container, PACK_TARGET_SIZE},
};
use super::*;
#[test]
fn io_packs_insert_0_when_empty() {
let (_tmp_dir, cnt) = new_container(PACK_TARGET_SIZE, "none");
let bstr: ByteString = b"test 0".to_vec();
let (_, _, hash) = insert(bstr, &cnt).unwrap();
let info = stat(&cnt).unwrap();
assert_eq!(info.count.packs_file, 1);
assert_eq!(info.count.packs, 1);
let obj = extract(&hash, &cnt).unwrap().unwrap();
assert_eq!(
String::from_utf8(obj.try_into().unwrap()).unwrap(),
"test 0".to_string()
);
let bstr: ByteString = b"test 1".to_vec();
let (_, _, hash) = insert(bstr, &cnt).unwrap();
let info = stat(&cnt).unwrap();
assert_eq!(info.count.packs_file, 1);
assert_eq!(info.count.packs, 2);
let obj = extract(&hash, &cnt).unwrap().unwrap();
assert_eq!(
String::from_utf8(obj.try_into().unwrap()).unwrap(),
"test 1".to_string()
);
}
#[test]
fn io_packs_insert_1_when_1_exist() {
let (_tmp_dir, cnt) = new_container(PACK_TARGET_SIZE, "none");
let packs = cnt.packs();
fs::File::create(packs.join("0")).unwrap();
fs::File::create(packs.join("1")).unwrap();
let bstr: ByteString = b"test 0".to_vec();
let (_, _, hash) = insert(bstr, &cnt).unwrap();
let info = stat(&cnt).unwrap();
assert_eq!(info.count.packs_file, 2);
assert_eq!(info.count.packs, 1);
let obj = extract(&hash, &cnt).unwrap().unwrap();
assert_eq!(
String::from_utf8(obj.try_into().unwrap()).unwrap(),
"test 0".to_string()
);
}
#[test]
fn io_packs_insert_2_when_1_reach_limit() {
let (_tmp_dir, cnt) = new_container(PACK_TARGET_SIZE, "none");
let pack_target_size = cnt.config().unwrap().pack_size_target;
let packs = cnt.packs();
fs::File::create(packs.join("0")).unwrap();
let mut p1 = fs::File::create(packs.join("1")).unwrap();
let bytes_holder = vec![0u8; usize::try_from(pack_target_size).unwrap()];
p1.write_all(&bytes_holder).unwrap();
let mut hash_content_map: HashMap<String, String> = HashMap::new();
for i in 0..100 {
let content = format!("test {i}");
let buf = content.clone().into_bytes();
let (_, _, hash) = insert(buf, &cnt).unwrap();
hash_content_map.insert(hash, content);
}
let info = stat(&cnt).unwrap();
assert_eq!(info.count.packs_file, 3);
assert_eq!(info.count.packs, 100);
for (hash, content) in hash_content_map {
let obj = extract(&hash, &cnt).unwrap().unwrap();
assert_eq!(String::from_utf8(obj.try_into().unwrap()).unwrap(), content);
}
}
#[test]
fn io_packs_extract_from_any_single() {
let (_tmp_dir, cnt) = new_container(6400, "none");
let n = 100;
let mut hash_content_map: HashMap<String, String> = HashMap::new();
for i in 0..n {
let content = format!("test {i}");
let buf = content.clone().into_bytes();
let (_, _, hash) = insert(buf, &cnt).unwrap();
hash_content_map.insert(hash, content);
}
let info = stat(&cnt).unwrap();
assert_eq!(info.count.packs_file, 1);
assert_eq!(info.count.packs, n);
for (hash, content) in hash_content_map {
let obj = extract(&hash, &cnt).unwrap().unwrap();
assert_eq!(String::from_utf8(obj.try_into().unwrap()).unwrap(), content);
}
}
#[test]
fn io_packs_insert_many() {
let (_tmp_dir, cnt) = new_container(64, "zlib:+1");
let mut hash_content_map: HashMap<String, String> = HashMap::new();
for i in 0..100 {
let content = format!("test {i}").repeat(i);
let buf = content.clone().into_bytes();
let (_, _, hash) = insert(buf, &cnt).unwrap();
hash_content_map.insert(hash, content);
}
let info = stat(&cnt).unwrap();
assert_eq!(info.count.packs, 100);
}
#[rstest]
#[case("none")]
#[case("zlib+1")]
#[case("zlib:+9")]
fn io_packs_extract_many(#[case] algo: &str) {
let (_tmp_dir, cnt) = new_container(64, algo);
let mut hash_content_map: HashMap<String, String> = HashMap::new();
for i in 0..100 {
let content = format!("test {i}").repeat(i);
let buf = content.clone().into_bytes();
let (_, _, hash) = insert(buf, &cnt).unwrap();
hash_content_map.insert(hash, content);
}
let info = stat(&cnt).unwrap();
assert_eq!(info.count.packs, 100);
let mut hashkeys = hash_content_map
.keys()
.map(ToString::to_string)
.collect::<Vec<_>>();
hashkeys
.push("68e2056a0496c469727fa5ab041e1778e39137643fd24db94dd7a532db17aaba".to_string());
hashkeys
.push("7e76df6ac7d08a837f7212e765edd07333c8159ffa0484bc26394e7ffd898817".to_string());
let objs = extract_many(&hashkeys, &cnt).unwrap();
let mut count = 0;
for obj in objs {
count += 1;
let content = hash_content_map.get(&obj.id).unwrap();
assert_eq!(
String::from_utf8(obj.try_into().unwrap()).unwrap(),
content.to_owned()
);
}
assert_eq!(count + 2, hashkeys.len());
}
#[rstest]
#[case("none")]
#[case("zlib+1")]
#[case("zlib:+9")]
fn io_packs_extract_many_large_content(#[case] algo: &str) {
let (_tmp_dir, cnt) = new_container(64 * 1024 * 1024, algo);
let mut hash_content_map: HashMap<String, String> = HashMap::new();
for i in 0..10 {
let content = format!("8bytes{i}").repeat(100 * 1024);
let buf = content.clone().into_bytes();
let (_, _, hash) = insert(buf, &cnt).unwrap();
hash_content_map.insert(hash, content);
}
let info = stat(&cnt).unwrap();
assert_eq!(info.count.packs, 10);
let mut hashkeys = hash_content_map
.keys()
.map(ToString::to_string)
.collect::<Vec<_>>();
hashkeys
.push("68e2056a0496c469727fa5ab041e1778e39137643fd24db94dd7a532db17aaba".to_string());
hashkeys
.push("7e76df6ac7d08a837f7212e765edd07333c8159ffa0484bc26394e7ffd898817".to_string());
let objs = extract_many(&hashkeys, &cnt).unwrap();
let mut count = 0;
for obj in objs {
count += 1;
let content = hash_content_map.get(&obj.id).unwrap();
assert_eq!(
String::from_utf8(obj.try_into().unwrap()).unwrap(),
content.to_owned()
);
}
assert_eq!(count + 2, hashkeys.len());
}
}