use std::{
fs::File,
io::{Error as IoError, ErrorKind as IoErrorKind},
result::Result as StdResult,
time::SystemTime,
};
use ::csv::{
Error as CsvError, ErrorKind, StringRecord, Writer as CsvWriter,
WriterBuilder as CsvWriterBuilder,
};
use serde::Serialize;
use thiserror::Error;
use crate::{
io::{BytesWritten, CountingWrite},
time::SystemInstant,
};
use super::{
policy::{
OpenRollingFile, RollingFileConfig, RollingFileInfo, RollingFileInfoWithSize,
RollingFileLimits, RollingFileStatus as PolicyRollingFileStatus,
},
WriteError, WriteResult,
};
type CountingFileWriter = CsvWriter<CountingWrite<File>>;
#[derive(Error, Debug)]
pub enum Error {
#[error(transparent)]
Io(#[from] IoError),
#[error(transparent)]
Csv(#[from] CsvError),
}
pub type Result<T> = StdResult<T, Error>;
#[derive(Debug, Clone)]
struct RollingFileStatus {
created_at: SystemTime,
bytes_written: BytesWritten,
records_written: u64,
}
impl RollingFileStatus {
fn new(created_at: SystemTime, bytes_written: BytesWritten) -> Self {
debug_assert_eq!(0, bytes_written.value());
Self {
created_at,
bytes_written,
records_written: 0,
}
}
fn should_roll(
&self,
now: SystemTime,
now_nanoseconds_offset: u64,
limits: &RollingFileLimits,
) -> bool {
PolicyRollingFileStatus::from(self).should_roll(now, now_nanoseconds_offset, limits)
}
}
impl From<&RollingFileStatus> for PolicyRollingFileStatus {
fn from(from: &RollingFileStatus) -> Self {
let RollingFileStatus {
created_at,
bytes_written,
records_written,
} = from;
Self {
created_at: *created_at,
bytes_written: Some(bytes_written.value()),
records_written: Some(*records_written),
}
}
}
#[derive(Debug)]
struct RollingFile {
info: RollingFileInfo,
status: RollingFileStatus,
writer: CountingFileWriter,
last_os_error_code: Option<i32>,
}
impl RollingFile {
#[allow(clippy::panic_in_result_fn)] fn after_record_written(&mut self, res: StdResult<(), ::csv::Error>) -> Result<WriteResult> {
match res {
Ok(()) => {
self.status.records_written += 1;
self.last_os_error_code = None;
Ok(Ok(()))
}
Err(err) => {
if let ErrorKind::Io(err) = err.kind() {
let last_os_error_code = err.raw_os_error();
if let Some(last_os_error_code) = last_os_error_code {
if self.last_os_error_code == Some(last_os_error_code) {
log::debug!("Writing of CSV record failed repeatedly with I/O error (OS code = {}): {}", last_os_error_code, err);
return Ok(Err(WriteError::RepeatedOsError {
code: last_os_error_code,
}));
} else {
log::warn!(
"Writing of CSV record failed with I/O error (OS code = {}): {}",
last_os_error_code,
err
);
self.last_os_error_code = Some(last_os_error_code);
}
} else {
log::warn!("Writing of CSV record failed with I/O error: {}", err);
self.last_os_error_code = None;
}
} else {
log::warn!("Writing of CSV record failed: {}", err);
return Err(Error::Csv(err));
}
match err.into_kind() {
ErrorKind::Io(err) => Err(Error::Io(err)),
_ => {
unreachable!();
}
}
}
}
}
}
#[derive(Debug, Eq, PartialEq)]
pub struct ClosedFileInfo(RollingFileInfo);
impl ClosedFileInfo {
#[must_use]
pub fn into_inner(self) -> RollingFileInfo {
self.0
}
}
#[derive(Debug)]
pub struct RollingFileWriter {
config: RollingFileConfig,
custom_header: Option<StringRecord>,
current_file: Option<RollingFile>,
}
impl RollingFileWriter {
#[must_use]
pub fn new(config: RollingFileConfig, custom_header: Option<StringRecord>) -> Self {
Self {
config,
custom_header,
current_file: None,
}
}
fn start_new_file(&self, starting_at: SystemInstant) -> Result<Option<RollingFile>> {
let new_file = self
.config
.system
.open_new_file_for_writing(starting_at.system_time().into())?;
match new_file {
OpenRollingFile::Opened(file, info) => {
let (writer, bytes_written) = CountingWrite::from_writer(file);
let status = RollingFileStatus::new(info.created_at.into(), bytes_written);
let rolling_file = RollingFile {
info,
status,
writer: CsvWriterBuilder::new()
.has_headers(self.custom_header.is_none())
.from_writer(writer),
last_os_error_code: None,
};
Ok(Some(rolling_file))
}
OpenRollingFile::AlreadyExists(path) => {
log::info!("File already exists: {}", path.display());
Ok(None)
}
}
}
fn roll_file_now(&mut self, now: SystemInstant) -> Result<Option<ClosedFileInfo>> {
let new_file = self.start_new_file(now)?;
if let Some(new_file) = new_file {
log::info!("Opened new file: {}", new_file.info.path.display());
let old_file = self.current_file.replace(new_file);
let closed_file_info = old_file.map(|old_file| {
log::info!("Closing old file: {}", old_file.info.path.display());
ClosedFileInfo(old_file.info)
});
Ok(closed_file_info)
} else {
Ok(None)
}
}
#[must_use]
pub fn current_file_info(&self) -> Option<&RollingFileInfo> {
self.current_file
.as_ref()
.map(|current_file| ¤t_file.info)
}
#[must_use]
pub fn current_file_info_with_size(&self) -> Option<RollingFileInfoWithSize> {
self.current_file.as_ref().map(|current_file| {
let RollingFileInfo { path, created_at } = ¤t_file.info;
RollingFileInfoWithSize {
path: path.clone(),
created_at: *created_at,
size_in_bytes: current_file.status.bytes_written.value(),
}
})
}
pub fn recent_files(&self) -> Result<Vec<RollingFileInfoWithSize>> {
let mut files = self
.config
.system
.read_all_dir_entries_filtered_chronologically(&Default::default())?;
files.reverse();
Ok(files)
}
fn before_writing(
&mut self,
now: &SystemInstant,
now_nanoseconds_offset: u64,
) -> Result<Option<ClosedFileInfo>> {
let (closed_file_info, created_new_file) = if let Some(current_file) = &self.current_file {
if current_file.status.should_roll(
now.system_time(),
now_nanoseconds_offset,
&self.config.limits,
) {
self.flush()?;
let closed_file_info = self.roll_file_now(now.clone())?;
let created_new_file = closed_file_info.is_some();
(closed_file_info, created_new_file)
} else {
(None, false)
}
} else {
(self.roll_file_now(now.clone())?, true)
};
if let Some(current_file) = self.current_file.as_mut() {
if created_new_file {
if let Some(custom_header) = &self.custom_header {
current_file.writer.write_record(custom_header)?;
}
}
Ok(closed_file_info)
} else {
Err(IoError::new(IoErrorKind::NotFound, "no open file").into())
}
}
pub fn write_record<I, T>(
&mut self,
now: &SystemInstant,
now_nanoseconds_offset: u64,
record: I,
) -> Result<(WriteResult, Option<ClosedFileInfo>)>
where
I: IntoIterator<Item = T>,
T: AsRef<[u8]>,
{
let closed_file_info = self.before_writing(now, now_nanoseconds_offset)?;
let record_written = if let Some(current_file) = self.current_file.as_mut() {
let res = current_file.writer.write_record(record);
current_file.after_record_written(res)?
} else {
Err(WriteError::NoFile)
};
Ok((record_written, closed_file_info))
}
pub fn serialize<S: Serialize>(
&mut self,
now: &SystemInstant,
now_nanoseconds_offset: u64,
record: S,
) -> Result<(WriteResult, Option<ClosedFileInfo>)> {
let closed_file_info = self.before_writing(now, now_nanoseconds_offset)?;
let record_written = if let Some(current_file) = self.current_file.as_mut() {
let res = current_file.writer.serialize(record);
current_file.after_record_written(res)?
} else {
Err(WriteError::NoFile)
};
Ok((record_written, closed_file_info))
}
pub fn flush(&mut self) -> Result<()> {
if let Some(current_file) = self.current_file.as_mut() {
current_file.writer.flush()?;
}
Ok(())
}
}
#[cfg(test)]
mod tests;