archiver_core/storage/plainpb/mod.rs
1pub mod codec;
2pub mod reader;
3pub mod search;
4pub mod writer;
5
6use std::collections::{HashMap, HashSet};
7use std::io::{BufRead, BufWriter, Read, Seek, SeekFrom, Write};
8use std::path::{Path, PathBuf};
9use std::sync::atomic::{AtomicUsize, Ordering};
10use std::sync::{Arc, Mutex};
11use std::time::SystemTime;
12
13/// Hash a PV name to a shard index in `0..n`. Pub so the engine's
14/// dispatcher AND PlainPB's `flush_ingest_writes_for_shard` agree
15/// on which shard owns which PV — without a shared definition the
16/// engine could route PV-X to shard A while PlainPB filters PV-X
17/// into shard B's flush set, re-introducing the misattribution
18/// bug we're trying to close.
19///
20/// `DefaultHasher` is stable enough for in-process partitioning;
21/// hash quality across process restarts doesn't matter because
22/// the on-disk file layout is keyed by PV name, not shard.
23pub fn shard_for_pv(pv: &str, n: usize) -> usize {
24 use std::collections::hash_map::DefaultHasher;
25 use std::hash::{Hash, Hasher};
26 debug_assert!(n > 0);
27 if n <= 1 {
28 return 0;
29 }
30 let mut h = DefaultHasher::new();
31 pv.hash(&mut h);
32 (h.finish() % n as u64) as usize
33}
34
35/// Default cap on simultaneously-open `BufWriter` file handles across
36/// all PVs in one PlainPB tier. Sized to stay well clear of the
37/// typical Linux process fd ulimit (1024) so the storage plugin can
38/// run on an out-of-the-box host without `ulimit -n` tuning. Sites
39/// with raised ulimits and tens of thousands of active PVs should
40/// override via [`PlainPbStoragePlugin::with_max_open_writers`].
41pub const DEFAULT_MAX_OPEN_WRITERS: usize = 512;
42
43/// Shared fd permit pool used by [`PlainPbStoragePlugin`]. Cloning
44/// the budget hands the SAME counter to multiple plugins, so a
45/// process-wide cap can be enforced across STS/MTS/LTS instead of
46/// each tier privately keeping its own 512-fd ceiling and silently
47/// summing past the process ulimit.
48///
49/// Internally a `Arc<AtomicUsize>` plus a `max`. CAS-based
50/// reservation in `try_reserve` makes the cap a hard ceiling under
51/// concurrency (no check-then-fetch_add race).
52#[derive(Clone)]
53pub struct FdBudget {
54 counter: Arc<AtomicUsize>,
55 max: usize,
56}
57
58impl FdBudget {
59 /// New budget with the given cap. `0` is sentinel for
60 /// "unbounded" (lifts the cap to `usize::MAX`); any other value
61 /// is the hard ceiling.
62 pub fn new(max: usize) -> Self {
63 Self {
64 counter: Arc::new(AtomicUsize::new(0)),
65 max: if max == 0 { usize::MAX } else { max },
66 }
67 }
68
69 /// Convenience constructor for sites that want to disable the
70 /// internal cap entirely (e.g. when the OS ulimit alone is the
71 /// only ceiling that matters).
72 pub fn unbounded() -> Self {
73 Self::new(usize::MAX)
74 }
75
76 /// Snapshot of the current open-writer count (across every
77 /// plugin sharing this budget). Lock-free; can drift between
78 /// the read and any subsequent action.
79 pub fn count(&self) -> usize {
80 self.counter.load(Ordering::Relaxed)
81 }
82
83 /// Configured cap.
84 pub fn max(&self) -> usize {
85 self.max
86 }
87
88 /// Atomically reserve one permit. Returns `Some(WriterFdGuard)`
89 /// on success; the guard's Drop releases the permit. CAS loop
90 /// closes the check-then-act race on contended paths.
91 fn try_reserve(&self) -> Option<WriterFdGuard> {
92 loop {
93 let cur = self.counter.load(Ordering::Acquire);
94 if cur >= self.max {
95 return None;
96 }
97 match self.counter.compare_exchange_weak(
98 cur,
99 cur + 1,
100 Ordering::AcqRel,
101 Ordering::Acquire,
102 ) {
103 Ok(_) => {
104 return Some(WriterFdGuard {
105 counter: self.counter.clone(),
106 });
107 }
108 Err(_) => continue,
109 }
110 }
111 }
112}
113
114use async_trait::async_trait;
115use prost::Message;
116use tracing::debug;
117
118use crate::storage::partition::PartitionGranularity;
119use crate::storage::traits::{
120 AppendMeta, EventStream, IngestFlushResult, StoragePlugin, StoreSummary,
121};
122use crate::types::{ArchDbType, ArchiverSample};
123
124use self::reader::PbFileReader;
125
126/// RAII handle that decrements the plugin's `open_writers` counter
127/// when the owning [`CachedWriter`] is dropped. Tying the decrement
128/// to Drop guarantees the count stays in sync with reality even if
129/// a future code path takes the writer without going through
130/// `flush_dirty_writers` / `evict_lru_writer` — every drop site
131/// already exercises this guard.
132struct WriterFdGuard {
133 counter: Arc<AtomicUsize>,
134}
135
136impl Drop for WriterFdGuard {
137 fn drop(&mut self) {
138 self.counter.fetch_sub(1, Ordering::Relaxed);
139 }
140}
141
142/// Cached file handle for writing the current partition of a PV.
143struct CachedWriter {
144 path: PathBuf,
145 writer: BufWriter<std::fs::File>,
146 /// `true` between writes and the next successful flush. Lets
147 /// `flush_writes` skip writers that have nothing pending so a
148 /// reader-side `get_data` doesn't pay an O(N) syscall storm
149 /// across every cached writer (Java parity is similar — the
150 /// `dirty` bit short-circuits the iteration).
151 dirty: bool,
152 /// Last access timestamp — the LRU key used by the always-on
153 /// fd-cap eviction path: when [`PlainPbStoragePlugin::open_writers`]
154 /// reaches `max_open_writers`, the writer with the smallest
155 /// `last_used` is evicted to make room for the next open.
156 last_used: SystemTime,
157 /// Decrements `open_writers` on drop. Field name starts with
158 /// `_` because it's never read directly — it exists purely for
159 /// its Drop side-effect.
160 _fd_guard: WriterFdGuard,
161}
162
163/// Per-PV serialization slot. Holds the cached writer (if any) and
164/// a tombstone bit so a concurrent `append` doesn't resurrect a PV
165/// whose `delete_pv_data`/`rename_pv` is already in flight.
166///
167/// Each slot is wrapped in its own `Mutex` so I/O for one PV cannot
168/// stall I/O for any other. The outer `write_cache` map is locked
169/// only while inserting/looking up/removing slots — never while
170/// holding a filesystem syscall.
171struct PvWriterSlot {
172 writer: Option<CachedWriter>,
173 /// Set under the slot lock by `delete_pv_data` / `rename_pv`.
174 /// Once true, every subsequent `append` for this PV bails with
175 /// an error so the deleted PV doesn't reappear from a racing
176 /// late writer. The slot stays in `write_cache` only until the
177 /// caller that set the flag clears the entry from the map; new
178 /// `append`s that look up the PV after the cache eviction get a
179 /// fresh slot.
180 dead: bool,
181}
182
183/// RAII cleanup for the tombstoned-slot pattern used by
184/// `delete_pv_data` and `rename_pv`. Ensures `cache.remove(pv)`
185/// runs on every return path — including `?` short-circuits and
186/// panics — so a PV cannot be left permanently undead because an
187/// intermediate `tokio::fs::remove_file` errored. Without this
188/// guard the slot stays in `write_cache` with `dead == true` and
189/// every future append for this PV name bails forever.
190struct TombstoneCleanupGuard<'a> {
191 plugin: &'a PlainPbStoragePlugin,
192 pv: String,
193}
194
195impl<'a> Drop for TombstoneCleanupGuard<'a> {
196 fn drop(&mut self) {
197 let mut cache = self
198 .plugin
199 .write_cache
200 .lock()
201 .unwrap_or_else(|e| e.into_inner());
202 cache.remove(&self.pv);
203 }
204}
205
206/// Aggregated outcome of one `flush_dirty_writers` pass — used to
207/// give the read-side and the write-side flush surfaces different
208/// semantics over the same underlying iteration.
209struct FlushOutcome {
210 /// PVs whose `flush()` syscall errored. Their cached writers
211 /// have been evicted (buffered bytes discarded via `into_parts`)
212 /// and their entries removed from the map. Surfaced to the
213 /// write_loop so it drops their `last_event` from the registry
214 /// commit batch.
215 failed: Vec<String>,
216 /// PVs whose slot was already locked (an `append` or another
217 /// flush is in flight). Their dirty bytes remain buffered and
218 /// will be picked up on the next flush cycle. Surfaced to the
219 /// write_loop alongside `failed` so the registry doesn't claim
220 /// `last_event` for samples whose bytes are still in BufWriter
221 /// memory — under-commit, never over-commit.
222 deferred: Vec<String>,
223}
224
225/// Wraps a `PbFileReader` and clamps emitted samples to `[start, end]`.
226///
227/// Java parity (e3b4471 + 88c7601): `binary_search_pb_file` returns
228/// `None` when every sample in the file is older than `start`, leaving
229/// the reader at the data section start. Without a lower-bound filter
230/// the wrapper would leak the entire file's stale contents into the
231/// retrieval merge. The upper bound covers files included by partition
232/// name whose actual sample timestamps spill past `end`.
233struct BoundedReader {
234 inner: PbFileReader,
235 start: SystemTime,
236 end: SystemTime,
237 done: bool,
238}
239
240impl BoundedReader {
241 fn new(inner: PbFileReader, start: SystemTime, end: SystemTime) -> Self {
242 Self {
243 inner,
244 start,
245 end,
246 done: false,
247 }
248 }
249}
250
251impl crate::storage::traits::EventStream for BoundedReader {
252 fn description(&self) -> &crate::types::EventStreamDesc {
253 self.inner.description()
254 }
255
256 fn next_event(&mut self) -> anyhow::Result<Option<crate::types::ArchiverSample>> {
257 if self.done {
258 return Ok(None);
259 }
260 loop {
261 match self.inner.next_event()? {
262 None => {
263 self.done = true;
264 return Ok(None);
265 }
266 // Below `start` or above `end`: drop and continue. PB
267 // partition files are append-ordered but timestamps
268 // within one file aren't strictly monotonic — clock
269 // backsteps and late backfills exist — so a single
270 // out-of-window sample must NOT terminate the stream
271 // (Java's reader keeps consuming until EOF).
272 Some(s) if s.timestamp < self.start => continue,
273 Some(s) if s.timestamp > self.end => continue,
274 Some(s) => return Ok(Some(s)),
275 }
276 }
277 }
278}
279
280use crate::retrieval::query::SingleSampleStream;
281
282/// PlainPB storage plugin — binary-compatible with Java EPICS Archiver Appliance.
283pub struct PlainPbStoragePlugin {
284 plugin_name: String,
285 root_folder: PathBuf,
286 granularity: PartitionGranularity,
287 /// One per-PV slot, each holding (at most) one `BufWriter` pointed
288 /// at that PV's current partition file. The outer `Mutex<HashMap>`
289 /// is held only briefly to look up / insert / remove a slot; all
290 /// I/O happens under the per-slot mutex so a stuck syscall on one
291 /// PV does NOT block any other PV's appends or flushes.
292 write_cache: Mutex<HashMap<String, Arc<Mutex<PvWriterSlot>>>>,
293 /// Directories known to exist. Avoids redundant create_dir_all syscalls.
294 known_dirs: Mutex<HashSet<PathBuf>>,
295 /// Shared fd permit pool. Cloning the [`FdBudget`] across
296 /// multiple plugins ties them to the SAME counter and ceiling,
297 /// so STS+MTS+LTS can enforce a single process-wide cap rather
298 /// than each tier silently keeping its own 512-fd ceiling.
299 fd_budget: FdBudget,
300 /// PVs whose dirty bytes were evicted by the LRU path AND
301 /// whose flush failed, so the bytes are LOST. Drained by the
302 /// next `flush_dirty_writers` and merged into `failed` so
303 /// write_loop drops these PVs from `ts_updates` instead of
304 /// silently committing a stale timestamp.
305 evicted_with_loss: Mutex<Vec<String>>,
306}
307
308impl PlainPbStoragePlugin {
309 pub fn new(name: &str, root_folder: PathBuf, granularity: PartitionGranularity) -> Self {
310 Self::with_max_open_writers(name, root_folder, granularity, DEFAULT_MAX_OPEN_WRITERS)
311 }
312
313 /// Construct with an explicit cap on simultaneously-open writers.
314 /// Equivalent to `with_fd_budget(name, root, granularity,
315 /// FdBudget::new(max_open_writers))` — sites that want to
316 /// SHARE the budget across multiple tiers should call
317 /// `with_fd_budget` directly with a clone of the same `FdBudget`.
318 pub fn with_max_open_writers(
319 name: &str,
320 root_folder: PathBuf,
321 granularity: PartitionGranularity,
322 max_open_writers: usize,
323 ) -> Self {
324 Self::with_fd_budget(
325 name,
326 root_folder,
327 granularity,
328 FdBudget::new(max_open_writers),
329 )
330 }
331
332 /// Construct using a (possibly-shared) fd permit pool.
333 ///
334 /// Pass the SAME [`FdBudget`] (via `.clone()`) to multiple
335 /// plugins to enforce a process-wide cap; pass a fresh
336 /// `FdBudget::new(N)` per plugin for the legacy per-tier cap.
337 pub fn with_fd_budget(
338 name: &str,
339 root_folder: PathBuf,
340 granularity: PartitionGranularity,
341 fd_budget: FdBudget,
342 ) -> Self {
343 Self {
344 plugin_name: name.to_string(),
345 root_folder,
346 granularity,
347 write_cache: Mutex::new(HashMap::new()),
348 known_dirs: Mutex::new(HashSet::new()),
349 fd_budget,
350 evicted_with_loss: Mutex::new(Vec::new()),
351 }
352 }
353
354 /// Snapshot of the current open-writer count for the budget
355 /// this plugin draws from. When the budget is shared across
356 /// tiers, this reflects the GLOBAL count, not just this tier's
357 /// share. Lock-free; instantaneous, can drift.
358 pub fn open_writer_count(&self) -> usize {
359 self.fd_budget.count()
360 }
361
362 /// Build the file path for a PV at a given timestamp.
363 /// Format: {root}/{pv_key}:{partition_name}.pb
364 /// where pv_key replaces `:` with `/` in the PV name.
365 pub fn file_path_for(&self, pv: &str, ts: SystemTime) -> PathBuf {
366 let pv_key = pv_name_to_key(pv);
367 let partition_name = crate::storage::partition::partition_name(ts, self.granularity);
368 let filename = format!("{pv_key}:{partition_name}.pb");
369 self.root_folder.join(filename)
370 }
371
372 /// List all PB files for a PV in a time range.
373 fn list_files_for_range(&self, pv: &str, start: SystemTime, end: SystemTime) -> Vec<PathBuf> {
374 let partitions =
375 crate::storage::partition::partitions_in_range(start, end, self.granularity);
376 let pv_key = pv_name_to_key(pv);
377 partitions
378 .into_iter()
379 .map(|pname| {
380 let filename = format!("{pv_key}:{pname}.pb");
381 self.root_folder.join(filename)
382 })
383 .filter(|p| p.exists())
384 .collect()
385 }
386
387 pub fn root_folder(&self) -> &Path {
388 &self.root_folder
389 }
390
391 /// Flush every dirty cached writer (across all PVs) that we
392 /// can lock without blocking. Per-slot `try_lock` keeps a
393 /// stuck PV from blocking the flush of every other PV — those
394 /// are reported in `deferred` and retried next cycle.
395 ///
396 /// Errored flushes evict the writer (buffered bytes dropped
397 /// via `into_parts`) and are recorded BOTH in the returned
398 /// `failed` list (for the immediate caller) AND in
399 /// `evicted_with_loss` (so the next ingest-side flush_owner
400 /// pass can pick them up even if the immediate caller was the
401 /// read-side `flush_writes`). Without the second record, a
402 /// retrieval-triggered flush could swallow a failure before
403 /// the global flush owner ever sees it.
404 ///
405 /// **Does not drain `evicted_with_loss`.** That drain belongs
406 /// exclusively to `flush_ingest_writes` so the global owner
407 /// is the single consumer of loss markers. Read-side callers
408 /// (retrieval, ETL) must NOT consume the queue or owner-side
409 /// ts_updates would silently advance past PVs whose bytes
410 /// never reached disk.
411 fn flush_dirty_writers(&self) -> FlushOutcome {
412 // Snapshot slot Arcs under a brief outer lock so we never
413 // hold the outer cache mutex while attempting an inner-slot
414 // lock — that would deadlock with an `append` that holds
415 // its slot lock and is waiting on the outer lock to record
416 // a partition rollover.
417 let snapshot: Vec<(String, Arc<Mutex<PvWriterSlot>>)> = {
418 let cache = self.write_cache.lock().unwrap_or_else(|e| e.into_inner());
419 cache.iter().map(|(k, v)| (k.clone(), v.clone())).collect()
420 };
421
422 let mut failed = Vec::new();
423 let mut deferred = Vec::new();
424 let mut to_remove = Vec::new();
425
426 for (pv, slot_arc) in snapshot {
427 let mut slot_guard = match slot_arc.try_lock() {
428 Ok(g) => g,
429 Err(std::sync::TryLockError::WouldBlock) => {
430 deferred.push(pv);
431 continue;
432 }
433 Err(std::sync::TryLockError::Poisoned(p)) => p.into_inner(),
434 };
435 let Some(cached) = slot_guard.writer.as_mut() else {
436 continue;
437 };
438 if !cached.dirty {
439 continue;
440 }
441 match cached.writer.flush() {
442 Ok(()) => {
443 // Principle 4 (flush truth): flush succeeded
444 // at the syscall level, but if the underlying
445 // file is gone (ETL deleted it; an `rm -f` ran;
446 // an NFS race), the bytes went into the page
447 // cache for an unlinked inode — not reader-
448 // visible. Treat as loss to keep the registry's
449 // `last_event` honest.
450 if !cached.path.exists() {
451 tracing::warn!(
452 pv,
453 path = ?cached.path,
454 "Flush succeeded but file is gone; bytes are not \
455 reader-visible — surfacing PV to loss queue"
456 );
457 metrics::counter!(
458 "archiver_pb_flush_failures_total",
459 "tier" => self.plugin_name.clone(),
460 )
461 .increment(1);
462 if let Some(removed) = slot_guard.writer.take() {
463 let (_file, _buffered) = removed.writer.into_parts();
464 }
465 self.record_dirty_loss(&pv);
466 failed.push(pv.clone());
467 to_remove.push(pv);
468 } else {
469 cached.dirty = false;
470 }
471 }
472 Err(e) => {
473 tracing::warn!(pv, path = ?cached.path, "Failed to flush cached writer: {e}");
474 metrics::counter!(
475 "archiver_pb_flush_failures_total",
476 "tier" => self.plugin_name.clone(),
477 )
478 .increment(1);
479 if let Some(removed) = slot_guard.writer.take() {
480 let (_file, _buffered) = removed.writer.into_parts();
481 }
482 // Persist the loss so the next flush_ingest_writes
483 // surfaces it to the global owner — even if THIS
484 // call was triggered by retrieval/ETL, the owner
485 // must still learn that PV's bytes were lost so
486 // it doesn't commit a stale `last_event`.
487 self.record_dirty_loss(&pv);
488 failed.push(pv.clone());
489 to_remove.push(pv);
490 }
491 }
492 }
493
494 if !to_remove.is_empty() {
495 let mut cache = self.write_cache.lock().unwrap_or_else(|e| e.into_inner());
496 for pv in &to_remove {
497 cache.remove(pv);
498 }
499 }
500
501 FlushOutcome { failed, deferred }
502 }
503
504 /// Push `pv` onto the persistent loss queue so the next
505 /// `flush_ingest_writes` call surfaces it to the global flush
506 /// owner. Idempotent at the caller site — duplicate entries
507 /// are harmless because the owner's `ts_updates.remove` is
508 /// idempotent.
509 fn record_dirty_loss(&self, pv: &str) {
510 let mut ev = self
511 .evicted_with_loss
512 .lock()
513 .unwrap_or_else(|e| e.into_inner());
514 ev.push(pv.to_string());
515 }
516
517 /// Helper used by every code path that drops a dirty writer
518 /// outside the regular flush iteration (partition rollover,
519 /// ghost-file reopen, evict_writer_for_path, write_cached
520 /// write-error path, LRU eviction). Tries to flush; on flush
521 /// failure pushes the PV to `evicted_with_loss`. Returns
522 /// `Ok(())` when the bytes are reader-visible on disk,
523 /// `Err(io::Error)` when the bytes were lost.
524 ///
525 /// **Always uses `into_parts` to drop the BufWriter**, on
526 /// every path. Without this, the loss branch's `Err` return
527 /// would leave `cached.writer` to be dropped normally —
528 /// `BufWriter::drop` would re-issue the failing flush syscall
529 /// behind our back, possibly writing a partial frame to disk
530 /// after we've already classified the bytes as lost. The
531 /// remaining fields (path, last_used, _fd_guard) drop at
532 /// scope exit; `_fd_guard` releases the fd permit.
533 ///
534 /// Also checks `path.exists()` after a successful flush —
535 /// flushing to a deleted inode returns Ok at the syscall
536 /// level but the bytes are not reader-visible (principle 4:
537 /// flush success must mean reader-visible).
538 fn drop_dirty_writer(&self, pv: &str, cached: CachedWriter) -> std::io::Result<()> {
539 let CachedWriter {
540 path,
541 mut writer,
542 dirty,
543 last_used: _,
544 _fd_guard,
545 } = cached;
546 let flush_res = if dirty { writer.flush() } else { Ok(()) };
547 // Discard the BufWriter without invoking its Drop — keeps
548 // a failed flush from re-firing as a drop-time auto-flush.
549 let (_file, _buffered) = writer.into_parts();
550 // _fd_guard drops at end of this function, releasing the
551 // fd permit.
552
553 match flush_res {
554 Ok(()) => {
555 if dirty && !path.exists() {
556 // Flushed to a deleted inode. Bytes went into
557 // the OS page cache for an unlinked file —
558 // not reader-visible. Treat as loss.
559 tracing::warn!(
560 pv,
561 ?path,
562 "Dirty-writer flush succeeded but file is gone; \
563 bytes are not reader-visible — surfacing PV to loss queue"
564 );
565 metrics::counter!(
566 "archiver_pb_dirty_drop_loss_total",
567 "tier" => self.plugin_name.clone(),
568 )
569 .increment(1);
570 self.record_dirty_loss(pv);
571 Err(std::io::Error::new(
572 std::io::ErrorKind::NotFound,
573 "flushed to deleted inode",
574 ))
575 } else {
576 Ok(())
577 }
578 }
579 Err(e) => {
580 tracing::warn!(
581 pv,
582 ?path,
583 "Dirty-writer drop flush failed; dirty bytes lost — \
584 surfacing PV to loss queue: {e}"
585 );
586 metrics::counter!(
587 "archiver_pb_dirty_drop_loss_total",
588 "tier" => self.plugin_name.clone(),
589 )
590 .increment(1);
591 self.record_dirty_loss(pv);
592 Err(e)
593 }
594 }
595 }
596
597 /// Variant of [`drop_dirty_writer`] for paths where the
598 /// underlying file is GONE (ghost-file disappearance,
599 /// `evict_writer_for_path` after `remove_file`). Skips the
600 /// flush attempt — flushing to a deleted inode either fails
601 /// or vanishes silently — and unconditionally records loss
602 /// for any dirty bytes.
603 fn drop_writer_file_gone(&self, pv: &str, cached: CachedWriter) {
604 if cached.dirty {
605 tracing::warn!(
606 pv,
607 path = ?cached.path,
608 "Dirty bytes lost — file disappeared while writer was \
609 still buffering; surfacing PV to loss queue"
610 );
611 metrics::counter!(
612 "archiver_pb_dirty_drop_loss_total",
613 "tier" => self.plugin_name.clone(),
614 )
615 .increment(1);
616 self.record_dirty_loss(pv);
617 }
618 // Drop the CachedWriter explicitly: into_parts avoids
619 // BufWriter's drop-time auto-flush so we don't try to
620 // write to a deleted file.
621 let (_file, _buffered) = cached.writer.into_parts();
622 }
623
624 /// Drop any cached BufWriter whose target path matches `path`.
625 /// Call this after `remove_file` on a `.pb` file the engine may have
626 /// open — without it, subsequent `append_event` writes go to the
627 /// deleted-but-still-open inode (a "ghost" file invisible to readers
628 /// because `list_files_for_range` walks the directory). Safe no-op
629 /// when nothing matches. Returns true if a writer was evicted.
630 ///
631 /// **Definitive (principle 3 / eviction ownership):** the slot
632 /// pointing at `path` (if any) is evicted before this returns.
633 /// Fast path uses the file path's deterministic encoding to
634 /// derive the PV name and look up exactly one slot, avoiding
635 /// the previous full-scan blocking-lock cost (where one stuck
636 /// unrelated slot could delay every ETL eviction). Falls back
637 /// to a scan only if the path's filename can't be parsed.
638 ///
639 /// Sync method, blocking on the target slot's mutex — the
640 /// ETL caller wraps it in `tokio::task::spawn_blocking` so
641 /// the runtime worker isn't held during the wait.
642 pub fn evict_writer_for_path(&self, path: &Path) -> bool {
643 // Fast path: the file path encodes the PV name (see
644 // `pv_name_to_key`), so we can derive it and look up
645 // exactly one slot. Avoids the previous full-scan
646 // behaviour where one stuck unrelated slot could delay
647 // every ETL eviction.
648 if let Some(pv) = self.pv_name_from_path(path) {
649 return self.evict_writer_for_pv_at_path(&pv, path);
650 }
651 // Fallback: file path didn't decode (caller passed a
652 // non-storage path, or the encoding scheme has drifted).
653 // Run the legacy scan so we don't miss a writer just
654 // because the filename was unusual.
655 self.evict_writer_for_path_scan(path)
656 }
657
658 /// Reverse the deterministic `pv_name_to_key` → file-path
659 /// encoding. Returns `Some(pv_name)` for any path under the
660 /// storage root whose filename matches the
661 /// `{pv_key}:{partition}.pb` layout.
662 fn pv_name_from_path(&self, path: &Path) -> Option<String> {
663 let rel = path.strip_prefix(&self.root_folder).ok()?;
664 let s = rel.to_str()?;
665 // Strip the trailing partition+extension (everything from
666 // the LAST `:` onward — the last colon is the separator
667 // because `pv_name_to_key` has already replaced any colon
668 // in the PV name with `/`).
669 let colon = s.rfind(':')?;
670 let pv_key = &s[..colon];
671 if pv_key.is_empty() {
672 return None;
673 }
674 Some(pv_key.replace('/', ":"))
675 }
676
677 /// Direct-lookup eviction: lock just one slot (the one keyed
678 /// by `pv`) and evict if its writer's path matches. Returns
679 /// `true` when a writer was evicted, `false` otherwise.
680 fn evict_writer_for_pv_at_path(&self, pv: &str, path: &Path) -> bool {
681 let slot_arc = {
682 let cache = self.write_cache.lock().unwrap_or_else(|e| {
683 tracing::warn!(?path, "write cache poisoned at evict_writer_for_path: {e}");
684 e.into_inner()
685 });
686 cache.get(pv).cloned()
687 };
688 let Some(arc) = slot_arc else {
689 return false;
690 };
691 let mut slot_guard = arc.lock().unwrap_or_else(|e| e.into_inner());
692 let matches = slot_guard
693 .writer
694 .as_ref()
695 .map(|cw| cw.path == path)
696 .unwrap_or(false);
697 if !matches {
698 return false;
699 }
700 let Some(cached) = slot_guard.writer.take() else {
701 return false;
702 };
703 self.drop_writer_file_gone(pv, cached);
704 drop(slot_guard);
705 let mut cache = self.write_cache.lock().unwrap_or_else(|e| e.into_inner());
706 cache.remove(pv);
707 true
708 }
709
710 /// Fallback full-scan eviction for paths that don't decode
711 /// to a known PV name. Snapshots every slot Arc and blocking-
712 /// locks each — same correctness as the fast path, but with
713 /// the O(N) cost the fast path is designed to avoid.
714 fn evict_writer_for_path_scan(&self, path: &Path) -> bool {
715 let snapshot: Vec<(String, Arc<Mutex<PvWriterSlot>>)> = {
716 let cache = self.write_cache.lock().unwrap_or_else(|e| {
717 tracing::warn!(?path, "write cache poisoned at evict_writer_for_path: {e}");
718 e.into_inner()
719 });
720 cache
721 .iter()
722 .map(|(pv, slot)| (pv.clone(), slot.clone()))
723 .collect()
724 };
725
726 let mut removed = false;
727 let mut to_remove = Vec::new();
728 for (pv, slot_arc) in snapshot {
729 let mut slot_guard = slot_arc.lock().unwrap_or_else(|e| e.into_inner());
730 let matches = slot_guard
731 .writer
732 .as_ref()
733 .map(|cw| cw.path == path)
734 .unwrap_or(false);
735 if !matches {
736 continue;
737 }
738 if let Some(cached) = slot_guard.writer.take() {
739 self.drop_writer_file_gone(&pv, cached);
740 removed = true;
741 }
742 drop(slot_guard);
743 to_remove.push(pv);
744 }
745 if !to_remove.is_empty() {
746 let mut cache = self.write_cache.lock().unwrap_or_else(|e| e.into_inner());
747 for pv in &to_remove {
748 cache.remove(pv);
749 }
750 }
751 removed
752 }
753
754 /// Ensure a parent directory exists, using a cached set to skip repeated syscalls.
755 fn ensure_parent_dir(&self, path: &Path) -> anyhow::Result<()> {
756 if let Some(parent) = path.parent() {
757 let needs_create = {
758 // Recover from poison: known_dirs mutations are
759 // single-statement HashSet inserts, so a panicking
760 // thread can't leave half-modified state. Better to
761 // proceed than fail every future write_cached call.
762 let dirs = self.known_dirs.lock().unwrap_or_else(|e| e.into_inner());
763 !dirs.contains(parent)
764 };
765 if needs_create {
766 std::fs::create_dir_all(parent)?;
767 let mut dirs = self.known_dirs.lock().unwrap_or_else(|e| e.into_inner());
768 dirs.insert(parent.to_path_buf());
769 }
770 }
771 Ok(())
772 }
773
774 /// Look up the slot Arc for `pv`, creating an empty one under a
775 /// brief outer-cache lock if absent. The returned Arc lets the
776 /// caller serialise on the per-PV mutex without holding the
777 /// outer cache lock during file I/O.
778 fn slot_for(&self, pv: &str) -> Arc<Mutex<PvWriterSlot>> {
779 let mut cache = self.write_cache.lock().unwrap_or_else(|e| e.into_inner());
780 cache
781 .entry(pv.to_string())
782 .or_insert_with(|| {
783 Arc::new(Mutex::new(PvWriterSlot {
784 writer: None,
785 dead: false,
786 }))
787 })
788 .clone()
789 }
790
791 /// Write a sample using the cached BufWriter, creating the file + header if needed.
792 fn write_cached(
793 &self,
794 path: &Path,
795 pv: &str,
796 dbr_type: ArchDbType,
797 sample: &ArchiverSample,
798 meta: &AppendMeta,
799 ) -> anyhow::Result<()> {
800 let sample_bytes = writer::encode_sample(dbr_type, sample)?;
801 let escaped_sample = codec::escape(&sample_bytes);
802
803 let path_buf = path.to_path_buf();
804 let slot_arc = self.slot_for(pv);
805 // Recover from poison: per-PV slot mutations are confined to
806 // this method and `flush_dirty_writers` / `delete_pv_data` /
807 // `rename_pv`, all of which leave internally-consistent state
808 // on early return. Better to proceed than fail every future
809 // append for a PV whose writer once panicked.
810 let mut slot = slot_arc.lock().unwrap_or_else(|e| e.into_inner());
811
812 // Tombstone check: a concurrent `delete_pv_data` /
813 // `rename_pv` may have grabbed this Arc and set `dead`.
814 // Bail rather than recreate the file we just deleted.
815 if slot.dead {
816 anyhow::bail!("PV `{pv}` was deleted/renamed concurrently; refusing to recreate file");
817 }
818
819 // If the cached writer points at a different path, the partition has
820 // rolled over — flush and drop the old writer before opening the new
821 // one. Use `drop_dirty_writer` so a flush failure here records loss
822 // (without it, partition rollover at the wrong moment silently lost
823 // the old partition's last buffered samples).
824 if let Some(existing) = slot.writer.as_ref()
825 && existing.path != path_buf
826 && let Some(cached) = slot.writer.take()
827 {
828 let _ = self.drop_dirty_writer(pv, cached);
829 }
830
831 // Defense-in-depth for ghost-file writes: if the cached writer's
832 // target path no longer exists on disk (deleted by ETL while we
833 // missed the eviction, by manual `rm`, by a flaky NFS mount, …)
834 // its bytes are going into an orphaned inode invisible to readers.
835 // Drop via `drop_writer_file_gone` so dirty bytes get a loss marker
836 // (flushing to a deleted inode is meaningless; bytes are lost
837 // regardless).
838 if let Some(existing) = slot.writer.as_ref()
839 && !existing.path.exists()
840 {
841 tracing::warn!(
842 pv,
843 path = ?existing.path,
844 "Cached writer's file disappeared from filesystem; reopening"
845 );
846 if let Some(cached) = slot.writer.take() {
847 self.drop_writer_file_gone(pv, cached);
848 }
849 }
850
851 if slot.writer.is_none() {
852 let needs_header = file_needs_header(path);
853
854 // Atomic fd-cap reservation: loop trying to reserve a
855 // permit; each failed reservation triggers one LRU
856 // eviction (which drops a CachedWriter, decrementing
857 // `open_writers` via WriterFdGuard's Drop). Bail with a
858 // clear error if no evictable candidate exists — better
859 // than silently letting open() blow past the cap.
860 //
861 // CAS reservation closes the old check-then-fetch_add
862 // race where N concurrent appends would all see "below
863 // cap", all increment, and all open files past the cap.
864 let fd_guard = loop {
865 if let Some(guard) = self.fd_budget.try_reserve() {
866 break guard;
867 }
868 if !self.evict_lru_writer(pv) {
869 return Err(anyhow::anyhow!(
870 "PlainPB tier `{}` at fd cap ({}) and no evictable \
871 writer (all slots busy); refusing to open another \
872 to protect the process fd budget",
873 self.plugin_name,
874 self.fd_budget.max()
875 ));
876 }
877 };
878
879 // Open with EMFILE/ENFILE recovery: even with our
880 // internal reservation honoured, the OS-wide fd table
881 // can still be exhausted (other processes, other tiers
882 // sharing the same ulimit). Evict and retry once.
883 let file = match std::fs::OpenOptions::new()
884 .create(true)
885 .append(true)
886 .open(path)
887 {
888 Ok(f) => f,
889 Err(e) if is_too_many_open_files(&e) && self.evict_lru_writer(pv) => {
890 tracing::warn!(
891 ?path,
892 "Hit OS file-handle limit; evicted LRU writer and \
893 retrying open"
894 );
895 std::fs::OpenOptions::new()
896 .create(true)
897 .append(true)
898 .open(path)?
899 }
900 Err(e) => return Err(e.into()),
901 };
902 let mut bw = BufWriter::with_capacity(64 * 1024, file);
903
904 if needs_header {
905 let (year, _, _) = sample.decompose_timestamp();
906 let header = writer::build_payload_info(
907 pv,
908 dbr_type,
909 year,
910 meta.element_count,
911 &meta.headers,
912 );
913 let header_bytes = header.encode_to_vec();
914 let escaped_header = codec::escape(&header_bytes);
915 // Single write_all so the header+newline never split
916 // across BufWriter flushes — same atomicity rationale
917 // as the sample frame below.
918 let mut header_frame = Vec::with_capacity(escaped_header.len() + 1);
919 header_frame.extend_from_slice(&escaped_header);
920 header_frame.push(codec::NEWLINE);
921 if let Err(e) = bw.write_all(&header_frame) {
922 // Header write failed. Discard the BufWriter
923 // via `into_parts` so its `Drop` can't re-issue
924 // the failing syscall — without this, the
925 // drop-time auto-flush could write a partial
926 // header to disk after we've already classified
927 // the bytes as lost. (Principle: failure-
928 // classified resource never goes through the
929 // normal destructor path.) The created file
930 // exists on disk; `file_needs_header` on the
931 // next attempt sees the unreadable header and
932 // truncates.
933 let (_file, _buffered) = bw.into_parts();
934 // fd_guard's Drop releases the fd permit at
935 // function exit.
936 return Err(e.into());
937 }
938 }
939
940 slot.writer = Some(CachedWriter {
941 path: path_buf,
942 writer: bw,
943 // Header bytes (if any) are buffered but not yet
944 // flushed. Mark dirty so the periodic flush picks
945 // them up — without this, a PV that gets created
946 // and then receives no further samples within a
947 // flush_period would never persist its header.
948 dirty: true,
949 last_used: SystemTime::now(),
950 // Move the reservation we obtained at the top of
951 // this branch into the writer; its Drop will
952 // release the fd permit when the writer is taken /
953 // partition-rolled / evicted / dropped.
954 _fd_guard: fd_guard,
955 });
956 }
957
958 let cached = slot.writer.as_mut().expect("just inserted");
959 cached.last_used = SystemTime::now();
960 // Atomic-at-buffer-layer sample frame: a single `write_all`
961 // means the BufWriter never splits the sample/newline pair
962 // across two internal flushes. OS-level write atomicity is
963 // still bounded by the kernel page boundary, but this removes
964 // our contribution to the partial-record risk.
965 let mut frame = Vec::with_capacity(escaped_sample.len() + 1);
966 frame.extend_from_slice(&escaped_sample);
967 frame.push(codec::NEWLINE);
968 if let Err(e) = cached.writer.write_all(&frame) {
969 // After a partial write (ENOSPC, NFS hiccup, …) the
970 // BufWriter's internal state is suspect — reusing it
971 // would compound the corruption (tail garbage, repeated
972 // failures). Evict so the next call goes through the
973 // create+append+header path which validates the file
974 // and reopens fresh. Tail-trim runs in `file_needs_header`
975 // on that next open and removes any partial record.
976 //
977 // Use `into_parts` to discard buffered bytes WITHOUT
978 // letting BufWriter::drop attempt a final flush; that
979 // flush would re-issue the same failing syscall (worst
980 // case: another partial write) and the buffered bytes
981 // are already suspect.
982 tracing::warn!(
983 pv,
984 path = ?cached.path,
985 "Write failed; evicting cached writer to force \
986 reopen on next sample: {e}"
987 );
988 // Record loss IF the writer had previously-buffered
989 // dirty bytes (other samples queued in BufWriter that
990 // never reached disk). This sample's own ts won't be
991 // reported because we return Err below — but past
992 // samples whose ts WERE reported would otherwise be
993 // false-committed by the global owner.
994 let was_dirty = cached.dirty;
995 if let Some(removed) = slot.writer.take() {
996 let (_file, _buffered) = removed.writer.into_parts();
997 // _file dropped → fd closed without flush.
998 if was_dirty {
999 self.record_dirty_loss(pv);
1000 metrics::counter!(
1001 "archiver_pb_dirty_drop_loss_total",
1002 "tier" => self.plugin_name.clone(),
1003 )
1004 .increment(1);
1005 }
1006 }
1007 // Drop the slot from the outer map so the next append
1008 // gets a clean lookup (no stale empty slot lingering).
1009 drop(slot);
1010 let mut cache = self.write_cache.lock().unwrap_or_else(|e| e.into_inner());
1011 cache.remove(pv);
1012 return Err(e.into());
1013 }
1014 cached.dirty = true;
1015 Ok(())
1016 }
1017
1018 /// Evict one cached writer to free a file-descriptor permit.
1019 ///
1020 /// Two-pass policy:
1021 /// 1. **Clean writer LRU.** Pick the oldest slot whose
1022 /// writer has `dirty == false`. Drop it; no flush needed,
1023 /// no risk of data loss.
1024 /// 2. **Dirty writer LRU (last resort).** If no clean
1025 /// candidate, pick the oldest dirty slot. Flush it. On
1026 /// flush *failure*, push the PV onto `evicted_with_loss`
1027 /// so the next `flush_dirty_writers` reports it in
1028 /// `failed` — without this, write_loop would silently
1029 /// commit a stale `last_event` for bytes that never
1030 /// reached disk.
1031 ///
1032 /// Slots whose mutex is held by another thread are skipped —
1033 /// `try_lock` returning `WouldBlock` means an in-flight append
1034 /// is already using that fd, so taking it wouldn't free
1035 /// anything anyway.
1036 ///
1037 /// Returns `true` when something was evicted (caller can
1038 /// retry), `false` when no candidate could be freed.
1039 fn evict_lru_writer(&self, current_pv: &str) -> bool {
1040 let candidates: Vec<(String, Arc<Mutex<PvWriterSlot>>)> = {
1041 let cache = self.write_cache.lock().unwrap_or_else(|e| e.into_inner());
1042 cache
1043 .iter()
1044 .filter(|(pv, _)| pv.as_str() != current_pv)
1045 .map(|(pv, slot)| (pv.clone(), slot.clone()))
1046 .collect()
1047 };
1048
1049 // Pass 1: prefer CLEAN writers. No flush, no risk of loss.
1050 if self.try_evict_with_filter(&candidates, /* want_dirty = */ false) {
1051 return true;
1052 }
1053 // Pass 2: forced fallback — dirty writers. Flush attempted;
1054 // failures surfaced via `evicted_with_loss`.
1055 self.try_evict_with_filter(&candidates, /* want_dirty = */ true)
1056 }
1057
1058 /// Inner half of [`evict_lru_writer`] — find the oldest
1059 /// candidate matching `want_dirty` and evict it. Split out so
1060 /// the two passes share the snapshot + lock-retry logic.
1061 fn try_evict_with_filter(
1062 &self,
1063 candidates: &[(String, Arc<Mutex<PvWriterSlot>>)],
1064 want_dirty: bool,
1065 ) -> bool {
1066 let mut oldest: Option<(String, Arc<Mutex<PvWriterSlot>>, SystemTime)> = None;
1067 for (pv, slot_arc) in candidates {
1068 let Ok(guard) = slot_arc.try_lock() else {
1069 continue;
1070 };
1071 let Some(cw) = guard.writer.as_ref() else {
1072 drop(guard);
1073 continue;
1074 };
1075 if cw.dirty != want_dirty {
1076 drop(guard);
1077 continue;
1078 }
1079 let last_used = cw.last_used;
1080 drop(guard);
1081 match &oldest {
1082 Some((_, _, ts)) if *ts <= last_used => {}
1083 _ => oldest = Some((pv.clone(), slot_arc.clone(), last_used)),
1084 }
1085 }
1086
1087 let Some((pv, slot_arc, _)) = oldest else {
1088 return false;
1089 };
1090 let Ok(mut guard) = slot_arc.try_lock() else {
1091 // Lost the race — another thread grabbed the slot
1092 // between our scan and the eviction.
1093 return false;
1094 };
1095 let Some(cached) = guard.writer.take() else {
1096 return false;
1097 };
1098 // Single dirty-drop helper does the flush, the
1099 // loss-marker bookkeeping, and the metric — kept
1100 // consistent with partition-rollover and write-error
1101 // paths so a future code path can't accidentally bypass
1102 // the loss surface.
1103 let _ = self.drop_dirty_writer(&pv, cached);
1104 // CachedWriter's WriterFdGuard already released the fd
1105 // permit when `drop_dirty_writer` consumed it.
1106 drop(guard);
1107 let mut cache = self.write_cache.lock().unwrap_or_else(|e| e.into_inner());
1108 cache.remove(&pv);
1109 true
1110 }
1111}
1112
1113/// True iff `e` corresponds to a POSIX EMFILE (per-process fd limit)
1114/// or ENFILE (system-wide fd limit). Used by write_cached's open
1115/// retry to distinguish recoverable resource exhaustion from real
1116/// filesystem errors. Codes are POSIX-standard (Linux + macOS:
1117/// EMFILE=24, ENFILE=23).
1118fn is_too_many_open_files(e: &std::io::Error) -> bool {
1119 matches!(e.raw_os_error(), Some(23) | Some(24))
1120}
1121
1122/// Decide whether a file at `path` needs a fresh PayloadInfo header
1123/// written before sample data is appended. Returns `true` when:
1124/// 1. the file doesn't exist,
1125/// 2. the file exists but is 0 bytes (Java parity 651c3a6b: a crash
1126/// mid-create would otherwise leave a header-less file), OR
1127/// 3. the file exists with bytes but `PbFileReader::open` cannot
1128/// parse the header — in that case we **truncate** the file so
1129/// the caller's `create+append` opens cleanly. Without this
1130/// third branch, a partial-header crash makes the file forever
1131/// unreadable AND every subsequent append silently piles garbage
1132/// onto a corrupt prefix.
1133fn file_needs_header(path: &Path) -> bool {
1134 if !path.exists() {
1135 return true;
1136 }
1137 let size = std::fs::metadata(path).map(|m| m.len()).unwrap_or(0);
1138 if size == 0 {
1139 return true;
1140 }
1141 if PbFileReader::open(path).is_err() {
1142 tracing::warn!(
1143 ?path,
1144 "PB file has unreadable header; truncating so a fresh \
1145 header gets written"
1146 );
1147 if let Err(e) = std::fs::OpenOptions::new()
1148 .write(true)
1149 .truncate(true)
1150 .open(path)
1151 {
1152 tracing::warn!(?path, "Failed to truncate corrupt PB file: {e}");
1153 }
1154 return true;
1155 }
1156 // Header is valid; defend against a tail with a partial sample
1157 // (writer killed mid-flush). Truncating to the last NEWLINE
1158 // boundary loses at most one record but keeps the file readable
1159 // — without this, a reader hits the partial record and stops
1160 // returning every sample after that point.
1161 if let Err(e) = trim_to_last_newline(path) {
1162 tracing::warn!(?path, "Failed to trim partial trailing record: {e}");
1163 }
1164 false
1165}
1166
1167/// Truncate `path` to end at its last NEWLINE byte (inclusive). Used
1168/// to drop a partial sample frame at file tail after a crash.
1169fn trim_to_last_newline(path: &Path) -> std::io::Result<()> {
1170 use std::io::{Read, Seek, SeekFrom};
1171 let mut file = std::fs::OpenOptions::new()
1172 .read(true)
1173 .write(true)
1174 .open(path)?;
1175 let len = file.metadata()?.len();
1176 if len == 0 {
1177 return Ok(());
1178 }
1179 // If the very last byte is already a NEWLINE, nothing to trim.
1180 file.seek(SeekFrom::End(-1))?;
1181 let mut tail = [0u8; 1];
1182 file.read_exact(&mut tail)?;
1183 if tail[0] == codec::NEWLINE {
1184 return Ok(());
1185 }
1186 // Scan backwards in chunks for the last NEWLINE.
1187 const CHUNK: usize = 4096;
1188 let mut buf = vec![0u8; CHUNK];
1189 let mut window_end = len;
1190 while window_end > 0 {
1191 let read_len = (window_end as usize).min(CHUNK);
1192 let read_start = window_end - read_len as u64;
1193 file.seek(SeekFrom::Start(read_start))?;
1194 file.read_exact(&mut buf[..read_len])?;
1195 if let Some(idx) = buf[..read_len].iter().rposition(|&b| b == codec::NEWLINE) {
1196 let new_len = read_start + idx as u64 + 1;
1197 tracing::warn!(
1198 ?path,
1199 old_len = len,
1200 new_len,
1201 "Trimming partial trailing PB record"
1202 );
1203 file.set_len(new_len)?;
1204 return Ok(());
1205 }
1206 window_end = read_start;
1207 }
1208 // No NEWLINE anywhere — file is just one giant un-terminated
1209 // record (or single header line that didn't get its newline).
1210 // Leave as-is; truncation here would be more destructive than
1211 // the corruption we're trying to fix.
1212 Ok(())
1213}
1214
1215/// Convert PV name to file path key.
1216/// `SIM:Sine` → `SIM/Sine`
1217///
1218/// Defensive: an attacker-supplied PV name like `../../etc/passwd` would
1219/// otherwise pass straight through and let `Path::join` escape the
1220/// storage root. Registry-side validation already rejects these at
1221/// register_pv / import_pv / add_alias time, but we re-validate here so
1222/// any code path that bypasses the registry (e.g. retrieval of a PV
1223/// name read directly from a PB file's PayloadInfo) still fails closed.
1224/// Returns a sanitized fallback rather than panicking so retrieval
1225/// errors stay diagnosable.
1226pub(crate) fn pv_name_to_key(pv: &str) -> String {
1227 if !crate::registry::is_valid_pv_name(pv) {
1228 // Strip every disallowed character so path joins stay anchored
1229 // at the storage root. Use a marker prefix so an audit can spot
1230 // these: we never write to such paths in normal operation.
1231 let mut sanitized = String::with_capacity(pv.len() + 16);
1232 sanitized.push_str("__invalid__/");
1233 for c in pv.chars() {
1234 if c.is_ascii_alphanumeric() || c == '_' || c == '-' {
1235 sanitized.push(c);
1236 } else {
1237 sanitized.push('_');
1238 }
1239 }
1240 tracing::warn!(
1241 pv,
1242 "PV name rejected by validator; sanitized to {sanitized}"
1243 );
1244 return sanitized;
1245 }
1246 pv.replace(':', "/")
1247}
1248
1249/// Read the last sample from a PB file by seeking near the end.
1250/// Falls back to full sequential read for edge cases (e.g., very large single sample).
1251fn read_last_sample_from_file(path: &Path) -> anyhow::Result<Option<ArchiverSample>> {
1252 let file = std::fs::File::open(path)?;
1253 let file_len = file.metadata()?.len();
1254 if file_len == 0 {
1255 return Ok(None);
1256 }
1257
1258 let mut rdr = std::io::BufReader::new(file);
1259
1260 // Read header to get year and dbr_type.
1261 let mut header_line = Vec::new();
1262 rdr.read_until(codec::NEWLINE, &mut header_line)?;
1263 if header_line.last() == Some(&codec::NEWLINE) {
1264 header_line.pop();
1265 }
1266 let header_bytes = codec::unescape(&header_line);
1267 let payload_info = archiver_proto::epics_event::PayloadInfo::decode(header_bytes.as_slice())?;
1268 let year = payload_info.year;
1269 let dbr_type = ArchDbType::from_i32(payload_info.r#type).unwrap_or(ArchDbType::ScalarDouble);
1270
1271 let header_end = rdr.stream_position()?;
1272 if header_end >= file_len {
1273 return Ok(None);
1274 }
1275
1276 // Read the last 64KB (or less) to find the final sample line.
1277 let data_len = file_len - header_end;
1278 let chunk_size = (64 * 1024u64).min(data_len);
1279 let seek_pos = file_len - chunk_size;
1280 rdr.seek(SeekFrom::Start(seek_pos))?;
1281
1282 let mut tail = Vec::with_capacity(chunk_size as usize);
1283 rdr.read_to_end(&mut tail)?;
1284
1285 // Trim trailing newline.
1286 if tail.last() == Some(&codec::NEWLINE) {
1287 tail.pop();
1288 }
1289
1290 if tail.is_empty() {
1291 return Ok(None);
1292 }
1293
1294 // Find the last complete line (after the last newline byte in the chunk).
1295 let last_line_data = if let Some(pos) = tail.iter().rposition(|&b| b == codec::NEWLINE) {
1296 &tail[pos + 1..]
1297 } else if seek_pos <= header_end {
1298 // Entire data section is in the chunk — this IS the (only) line.
1299 &tail
1300 } else {
1301 // Very large single line that exceeds 64KB — fall back to sequential read.
1302 let mut reader = PbFileReader::open(path)?;
1303 let mut last = None;
1304 while let Some(sample) = reader.next_event()? {
1305 last = Some(sample);
1306 }
1307 return Ok(last);
1308 };
1309
1310 if last_line_data.is_empty() {
1311 return Ok(None);
1312 }
1313
1314 let raw = codec::unescape(last_line_data);
1315 if let Ok(sample) = reader::decode_sample(dbr_type, year, &raw) {
1316 return Ok(Some(sample));
1317 }
1318
1319 // Java parity (20ec1a02): a crash mid-write leaves a torn last line.
1320 // Walk forward from the start tracking the last good sample so the
1321 // tail-corruption case still surfaces a usable answer instead of an
1322 // I/O-style error to the caller. Bounded by the file size (we'll
1323 // stop at end-of-stream); the cost only matters for the rare
1324 // corrupt-tail case.
1325 tracing::warn!(
1326 ?path,
1327 "PB tail decode failed; falling back to forward scan for last good sample"
1328 );
1329 let mut reader = PbFileReader::open(path)?;
1330 let mut last = None;
1331 while let Ok(Some(sample)) = reader.next_event() {
1332 last = Some(sample);
1333 }
1334 Ok(last)
1335}
1336
1337/// Build PV file prefix info for matching files in a directory.
1338fn pv_file_parts(pv: &str) -> (PathBuf, String) {
1339 let pv_key = pv_name_to_key(pv);
1340 let dir_part = pv_key.rsplit_once('/').map(|(dir, _)| dir).unwrap_or("");
1341 let file_prefix = pv_key
1342 .rsplit_once('/')
1343 .map(|(_, name)| name)
1344 .unwrap_or(&pv_key)
1345 .to_string();
1346 (PathBuf::from(dir_part), file_prefix)
1347}
1348
1349/// List PB files for a PV in a directory, matching the PV file prefix.
1350/// Crate-public re-export of [`list_pv_pb_files`] — used by the ETL
1351/// executor to consolidate one PV's files without duplicating the
1352/// directory-walking logic.
1353pub fn list_pv_pb_files_pub(root: &Path, pv: &str) -> anyhow::Result<Vec<PathBuf>> {
1354 list_pv_pb_files(root, pv)
1355}
1356
1357fn list_pv_pb_files(root: &Path, pv: &str) -> anyhow::Result<Vec<PathBuf>> {
1358 let (dir_part, file_prefix) = pv_file_parts(pv);
1359 let pv_dir = root.join(&dir_part);
1360
1361 if !pv_dir.exists() {
1362 return Ok(Vec::new());
1363 }
1364
1365 let mut files: Vec<PathBuf> = std::fs::read_dir(&pv_dir)?
1366 .filter_map(|e| e.ok())
1367 .map(|e| e.path())
1368 .filter(|p| {
1369 p.extension().and_then(|e| e.to_str()) == Some("pb")
1370 && p.file_name().and_then(|n| n.to_str()).is_some_and(|n| {
1371 n.starts_with(&file_prefix) && n[file_prefix.len()..].starts_with(':')
1372 })
1373 })
1374 .collect();
1375
1376 files.sort();
1377 Ok(files)
1378}
1379
1380#[async_trait]
1381impl StoragePlugin for PlainPbStoragePlugin {
1382 fn name(&self) -> &str {
1383 &self.plugin_name
1384 }
1385
1386 fn partition_granularity(&self) -> PartitionGranularity {
1387 self.granularity
1388 }
1389
1390 async fn append_event(
1391 &self,
1392 pv: &str,
1393 dbr_type: ArchDbType,
1394 sample: &ArchiverSample,
1395 ) -> anyhow::Result<()> {
1396 let meta = AppendMeta::default();
1397 self.append_event_with_meta(pv, dbr_type, sample, &meta)
1398 .await
1399 }
1400
1401 async fn append_event_with_meta(
1402 &self,
1403 pv: &str,
1404 dbr_type: ArchDbType,
1405 sample: &ArchiverSample,
1406 meta: &AppendMeta,
1407 ) -> anyhow::Result<()> {
1408 let path = self.file_path_for(pv, sample.timestamp);
1409 debug!(?path, pv, "appending event");
1410
1411 self.ensure_parent_dir(&path)?;
1412 self.write_cached(&path, pv, dbr_type, sample, meta)
1413 }
1414
1415 async fn get_data(
1416 &self,
1417 pv: &str,
1418 start: SystemTime,
1419 end: SystemTime,
1420 ) -> anyhow::Result<Vec<Box<dyn EventStream>>> {
1421 // Flush cached writes so readers see the latest data.
1422 self.flush_writes().await?;
1423
1424 let files = self.list_files_for_range(pv, start, end);
1425
1426 // Java parity (88c7601): single-file short-circuit. When the only
1427 // matching file's last sample is older than `start`, return that
1428 // single event in a tiny stream rather than opening a full reader
1429 // that the lower-bound filter would just discard. Equivalent to
1430 // Java's `CallableEventStream.makeOneEventCallable(...)` branch.
1431 // Java parity (88c7601): Java compares `lastEventEpochSeconds <= startTime`,
1432 // so a file whose final sample lands exactly on `start` still
1433 // short-circuits. `<` would force a full reader open at the
1434 // boundary and emit the same single sample after a wasted seek.
1435 if files.len() == 1
1436 && let Some(last) = read_last_sample_from_file(&files[0])?
1437 && last.timestamp <= start
1438 {
1439 let reader = PbFileReader::open(&files[0])?;
1440 let desc = reader.description().clone();
1441 return Ok(vec![Box::new(SingleSampleStream {
1442 desc,
1443 sample: Some(last),
1444 })]);
1445 }
1446
1447 let mut streams: Vec<Box<dyn EventStream>> = Vec::new();
1448 for file in files {
1449 let reader = PbFileReader::open_seeked(&file, start)?;
1450 // Java parity (e3b4471 + 88c7601): clamp output at both
1451 // ends. Without the upper bound, files whose partition name
1452 // overlaps the query but whose late-arriving samples spill
1453 // past `end` leak stale tail data. Without the lower bound,
1454 // a binary-search miss leaves the reader at data-start and
1455 // emits every pre-`start` sample in the file.
1456 streams.push(Box::new(BoundedReader::new(reader, start, end)));
1457 }
1458 Ok(streams)
1459 }
1460
1461 async fn get_last_known_event(&self, pv: &str) -> anyhow::Result<Option<ArchiverSample>> {
1462 // Flush cached writes so readers can see the latest data.
1463 self.flush_writes().await?;
1464
1465 let pb_files = list_pv_pb_files(&self.root_folder, pv)?;
1466
1467 // Read from the last (most recent) file, using optimized tail read.
1468 for path in pb_files.into_iter().rev() {
1469 if let Some(sample) = read_last_sample_from_file(&path)? {
1470 return Ok(Some(sample));
1471 }
1472 }
1473 Ok(None)
1474 }
1475
1476 async fn get_last_event_before(
1477 &self,
1478 pv: &str,
1479 target: SystemTime,
1480 ) -> anyhow::Result<Option<ArchiverSample>> {
1481 self.flush_writes().await?;
1482
1483 let pb_files = list_pv_pb_files(&self.root_folder, pv)?;
1484
1485 // Walk newest-to-oldest; first file whose final sample is before
1486 // `target` provides the answer (its last sample IS the answer).
1487 // For files whose final sample is at-or-after target, scan from
1488 // the start to find the last sample with ts < target.
1489 for path in pb_files.into_iter().rev() {
1490 let Some(last) = read_last_sample_from_file(&path)? else {
1491 continue;
1492 };
1493 if last.timestamp < target {
1494 return Ok(Some(last));
1495 }
1496 // Final sample is past target — scan the file forward and
1497 // track the latest sample with ts < target.
1498 let mut reader = PbFileReader::open(&path)?;
1499 let mut last_before: Option<ArchiverSample> = None;
1500 while let Some(sample) = reader.next_event()? {
1501 if sample.timestamp >= target {
1502 break;
1503 }
1504 last_before = Some(sample);
1505 }
1506 if last_before.is_some() {
1507 return Ok(last_before);
1508 }
1509 // Every sample in this file is at-or-after target; the answer,
1510 // if any, lives in an older file.
1511 }
1512 Ok(None)
1513 }
1514
1515 async fn delete_pv_data(&self, pv: &str) -> anyhow::Result<u64> {
1516 // Three-phase delete to close the concurrent-append race:
1517 //
1518 // Phase 1: KEEP the slot in the cache, set `dead = true`,
1519 // drop the writer (file-gone helper). Any
1520 // concurrent `append` that performs `slot_for`
1521 // during this window — whether before or after
1522 // we acquire the slot lock — gets the SAME
1523 // tombstoned slot back. write_cached's
1524 // dead-check then bails before opening any new
1525 // file under this PV's name.
1526 //
1527 // Phase 2: List + remove on-disk files (async).
1528 //
1529 // Phase 3 (RAII): `_cleanup` drops at function exit and
1530 // removes the slot from the cache. Runs on
1531 // EVERY return path — Ok, ?, panic. Without
1532 // this guard, an early-? from list_pv_pb_files
1533 // or remove_file would leave the slot
1534 // tombstoned forever with no operator-visible
1535 // way to recover the PV name.
1536 let _cleanup = TombstoneCleanupGuard {
1537 plugin: self,
1538 pv: pv.to_string(),
1539 };
1540 let slot_arc = self.slot_for(pv);
1541 {
1542 let mut slot = slot_arc.lock().unwrap_or_else(|e| e.into_inner());
1543 slot.dead = true;
1544 if let Some(cached) = slot.writer.take() {
1545 // Files are about to be deleted unconditionally —
1546 // flushing buffered bytes is wasted I/O. Use the
1547 // file-gone helper so dirty bytes get a loss
1548 // marker, satisfying principle 1 (every dirty
1549 // drop classified). The next ingest flush will
1550 // surface the PV to the global owner; the owner's
1551 // commit is a no-op (registry row also torn down
1552 // by the management API), but the classification
1553 // stays uniform.
1554 self.drop_writer_file_gone(pv, cached);
1555 }
1556 }
1557
1558 let entries = list_pv_pb_files(&self.root_folder, pv)?;
1559 let mut deleted = 0u64;
1560 for path in entries {
1561 tokio::fs::remove_file(&path).await?;
1562 deleted += 1;
1563 }
1564
1565 // Clean up empty directory + invalidate the known_dirs
1566 // cache for it. Without the cache invalidation, a later
1567 // re-archive of the same PV would skip create_dir_all
1568 // (cache says "exists") and then fail to open with
1569 // ENOENT — the directory was deleted out from under the
1570 // cache.
1571 let (dir_part, _) = pv_file_parts(pv);
1572 let pv_dir = self.root_folder.join(&dir_part);
1573 if pv_dir.exists() {
1574 let is_empty = std::fs::read_dir(&pv_dir)?.next().is_none();
1575 if is_empty {
1576 let _ = tokio::fs::remove_dir(&pv_dir).await;
1577 let mut dirs = self.known_dirs.lock().unwrap_or_else(|e| e.into_inner());
1578 dirs.remove(&pv_dir);
1579 }
1580 }
1581
1582 debug!(pv, deleted, "Deleted PV data files");
1583 Ok(deleted)
1584 }
1585
1586 async fn flush_writes(&self) -> anyhow::Result<()> {
1587 // Read-side callers (`get_data` etc.) only care about real
1588 // I/O errors. Deferred writers (an `append` is in flight)
1589 // are not failures — those bytes will reach disk on the
1590 // next cycle, and the reader can still see everything
1591 // already flushed. Surfacing deferred as Err would make
1592 // every concurrent read+write race look like storage death.
1593 //
1594 // **Does NOT drain `evicted_with_loss`.** Loss markers
1595 // are reserved for the global flush owner — read-side
1596 // consumption would silently swallow a failure before
1597 // the owner can drop its `ts_updates` for the lost PV.
1598 // (Write failures during this read-side pass are still
1599 // recorded in `evicted_with_loss` by `flush_dirty_writers`
1600 // — they just aren't drained here.)
1601 let outcome = self.flush_dirty_writers();
1602 if !outcome.failed.is_empty() {
1603 anyhow::bail!(
1604 "{} writer flush(es) failed (first pv={})",
1605 outcome.failed.len(),
1606 outcome.failed[0],
1607 );
1608 }
1609 Ok(())
1610 }
1611
1612 async fn flush_ingest_writes(&self) -> anyhow::Result<IngestFlushResult> {
1613 // Surface failed/deferred separately so the engine can keep
1614 // deferred PVs in its ts_updates map (their bytes will reach
1615 // disk next cycle) while dropping failed PVs (their bytes
1616 // are lost). Lumping them together caused permanent
1617 // registry-timestamp loss for PVs that went briefly busy
1618 // then silent.
1619 let outcome = self.flush_dirty_writers();
1620 let mut failed = outcome.failed;
1621 // Drain the loss queue here — the global flush owner is
1622 // the SOLE consumer of these markers. Includes:
1623 // * LRU dirty-eviction losses (proactive cap pressure)
1624 // * Partition rollover / ghost-file / evict-by-path
1625 // dirty-drop losses
1626 // * Read-side flush failures (recorded by
1627 // `flush_dirty_writers` even when invoked by retrieval)
1628 // Dedupe so an entry that appears in both `failed` and the
1629 // queue (this-cycle flush failure) is reported once.
1630 {
1631 let mut ev = self
1632 .evicted_with_loss
1633 .lock()
1634 .unwrap_or_else(|e| e.into_inner());
1635 failed.append(&mut *ev);
1636 }
1637 failed.sort();
1638 failed.dedup();
1639 Ok(IngestFlushResult {
1640 failed,
1641 deferred: outcome.deferred,
1642 })
1643 }
1644
1645 fn stores_for_pv(&self, pv: &str) -> anyhow::Result<Vec<StoreSummary>> {
1646 let files = list_pv_pb_files(&self.root_folder, pv).unwrap_or_default();
1647 let count = files.len() as u64;
1648 let bytes: u64 = files
1649 .iter()
1650 .filter_map(|p| std::fs::metadata(p).ok())
1651 .map(|m| m.len())
1652 .sum();
1653 Ok(vec![StoreSummary {
1654 name: self.plugin_name.clone(),
1655 root_folder: self.root_folder.clone(),
1656 granularity: self.granularity,
1657 pv_file_count: Some(count),
1658 pv_size_bytes: Some(bytes),
1659 total_size_bytes: None,
1660 total_files: None,
1661 }])
1662 }
1663
1664 fn appliance_metrics(&self) -> anyhow::Result<Vec<StoreSummary>> {
1665 let (total_files, total_size) = total_pb_stats(&self.root_folder);
1666 Ok(vec![StoreSummary {
1667 name: self.plugin_name.clone(),
1668 root_folder: self.root_folder.clone(),
1669 granularity: self.granularity,
1670 pv_file_count: None,
1671 pv_size_bytes: None,
1672 total_size_bytes: Some(total_size),
1673 total_files: Some(total_files),
1674 }])
1675 }
1676
1677 async fn rename_pv(&self, from: &str, to: &str) -> anyhow::Result<u64> {
1678 // Phase 1: tombstone the SOURCE slot in cache (don't
1679 // remove it yet — see delete_pv_data for the same race
1680 // rationale). Concurrent appends to `from` during the
1681 // file-rename window will find this same dead slot and
1682 // bail at the dead-check.
1683 //
1684 // Phase 3 (RAII): the cleanup guard removes the source
1685 // slot from the cache on every return path so a
1686 // partway-failed rename can't leave `from` permanently
1687 // tombstoned.
1688 let _cleanup = TombstoneCleanupGuard {
1689 plugin: self,
1690 pv: from.to_string(),
1691 };
1692 let from_slot = self.slot_for(from);
1693 {
1694 let mut slot = from_slot.lock().unwrap_or_else(|e| e.into_inner());
1695 slot.dead = true;
1696 if let Some(cached) = slot.writer.take() {
1697 // Try to flush so the post-rename dest file
1698 // inherits the source's last buffered samples.
1699 // On flush failure, record loss for `from`; the
1700 // global owner will drop its pending entry. The
1701 // source name's registry row is being removed by
1702 // the rename anyway, so the commit-side effect is
1703 // a no-op — but principle 1 (every dirty drop
1704 // classified) holds.
1705 let _ = self.drop_dirty_writer(from, cached);
1706 }
1707 }
1708 // Defensive: clear any stale destination writer before
1709 // the rename moves source files into the dest's path
1710 // namespace. Dest is NOT tombstoned — `to` is the live
1711 // PV after this returns, and future appends to it are
1712 // legal. (If the operator was actively appending to `to`
1713 // at the moment of rename, they made a mistake; we don't
1714 // optimise for that case.)
1715 //
1716 // **Outer lock briefly, then release**: take the slot
1717 // Arc out under the outer lock, drop the outer guard,
1718 // THEN lock the slot and run drop_dirty_writer (which
1719 // does sync flush I/O). Holding the outer lock across
1720 // the flush would block every concurrent slot_for() —
1721 // i.e., every shard's append — for the duration of the
1722 // dest writer's flush, which on slow storage could stall
1723 // the entire ingest path.
1724 let dest_slot_arc = {
1725 let mut cache = self
1726 .write_cache
1727 .lock()
1728 .map_err(|e| anyhow::anyhow!("write cache poisoned: {e}"))?;
1729 cache.remove(to)
1730 };
1731 if let Some(arc) = dest_slot_arc {
1732 let mut slot = arc.lock().unwrap_or_else(|e| e.into_inner());
1733 if let Some(cached) = slot.writer.take() {
1734 let _ = self.drop_dirty_writer(to, cached);
1735 }
1736 }
1737
1738 let from_files = list_pv_pb_files(&self.root_folder, from)?;
1739 if from_files.is_empty() {
1740 return Ok(0);
1741 }
1742 let from_key = pv_name_to_key(from);
1743 let from_leaf = from_key.rsplit('/').next().unwrap_or(&from_key).to_string();
1744 let to_key = pv_name_to_key(to);
1745 let to_leaf = to_key.rsplit('/').next().unwrap_or(&to_key).to_string();
1746
1747 // Ensure destination parent directory exists so std::fs::rename can
1748 // place files across PV-name prefixes (e.g. SIM:Sine -> RING:Current
1749 // changes the parent dir from SIM/ to RING/).
1750 let (to_dir_part, _) = pv_file_parts(to);
1751 let to_dir = self.root_folder.join(&to_dir_part);
1752 if !to_dir.as_os_str().is_empty() && !to_dir.exists() {
1753 std::fs::create_dir_all(&to_dir)?;
1754 }
1755
1756 let mut moved = 0u64;
1757 for src in &from_files {
1758 let file_name = src
1759 .file_name()
1760 .and_then(|n| n.to_str())
1761 .ok_or_else(|| anyhow::anyhow!("non-utf8 filename: {src:?}"))?;
1762 // file is "{from_leaf}:{partition}.pb" — replace the leaf prefix.
1763 let suffix = file_name
1764 .strip_prefix(&from_leaf)
1765 .and_then(|s| s.strip_prefix(':'))
1766 .ok_or_else(|| {
1767 anyhow::anyhow!("filename {file_name} did not match expected PV leaf")
1768 })?;
1769 let new_name = format!("{to_leaf}:{suffix}");
1770 let dst = to_dir.join(new_name);
1771 std::fs::rename(src, &dst)?;
1772 moved += 1;
1773 }
1774
1775 // Clean up empty source directory + invalidate
1776 // known_dirs cache for it (same rationale as
1777 // delete_pv_data — a stale entry would make the next
1778 // append to a re-archived `from` skip create_dir_all and
1779 // hit ENOENT).
1780 let (from_dir_part, _) = pv_file_parts(from);
1781 let from_dir = self.root_folder.join(&from_dir_part);
1782 if !from_dir_part.as_os_str().is_empty()
1783 && from_dir.exists()
1784 && std::fs::read_dir(&from_dir)?.next().is_none()
1785 {
1786 let _ = std::fs::remove_dir(&from_dir);
1787 let mut dirs = self.known_dirs.lock().unwrap_or_else(|e| e.into_inner());
1788 dirs.remove(&from_dir);
1789 }
1790
1791 // Phase 3 cleanup of the tombstoned source slot is done
1792 // by `_cleanup`'s Drop on function exit. Single source of
1793 // truth for the cleanup keeps it consistent across every
1794 // return path.
1795
1796 Ok(moved)
1797 }
1798}
1799
1800/// Sum sizes and counts of `.pb` files under `root` recursively. Errors are
1801/// logged and ignored so a single unreadable file doesn't poison the metric.
1802fn total_pb_stats(root: &Path) -> (u64, u64) {
1803 fn walk(p: &Path, files: &mut u64, bytes: &mut u64) {
1804 let entries = match std::fs::read_dir(p) {
1805 Ok(e) => e,
1806 Err(_) => return,
1807 };
1808 for entry in entries.flatten() {
1809 let path = entry.path();
1810 if path.is_dir() {
1811 walk(&path, files, bytes);
1812 } else if path.extension().and_then(|e| e.to_str()) == Some("pb") {
1813 *files += 1;
1814 if let Ok(meta) = entry.metadata() {
1815 *bytes += meta.len();
1816 }
1817 }
1818 }
1819 }
1820 let mut files = 0u64;
1821 let mut bytes = 0u64;
1822 if root.exists() {
1823 walk(root, &mut files, &mut bytes);
1824 }
1825 (files, bytes)
1826}