use crate::error::Error;
#[cfg(feature = "jsonl")]
use crate::error::MtJsonlError;
#[cfg(feature = "file-bz2")]
use bzip2::{self, bufread::BzDecoder, write::BzEncoder};
#[cfg(feature = "file-gz")]
use flate2::{self, bufread::MultiGzDecoder, write::GzEncoder};
use log::debug;
#[cfg(feature = "jsonl")]
use log::{info, warn};
#[cfg(feature = "jsonl")]
use serde::de::DeserializeOwned;
#[cfg(feature = "jsonl")]
use serde_json::Deserializer;
use std::{
ffi::OsStr,
fs::OpenOptions,
io::{BufReader, BufWriter, Read, Seek, SeekFrom, Write},
path::{Path, PathBuf},
};
#[cfg(feature = "jsonl")]
use std::{io::BufRead, sync::mpsc, thread};
#[cfg(feature = "file-xz")]
use xz2::{
bufread::XzDecoder,
stream::{Check, MtStreamBuilder},
write::XzEncoder,
};
pub fn file_open_read<P>(file: P) -> Result<Box<dyn Read>, Error>
where
P: AsRef<Path>,
{
do_file_open_read(file.as_ref(), None)
}
pub fn file_open_read_with_capacity<P>(
file: P,
buffer_capacity: usize,
) -> Result<Box<dyn Read>, Error>
where
P: AsRef<Path>,
{
do_file_open_read(file.as_ref(), Some(buffer_capacity))
}
fn do_file_open_read(file: &Path, buffer_capacity: Option<usize>) -> Result<Box<dyn Read>, Error> {
#[cfg(not(unix))]
if !file.is_file() {
return Err(Error::NotAFileError {
path: file.to_path_buf(),
});
}
#[cfg(unix)]
{
use std::os::unix::prelude::FileTypeExt;
let ft = std::fs::metadata(file)
.map_err(|err| Error::FileIo {
file: file.to_path_buf(),
msg: "Accessing file metadata failed.",
source: err,
})?
.file_type();
if !(ft.is_file() || ft.is_char_device() || ft.is_fifo()) {
return Err(Error::NotAFileError {
path: file.to_path_buf(),
});
}
}
let f = OpenOptions::new()
.create(false)
.read(true)
.write(false)
.open(file)
.map_err(|err| Error::FileIo {
file: file.to_path_buf(),
msg: "Could not open file.",
source: err,
})?;
let mut bufread = if let Some(size) = buffer_capacity {
BufReader::with_capacity(size, f)
} else {
BufReader::new(f)
};
let mut buffer = [0; 6];
if bufread.read_exact(&mut buffer).is_err() {
buffer = [0; 6];
};
bufread
.seek(SeekFrom::Start(0))
.map_err(|err| Error::FileIo {
file: file.to_path_buf(),
msg: "Failed to seek to start of file.",
source: err,
})?;
if buffer[..6] == [0xfd, b'7', b'z', b'X', b'Z', 0x00] {
debug!("File {} is detected to have type `xz`", file.display());
#[cfg(feature = "file-xz")]
return Ok(Box::new(XzDecoder::new(bufread)));
#[cfg(not(feature = "file-xz"))]
return Err(Error::CompressionNotEnabled {
file: file.to_path_buf(),
technique: "xz",
});
}
if buffer[..2] == [0x1f, 0x8b] {
debug!("File {} is detected to have type `gz`", file.display());
#[cfg(feature = "file-gz")]
return Ok(Box::new(MultiGzDecoder::new(bufread)));
#[cfg(not(feature = "file-gz"))]
return Err(Error::CompressionNotEnabled {
file: file.to_path_buf(),
technique: "gz",
});
}
if buffer[..3] == [b'B', b'Z', b'h'] {
debug!("File {} is detected to have type `bz2`", file.display());
#[cfg(feature = "file-bz2")]
return Ok(Box::new(BzDecoder::new(bufread)));
#[cfg(not(feature = "file-bz2"))]
return Err(Error::CompressionNotEnabled {
file: file.to_path_buf(),
technique: "bz2",
});
}
debug!("Open file {} as plaintext", file.display());
Ok(Box::new(bufread))
}
#[derive(Clone, Copy, Eq, PartialEq, Ord, PartialOrd, Hash, Debug)]
pub enum FileType {
#[cfg(feature = "file-bz2")]
Bz2,
#[cfg(feature = "file-gz")]
Gz,
PlainText,
#[cfg(feature = "file-xz")]
Xz,
}
impl Default for FileType {
fn default() -> Self {
FileType::PlainText
}
}
#[derive(Clone, Copy, Eq, PartialEq, Ord, PartialOrd, Hash, Debug)]
pub enum Compression {
Fastest,
Default,
Best,
Numeric(u8),
}
impl Default for Compression {
fn default() -> Self {
Compression::Default
}
}
#[cfg(feature = "file-bz2")]
impl From<Compression> for bzip2::Compression {
fn from(compression: Compression) -> Self {
match compression {
Compression::Fastest => bzip2::Compression::fast(),
Compression::Default => bzip2::Compression::default(),
Compression::Best => bzip2::Compression::best(),
Compression::Numeric(n) => bzip2::Compression::new(clamp(u32::from(n), 0, 9)),
}
}
}
#[cfg(feature = "file-gz")]
impl From<Compression> for flate2::Compression {
fn from(compression: Compression) -> Self {
match compression {
Compression::Fastest => flate2::Compression::fast(),
Compression::Default => flate2::Compression::default(),
Compression::Best => flate2::Compression::best(),
Compression::Numeric(n) => flate2::Compression::new(clamp(u32::from(n), 0, 9)),
}
}
}
#[derive(Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Hash, Debug, Default)]
struct XzCompression(u32);
impl From<Compression> for XzCompression {
fn from(compression: Compression) -> Self {
match compression {
Compression::Fastest => XzCompression(0),
Compression::Default => XzCompression(6),
Compression::Best => XzCompression(9),
Compression::Numeric(n) => XzCompression(clamp(u32::from(n), 0, 9)),
}
}
}
fn clamp<T: PartialOrd>(input: T, min: T, max: T) -> T {
if input < min {
min
} else if input > max {
max
} else {
input
}
}
#[derive(Debug)]
pub struct WriteBuilder {
buffer_capacity: Option<usize>,
compression_level: Compression,
filetype: Option<FileType>,
path: PathBuf,
open_options: OpenOptions,
threads: u8,
}
impl WriteBuilder {
pub fn new(path: PathBuf) -> Self {
let mut open_options = OpenOptions::new();
open_options.read(false).write(true).create(true);
WriteBuilder {
path,
filetype: None,
open_options,
buffer_capacity: Default::default(),
compression_level: Default::default(),
threads: 1,
}
}
pub fn append(&mut self) -> Result<Box<dyn Write>, Error> {
self.open_options.append(true);
self.open()
}
pub fn truncate(&mut self) -> Result<Box<dyn Write>, Error> {
self.open_options.truncate(true);
self.open()
}
fn open(&mut self) -> Result<Box<dyn Write>, Error> {
use self::FileType::*;
if self.filetype.is_none() {
self.filetype = Some(guess_file_type(&self.path)?);
}
let file = self
.open_options
.open(&self.path)
.map_err(|err| Error::FileIo {
file: self.path.to_path_buf(),
msg: "Could not open file.",
source: err,
})?;
let bufwrite = if let Some(size) = self.buffer_capacity {
BufWriter::with_capacity(size, file)
} else {
BufWriter::new(file)
};
match self
.filetype
.expect("FileType is set based on extension if it was None")
{
#[cfg(feature = "file-bz2")]
Bz2 => {
let level = self.compression_level.into();
Ok(Box::new(BzEncoder::new(bufwrite, level)))
}
#[cfg(feature = "file-gz")]
Gz => {
let level = self.compression_level.into();
Ok(Box::new(GzEncoder::new(bufwrite, level)))
}
PlainText => Ok(Box::new(bufwrite)),
#[cfg(feature = "file-xz")]
Xz => {
let level: XzCompression = self.compression_level.into();
let threads = clamp(self.threads, 1, u8::max_value());
if threads == 1 {
Ok(Box::new(XzEncoder::new(bufwrite, level.0)))
} else {
let stream = MtStreamBuilder::new()
.preset(level.0)
.threads(u32::from(threads))
.block_size(0)
.timeout_ms(300)
.check(Check::Crc64)
.encoder()
.map_err(|err| Error::XzError {
file: self.path.to_path_buf(),
source: err,
})?;
Ok(Box::new(XzEncoder::new_stream(bufwrite, stream)))
}
}
}
}
pub fn buffer_capacity(&mut self, buffer_capacity: usize) -> &mut Self {
self.buffer_capacity = Some(buffer_capacity);
self
}
pub fn create(&mut self, create: bool) -> &mut Self {
self.open_options.create(create);
self
}
pub fn create_new(&mut self, create_new: bool) -> &mut Self {
self.open_options.create_new(create_new);
self
}
pub fn compression_level(&mut self, compression_level: Compression) -> &mut Self {
self.compression_level = compression_level;
self
}
pub fn filetype(&mut self, filetype: FileType) -> &mut Self {
self.filetype = Some(filetype);
self
}
pub fn threads(&mut self, threads: u8) -> &mut Self {
self.threads = if threads == 0 { 1 } else { threads };
self
}
}
pub fn file_write<P>(path: P) -> WriteBuilder
where
P: AsRef<Path>,
{
WriteBuilder::new(path.as_ref().to_path_buf())
}
#[cfg(feature = "jsonl")]
#[derive(Debug)]
enum ProcessingStatus<T>
where
T: 'static + Send,
{
Completed,
Data(T),
Error(MtJsonlError),
}
#[cfg(feature = "jsonl")]
#[derive(Debug)]
pub struct MtJsonl<T>
where
T: 'static + DeserializeOwned + Send,
{
iter: mpsc::IntoIter<ProcessingStatus<Vec<Result<T, MtJsonlError>>>>,
tmp_state: ::std::vec::IntoIter<Result<T, MtJsonlError>>,
did_complete: bool,
}
#[cfg(feature = "jsonl")]
impl<T> MtJsonl<T>
where
T: 'static + DeserializeOwned + Send,
{
fn new(iter: mpsc::IntoIter<ProcessingStatus<Vec<Result<T, MtJsonlError>>>>) -> Self {
Self {
iter,
tmp_state: vec![].into_iter(),
did_complete: false,
}
}
}
#[cfg(feature = "jsonl")]
impl<T> Iterator for MtJsonl<T>
where
T: 'static + DeserializeOwned + Send,
{
type Item = Result<T, MtJsonlError>;
fn next(&mut self) -> Option<Self::Item> {
loop {
if let Some(res) = self.tmp_state.next() {
return Some(match res {
Ok(x) => Ok(x),
Err(err) => {
info!("{:?}", err);
Err(err)
}
});
} else if let Some(state) = self.iter.next() {
match state {
ProcessingStatus::Data(data) => self.tmp_state = data.into_iter(),
ProcessingStatus::Completed => self.did_complete = true,
ProcessingStatus::Error(err) => return Some(Err(err)),
}
continue;
}
return if self.did_complete {
None
} else {
Some(Err(MtJsonlError::NotCompleted))
};
}
}
}
#[cfg(feature = "jsonl")]
pub fn parse_jsonl_multi_threaded<P, T>(path: P, batchsize: u32) -> MtJsonl<T>
where
P: AsRef<Path>,
T: 'static + DeserializeOwned + Send,
{
let path = path.as_ref().to_path_buf();
const CHAN_BUFSIZE: usize = 2;
let (lines_sender, lines_receiver) = mpsc::sync_channel(CHAN_BUFSIZE);
#[allow(clippy::type_complexity)]
let (struct_sender, struct_receiver): (
_,
mpsc::Receiver<ProcessingStatus<Vec<Result<T, MtJsonlError>>>>,
) = mpsc::sync_channel(CHAN_BUFSIZE);
thread::spawn(move || {
info!(
"Start background reading thread: {:?}",
thread::current().id()
);
let mut rdr = match file_open_read(&path) {
Ok(rdr) => BufReader::new(rdr),
Err(err) => {
warn!(
"Background reading thread cannot open file {} {:?}",
path.display(),
thread::current().id()
);
let _ = lines_sender.send(ProcessingStatus::Error(err.into()));
return;
}
};
let mut is_eof = false;
while !is_eof {
let mut batch = String::new();
for _ in 0..batchsize {
match rdr.read_line(&mut batch) {
Ok(0) => {
is_eof = true;
break;
}
Ok(_) => {}
Err(err) => {
warn!(
"Background reading thread cannot read line {:?}",
thread::current().id()
);
let _ = lines_sender.send(ProcessingStatus::Error(
Error::FileIo {
file: path.to_path_buf(),
msg: "Background reading thread cannot read line.",
source: err,
}
.into(),
));
return;
}
}
}
if lines_sender.send(ProcessingStatus::Data(batch)).is_err() {
return;
}
info!(
"Background reading thread: sent batch {:?}",
thread::current().id()
);
}
let _ = lines_sender.send(ProcessingStatus::Completed);
info!(
"Background reading thread: successful processed file {:?} {:?}",
path,
thread::current().id()
);
});
thread::spawn(move || {
info!(
"Start background parsing thread {:?}",
thread::current().id()
);
let mut channel_successful_completed = false;
lines_receiver.iter().for_each(|batch| {
match batch {
ProcessingStatus::Error(e) => {
info!(
"Background parsing thread: pass through error {:?}",
thread::current().id()
);
let _ = struct_sender.send(ProcessingStatus::Error(e));
}
ProcessingStatus::Completed => channel_successful_completed = true,
ProcessingStatus::Data(batch) => {
let batch: Vec<Result<T, MtJsonlError>> = Deserializer::from_str(&*batch)
.into_iter()
.map(|v| v.map_err(|err| MtJsonlError::ParsingError { source: err }))
.collect();
info!(
"Background parsing thread: batch parsed {:?}",
thread::current().id()
);
if struct_sender.send(ProcessingStatus::Data(batch)).is_err() {
warn!(
"Background parsing thread: sent channel error {:?}",
thread::current().id()
);
}
}
}
});
if channel_successful_completed {
info!(
"Background parsing thread: successfully completed {:?}",
thread::current().id()
);
if struct_sender.send(ProcessingStatus::Completed).is_err() {
warn!(
"Background parsing thread: sent channel error {:?}",
thread::current().id()
);
}
} else {
warn!(
"Background parsing thread: did not receive complete message from underlying reader {:?}",
thread::current().id()
);
}
});
MtJsonl::new(struct_receiver.into_iter())
}
pub fn read<P: AsRef<Path>>(path: P) -> Result<Vec<u8>, Error> {
let mut buffer = Vec::new();
let mut reader = file_open_read(path.as_ref())?;
reader
.read_to_end(&mut buffer)
.map_err(|err| Error::FileIo {
file: path.as_ref().to_path_buf(),
msg: "Could not read file.",
source: err,
})?;
Ok(buffer)
}
pub fn read_to_string<P: AsRef<Path>>(path: P) -> Result<String, Error> {
let path = path.as_ref();
let mut buffer = String::new();
let mut reader = file_open_read(path)?;
reader
.read_to_string(&mut buffer)
.map_err(|err| Error::FileIo {
file: path.to_path_buf(),
msg: "Could not read file.",
source: err,
})?;
Ok(buffer)
}
#[allow(clippy::match_single_binding)]
pub fn write<P: AsRef<Path>, C: AsRef<[u8]>>(path: P, contents: C) -> Result<(), Error> {
let path = path.as_ref();
let mut writer = file_write(path).truncate()?;
writer
.write_all(contents.as_ref())
.map_err(|err| Error::FileIo {
file: path.to_path_buf(),
msg: "Could not write content to file.",
source: err,
})?;
writer.flush().map_err(|err| Error::FileIo {
file: path.to_path_buf(),
msg: "Could not write content to file.",
source: err,
})?;
drop(writer);
Ok(())
}
#[allow(clippy::match_single_binding)]
pub fn append<P: AsRef<Path>, C: AsRef<[u8]>>(path: P, contents: C) -> Result<(), Error> {
let path = path.as_ref();
let mut writer = file_write(path).append()?;
writer
.write_all(contents.as_ref())
.map_err(|err| Error::FileIo {
file: path.to_path_buf(),
msg: "Could not append to file.",
source: err,
})?;
writer.flush().map_err(|err| Error::FileIo {
file: path.to_path_buf(),
msg: "Could not append to file.",
source: err,
})?;
drop(writer);
Ok(())
}
#[allow(clippy::unnecessary_wraps)]
fn guess_file_type(path: &Path) -> Result<FileType, Error> {
match path.extension().and_then(OsStr::to_str) {
Some("xz") => {
#[cfg(feature = "file-xz")]
{
Ok(FileType::Xz)
}
#[cfg(not(feature = "file-xz"))]
{
Err(Error::CompressionNotEnabled {
file: path.to_path_buf(),
technique: "xz",
})
}
}
Some("gzip") | Some("gz") => {
#[cfg(feature = "file-gz")]
{
Ok(FileType::Gz)
}
#[cfg(not(feature = "file-gz"))]
{
Err(Error::CompressionNotEnabled {
file: path.to_path_buf(),
technique: "gz",
})
}
}
Some("bzip") | Some("bz2") => {
#[cfg(feature = "file-bz2")]
{
Ok(FileType::Bz2)
}
#[cfg(not(feature = "file-bz2"))]
{
Err(Error::CompressionNotEnabled {
file: path.to_path_buf(),
technique: "bz2",
})
}
}
_ => Ok(FileType::PlainText),
}
}