pub mod codec;
pub mod reader;
pub mod search;
pub mod writer;
use std::collections::{HashMap, HashSet};
use std::io::{BufRead, BufWriter, Read, Seek, SeekFrom, Write};
use std::path::{Path, PathBuf};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, Mutex};
use std::time::SystemTime;
pub fn shard_for_pv(pv: &str, n: usize) -> usize {
use std::collections::hash_map::DefaultHasher;
use std::hash::{Hash, Hasher};
debug_assert!(n > 0);
if n <= 1 {
return 0;
}
let mut h = DefaultHasher::new();
pv.hash(&mut h);
(h.finish() % n as u64) as usize
}
pub const DEFAULT_MAX_OPEN_WRITERS: usize = 512;
#[derive(Clone)]
pub struct FdBudget {
counter: Arc<AtomicUsize>,
max: usize,
}
impl FdBudget {
pub fn new(max: usize) -> Self {
Self {
counter: Arc::new(AtomicUsize::new(0)),
max: if max == 0 { usize::MAX } else { max },
}
}
pub fn unbounded() -> Self {
Self::new(usize::MAX)
}
pub fn count(&self) -> usize {
self.counter.load(Ordering::Relaxed)
}
pub fn max(&self) -> usize {
self.max
}
fn try_reserve(&self) -> Option<WriterFdGuard> {
loop {
let cur = self.counter.load(Ordering::Acquire);
if cur >= self.max {
return None;
}
match self.counter.compare_exchange_weak(
cur,
cur + 1,
Ordering::AcqRel,
Ordering::Acquire,
) {
Ok(_) => {
return Some(WriterFdGuard {
counter: self.counter.clone(),
});
}
Err(_) => continue,
}
}
}
}
use async_trait::async_trait;
use prost::Message;
use tracing::debug;
use crate::storage::partition::PartitionGranularity;
use crate::storage::traits::{
AppendMeta, EventStream, IngestFlushResult, StoragePlugin, StoreSummary,
};
use crate::types::{ArchDbType, ArchiverSample};
use self::reader::PbFileReader;
struct WriterFdGuard {
counter: Arc<AtomicUsize>,
}
impl Drop for WriterFdGuard {
fn drop(&mut self) {
self.counter.fetch_sub(1, Ordering::Relaxed);
}
}
struct CachedWriter {
path: PathBuf,
writer: BufWriter<std::fs::File>,
dirty: bool,
last_used: SystemTime,
_fd_guard: WriterFdGuard,
}
struct PvWriterSlot {
writer: Option<CachedWriter>,
dead: bool,
}
struct TombstoneCleanupGuard<'a> {
plugin: &'a PlainPbStoragePlugin,
pv: String,
}
impl<'a> Drop for TombstoneCleanupGuard<'a> {
fn drop(&mut self) {
let mut cache = self
.plugin
.write_cache
.lock()
.unwrap_or_else(|e| e.into_inner());
cache.remove(&self.pv);
}
}
struct FlushOutcome {
failed: Vec<String>,
deferred: Vec<String>,
}
struct BoundedReader {
inner: PbFileReader,
start: SystemTime,
end: SystemTime,
done: bool,
}
impl BoundedReader {
fn new(inner: PbFileReader, start: SystemTime, end: SystemTime) -> Self {
Self {
inner,
start,
end,
done: false,
}
}
}
impl crate::storage::traits::EventStream for BoundedReader {
fn description(&self) -> &crate::types::EventStreamDesc {
self.inner.description()
}
fn next_event(&mut self) -> anyhow::Result<Option<crate::types::ArchiverSample>> {
if self.done {
return Ok(None);
}
loop {
match self.inner.next_event()? {
None => {
self.done = true;
return Ok(None);
}
Some(s) if s.timestamp < self.start => continue,
Some(s) if s.timestamp > self.end => continue,
Some(s) => return Ok(Some(s)),
}
}
}
}
use crate::retrieval::query::SingleSampleStream;
pub struct PlainPbStoragePlugin {
plugin_name: String,
root_folder: PathBuf,
granularity: PartitionGranularity,
write_cache: Mutex<HashMap<String, Arc<Mutex<PvWriterSlot>>>>,
known_dirs: Mutex<HashSet<PathBuf>>,
fd_budget: FdBudget,
evicted_with_loss: Mutex<Vec<String>>,
}
impl PlainPbStoragePlugin {
pub fn new(name: &str, root_folder: PathBuf, granularity: PartitionGranularity) -> Self {
Self::with_max_open_writers(name, root_folder, granularity, DEFAULT_MAX_OPEN_WRITERS)
}
pub fn with_max_open_writers(
name: &str,
root_folder: PathBuf,
granularity: PartitionGranularity,
max_open_writers: usize,
) -> Self {
Self::with_fd_budget(
name,
root_folder,
granularity,
FdBudget::new(max_open_writers),
)
}
pub fn with_fd_budget(
name: &str,
root_folder: PathBuf,
granularity: PartitionGranularity,
fd_budget: FdBudget,
) -> Self {
Self {
plugin_name: name.to_string(),
root_folder,
granularity,
write_cache: Mutex::new(HashMap::new()),
known_dirs: Mutex::new(HashSet::new()),
fd_budget,
evicted_with_loss: Mutex::new(Vec::new()),
}
}
pub fn open_writer_count(&self) -> usize {
self.fd_budget.count()
}
pub fn file_path_for(&self, pv: &str, ts: SystemTime) -> PathBuf {
let pv_key = pv_name_to_key(pv);
let partition_name = crate::storage::partition::partition_name(ts, self.granularity);
let filename = format!("{pv_key}:{partition_name}.pb");
self.root_folder.join(filename)
}
fn list_files_for_range(&self, pv: &str, start: SystemTime, end: SystemTime) -> Vec<PathBuf> {
let partitions =
crate::storage::partition::partitions_in_range(start, end, self.granularity);
let pv_key = pv_name_to_key(pv);
partitions
.into_iter()
.map(|pname| {
let filename = format!("{pv_key}:{pname}.pb");
self.root_folder.join(filename)
})
.filter(|p| p.exists())
.collect()
}
pub fn root_folder(&self) -> &Path {
&self.root_folder
}
fn flush_dirty_writers(&self) -> FlushOutcome {
let snapshot: Vec<(String, Arc<Mutex<PvWriterSlot>>)> = {
let cache = self.write_cache.lock().unwrap_or_else(|e| e.into_inner());
cache.iter().map(|(k, v)| (k.clone(), v.clone())).collect()
};
let mut failed = Vec::new();
let mut deferred = Vec::new();
let mut to_remove = Vec::new();
for (pv, slot_arc) in snapshot {
let mut slot_guard = match slot_arc.try_lock() {
Ok(g) => g,
Err(std::sync::TryLockError::WouldBlock) => {
deferred.push(pv);
continue;
}
Err(std::sync::TryLockError::Poisoned(p)) => p.into_inner(),
};
let Some(cached) = slot_guard.writer.as_mut() else {
continue;
};
if !cached.dirty {
continue;
}
match cached.writer.flush() {
Ok(()) => {
if !cached.path.exists() {
tracing::warn!(
pv,
path = ?cached.path,
"Flush succeeded but file is gone; bytes are not \
reader-visible — surfacing PV to loss queue"
);
metrics::counter!(
"archiver_pb_flush_failures_total",
"tier" => self.plugin_name.clone(),
)
.increment(1);
if let Some(removed) = slot_guard.writer.take() {
let (_file, _buffered) = removed.writer.into_parts();
}
self.record_dirty_loss(&pv);
failed.push(pv.clone());
to_remove.push(pv);
} else {
cached.dirty = false;
}
}
Err(e) => {
tracing::warn!(pv, path = ?cached.path, "Failed to flush cached writer: {e}");
metrics::counter!(
"archiver_pb_flush_failures_total",
"tier" => self.plugin_name.clone(),
)
.increment(1);
if let Some(removed) = slot_guard.writer.take() {
let (_file, _buffered) = removed.writer.into_parts();
}
self.record_dirty_loss(&pv);
failed.push(pv.clone());
to_remove.push(pv);
}
}
}
if !to_remove.is_empty() {
let mut cache = self.write_cache.lock().unwrap_or_else(|e| e.into_inner());
for pv in &to_remove {
cache.remove(pv);
}
}
FlushOutcome { failed, deferred }
}
fn record_dirty_loss(&self, pv: &str) {
let mut ev = self
.evicted_with_loss
.lock()
.unwrap_or_else(|e| e.into_inner());
ev.push(pv.to_string());
}
fn drop_dirty_writer(&self, pv: &str, cached: CachedWriter) -> std::io::Result<()> {
let CachedWriter {
path,
mut writer,
dirty,
last_used: _,
_fd_guard,
} = cached;
let flush_res = if dirty { writer.flush() } else { Ok(()) };
let (_file, _buffered) = writer.into_parts();
match flush_res {
Ok(()) => {
if dirty && !path.exists() {
tracing::warn!(
pv,
?path,
"Dirty-writer flush succeeded but file is gone; \
bytes are not reader-visible — surfacing PV to loss queue"
);
metrics::counter!(
"archiver_pb_dirty_drop_loss_total",
"tier" => self.plugin_name.clone(),
)
.increment(1);
self.record_dirty_loss(pv);
Err(std::io::Error::new(
std::io::ErrorKind::NotFound,
"flushed to deleted inode",
))
} else {
Ok(())
}
}
Err(e) => {
tracing::warn!(
pv,
?path,
"Dirty-writer drop flush failed; dirty bytes lost — \
surfacing PV to loss queue: {e}"
);
metrics::counter!(
"archiver_pb_dirty_drop_loss_total",
"tier" => self.plugin_name.clone(),
)
.increment(1);
self.record_dirty_loss(pv);
Err(e)
}
}
}
fn drop_writer_file_gone(&self, pv: &str, cached: CachedWriter) {
if cached.dirty {
tracing::warn!(
pv,
path = ?cached.path,
"Dirty bytes lost — file disappeared while writer was \
still buffering; surfacing PV to loss queue"
);
metrics::counter!(
"archiver_pb_dirty_drop_loss_total",
"tier" => self.plugin_name.clone(),
)
.increment(1);
self.record_dirty_loss(pv);
}
let (_file, _buffered) = cached.writer.into_parts();
}
pub fn evict_writer_for_path(&self, path: &Path) -> bool {
if let Some(pv) = self.pv_name_from_path(path) {
return self.evict_writer_for_pv_at_path(&pv, path);
}
self.evict_writer_for_path_scan(path)
}
fn pv_name_from_path(&self, path: &Path) -> Option<String> {
let rel = path.strip_prefix(&self.root_folder).ok()?;
let s = rel.to_str()?;
let colon = s.rfind(':')?;
let pv_key = &s[..colon];
if pv_key.is_empty() {
return None;
}
Some(pv_key.replace('/', ":"))
}
fn evict_writer_for_pv_at_path(&self, pv: &str, path: &Path) -> bool {
let slot_arc = {
let cache = self.write_cache.lock().unwrap_or_else(|e| {
tracing::warn!(?path, "write cache poisoned at evict_writer_for_path: {e}");
e.into_inner()
});
cache.get(pv).cloned()
};
let Some(arc) = slot_arc else {
return false;
};
let mut slot_guard = arc.lock().unwrap_or_else(|e| e.into_inner());
let matches = slot_guard
.writer
.as_ref()
.map(|cw| cw.path == path)
.unwrap_or(false);
if !matches {
return false;
}
let Some(cached) = slot_guard.writer.take() else {
return false;
};
self.drop_writer_file_gone(pv, cached);
drop(slot_guard);
let mut cache = self.write_cache.lock().unwrap_or_else(|e| e.into_inner());
cache.remove(pv);
true
}
fn evict_writer_for_path_scan(&self, path: &Path) -> bool {
let snapshot: Vec<(String, Arc<Mutex<PvWriterSlot>>)> = {
let cache = self.write_cache.lock().unwrap_or_else(|e| {
tracing::warn!(?path, "write cache poisoned at evict_writer_for_path: {e}");
e.into_inner()
});
cache
.iter()
.map(|(pv, slot)| (pv.clone(), slot.clone()))
.collect()
};
let mut removed = false;
let mut to_remove = Vec::new();
for (pv, slot_arc) in snapshot {
let mut slot_guard = slot_arc.lock().unwrap_or_else(|e| e.into_inner());
let matches = slot_guard
.writer
.as_ref()
.map(|cw| cw.path == path)
.unwrap_or(false);
if !matches {
continue;
}
if let Some(cached) = slot_guard.writer.take() {
self.drop_writer_file_gone(&pv, cached);
removed = true;
}
drop(slot_guard);
to_remove.push(pv);
}
if !to_remove.is_empty() {
let mut cache = self.write_cache.lock().unwrap_or_else(|e| e.into_inner());
for pv in &to_remove {
cache.remove(pv);
}
}
removed
}
fn ensure_parent_dir(&self, path: &Path) -> anyhow::Result<()> {
if let Some(parent) = path.parent() {
let needs_create = {
let dirs = self.known_dirs.lock().unwrap_or_else(|e| e.into_inner());
!dirs.contains(parent)
};
if needs_create {
std::fs::create_dir_all(parent)?;
let mut dirs = self.known_dirs.lock().unwrap_or_else(|e| e.into_inner());
dirs.insert(parent.to_path_buf());
}
}
Ok(())
}
fn slot_for(&self, pv: &str) -> Arc<Mutex<PvWriterSlot>> {
let mut cache = self.write_cache.lock().unwrap_or_else(|e| e.into_inner());
cache
.entry(pv.to_string())
.or_insert_with(|| {
Arc::new(Mutex::new(PvWriterSlot {
writer: None,
dead: false,
}))
})
.clone()
}
fn write_cached(
&self,
path: &Path,
pv: &str,
dbr_type: ArchDbType,
sample: &ArchiverSample,
meta: &AppendMeta,
) -> anyhow::Result<()> {
let sample_bytes = writer::encode_sample(dbr_type, sample)?;
let escaped_sample = codec::escape(&sample_bytes);
let path_buf = path.to_path_buf();
let slot_arc = self.slot_for(pv);
let mut slot = slot_arc.lock().unwrap_or_else(|e| e.into_inner());
if slot.dead {
anyhow::bail!("PV `{pv}` was deleted/renamed concurrently; refusing to recreate file");
}
if let Some(existing) = slot.writer.as_ref()
&& existing.path != path_buf
&& let Some(cached) = slot.writer.take()
{
let _ = self.drop_dirty_writer(pv, cached);
}
if let Some(existing) = slot.writer.as_ref()
&& !existing.path.exists()
{
tracing::warn!(
pv,
path = ?existing.path,
"Cached writer's file disappeared from filesystem; reopening"
);
if let Some(cached) = slot.writer.take() {
self.drop_writer_file_gone(pv, cached);
}
}
if slot.writer.is_none() {
let needs_header = file_needs_header(path);
let fd_guard = loop {
if let Some(guard) = self.fd_budget.try_reserve() {
break guard;
}
if !self.evict_lru_writer(pv) {
return Err(anyhow::anyhow!(
"PlainPB tier `{}` at fd cap ({}) and no evictable \
writer (all slots busy); refusing to open another \
to protect the process fd budget",
self.plugin_name,
self.fd_budget.max()
));
}
};
let file = match std::fs::OpenOptions::new()
.create(true)
.append(true)
.open(path)
{
Ok(f) => f,
Err(e) if is_too_many_open_files(&e) && self.evict_lru_writer(pv) => {
tracing::warn!(
?path,
"Hit OS file-handle limit; evicted LRU writer and \
retrying open"
);
std::fs::OpenOptions::new()
.create(true)
.append(true)
.open(path)?
}
Err(e) => return Err(e.into()),
};
let mut bw = BufWriter::with_capacity(64 * 1024, file);
if needs_header {
let (year, _, _) = sample.decompose_timestamp();
let header = writer::build_payload_info(
pv,
dbr_type,
year,
meta.element_count,
&meta.headers,
);
let header_bytes = header.encode_to_vec();
let escaped_header = codec::escape(&header_bytes);
let mut header_frame = Vec::with_capacity(escaped_header.len() + 1);
header_frame.extend_from_slice(&escaped_header);
header_frame.push(codec::NEWLINE);
if let Err(e) = bw.write_all(&header_frame) {
let (_file, _buffered) = bw.into_parts();
return Err(e.into());
}
}
slot.writer = Some(CachedWriter {
path: path_buf,
writer: bw,
dirty: true,
last_used: SystemTime::now(),
_fd_guard: fd_guard,
});
}
let cached = slot.writer.as_mut().expect("just inserted");
cached.last_used = SystemTime::now();
let mut frame = Vec::with_capacity(escaped_sample.len() + 1);
frame.extend_from_slice(&escaped_sample);
frame.push(codec::NEWLINE);
if let Err(e) = cached.writer.write_all(&frame) {
tracing::warn!(
pv,
path = ?cached.path,
"Write failed; evicting cached writer to force \
reopen on next sample: {e}"
);
let was_dirty = cached.dirty;
if let Some(removed) = slot.writer.take() {
let (_file, _buffered) = removed.writer.into_parts();
if was_dirty {
self.record_dirty_loss(pv);
metrics::counter!(
"archiver_pb_dirty_drop_loss_total",
"tier" => self.plugin_name.clone(),
)
.increment(1);
}
}
drop(slot);
let mut cache = self.write_cache.lock().unwrap_or_else(|e| e.into_inner());
cache.remove(pv);
return Err(e.into());
}
cached.dirty = true;
Ok(())
}
fn evict_lru_writer(&self, current_pv: &str) -> bool {
let candidates: Vec<(String, Arc<Mutex<PvWriterSlot>>)> = {
let cache = self.write_cache.lock().unwrap_or_else(|e| e.into_inner());
cache
.iter()
.filter(|(pv, _)| pv.as_str() != current_pv)
.map(|(pv, slot)| (pv.clone(), slot.clone()))
.collect()
};
if self.try_evict_with_filter(&candidates, false) {
return true;
}
self.try_evict_with_filter(&candidates, true)
}
fn try_evict_with_filter(
&self,
candidates: &[(String, Arc<Mutex<PvWriterSlot>>)],
want_dirty: bool,
) -> bool {
let mut oldest: Option<(String, Arc<Mutex<PvWriterSlot>>, SystemTime)> = None;
for (pv, slot_arc) in candidates {
let Ok(guard) = slot_arc.try_lock() else {
continue;
};
let Some(cw) = guard.writer.as_ref() else {
drop(guard);
continue;
};
if cw.dirty != want_dirty {
drop(guard);
continue;
}
let last_used = cw.last_used;
drop(guard);
match &oldest {
Some((_, _, ts)) if *ts <= last_used => {}
_ => oldest = Some((pv.clone(), slot_arc.clone(), last_used)),
}
}
let Some((pv, slot_arc, _)) = oldest else {
return false;
};
let Ok(mut guard) = slot_arc.try_lock() else {
return false;
};
let Some(cached) = guard.writer.take() else {
return false;
};
let _ = self.drop_dirty_writer(&pv, cached);
drop(guard);
let mut cache = self.write_cache.lock().unwrap_or_else(|e| e.into_inner());
cache.remove(&pv);
true
}
}
fn is_too_many_open_files(e: &std::io::Error) -> bool {
matches!(e.raw_os_error(), Some(23) | Some(24))
}
fn file_needs_header(path: &Path) -> bool {
if !path.exists() {
return true;
}
let size = std::fs::metadata(path).map(|m| m.len()).unwrap_or(0);
if size == 0 {
return true;
}
if PbFileReader::open(path).is_err() {
tracing::warn!(
?path,
"PB file has unreadable header; truncating so a fresh \
header gets written"
);
if let Err(e) = std::fs::OpenOptions::new()
.write(true)
.truncate(true)
.open(path)
{
tracing::warn!(?path, "Failed to truncate corrupt PB file: {e}");
}
return true;
}
if let Err(e) = trim_to_last_newline(path) {
tracing::warn!(?path, "Failed to trim partial trailing record: {e}");
}
false
}
fn trim_to_last_newline(path: &Path) -> std::io::Result<()> {
use std::io::{Read, Seek, SeekFrom};
let mut file = std::fs::OpenOptions::new()
.read(true)
.write(true)
.open(path)?;
let len = file.metadata()?.len();
if len == 0 {
return Ok(());
}
file.seek(SeekFrom::End(-1))?;
let mut tail = [0u8; 1];
file.read_exact(&mut tail)?;
if tail[0] == codec::NEWLINE {
return Ok(());
}
const CHUNK: usize = 4096;
let mut buf = vec![0u8; CHUNK];
let mut window_end = len;
while window_end > 0 {
let read_len = (window_end as usize).min(CHUNK);
let read_start = window_end - read_len as u64;
file.seek(SeekFrom::Start(read_start))?;
file.read_exact(&mut buf[..read_len])?;
if let Some(idx) = buf[..read_len].iter().rposition(|&b| b == codec::NEWLINE) {
let new_len = read_start + idx as u64 + 1;
tracing::warn!(
?path,
old_len = len,
new_len,
"Trimming partial trailing PB record"
);
file.set_len(new_len)?;
return Ok(());
}
window_end = read_start;
}
Ok(())
}
pub(crate) fn pv_name_to_key(pv: &str) -> String {
if !crate::registry::is_valid_pv_name(pv) {
let mut sanitized = String::with_capacity(pv.len() + 16);
sanitized.push_str("__invalid__/");
for c in pv.chars() {
if c.is_ascii_alphanumeric() || c == '_' || c == '-' {
sanitized.push(c);
} else {
sanitized.push('_');
}
}
tracing::warn!(
pv,
"PV name rejected by validator; sanitized to {sanitized}"
);
return sanitized;
}
pv.replace(':', "/")
}
fn read_last_sample_from_file(path: &Path) -> anyhow::Result<Option<ArchiverSample>> {
let file = std::fs::File::open(path)?;
let file_len = file.metadata()?.len();
if file_len == 0 {
return Ok(None);
}
let mut rdr = std::io::BufReader::new(file);
let mut header_line = Vec::new();
rdr.read_until(codec::NEWLINE, &mut header_line)?;
if header_line.last() == Some(&codec::NEWLINE) {
header_line.pop();
}
let header_bytes = codec::unescape(&header_line);
let payload_info = archiver_proto::epics_event::PayloadInfo::decode(header_bytes.as_slice())?;
let year = payload_info.year;
let dbr_type = ArchDbType::from_i32(payload_info.r#type).unwrap_or(ArchDbType::ScalarDouble);
let header_end = rdr.stream_position()?;
if header_end >= file_len {
return Ok(None);
}
let data_len = file_len - header_end;
let chunk_size = (64 * 1024u64).min(data_len);
let seek_pos = file_len - chunk_size;
rdr.seek(SeekFrom::Start(seek_pos))?;
let mut tail = Vec::with_capacity(chunk_size as usize);
rdr.read_to_end(&mut tail)?;
if tail.last() == Some(&codec::NEWLINE) {
tail.pop();
}
if tail.is_empty() {
return Ok(None);
}
let last_line_data = if let Some(pos) = tail.iter().rposition(|&b| b == codec::NEWLINE) {
&tail[pos + 1..]
} else if seek_pos <= header_end {
&tail
} else {
let mut reader = PbFileReader::open(path)?;
let mut last = None;
while let Some(sample) = reader.next_event()? {
last = Some(sample);
}
return Ok(last);
};
if last_line_data.is_empty() {
return Ok(None);
}
let raw = codec::unescape(last_line_data);
if let Ok(sample) = reader::decode_sample(dbr_type, year, &raw) {
return Ok(Some(sample));
}
tracing::warn!(
?path,
"PB tail decode failed; falling back to forward scan for last good sample"
);
let mut reader = PbFileReader::open(path)?;
let mut last = None;
while let Ok(Some(sample)) = reader.next_event() {
last = Some(sample);
}
Ok(last)
}
fn pv_file_parts(pv: &str) -> (PathBuf, String) {
let pv_key = pv_name_to_key(pv);
let dir_part = pv_key.rsplit_once('/').map(|(dir, _)| dir).unwrap_or("");
let file_prefix = pv_key
.rsplit_once('/')
.map(|(_, name)| name)
.unwrap_or(&pv_key)
.to_string();
(PathBuf::from(dir_part), file_prefix)
}
pub fn list_pv_pb_files_pub(root: &Path, pv: &str) -> anyhow::Result<Vec<PathBuf>> {
list_pv_pb_files(root, pv)
}
fn list_pv_pb_files(root: &Path, pv: &str) -> anyhow::Result<Vec<PathBuf>> {
let (dir_part, file_prefix) = pv_file_parts(pv);
let pv_dir = root.join(&dir_part);
if !pv_dir.exists() {
return Ok(Vec::new());
}
let mut files: Vec<PathBuf> = std::fs::read_dir(&pv_dir)?
.filter_map(|e| e.ok())
.map(|e| e.path())
.filter(|p| {
p.extension().and_then(|e| e.to_str()) == Some("pb")
&& p.file_name().and_then(|n| n.to_str()).is_some_and(|n| {
n.starts_with(&file_prefix) && n[file_prefix.len()..].starts_with(':')
})
})
.collect();
files.sort();
Ok(files)
}
#[async_trait]
impl StoragePlugin for PlainPbStoragePlugin {
fn name(&self) -> &str {
&self.plugin_name
}
fn partition_granularity(&self) -> PartitionGranularity {
self.granularity
}
async fn append_event(
&self,
pv: &str,
dbr_type: ArchDbType,
sample: &ArchiverSample,
) -> anyhow::Result<()> {
let meta = AppendMeta::default();
self.append_event_with_meta(pv, dbr_type, sample, &meta)
.await
}
async fn append_event_with_meta(
&self,
pv: &str,
dbr_type: ArchDbType,
sample: &ArchiverSample,
meta: &AppendMeta,
) -> anyhow::Result<()> {
let path = self.file_path_for(pv, sample.timestamp);
debug!(?path, pv, "appending event");
self.ensure_parent_dir(&path)?;
self.write_cached(&path, pv, dbr_type, sample, meta)
}
async fn get_data(
&self,
pv: &str,
start: SystemTime,
end: SystemTime,
) -> anyhow::Result<Vec<Box<dyn EventStream>>> {
self.flush_writes().await?;
let files = self.list_files_for_range(pv, start, end);
if files.len() == 1
&& let Some(last) = read_last_sample_from_file(&files[0])?
&& last.timestamp <= start
{
let reader = PbFileReader::open(&files[0])?;
let desc = reader.description().clone();
return Ok(vec![Box::new(SingleSampleStream {
desc,
sample: Some(last),
})]);
}
let mut streams: Vec<Box<dyn EventStream>> = Vec::new();
for file in files {
let reader = PbFileReader::open_seeked(&file, start)?;
streams.push(Box::new(BoundedReader::new(reader, start, end)));
}
Ok(streams)
}
async fn get_last_known_event(&self, pv: &str) -> anyhow::Result<Option<ArchiverSample>> {
self.flush_writes().await?;
let pb_files = list_pv_pb_files(&self.root_folder, pv)?;
for path in pb_files.into_iter().rev() {
if let Some(sample) = read_last_sample_from_file(&path)? {
return Ok(Some(sample));
}
}
Ok(None)
}
async fn get_last_event_before(
&self,
pv: &str,
target: SystemTime,
) -> anyhow::Result<Option<ArchiverSample>> {
self.flush_writes().await?;
let pb_files = list_pv_pb_files(&self.root_folder, pv)?;
for path in pb_files.into_iter().rev() {
let Some(last) = read_last_sample_from_file(&path)? else {
continue;
};
if last.timestamp < target {
return Ok(Some(last));
}
let mut reader = PbFileReader::open(&path)?;
let mut last_before: Option<ArchiverSample> = None;
while let Some(sample) = reader.next_event()? {
if sample.timestamp >= target {
break;
}
last_before = Some(sample);
}
if last_before.is_some() {
return Ok(last_before);
}
}
Ok(None)
}
async fn delete_pv_data(&self, pv: &str) -> anyhow::Result<u64> {
let _cleanup = TombstoneCleanupGuard {
plugin: self,
pv: pv.to_string(),
};
let slot_arc = self.slot_for(pv);
{
let mut slot = slot_arc.lock().unwrap_or_else(|e| e.into_inner());
slot.dead = true;
if let Some(cached) = slot.writer.take() {
self.drop_writer_file_gone(pv, cached);
}
}
let entries = list_pv_pb_files(&self.root_folder, pv)?;
let mut deleted = 0u64;
for path in entries {
tokio::fs::remove_file(&path).await?;
deleted += 1;
}
let (dir_part, _) = pv_file_parts(pv);
let pv_dir = self.root_folder.join(&dir_part);
if pv_dir.exists() {
let is_empty = std::fs::read_dir(&pv_dir)?.next().is_none();
if is_empty {
let _ = tokio::fs::remove_dir(&pv_dir).await;
let mut dirs = self.known_dirs.lock().unwrap_or_else(|e| e.into_inner());
dirs.remove(&pv_dir);
}
}
debug!(pv, deleted, "Deleted PV data files");
Ok(deleted)
}
async fn flush_writes(&self) -> anyhow::Result<()> {
let outcome = self.flush_dirty_writers();
if !outcome.failed.is_empty() {
anyhow::bail!(
"{} writer flush(es) failed (first pv={})",
outcome.failed.len(),
outcome.failed[0],
);
}
Ok(())
}
async fn flush_ingest_writes(&self) -> anyhow::Result<IngestFlushResult> {
let outcome = self.flush_dirty_writers();
let mut failed = outcome.failed;
{
let mut ev = self
.evicted_with_loss
.lock()
.unwrap_or_else(|e| e.into_inner());
failed.append(&mut *ev);
}
failed.sort();
failed.dedup();
Ok(IngestFlushResult {
failed,
deferred: outcome.deferred,
})
}
fn stores_for_pv(&self, pv: &str) -> anyhow::Result<Vec<StoreSummary>> {
let files = list_pv_pb_files(&self.root_folder, pv).unwrap_or_default();
let count = files.len() as u64;
let bytes: u64 = files
.iter()
.filter_map(|p| std::fs::metadata(p).ok())
.map(|m| m.len())
.sum();
Ok(vec![StoreSummary {
name: self.plugin_name.clone(),
root_folder: self.root_folder.clone(),
granularity: self.granularity,
pv_file_count: Some(count),
pv_size_bytes: Some(bytes),
total_size_bytes: None,
total_files: None,
}])
}
fn appliance_metrics(&self) -> anyhow::Result<Vec<StoreSummary>> {
let (total_files, total_size) = total_pb_stats(&self.root_folder);
Ok(vec![StoreSummary {
name: self.plugin_name.clone(),
root_folder: self.root_folder.clone(),
granularity: self.granularity,
pv_file_count: None,
pv_size_bytes: None,
total_size_bytes: Some(total_size),
total_files: Some(total_files),
}])
}
async fn rename_pv(&self, from: &str, to: &str) -> anyhow::Result<u64> {
let _cleanup = TombstoneCleanupGuard {
plugin: self,
pv: from.to_string(),
};
let from_slot = self.slot_for(from);
{
let mut slot = from_slot.lock().unwrap_or_else(|e| e.into_inner());
slot.dead = true;
if let Some(cached) = slot.writer.take() {
let _ = self.drop_dirty_writer(from, cached);
}
}
let dest_slot_arc = {
let mut cache = self
.write_cache
.lock()
.map_err(|e| anyhow::anyhow!("write cache poisoned: {e}"))?;
cache.remove(to)
};
if let Some(arc) = dest_slot_arc {
let mut slot = arc.lock().unwrap_or_else(|e| e.into_inner());
if let Some(cached) = slot.writer.take() {
let _ = self.drop_dirty_writer(to, cached);
}
}
let from_files = list_pv_pb_files(&self.root_folder, from)?;
if from_files.is_empty() {
return Ok(0);
}
let from_key = pv_name_to_key(from);
let from_leaf = from_key.rsplit('/').next().unwrap_or(&from_key).to_string();
let to_key = pv_name_to_key(to);
let to_leaf = to_key.rsplit('/').next().unwrap_or(&to_key).to_string();
let (to_dir_part, _) = pv_file_parts(to);
let to_dir = self.root_folder.join(&to_dir_part);
if !to_dir.as_os_str().is_empty() && !to_dir.exists() {
std::fs::create_dir_all(&to_dir)?;
}
let mut moved = 0u64;
for src in &from_files {
let file_name = src
.file_name()
.and_then(|n| n.to_str())
.ok_or_else(|| anyhow::anyhow!("non-utf8 filename: {src:?}"))?;
let suffix = file_name
.strip_prefix(&from_leaf)
.and_then(|s| s.strip_prefix(':'))
.ok_or_else(|| {
anyhow::anyhow!("filename {file_name} did not match expected PV leaf")
})?;
let new_name = format!("{to_leaf}:{suffix}");
let dst = to_dir.join(new_name);
std::fs::rename(src, &dst)?;
moved += 1;
}
let (from_dir_part, _) = pv_file_parts(from);
let from_dir = self.root_folder.join(&from_dir_part);
if !from_dir_part.as_os_str().is_empty()
&& from_dir.exists()
&& std::fs::read_dir(&from_dir)?.next().is_none()
{
let _ = std::fs::remove_dir(&from_dir);
let mut dirs = self.known_dirs.lock().unwrap_or_else(|e| e.into_inner());
dirs.remove(&from_dir);
}
Ok(moved)
}
}
fn total_pb_stats(root: &Path) -> (u64, u64) {
fn walk(p: &Path, files: &mut u64, bytes: &mut u64) {
let entries = match std::fs::read_dir(p) {
Ok(e) => e,
Err(_) => return,
};
for entry in entries.flatten() {
let path = entry.path();
if path.is_dir() {
walk(&path, files, bytes);
} else if path.extension().and_then(|e| e.to_str()) == Some("pb") {
*files += 1;
if let Ok(meta) = entry.metadata() {
*bytes += meta.len();
}
}
}
}
let mut files = 0u64;
let mut bytes = 0u64;
if root.exists() {
walk(root, &mut files, &mut bytes);
}
(files, bytes)
}