use std::io::SeekFrom;
use std::path::Path;
use tokio::fs::File;
use tokio::io::{
AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt, AsyncWrite, AsyncWriteExt, BufWriter,
};
use crate::codec::CodecMethod;
use crate::format::{SIGNATURE, SIGNATURE_HEADER_SIZE, property_id};
use crate::write::{EntryMeta, WriteOptions, WriteResult};
use crate::{ArchivePath, Error, Result};
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum AsyncWriterState {
AcceptingEntries,
Building,
Finished,
}
#[derive(Debug)]
struct PendingEntry {
path: ArchivePath,
meta: EntryMeta,
crc: u32,
uncompressed_size: u64,
}
#[derive(Debug, Default)]
struct StreamInfo {
pack_sizes: Vec<u64>,
unpack_sizes: Vec<u64>,
crcs: Vec<u32>,
}
pub struct AsyncWriter<W> {
sink: W,
options: WriteOptions,
state: AsyncWriterState,
entries: Vec<PendingEntry>,
stream_info: StreamInfo,
compressed_bytes: u64,
}
impl AsyncWriter<BufWriter<File>> {
pub async fn create_path(path: impl AsRef<Path>) -> Result<Self> {
let file = File::create(path.as_ref()).await.map_err(Error::Io)?;
let writer = BufWriter::new(file);
Self::create(writer).await
}
}
impl<W: AsyncWrite + AsyncSeek + Unpin + Send> AsyncWriter<W> {
pub async fn create(mut sink: W) -> Result<Self> {
sink.seek(SeekFrom::Start(SIGNATURE_HEADER_SIZE))
.await
.map_err(Error::Io)?;
Ok(Self {
sink,
options: WriteOptions::default(),
state: AsyncWriterState::AcceptingEntries,
entries: Vec::new(),
stream_info: StreamInfo::default(),
compressed_bytes: 0,
})
}
pub fn options(mut self, options: WriteOptions) -> Self {
self.options = options;
self
}
pub async fn add_path(
&mut self,
disk_path: impl AsRef<Path>,
archive_path: ArchivePath,
) -> Result<()> {
self.ensure_accepting_entries()?;
let disk_path = disk_path.as_ref();
let meta = EntryMeta::from_path_async(disk_path).await?;
if meta.is_directory {
self.add_directory(archive_path, meta).await
} else {
let mut file = File::open(disk_path).await.map_err(Error::Io)?;
let mut data = Vec::new();
file.read_to_end(&mut data).await.map_err(Error::Io)?;
self.add_bytes_internal(archive_path, &data, meta).await
}
}
pub async fn add_directory(
&mut self,
archive_path: ArchivePath,
meta: EntryMeta,
) -> Result<()> {
self.ensure_accepting_entries()?;
let entry = PendingEntry {
path: archive_path,
meta: EntryMeta {
is_directory: true,
..meta
},
crc: 0,
uncompressed_size: 0,
};
self.entries.push(entry);
Ok(())
}
pub async fn add_stream<R: AsyncRead + Unpin>(
&mut self,
archive_path: ArchivePath,
mut source: R,
meta: EntryMeta,
) -> Result<()> {
self.ensure_accepting_entries()?;
let mut data = Vec::new();
source.read_to_end(&mut data).await.map_err(Error::Io)?;
self.add_bytes_internal(archive_path, &data, meta).await
}
pub async fn add_bytes(&mut self, archive_path: ArchivePath, data: &[u8]) -> Result<()> {
let meta = EntryMeta::file(data.len() as u64);
self.add_bytes_internal(archive_path, data, meta).await
}
async fn add_bytes_internal(
&mut self,
archive_path: ArchivePath,
data: &[u8],
meta: EntryMeta,
) -> Result<()> {
let crc = crc32fast::hash(data);
let uncompressed_size = data.len() as u64;
let method = self.options.method;
let level = self.options.level;
let data_owned = data.to_vec();
let compressed =
tokio::task::spawn_blocking(move || compress_data_sync(&data_owned, method, level))
.await
.map_err(|e| Error::Io(std::io::Error::other(e)))??;
let compressed_size = compressed.len() as u64;
self.sink.write_all(&compressed).await.map_err(Error::Io)?;
self.compressed_bytes += compressed_size;
self.stream_info.pack_sizes.push(compressed_size);
self.stream_info.unpack_sizes.push(uncompressed_size);
self.stream_info.crcs.push(crc);
let entry = PendingEntry {
path: archive_path,
meta,
crc,
uncompressed_size,
};
self.entries.push(entry);
Ok(())
}
pub async fn finish(self) -> Result<WriteResult> {
let (result, _sink) = self.finish_into_inner().await?;
Ok(result)
}
pub async fn finish_into_inner(mut self) -> Result<(WriteResult, W)> {
self.ensure_accepting_entries()?;
self.state = AsyncWriterState::Building;
if self.options.deterministic {
self.entries
.sort_by(|a, b| a.path.as_str().cmp(b.path.as_str()));
}
let header_pos = self.sink.stream_position().await.map_err(Error::Io)?;
let method = self.options.method;
let level = self.options.level;
let entries_data: Vec<_> = self
.entries
.iter()
.map(|e| {
(
e.path.as_str().to_string(),
e.meta.clone(),
e.crc,
e.uncompressed_size,
)
})
.collect();
let stream_info_data = (
self.stream_info.pack_sizes.clone(),
self.stream_info.unpack_sizes.clone(),
self.stream_info.crcs.clone(),
);
let header_data = tokio::task::spawn_blocking(move || {
encode_header_sync(&entries_data, &stream_info_data, method, level)
})
.await
.map_err(|e| Error::Io(std::io::Error::other(e)))??;
self.sink.write_all(&header_data).await.map_err(Error::Io)?;
self.write_signature_header_async(header_pos, &header_data)
.await?;
self.state = AsyncWriterState::Finished;
let result = WriteResult {
entries_written: self.entries.iter().filter(|e| !e.meta.is_directory).count(),
directories_written: self.entries.iter().filter(|e| e.meta.is_directory).count(),
total_size: self.entries.iter().map(|e| e.uncompressed_size).sum(),
compressed_size: self.compressed_bytes,
volume_count: 1,
volume_sizes: vec![],
};
Ok((result, self.sink))
}
async fn write_signature_header_async(
&mut self,
header_pos: u64,
header_data: &[u8],
) -> Result<()> {
let next_header_offset = header_pos - SIGNATURE_HEADER_SIZE;
let next_header_size = header_data.len() as u64;
let next_header_crc = crc32fast::hash(header_data);
let mut start_header = Vec::with_capacity(20);
start_header.extend_from_slice(&next_header_offset.to_le_bytes());
start_header.extend_from_slice(&next_header_size.to_le_bytes());
start_header.extend_from_slice(&next_header_crc.to_le_bytes());
let start_header_crc = crc32fast::hash(&start_header);
self.sink
.seek(SeekFrom::Start(0))
.await
.map_err(Error::Io)?;
self.sink.write_all(SIGNATURE).await.map_err(Error::Io)?;
self.sink
.write_all(&[0x00, 0x04])
.await
.map_err(Error::Io)?;
self.sink
.write_all(&start_header_crc.to_le_bytes())
.await
.map_err(Error::Io)?;
self.sink
.write_all(&start_header)
.await
.map_err(Error::Io)?;
Ok(())
}
fn ensure_accepting_entries(&self) -> Result<()> {
if self.state != AsyncWriterState::AcceptingEntries {
return Err(Error::InvalidFormat(
"Writer is not accepting entries".into(),
));
}
Ok(())
}
}
impl EntryMeta {
pub async fn from_path_async(path: impl AsRef<Path>) -> Result<Self> {
let metadata = tokio::fs::metadata(path).await.map_err(Error::Io)?;
Ok(Self::from_metadata(&metadata))
}
}
fn compress_data_sync(data: &[u8], method: CodecMethod, level: u32) -> Result<Vec<u8>> {
match method {
CodecMethod::Copy => Ok(data.to_vec()),
#[cfg(feature = "lzma2")]
CodecMethod::Lzma2 => compress_lzma2_sync(data, level),
#[cfg(feature = "lzma")]
CodecMethod::Lzma => compress_lzma_sync(data, level),
#[cfg(feature = "deflate")]
CodecMethod::Deflate => compress_deflate_sync(data, level),
#[cfg(feature = "bzip2")]
CodecMethod::BZip2 => compress_bzip2_sync(data, level),
_ => Err(Error::UnsupportedMethod {
method_id: method.method_id(),
}),
}
}
#[cfg(feature = "lzma2")]
fn compress_lzma2_sync(data: &[u8], level: u32) -> Result<Vec<u8>> {
use crate::codec::lzma::{Lzma2Encoder, Lzma2EncoderOptions};
let opts = Lzma2EncoderOptions {
dict_size: Some(1 << (16 + level.min(7))),
..Default::default()
};
let mut output = Vec::new();
{
let mut encoder = Lzma2Encoder::new(&mut output, &opts);
std::io::Write::write_all(&mut encoder, data).map_err(Error::Io)?;
encoder.try_finish().map_err(Error::Io)?;
}
Ok(output)
}
#[cfg(feature = "lzma")]
fn compress_lzma_sync(data: &[u8], level: u32) -> Result<Vec<u8>> {
use crate::codec::lzma::{LzmaEncoder, LzmaEncoderOptions};
let opts = LzmaEncoderOptions {
dict_size: Some(1 << (16 + level.min(7))),
..Default::default()
};
let mut output = Vec::new();
{
let mut encoder = LzmaEncoder::new(&mut output, &opts)?;
std::io::Write::write_all(&mut encoder, data).map_err(Error::Io)?;
encoder.try_finish().map_err(Error::Io)?;
}
Ok(output)
}
#[cfg(feature = "deflate")]
fn compress_deflate_sync(data: &[u8], level: u32) -> Result<Vec<u8>> {
use crate::codec::deflate::{DeflateEncoder, DeflateEncoderOptions};
let opts = DeflateEncoderOptions { level };
let mut output = Vec::new();
{
let mut encoder = DeflateEncoder::new(&mut output, &opts);
std::io::Write::write_all(&mut encoder, data).map_err(Error::Io)?;
let _ = encoder.try_finish().map_err(Error::Io)?;
}
Ok(output)
}
#[cfg(feature = "bzip2")]
fn compress_bzip2_sync(data: &[u8], level: u32) -> Result<Vec<u8>> {
use crate::codec::bzip2::{Bzip2Encoder, Bzip2EncoderOptions};
let opts = Bzip2EncoderOptions { level };
let mut output = Vec::new();
{
let mut encoder = Bzip2Encoder::new(&mut output, &opts);
std::io::Write::write_all(&mut encoder, data).map_err(Error::Io)?;
let _ = encoder.try_finish().map_err(Error::Io)?;
}
Ok(output)
}
fn encode_header_sync(
entries: &[(String, EntryMeta, u32, u64)],
stream_info: &(Vec<u64>, Vec<u64>, Vec<u32>),
method: CodecMethod,
level: u32,
) -> Result<Vec<u8>> {
use crate::format::reader::write_variable_u64;
let (pack_sizes, unpack_sizes, crcs) = stream_info;
let mut header = Vec::new();
header.push(property_id::HEADER);
if !pack_sizes.is_empty() {
header.push(property_id::MAIN_STREAMS_INFO);
header.push(property_id::PACK_INFO);
write_variable_u64(&mut header, 0)?;
write_variable_u64(&mut header, pack_sizes.len() as u64)?;
header.push(property_id::SIZE);
for &size in pack_sizes {
write_variable_u64(&mut header, size)?;
}
header.push(property_id::END);
header.push(property_id::UNPACK_INFO);
header.push(property_id::FOLDER);
write_variable_u64(&mut header, unpack_sizes.len() as u64)?;
header.push(0);
for _i in 0..unpack_sizes.len() {
header.push(0x01);
let method_id = method.method_id();
let method_bytes = encode_method_id(method_id);
let id_size = method_bytes.len() as u8;
let has_props = method_has_properties(method);
let flags = id_size | if has_props { 0x20 } else { 0 };
header.push(flags);
header.extend_from_slice(&method_bytes);
if has_props {
let props = encode_method_properties(method, level);
write_variable_u64(&mut header, props.len() as u64)?;
header.extend_from_slice(&props);
}
}
header.push(property_id::CODERS_UNPACK_SIZE);
for &size in unpack_sizes {
write_variable_u64(&mut header, size)?;
}
header.push(property_id::CRC);
header.push(1); for &crc in crcs {
header.extend_from_slice(&crc.to_le_bytes());
}
header.push(property_id::END); header.push(property_id::END); }
if !entries.is_empty() {
header.push(property_id::FILES_INFO);
write_variable_u64(&mut header, entries.len() as u64)?;
let empty_entries: Vec<_> = entries
.iter()
.map(|(_, meta, _, size)| meta.is_directory || *size == 0)
.collect();
if empty_entries.iter().any(|&x| x) {
header.push(property_id::EMPTY_STREAM);
let bool_vec = encode_bool_vector(&empty_entries);
write_variable_u64(&mut header, bool_vec.len() as u64)?;
header.extend_from_slice(&bool_vec);
let empty_files: Vec<_> = entries
.iter()
.filter(|(_, meta, _, size)| meta.is_directory || *size == 0)
.map(|(_, meta, _, _)| !meta.is_directory)
.collect();
if empty_files.iter().any(|&x| x) {
header.push(property_id::EMPTY_FILE);
let bool_vec = encode_bool_vector(&empty_files);
write_variable_u64(&mut header, bool_vec.len() as u64)?;
header.extend_from_slice(&bool_vec);
}
}
header.push(property_id::NAME);
let names_data = encode_names(entries);
write_variable_u64(&mut header, names_data.len() as u64 + 1)?;
header.push(0); header.extend_from_slice(&names_data);
let has_mtime: Vec<_> = entries
.iter()
.map(|(_, meta, _, _)| meta.modification_time.is_some())
.collect();
if has_mtime.iter().any(|&x| x) {
header.push(property_id::MTIME);
let mtime_data = encode_times(entries, &has_mtime);
write_variable_u64(&mut header, mtime_data.len() as u64)?;
header.extend_from_slice(&mtime_data);
}
header.push(property_id::END); }
header.push(property_id::END);
Ok(header)
}
fn encode_method_id(id: u64) -> Vec<u8> {
if id == 0 {
return vec![0];
}
let mut bytes = Vec::new();
let mut val = id;
while val > 0 {
bytes.push((val & 0xFF) as u8);
val >>= 8;
}
bytes.reverse();
bytes
}
fn encode_bool_vector(bits: &[bool]) -> Vec<u8> {
let num_bytes = bits.len().div_ceil(8);
let mut bytes = vec![0u8; num_bytes];
for (i, &bit) in bits.iter().enumerate() {
if bit {
bytes[i / 8] |= 1 << (7 - (i % 8));
}
}
bytes
}
fn method_has_properties(method: CodecMethod) -> bool {
matches!(method, CodecMethod::Lzma | CodecMethod::Lzma2)
}
fn encode_method_properties(method: CodecMethod, level: u32) -> Vec<u8> {
match method {
#[cfg(feature = "lzma2")]
CodecMethod::Lzma2 => {
vec![crate::codec::lzma::encode_lzma2_dict_size(
1 << (16 + level),
)]
}
#[cfg(feature = "lzma")]
CodecMethod::Lzma => {
let dict_size: u32 = 1 << (16 + level);
let mut props = vec![0x5D];
props.extend_from_slice(&dict_size.to_le_bytes());
props
}
_ => Vec::new(),
}
}
fn encode_names(entries: &[(String, EntryMeta, u32, u64)]) -> Vec<u8> {
let mut data = Vec::new();
for (path, _, _, _) in entries {
for c in path.encode_utf16() {
data.extend_from_slice(&c.to_le_bytes());
}
data.extend_from_slice(&[0, 0]);
}
data
}
fn encode_times(entries: &[(String, EntryMeta, u32, u64)], defined: &[bool]) -> Vec<u8> {
let mut data = Vec::new();
let all_defined = defined.iter().all(|&x| x);
if all_defined {
data.push(1);
} else {
data.push(0);
data.extend_from_slice(&encode_bool_vector(defined));
}
data.push(0);
for (_, meta, _, _) in entries {
if let Some(time) = meta.modification_time {
data.extend_from_slice(&time.to_le_bytes());
}
}
data
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_async_writer_create() {
let buffer = std::io::Cursor::new(Vec::new());
let writer = AsyncWriter::create(buffer).await.unwrap();
assert_eq!(writer.state, AsyncWriterState::AcceptingEntries);
}
#[tokio::test]
async fn test_async_writer_options() {
let buffer = std::io::Cursor::new(Vec::new());
let writer = AsyncWriter::create(buffer)
.await
.unwrap()
.options(WriteOptions::new().level(9).unwrap());
assert_eq!(writer.options.level, 9);
}
#[tokio::test]
async fn test_async_writer_add_bytes_and_finish() {
let buffer = std::io::Cursor::new(Vec::new());
let mut writer = AsyncWriter::create(buffer).await.unwrap();
let path = ArchivePath::new("test.txt").unwrap();
writer.add_bytes(path, b"Hello, World!").await.unwrap();
let result = writer.finish().await.unwrap();
assert_eq!(result.entries_written, 1);
assert_eq!(result.total_size, 13);
}
#[tokio::test]
async fn test_async_writer_empty_archive() {
let buffer = std::io::Cursor::new(Vec::new());
let writer = AsyncWriter::create(buffer).await.unwrap();
let result = writer.finish().await.unwrap();
assert_eq!(result.entries_written, 0);
}
#[tokio::test]
async fn test_async_writer_with_directory() {
let buffer = std::io::Cursor::new(Vec::new());
let mut writer = AsyncWriter::create(buffer).await.unwrap();
let dir_path = ArchivePath::new("mydir").unwrap();
writer
.add_directory(dir_path, EntryMeta::directory())
.await
.unwrap();
let result = writer.finish().await.unwrap();
assert_eq!(result.entries_written, 0);
assert_eq!(result.directories_written, 1);
}
#[test]
fn test_encode_method_id() {
assert_eq!(encode_method_id(0), vec![0]);
assert_eq!(encode_method_id(0x21), vec![0x21]);
assert_eq!(encode_method_id(0x030101), vec![0x03, 0x01, 0x01]);
}
#[test]
fn test_encode_bool_vector() {
assert_eq!(encode_bool_vector(&[true, false, true]), vec![0b10100000]);
assert_eq!(encode_bool_vector(&[true; 8]), vec![0b11111111]);
}
}