#![deny(unsafe_code)]
#![deny(unused_imports)]
#![deny(missing_docs)]
use core::debug_assert;
use std::{
fs::{self, Metadata},
io::{self, BufRead, BufReader, BufWriter, Write},
path::{Path, PathBuf},
thread,
time::{Duration, SystemTime},
};
use flate2::{read::GzDecoder, write::GzEncoder};
use huby::ByteSize;
#[cfg(windows)]
use std::os::windows::fs::MetadataExt;
#[cfg(target_family = "unix")]
use std::{
fs::Permissions,
os::unix::fs::{MetadataExt, OpenOptionsExt, PermissionsExt},
};
use tempfile::{NamedTempFile, PersistError};
use thiserror::Error;
#[cfg(target_family = "unix")]
mod unix;
#[inline(always)]
fn add_extension<P: AsRef<Path>, S: AsRef<str>>(path: P, ext: S) -> PathBuf {
let tmp = path.as_ref().to_string_lossy();
let san_path = tmp.trim_end_matches('.');
let ext = ext.as_ref().trim_start_matches('.');
format!("{san_path}.{ext}").into()
}
#[inline(always)]
fn match_ext<P: AsRef<Path>, S: AsRef<str>>(p: P, ext: S) -> bool {
if let Some(e) = p.as_ref().extension() {
return ext.as_ref() == e;
}
false
}
#[inline(always)]
fn file_size(meta: &Metadata) -> u64 {
#[cfg(unix)]
return meta.size();
#[cfg(windows)]
return meta.file_size();
}
#[derive(Error, Debug)]
pub enum Error {
#[error("not open for read")]
WrongModeRead,
#[error("not open for write")]
WrongModeWrite,
#[error("no such file: {0}")]
NoSuchFile(PathBuf),
#[error("file is closed")]
FileClosed,
#[error("file prefix not found: {0}")]
PrefixNotFound(PathBuf),
#[error("root directory not found: {0}")]
RootNotFound(PathBuf),
#[error("io: {0}")]
Io(#[from] io::Error),
#[error("compression: {0}")]
Compression(#[from] CompressionError),
}
impl From<Error> for io::Error {
fn from(value: Error) -> Self {
match value {
Error::Io(io) => io,
_ => io::Error::other(value),
}
}
}
#[derive(Debug, Clone, Copy)]
pub enum Trigger {
Size(ByteSize),
Time(Duration),
SizeOrTime(ByteSize, Duration),
}
impl From<Duration> for Trigger {
fn from(value: Duration) -> Self {
Self::Time(value)
}
}
impl From<ByteSize> for Trigger {
fn from(value: ByteSize) -> Self {
Self::Size(value)
}
}
impl Trigger {
pub fn from_options(t: Option<Duration>, s: Option<ByteSize>) -> Option<Self> {
match (t, s) {
(None, None) => None,
(None, Some(s)) => Some(Self::Size(s)),
(Some(t), None) => Some(Self::Time(t)),
(Some(t), Some(s)) => Some(Self::SizeOrTime(s, t)),
}
}
}
#[derive(Debug, Clone, Copy)]
pub enum Compression {
Gzip,
}
#[derive(Debug, Error)]
pub enum CompressionError {
#[error("persist: {0}")]
Persist(#[from] PersistError),
#[error("io: {0}")]
Io(#[from] io::Error),
}
impl Compression {
const fn extension(&self) -> &'static str {
match self {
Compression::Gzip => "gz",
}
}
fn compress<P: AsRef<Path>>(&self, path: P, mode: Option<u32>) -> Result<(), CompressionError> {
match self {
Self::Gzip => Self::compress_gzip(path, mode),
}
}
#[allow(unused_variables)]
fn compress_gzip<P: AsRef<Path>>(path: P, mode: Option<u32>) -> Result<(), CompressionError> {
let path = path.as_ref();
let tmp = NamedTempFile::new_in(path.parent().ok_or(io::Error::new(
io::ErrorKind::NotFound,
"cannot create temporary file",
))?)?;
let mut reader = BufReader::new(fs::File::open(path)?);
let writer = BufWriter::new(&tmp);
let mut enc = GzEncoder::new(writer, flate2::Compression::best());
loop {
let buf = reader.fill_buf()?;
if buf.is_empty() {
break;
}
enc.write_all(buf)?;
let length = buf.len();
reader.consume(length);
}
enc.finish()?;
let new = add_extension(path, Compression::Gzip.extension());
tmp.persist(&new)?;
#[cfg(unix)]
if let Some(mode) = mode {
fs::set_permissions(new, Permissions::from_mode(mode))?;
}
fs::remove_file(path)?;
Ok(())
}
}
#[derive(Default, Debug, Clone, Copy)]
pub struct OpenOptions {
max_size: Option<ByteSize>,
trigger: Option<Trigger>,
compression: Option<Compression>,
#[cfg(target_family = "unix")]
ext: UnixExt,
}
#[cfg(target_family = "unix")]
#[derive(Default, Debug, Clone, Copy)]
struct UnixExt {
mode: Option<u32>,
flags: Option<i32>,
}
impl OpenOptions {
pub fn new() -> Self {
Self::default()
}
pub fn max_size(&mut self, m: ByteSize) -> &mut Self {
self.max_size = Some(m);
self
}
pub fn trigger(&mut self, t: Trigger) -> &mut Self {
self.trigger = Some(t);
self
}
pub fn opt_trigger(&mut self, t: Option<Trigger>) -> &mut Self {
self.trigger = t;
self
}
pub fn compression(&mut self, c: Compression) -> &mut Self {
self.compression = Some(c);
self
}
pub fn create_append<P: AsRef<Path>>(self, path: P) -> Result<File, Error> {
File::create_append_with_options(path, self)
}
pub fn open<P: AsRef<Path>>(self, path: P) -> Result<File, Error> {
File::open_with_options(path, self)
}
}
#[derive(Debug)]
enum R {
File(BufReader<fs::File>),
Gzip(GzDecoder<BufReader<fs::File>>),
}
impl R {
fn open<P: AsRef<Path>>(p: P) -> Result<Self, Error> {
let br = BufReader::new(fs::File::open(p.as_ref())?);
if match_ext(p, Compression::Gzip.extension()) {
Ok(Self::Gzip(GzDecoder::new(br)))
} else {
Ok(Self::File(br))
}
}
}
impl io::Read for R {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
match self {
Self::File(f) => f.read(buf),
Self::Gzip(d) => d.read(buf),
}
}
}
#[derive(Debug)]
struct Reader {
files: Vec<PathBuf>,
reader: R,
}
impl io::Read for Reader {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
let read = self.reader.read(buf)?;
if read == 0 && !self.files.is_empty() {
let f = self.files.pop().unwrap();
self.reader = R::open(f)?;
return self.reader.read(buf);
}
Ok(read)
}
}
impl Reader {
fn from_file(f: &File) -> Result<Self, Error> {
let mut files = f.files_sorted_by_index()?;
files.reverse();
if files.is_empty() {
return Err(Error::NoSuchFile(f.file_path()));
}
let f = files.pop().unwrap();
let reader = R::open(f)?;
Ok(Self { files, reader })
}
}
#[derive(Debug, Default)]
pub struct File {
#[cfg(target_family = "unix")]
ext: UnixExt,
dir: PathBuf,
prefix: String,
size: u64,
created: Option<SystemTime>,
writer: Option<BufWriter<fs::File>>,
reader: Option<Reader>,
max_size: Option<ByteSize>,
trigger: Option<Trigger>,
compression: Option<Compression>,
compress_job: Option<thread::JoinHandle<Result<(), CompressionError>>>,
}
impl io::Write for File {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
if self.should_rotate() {
self.rotate()?;
}
if let Some(writer) = self.writer.as_mut() {
let len = writer.write(buf)?;
self.size = self.size.wrapping_add(len as u64);
Ok(len)
} else {
Err(io::Error::other(Error::WrongModeWrite))
}
}
fn flush(&mut self) -> io::Result<()> {
if let Some(f) = self.writer.as_mut() {
f.flush()?;
}
Ok(())
}
}
impl io::Read for File {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
if let Some(r) = self.reader.as_mut() {
r.read(buf)
} else {
Err(io::Error::other(Error::WrongModeRead))
}
}
}
impl File {
fn new<P: AsRef<Path>>(file: P, opts: OpenOptions) -> Result<Self, Error> {
let file = file.as_ref();
let prefix = file
.file_name()
.ok_or(Error::PrefixNotFound(file.to_path_buf()))?;
let dir = file
.parent()
.ok_or(Error::RootNotFound(file.to_path_buf()))?
.canonicalize()?
.to_path_buf();
Ok(Self {
#[cfg(target_family = "unix")]
ext: opts.ext,
dir,
prefix: prefix.to_string_lossy().into(),
size: 0,
created: None,
writer: None,
reader: None,
max_size: opts.max_size,
trigger: opts.trigger,
compression: opts.compression,
compress_job: None,
})
}
pub fn open_options(&self) -> OpenOptions {
OpenOptions {
max_size: self.max_size,
trigger: self.trigger,
compression: self.compression,
#[cfg(target_family = "unix")]
ext: self.ext,
}
}
pub fn create_append<P: AsRef<Path>>(file: P) -> Result<Self, Error> {
Self::create_append_with_options(file, OpenOptions::default())
}
pub fn open_with_options<P: AsRef<Path>>(file: P, opts: OpenOptions) -> Result<Self, Error> {
let mut f = Self::new(file, opts)?;
f.reader = Some(Reader::from_file(&f)?);
Ok(f)
}
pub fn create_append_with_options<P: AsRef<Path>>(
file: P,
opts: OpenOptions,
) -> Result<Self, Error> {
let mut f = Self::new(file, opts)?;
f.init_create_append()?;
Ok(f)
}
#[inline(always)]
pub fn file_path(&self) -> PathBuf {
self.dir.join(&self.prefix)
}
#[inline(always)]
fn file_index<P: AsRef<Path>>(&self, p: P) -> Option<u64> {
let p = p.as_ref();
if let Some(file_name) = p.file_name().map(PathBuf::from) {
if !file_name.to_string_lossy().starts_with(&self.prefix) {
return None;
}
if file_name == self.prefix {
return Some(0);
}
macro_rules! idx {
($p:expr) => {{
if let Some(ext) = $p.extension().map(|e| e.to_string_lossy()) {
if let Ok(i) = ext.parse::<u64>() {
Some(i)
} else {
None
}
} else {
None
}
}};
}
return match idx!(file_name) {
Some(i) => Some(i),
None => idx!(file_name.file_stem().map(PathBuf::from)?),
};
}
None
}
#[inline(always)]
fn list_files(&self) -> Result<Vec<(u64, Metadata, PathBuf)>, Error> {
let mut out = vec![];
for d in self.dir.read_dir()? {
let de = d?;
let p = de.path().canonicalize()?;
if p == self.file_path() {
continue;
}
if let Some(i) = self.file_index(&p) {
out.push((i, p.metadata()?, p))
}
}
let last = self.file_path();
if last.exists() {
out.push((out.len() as u64, last.metadata()?, last));
}
Ok(out)
}
#[inline]
pub fn files_sorted_by_index(&self) -> Result<Vec<PathBuf>, Error> {
let mut files = self.list_files()?;
files.sort_by_key(|(i, _, _)| *i);
Ok(files.into_iter().map(|(_, _, p)| p).collect())
}
#[inline]
pub fn size(&self) -> Result<u64, Error> {
Ok(self
.list_files()?
.iter()
.map(|(_, m, _)| file_size(m))
.sum())
}
#[inline]
pub fn rotate(&mut self) -> Result<(), Error> {
if let Some(f) = self.writer.as_mut() {
f.flush()?;
self.wait_compress()?;
let mut files = self.list_files()?;
files.sort_by_key(|(i, _, _)| *i);
let i = files.last().map(|(i, _, _)| *i).unwrap_or(0);
let archive_path = add_extension(self.file_path(), (i + 1).to_string());
fs::rename(self.file_path(), &archive_path)?;
let size = self.size()?;
if let Some(max_size) = self.max_size.map(|m| m.in_bytes()) {
if size >= max_size {
let mut files = self.list_files()?;
let def = SystemTime::now();
files.sort_by_key(|(i, m, _)| (*i, m.modified().unwrap_or(def)));
files.reverse();
let mut free = size - max_size;
while let Some((_, meta, path)) = files.pop() {
fs::remove_file(path)?;
free = free.saturating_sub(file_size(&meta));
if free == 0 {
break;
}
}
}
}
if let Some(compression) = self.compression {
if archive_path.is_file() {
let mode = {
let f = || {
#[cfg(not(unix))]
return None;
#[cfg(unix)]
return self.ext.mode;
};
f()
};
self.compress_job = Some(std::thread::spawn(move || {
let r = compression.compress(archive_path, mode);
debug_assert!(r.is_ok(), "compress job failed: {:?}", r);
r
}));
}
}
return self.init_create_append();
}
Ok(())
}
#[inline(always)]
fn should_rotate(&self) -> bool {
if let Some(t) = self.trigger.as_ref() {
return match t {
Trigger::Size(s) => ByteSize::from_bytes(self.size) >= *s,
Trigger::Time(d) => self
.created
.map(|c| SystemTime::now().duration_since(c).unwrap() >= *d)
.unwrap_or_default(),
Trigger::SizeOrTime(s, d) => {
ByteSize::from_bytes(self.size) >= *s
|| self
.created
.map(|c| SystemTime::now().duration_since(c).unwrap() >= *d)
.unwrap_or_default()
}
};
}
false
}
#[inline(always)]
fn init_create_append(&mut self) -> Result<(), Error> {
let opts = {
let mut opts = fs::File::options();
#[cfg(target_family = "unix")]
{
if let Some(mode) = self.ext.mode {
opts.mode(mode);
}
if let Some(flags) = self.ext.flags {
opts.custom_flags(flags);
}
}
opts.append(true).create(true);
opts
};
let fd = opts.open(self.file_path())?;
let m = fd.metadata()?;
self.size = file_size(&m);
self.created = Some(
m.created()
.unwrap_or(
m.modified()
.unwrap_or(SystemTime::now()),
),
);
self.writer = Some(io::BufWriter::new(fd));
Ok(())
}
#[inline]
fn wait_compress(&mut self) -> Result<(), Error> {
if self.compression.is_none() || self.compress_job.is_none() {
return Ok(());
}
if let Some(h) = self.compress_job.take() {
return h
.join()
.expect("cannot join compression thread")
.map_err(Error::from);
}
Ok(())
}
#[inline]
pub fn sync(&mut self) -> Result<(), Error> {
self.flush()?;
self.wait_compress()
}
}
impl Drop for File {
fn drop(&mut self) {
let _ = self.sync();
}
}
#[cfg(test)]
mod test {
use std::{
io::{BufRead, BufReader, Write},
time::{Duration, Instant},
};
use huby::ByteSize;
use crate::OpenOptions;
use super::Compression;
use super::File;
use super::Trigger;
#[test]
fn test() {
let td = tempfile::tempdir().unwrap();
let p = td.path().join("log");
let mut f = File::create_append(&p).unwrap();
for _ in 0..199 {
writeln!(f, "test").unwrap();
f.rotate().unwrap();
}
writeln!(f, "test").unwrap();
f.flush().unwrap();
assert_eq!(f.files_sorted_by_index().unwrap().len(), 200);
let r = BufReader::new(f.open_options().open(p).unwrap());
let mut c = 0;
for l in r.lines() {
let l = l.unwrap();
assert_eq!(l, "test");
c += 1;
}
assert_eq!(c, 200)
}
#[test]
fn test_time_rotate() {
let td = tempfile::tempdir().unwrap();
let p = td.path().join("log");
let mut f = OpenOptions::new()
.trigger(Duration::from_millis(500).into())
.create_append(&p)
.unwrap();
let start = Instant::now();
let mut c = 0usize;
while Instant::now().checked_duration_since(start).unwrap() < Duration::from_secs(2) {
writeln!(f, "test").unwrap();
c = c.saturating_add(1);
}
f.sync().unwrap();
assert!(f.files_sorted_by_index().unwrap().len() >= 4);
let r = BufReader::new(f.open_options().open(p).unwrap());
assert_eq!(r.lines().count(), c)
}
#[test]
fn test_size_or_time_rotate_time() {
let td = tempfile::tempdir().unwrap();
let p = td.path().join("log");
let mut f = OpenOptions::new()
.trigger(Trigger::SizeOrTime(
ByteSize::from_mb(50),
Duration::from_millis(500),
))
.create_append(&p)
.unwrap();
let start = Instant::now();
let mut c = 0usize;
while Instant::now().checked_duration_since(start).unwrap() < Duration::from_secs(2) {
writeln!(f, "test").unwrap();
c = c.saturating_add(1);
}
f.sync().unwrap();
assert!(f.files_sorted_by_index().unwrap().len() >= 4);
let r = BufReader::new(f.open_options().open(p).unwrap());
assert_eq!(r.lines().count(), c)
}
#[test]
fn test_size_rotate() {
let td = tempfile::tempdir().unwrap();
let p = td.path().join("log");
let mut f = OpenOptions::new()
.trigger(ByteSize::from_bytes(50).into())
.create_append(&p)
.unwrap();
for _ in 0..100 {
writeln!(f, "test").unwrap();
}
f.flush().unwrap();
assert_eq!(f.files_sorted_by_index().unwrap().len(), 10);
assert_eq!(f.size().unwrap(), 500);
let r = BufReader::new(f.open_options().open(p).unwrap());
assert_eq!(r.lines().count(), 100);
}
#[test]
fn test_size_or_time_rotate_size() {
let td = tempfile::tempdir().unwrap();
let p = td.path().join("log");
let mut f = OpenOptions::new()
.trigger(Trigger::SizeOrTime(
ByteSize::from_bytes(50),
Duration::from_secs(600),
))
.create_append(&p)
.unwrap();
for _ in 0..100 {
writeln!(f, "test").unwrap();
}
f.flush().unwrap();
assert_eq!(f.files_sorted_by_index().unwrap().len(), 10);
assert_eq!(f.size().unwrap(), 500);
let r = BufReader::new(f.open_options().open(p).unwrap());
assert_eq!(r.lines().count(), 100);
}
#[test]
fn test_compression() {
let td = tempfile::tempdir().unwrap();
let p = td.path().join("log");
let mut f = OpenOptions::new()
.trigger(Trigger::Size(ByteSize::from_bytes(50)))
.compression(Compression::Gzip)
.create_append(&p)
.unwrap();
for _ in 0..100 {
writeln!(f, "test").unwrap();
}
f.flush().unwrap();
f.wait_compress().unwrap();
assert_eq!(f.files_sorted_by_index().unwrap().len(), 10);
assert_eq!(f.size().unwrap(), 401);
let lines = BufReader::new(f.open_options().open(p).unwrap()).lines();
assert_eq!(lines.count(), 100)
}
#[test]
fn test_max_size() {
let td = tempfile::tempdir().unwrap();
let p = td.path().join("log");
let mut f = OpenOptions::new()
.trigger(ByteSize::from_bytes(50).into())
.max_size(ByteSize::from_bytes(200))
.create_append(&p)
.unwrap();
for _ in 0..10000 {
writeln!(f, "test").unwrap();
}
f.flush().unwrap();
let files = f.files_sorted_by_index().unwrap();
assert_eq!(files.len(), 4);
assert_eq!(f.size().unwrap(), 200);
for f in files {
println!("{f:?}");
}
let lines = BufReader::new(f.open_options().open(p).unwrap()).lines();
assert_eq!(lines.count(), 40)
}
#[test]
fn test_max_size_with_compression() {
let td = tempfile::tempdir().unwrap();
let mut f = OpenOptions::new()
.trigger(ByteSize::from_kb(1).into())
.max_size(ByteSize::from_kb(2))
.compression(Compression::Gzip)
.create_append(td.path().join("log"))
.unwrap();
for _ in 0..20000 {
writeln!(f, "test").unwrap();
}
f.flush().unwrap();
f.wait_compress().unwrap();
let files = f.files_sorted_by_index().unwrap();
assert_eq!(files.len(), 26);
assert!(f.size().unwrap() <= ByteSize::from_kb(2).in_bytes());
}
#[test]
fn test_read_order() {
let td = tempfile::tempdir().unwrap();
let p = td.path().join("log");
let mut opts = OpenOptions::new();
opts.trigger(Trigger::Size(ByteSize::from_kb(1)))
.compression(Compression::Gzip);
let mut f = opts.create_append(&p).unwrap();
for i in 0..20_000 {
writeln!(f, "{i}").unwrap();
}
f.flush().unwrap();
f.wait_compress().unwrap();
assert_eq!(f.list_files().unwrap().len(), 107);
let lines = BufReader::new(opts.open(p).unwrap())
.lines()
.map_while(Result::ok)
.flat_map(|l| l.parse::<usize>())
.collect::<Vec<usize>>();
assert_eq!(lines.len(), 20_000);
let mut prev = 0;
for i in lines {
if i == 0 {
prev = i;
continue;
}
assert!(i > prev);
prev = i
}
}
}