use std::{num::NonZeroUsize, path::PathBuf, time::SystemTime};
use ::csv::Reader as CsvReader;
use crate::{
fs::{
policy::{RollingFileInfo, RollingFileNameTemplate},
WriteResult,
},
storage::{
self, csv, BinaryDataFormat, CreatedAtOffset, RecordStorageBase, RecordStorageRead,
RecordStorageWrite, StorageConfig, StorageDescriptor, StorageStatistics,
MAX_PREALLOCATED_CAPACITY_LIMIT,
},
time::SystemInstant,
};
use super::{Error, Record, RecordFilter, RecordStorage, Result, StorageRecord, StoredRecord};
#[allow(missing_debug_implementations)]
pub struct FileRecordStorage {
inner: csv::FileRecordStorage<StorageRecord, StorageRecord>,
}
impl FileRecordStorage {
pub fn try_new(
base_path: PathBuf,
file_name_prefix: String,
binary_data_format: BinaryDataFormat,
initial_config: StorageConfig,
) -> Result<Self> {
let file_name_template = RollingFileNameTemplate {
prefix: file_name_prefix,
suffix: ".csv".to_string(),
};
let inner = csv::FileRecordStorage::try_new(
binary_data_format,
initial_config,
base_path,
file_name_template,
None,
)?;
Ok(Self { inner })
}
}
fn filter_map_storage_record(
created_at_origin: SystemTime,
record: StorageRecord,
binary_data_format: BinaryDataFormat,
) -> Option<StoredRecord> {
match StoredRecord::try_restore(created_at_origin, record, binary_data_format) {
Ok(record) => Some(record),
Err(err) => {
log::error!("Failed to convert record: {}", err);
None
}
}
}
impl RecordStorageBase for FileRecordStorage {
fn descriptor(&self) -> &StorageDescriptor {
self.inner.descriptor()
}
fn config(&self) -> &StorageConfig {
self.inner.config()
}
fn replace_config(&mut self, new_config: StorageConfig) -> StorageConfig {
self.inner.replace_config(new_config)
}
fn perform_housekeeping(&mut self) -> storage::Result<()> {
self.inner.perform_housekeeping()
}
fn retain_all_records_created_since(
&mut self,
created_since: SystemTime,
) -> storage::Result<()> {
self.inner.retain_all_records_created_since(created_since)
}
fn report_statistics(&mut self) -> storage::Result<StorageStatistics> {
self.inner.report_statistics()
}
}
impl RecordStorageWrite<Record> for FileRecordStorage {
fn append_record(
&mut self,
created_at: &SystemInstant,
record: Record,
) -> storage::Result<(WriteResult, CreatedAtOffset)> {
let storage_record = StorageRecord::try_new(record, self.descriptor().binary_data_format)?;
self.inner.append_record(created_at, storage_record)
}
}
impl RecordStorage for FileRecordStorage {
fn recent_records(&mut self, limit: NonZeroUsize) -> Result<Vec<StoredRecord>> {
self.inner
.recent_records(limit)
.map(|v| {
v.into_iter()
.filter_map(|(create_at_origin, record)| {
filter_map_storage_record(
create_at_origin,
record,
self.descriptor().binary_data_format,
)
})
.collect()
})
.map_err(Error::Storage)
}
fn filter_records(
&mut self,
limit: NonZeroUsize,
filter: RecordFilter,
) -> Result<Vec<StoredRecord>> {
self.inner.flush_before_reading()?;
let limit = limit.get().min(MAX_PREALLOCATED_CAPACITY_LIMIT);
let mut records = Vec::with_capacity(limit);
for file_info in self
.inner
.read_all_dir_entries_filtered_chronologically(
&csv::file_info_filter_from_record_prelude_filter(&filter.prelude),
)?
.into_iter()
.map(RollingFileInfo::from)
{
if limit <= records.len() {
break;
}
let remaining_limit = limit - records.len();
let reader = csv::create_file_reader(&file_info.path)?;
records.extend(
reader_into_filtered_record_iter(
reader,
file_info.created_at.into(),
filter.clone(),
self.descriptor().binary_data_format,
)
.take(remaining_limit),
);
}
Ok(records)
}
}
fn reader_into_filtered_record_iter<R>(
reader: CsvReader<R>,
created_at_origin: SystemTime,
filter: RecordFilter,
binary_data_format: BinaryDataFormat,
) -> impl Iterator<Item = StoredRecord>
where
R: std::io::Read,
{
let RecordFilter {
prelude: prelude_filter,
any_codes,
any_scopes,
min_severity,
} = filter;
csv::reader_into_filtered_record_iter(reader, created_at_origin, prelude_filter)
.filter_map(move |record| {
filter_map_storage_record(created_at_origin, record, binary_data_format)
})
.filter(move |StoredRecord { prelude: _, entry }| {
if let Some(min_severity) = min_severity {
if entry.severity < min_severity {
return false;
}
}
if let Some(any_codes) = &any_codes {
if any_codes.iter().all(|code| *code != entry.code) {
return false;
}
}
if let Some(any_scopes) = &any_scopes {
if any_scopes.iter().all(|scope| scope != &entry.scope) {
return false;
}
}
true
})
}