Skip to main content

archiver_core/storage/plainpb/
mod.rs

1pub mod codec;
2pub mod reader;
3pub mod search;
4pub mod writer;
5
6use std::collections::{HashMap, HashSet};
7use std::io::{BufRead, BufWriter, Read, Seek, SeekFrom, Write};
8use std::path::{Path, PathBuf};
9use std::sync::atomic::{AtomicUsize, Ordering};
10use std::sync::{Arc, Mutex};
11use std::time::SystemTime;
12
13/// Hash a PV name to a shard index in `0..n`. Pub so the engine's
14/// dispatcher AND PlainPB's `flush_ingest_writes_for_shard` agree
15/// on which shard owns which PV — without a shared definition the
16/// engine could route PV-X to shard A while PlainPB filters PV-X
17/// into shard B's flush set, re-introducing the misattribution
18/// bug we're trying to close.
19///
20/// `DefaultHasher` is stable enough for in-process partitioning;
21/// hash quality across process restarts doesn't matter because
22/// the on-disk file layout is keyed by PV name, not shard.
23pub fn shard_for_pv(pv: &str, n: usize) -> usize {
24    use std::collections::hash_map::DefaultHasher;
25    use std::hash::{Hash, Hasher};
26    debug_assert!(n > 0);
27    if n <= 1 {
28        return 0;
29    }
30    let mut h = DefaultHasher::new();
31    pv.hash(&mut h);
32    (h.finish() % n as u64) as usize
33}
34
35/// Default cap on simultaneously-open `BufWriter` file handles across
36/// all PVs in one PlainPB tier. Sized to stay well clear of the
37/// typical Linux process fd ulimit (1024) so the storage plugin can
38/// run on an out-of-the-box host without `ulimit -n` tuning. Sites
39/// with raised ulimits and tens of thousands of active PVs should
40/// override via [`PlainPbStoragePlugin::with_max_open_writers`].
41pub const DEFAULT_MAX_OPEN_WRITERS: usize = 512;
42
43/// Shared fd permit pool used by [`PlainPbStoragePlugin`]. Cloning
44/// the budget hands the SAME counter to multiple plugins, so a
45/// process-wide cap can be enforced across STS/MTS/LTS instead of
46/// each tier privately keeping its own 512-fd ceiling and silently
47/// summing past the process ulimit.
48///
49/// Internally a `Arc<AtomicUsize>` plus a `max`. CAS-based
50/// reservation in `try_reserve` makes the cap a hard ceiling under
51/// concurrency (no check-then-fetch_add race).
52#[derive(Clone)]
53pub struct FdBudget {
54    counter: Arc<AtomicUsize>,
55    max: usize,
56}
57
58impl FdBudget {
59    /// New budget with the given cap. `0` is sentinel for
60    /// "unbounded" (lifts the cap to `usize::MAX`); any other value
61    /// is the hard ceiling.
62    pub fn new(max: usize) -> Self {
63        Self {
64            counter: Arc::new(AtomicUsize::new(0)),
65            max: if max == 0 { usize::MAX } else { max },
66        }
67    }
68
69    /// Convenience constructor for sites that want to disable the
70    /// internal cap entirely (e.g. when the OS ulimit alone is the
71    /// only ceiling that matters).
72    pub fn unbounded() -> Self {
73        Self::new(usize::MAX)
74    }
75
76    /// Snapshot of the current open-writer count (across every
77    /// plugin sharing this budget). Lock-free; can drift between
78    /// the read and any subsequent action.
79    pub fn count(&self) -> usize {
80        self.counter.load(Ordering::Relaxed)
81    }
82
83    /// Configured cap.
84    pub fn max(&self) -> usize {
85        self.max
86    }
87
88    /// Atomically reserve one permit. Returns `Some(WriterFdGuard)`
89    /// on success; the guard's Drop releases the permit. CAS loop
90    /// closes the check-then-act race on contended paths.
91    fn try_reserve(&self) -> Option<WriterFdGuard> {
92        loop {
93            let cur = self.counter.load(Ordering::Acquire);
94            if cur >= self.max {
95                return None;
96            }
97            match self.counter.compare_exchange_weak(
98                cur,
99                cur + 1,
100                Ordering::AcqRel,
101                Ordering::Acquire,
102            ) {
103                Ok(_) => {
104                    return Some(WriterFdGuard {
105                        counter: self.counter.clone(),
106                    });
107                }
108                Err(_) => continue,
109            }
110        }
111    }
112}
113
114use async_trait::async_trait;
115use prost::Message;
116use tracing::debug;
117
118use crate::storage::partition::PartitionGranularity;
119use crate::storage::traits::{
120    AppendMeta, EventStream, IngestFlushResult, StoragePlugin, StoreSummary,
121};
122use crate::types::{ArchDbType, ArchiverSample};
123
124use self::reader::PbFileReader;
125
126/// RAII handle that decrements the plugin's `open_writers` counter
127/// when the owning [`CachedWriter`] is dropped. Tying the decrement
128/// to Drop guarantees the count stays in sync with reality even if
129/// a future code path takes the writer without going through
130/// `flush_dirty_writers` / `evict_lru_writer` — every drop site
131/// already exercises this guard.
132struct WriterFdGuard {
133    counter: Arc<AtomicUsize>,
134}
135
136impl Drop for WriterFdGuard {
137    fn drop(&mut self) {
138        self.counter.fetch_sub(1, Ordering::Relaxed);
139    }
140}
141
142/// Cached file handle for writing the current partition of a PV.
143struct CachedWriter {
144    path: PathBuf,
145    writer: BufWriter<std::fs::File>,
146    /// `true` between writes and the next successful flush. Lets
147    /// `flush_writes` skip writers that have nothing pending so a
148    /// reader-side `get_data` doesn't pay an O(N) syscall storm
149    /// across every cached writer (Java parity is similar — the
150    /// `dirty` bit short-circuits the iteration).
151    dirty: bool,
152    /// Last access timestamp — the LRU key used by the always-on
153    /// fd-cap eviction path: when [`PlainPbStoragePlugin::open_writers`]
154    /// reaches `max_open_writers`, the writer with the smallest
155    /// `last_used` is evicted to make room for the next open.
156    last_used: SystemTime,
157    /// Decrements `open_writers` on drop. Field name starts with
158    /// `_` because it's never read directly — it exists purely for
159    /// its Drop side-effect.
160    _fd_guard: WriterFdGuard,
161}
162
163/// Per-PV serialization slot. Holds the cached writer (if any) and
164/// a tombstone bit so a concurrent `append` doesn't resurrect a PV
165/// whose `delete_pv_data`/`rename_pv` is already in flight.
166///
167/// Each slot is wrapped in its own `Mutex` so I/O for one PV cannot
168/// stall I/O for any other. The outer `write_cache` map is locked
169/// only while inserting/looking up/removing slots — never while
170/// holding a filesystem syscall.
171struct PvWriterSlot {
172    writer: Option<CachedWriter>,
173    /// Set under the slot lock by `delete_pv_data` / `rename_pv`.
174    /// Once true, every subsequent `append` for this PV bails with
175    /// an error so the deleted PV doesn't reappear from a racing
176    /// late writer. The slot stays in `write_cache` only until the
177    /// caller that set the flag clears the entry from the map; new
178    /// `append`s that look up the PV after the cache eviction get a
179    /// fresh slot.
180    dead: bool,
181}
182
183/// RAII cleanup for the tombstoned-slot pattern used by
184/// `delete_pv_data` and `rename_pv`. Ensures `cache.remove(pv)`
185/// runs on every return path — including `?` short-circuits and
186/// panics — so a PV cannot be left permanently undead because an
187/// intermediate `tokio::fs::remove_file` errored. Without this
188/// guard the slot stays in `write_cache` with `dead == true` and
189/// every future append for this PV name bails forever.
190struct TombstoneCleanupGuard<'a> {
191    plugin: &'a PlainPbStoragePlugin,
192    pv: String,
193}
194
195impl<'a> Drop for TombstoneCleanupGuard<'a> {
196    fn drop(&mut self) {
197        let mut cache = self
198            .plugin
199            .write_cache
200            .lock()
201            .unwrap_or_else(|e| e.into_inner());
202        cache.remove(&self.pv);
203    }
204}
205
206/// Aggregated outcome of one `flush_dirty_writers` pass — used to
207/// give the read-side and the write-side flush surfaces different
208/// semantics over the same underlying iteration.
209struct FlushOutcome {
210    /// PVs whose `flush()` syscall errored. Their cached writers
211    /// have been evicted (buffered bytes discarded via `into_parts`)
212    /// and their entries removed from the map. Surfaced to the
213    /// write_loop so it drops their `last_event` from the registry
214    /// commit batch.
215    failed: Vec<String>,
216    /// PVs whose slot was already locked (an `append` or another
217    /// flush is in flight). Their dirty bytes remain buffered and
218    /// will be picked up on the next flush cycle. Surfaced to the
219    /// write_loop alongside `failed` so the registry doesn't claim
220    /// `last_event` for samples whose bytes are still in BufWriter
221    /// memory — under-commit, never over-commit.
222    deferred: Vec<String>,
223}
224
225/// Wraps a `PbFileReader` and clamps emitted samples to `[start, end]`.
226///
227/// Java parity (e3b4471 + 88c7601): `binary_search_pb_file` returns
228/// `None` when every sample in the file is older than `start`, leaving
229/// the reader at the data section start. Without a lower-bound filter
230/// the wrapper would leak the entire file's stale contents into the
231/// retrieval merge. The upper bound covers files included by partition
232/// name whose actual sample timestamps spill past `end`.
233struct BoundedReader {
234    inner: PbFileReader,
235    start: SystemTime,
236    end: SystemTime,
237    done: bool,
238}
239
240impl BoundedReader {
241    fn new(inner: PbFileReader, start: SystemTime, end: SystemTime) -> Self {
242        Self {
243            inner,
244            start,
245            end,
246            done: false,
247        }
248    }
249}
250
251impl crate::storage::traits::EventStream for BoundedReader {
252    fn description(&self) -> &crate::types::EventStreamDesc {
253        self.inner.description()
254    }
255
256    fn next_event(&mut self) -> anyhow::Result<Option<crate::types::ArchiverSample>> {
257        if self.done {
258            return Ok(None);
259        }
260        loop {
261            match self.inner.next_event()? {
262                None => {
263                    self.done = true;
264                    return Ok(None);
265                }
266                // Below `start` or above `end`: drop and continue. PB
267                // partition files are append-ordered but timestamps
268                // within one file aren't strictly monotonic — clock
269                // backsteps and late backfills exist — so a single
270                // out-of-window sample must NOT terminate the stream
271                // (Java's reader keeps consuming until EOF).
272                Some(s) if s.timestamp < self.start => continue,
273                Some(s) if s.timestamp > self.end => continue,
274                Some(s) => return Ok(Some(s)),
275            }
276        }
277    }
278}
279
280use crate::retrieval::query::SingleSampleStream;
281
282/// PlainPB storage plugin — binary-compatible with Java EPICS Archiver Appliance.
283pub struct PlainPbStoragePlugin {
284    plugin_name: String,
285    root_folder: PathBuf,
286    granularity: PartitionGranularity,
287    /// One per-PV slot, each holding (at most) one `BufWriter` pointed
288    /// at that PV's current partition file. The outer `Mutex<HashMap>`
289    /// is held only briefly to look up / insert / remove a slot; all
290    /// I/O happens under the per-slot mutex so a stuck syscall on one
291    /// PV does NOT block any other PV's appends or flushes.
292    write_cache: Mutex<HashMap<String, Arc<Mutex<PvWriterSlot>>>>,
293    /// Directories known to exist. Avoids redundant create_dir_all syscalls.
294    known_dirs: Mutex<HashSet<PathBuf>>,
295    /// Shared fd permit pool. Cloning the [`FdBudget`] across
296    /// multiple plugins ties them to the SAME counter and ceiling,
297    /// so STS+MTS+LTS can enforce a single process-wide cap rather
298    /// than each tier silently keeping its own 512-fd ceiling.
299    fd_budget: FdBudget,
300    /// PVs whose dirty bytes were evicted by the LRU path AND
301    /// whose flush failed, so the bytes are LOST. Drained by the
302    /// next `flush_dirty_writers` and merged into `failed` so
303    /// write_loop drops these PVs from `ts_updates` instead of
304    /// silently committing a stale timestamp.
305    evicted_with_loss: Mutex<Vec<String>>,
306}
307
308impl PlainPbStoragePlugin {
309    pub fn new(name: &str, root_folder: PathBuf, granularity: PartitionGranularity) -> Self {
310        Self::with_max_open_writers(name, root_folder, granularity, DEFAULT_MAX_OPEN_WRITERS)
311    }
312
313    /// Construct with an explicit cap on simultaneously-open writers.
314    /// Equivalent to `with_fd_budget(name, root, granularity,
315    /// FdBudget::new(max_open_writers))` — sites that want to
316    /// SHARE the budget across multiple tiers should call
317    /// `with_fd_budget` directly with a clone of the same `FdBudget`.
318    pub fn with_max_open_writers(
319        name: &str,
320        root_folder: PathBuf,
321        granularity: PartitionGranularity,
322        max_open_writers: usize,
323    ) -> Self {
324        Self::with_fd_budget(
325            name,
326            root_folder,
327            granularity,
328            FdBudget::new(max_open_writers),
329        )
330    }
331
332    /// Construct using a (possibly-shared) fd permit pool.
333    ///
334    /// Pass the SAME [`FdBudget`] (via `.clone()`) to multiple
335    /// plugins to enforce a process-wide cap; pass a fresh
336    /// `FdBudget::new(N)` per plugin for the legacy per-tier cap.
337    pub fn with_fd_budget(
338        name: &str,
339        root_folder: PathBuf,
340        granularity: PartitionGranularity,
341        fd_budget: FdBudget,
342    ) -> Self {
343        Self {
344            plugin_name: name.to_string(),
345            root_folder,
346            granularity,
347            write_cache: Mutex::new(HashMap::new()),
348            known_dirs: Mutex::new(HashSet::new()),
349            fd_budget,
350            evicted_with_loss: Mutex::new(Vec::new()),
351        }
352    }
353
354    /// Snapshot of the current open-writer count for the budget
355    /// this plugin draws from. When the budget is shared across
356    /// tiers, this reflects the GLOBAL count, not just this tier's
357    /// share. Lock-free; instantaneous, can drift.
358    pub fn open_writer_count(&self) -> usize {
359        self.fd_budget.count()
360    }
361
362    /// Build the file path for a PV at a given timestamp.
363    /// Format: {root}/{pv_key}:{partition_name}.pb
364    /// where pv_key replaces `:` with `/` in the PV name.
365    pub fn file_path_for(&self, pv: &str, ts: SystemTime) -> PathBuf {
366        let pv_key = pv_name_to_key(pv);
367        let partition_name = crate::storage::partition::partition_name(ts, self.granularity);
368        let filename = format!("{pv_key}:{partition_name}.pb");
369        self.root_folder.join(filename)
370    }
371
372    /// List all PB files for a PV in a time range.
373    fn list_files_for_range(&self, pv: &str, start: SystemTime, end: SystemTime) -> Vec<PathBuf> {
374        let partitions =
375            crate::storage::partition::partitions_in_range(start, end, self.granularity);
376        let pv_key = pv_name_to_key(pv);
377        partitions
378            .into_iter()
379            .map(|pname| {
380                let filename = format!("{pv_key}:{pname}.pb");
381                self.root_folder.join(filename)
382            })
383            .filter(|p| p.exists())
384            .collect()
385    }
386
387    pub fn root_folder(&self) -> &Path {
388        &self.root_folder
389    }
390
391    /// Flush every dirty cached writer (across all PVs) that we
392    /// can lock without blocking. Per-slot `try_lock` keeps a
393    /// stuck PV from blocking the flush of every other PV — those
394    /// are reported in `deferred` and retried next cycle.
395    ///
396    /// Errored flushes evict the writer (buffered bytes dropped
397    /// via `into_parts`) and are recorded BOTH in the returned
398    /// `failed` list (for the immediate caller) AND in
399    /// `evicted_with_loss` (so the next ingest-side flush_owner
400    /// pass can pick them up even if the immediate caller was the
401    /// read-side `flush_writes`). Without the second record, a
402    /// retrieval-triggered flush could swallow a failure before
403    /// the global flush owner ever sees it.
404    ///
405    /// **Does not drain `evicted_with_loss`.** That drain belongs
406    /// exclusively to `flush_ingest_writes` so the global owner
407    /// is the single consumer of loss markers. Read-side callers
408    /// (retrieval, ETL) must NOT consume the queue or owner-side
409    /// ts_updates would silently advance past PVs whose bytes
410    /// never reached disk.
411    fn flush_dirty_writers(&self) -> FlushOutcome {
412        // Snapshot slot Arcs under a brief outer lock so we never
413        // hold the outer cache mutex while attempting an inner-slot
414        // lock — that would deadlock with an `append` that holds
415        // its slot lock and is waiting on the outer lock to record
416        // a partition rollover.
417        let snapshot: Vec<(String, Arc<Mutex<PvWriterSlot>>)> = {
418            let cache = self.write_cache.lock().unwrap_or_else(|e| e.into_inner());
419            cache.iter().map(|(k, v)| (k.clone(), v.clone())).collect()
420        };
421
422        let mut failed = Vec::new();
423        let mut deferred = Vec::new();
424        let mut to_remove = Vec::new();
425
426        for (pv, slot_arc) in snapshot {
427            let mut slot_guard = match slot_arc.try_lock() {
428                Ok(g) => g,
429                Err(std::sync::TryLockError::WouldBlock) => {
430                    deferred.push(pv);
431                    continue;
432                }
433                Err(std::sync::TryLockError::Poisoned(p)) => p.into_inner(),
434            };
435            let Some(cached) = slot_guard.writer.as_mut() else {
436                continue;
437            };
438            if !cached.dirty {
439                continue;
440            }
441            match cached.writer.flush() {
442                Ok(()) => {
443                    // Principle 4 (flush truth): flush succeeded
444                    // at the syscall level, but if the underlying
445                    // file is gone (ETL deleted it; an `rm -f` ran;
446                    // an NFS race), the bytes went into the page
447                    // cache for an unlinked inode — not reader-
448                    // visible. Treat as loss to keep the registry's
449                    // `last_event` honest.
450                    if !cached.path.exists() {
451                        tracing::warn!(
452                            pv,
453                            path = ?cached.path,
454                            "Flush succeeded but file is gone; bytes are not \
455                             reader-visible — surfacing PV to loss queue"
456                        );
457                        metrics::counter!(
458                            "archiver_pb_flush_failures_total",
459                            "tier" => self.plugin_name.clone(),
460                        )
461                        .increment(1);
462                        if let Some(removed) = slot_guard.writer.take() {
463                            let (_file, _buffered) = removed.writer.into_parts();
464                        }
465                        self.record_dirty_loss(&pv);
466                        failed.push(pv.clone());
467                        to_remove.push(pv);
468                    } else {
469                        cached.dirty = false;
470                    }
471                }
472                Err(e) => {
473                    tracing::warn!(pv, path = ?cached.path, "Failed to flush cached writer: {e}");
474                    metrics::counter!(
475                        "archiver_pb_flush_failures_total",
476                        "tier" => self.plugin_name.clone(),
477                    )
478                    .increment(1);
479                    if let Some(removed) = slot_guard.writer.take() {
480                        let (_file, _buffered) = removed.writer.into_parts();
481                    }
482                    // Persist the loss so the next flush_ingest_writes
483                    // surfaces it to the global owner — even if THIS
484                    // call was triggered by retrieval/ETL, the owner
485                    // must still learn that PV's bytes were lost so
486                    // it doesn't commit a stale `last_event`.
487                    self.record_dirty_loss(&pv);
488                    failed.push(pv.clone());
489                    to_remove.push(pv);
490                }
491            }
492        }
493
494        if !to_remove.is_empty() {
495            let mut cache = self.write_cache.lock().unwrap_or_else(|e| e.into_inner());
496            for pv in &to_remove {
497                cache.remove(pv);
498            }
499        }
500
501        FlushOutcome { failed, deferred }
502    }
503
504    /// Push `pv` onto the persistent loss queue so the next
505    /// `flush_ingest_writes` call surfaces it to the global flush
506    /// owner. Idempotent at the caller site — duplicate entries
507    /// are harmless because the owner's `ts_updates.remove` is
508    /// idempotent.
509    fn record_dirty_loss(&self, pv: &str) {
510        let mut ev = self
511            .evicted_with_loss
512            .lock()
513            .unwrap_or_else(|e| e.into_inner());
514        ev.push(pv.to_string());
515    }
516
517    /// Helper used by every code path that drops a dirty writer
518    /// outside the regular flush iteration (partition rollover,
519    /// ghost-file reopen, evict_writer_for_path, write_cached
520    /// write-error path, LRU eviction). Tries to flush; on flush
521    /// failure pushes the PV to `evicted_with_loss`. Returns
522    /// `Ok(())` when the bytes are reader-visible on disk,
523    /// `Err(io::Error)` when the bytes were lost.
524    ///
525    /// **Always uses `into_parts` to drop the BufWriter**, on
526    /// every path. Without this, the loss branch's `Err` return
527    /// would leave `cached.writer` to be dropped normally —
528    /// `BufWriter::drop` would re-issue the failing flush syscall
529    /// behind our back, possibly writing a partial frame to disk
530    /// after we've already classified the bytes as lost. The
531    /// remaining fields (path, last_used, _fd_guard) drop at
532    /// scope exit; `_fd_guard` releases the fd permit.
533    ///
534    /// Also checks `path.exists()` after a successful flush —
535    /// flushing to a deleted inode returns Ok at the syscall
536    /// level but the bytes are not reader-visible (principle 4:
537    /// flush success must mean reader-visible).
538    fn drop_dirty_writer(&self, pv: &str, cached: CachedWriter) -> std::io::Result<()> {
539        let CachedWriter {
540            path,
541            mut writer,
542            dirty,
543            last_used: _,
544            _fd_guard,
545        } = cached;
546        let flush_res = if dirty { writer.flush() } else { Ok(()) };
547        // Discard the BufWriter without invoking its Drop — keeps
548        // a failed flush from re-firing as a drop-time auto-flush.
549        let (_file, _buffered) = writer.into_parts();
550        // _fd_guard drops at end of this function, releasing the
551        // fd permit.
552
553        match flush_res {
554            Ok(()) => {
555                if dirty && !path.exists() {
556                    // Flushed to a deleted inode. Bytes went into
557                    // the OS page cache for an unlinked file —
558                    // not reader-visible. Treat as loss.
559                    tracing::warn!(
560                        pv,
561                        ?path,
562                        "Dirty-writer flush succeeded but file is gone; \
563                         bytes are not reader-visible — surfacing PV to loss queue"
564                    );
565                    metrics::counter!(
566                        "archiver_pb_dirty_drop_loss_total",
567                        "tier" => self.plugin_name.clone(),
568                    )
569                    .increment(1);
570                    self.record_dirty_loss(pv);
571                    Err(std::io::Error::new(
572                        std::io::ErrorKind::NotFound,
573                        "flushed to deleted inode",
574                    ))
575                } else {
576                    Ok(())
577                }
578            }
579            Err(e) => {
580                tracing::warn!(
581                    pv,
582                    ?path,
583                    "Dirty-writer drop flush failed; dirty bytes lost — \
584                     surfacing PV to loss queue: {e}"
585                );
586                metrics::counter!(
587                    "archiver_pb_dirty_drop_loss_total",
588                    "tier" => self.plugin_name.clone(),
589                )
590                .increment(1);
591                self.record_dirty_loss(pv);
592                Err(e)
593            }
594        }
595    }
596
597    /// Variant of [`drop_dirty_writer`] for paths where the
598    /// underlying file is GONE (ghost-file disappearance,
599    /// `evict_writer_for_path` after `remove_file`). Skips the
600    /// flush attempt — flushing to a deleted inode either fails
601    /// or vanishes silently — and unconditionally records loss
602    /// for any dirty bytes.
603    fn drop_writer_file_gone(&self, pv: &str, cached: CachedWriter) {
604        if cached.dirty {
605            tracing::warn!(
606                pv,
607                path = ?cached.path,
608                "Dirty bytes lost — file disappeared while writer was \
609                 still buffering; surfacing PV to loss queue"
610            );
611            metrics::counter!(
612                "archiver_pb_dirty_drop_loss_total",
613                "tier" => self.plugin_name.clone(),
614            )
615            .increment(1);
616            self.record_dirty_loss(pv);
617        }
618        // Drop the CachedWriter explicitly: into_parts avoids
619        // BufWriter's drop-time auto-flush so we don't try to
620        // write to a deleted file.
621        let (_file, _buffered) = cached.writer.into_parts();
622    }
623
624    /// Drop any cached BufWriter whose target path matches `path`.
625    /// Call this after `remove_file` on a `.pb` file the engine may have
626    /// open — without it, subsequent `append_event` writes go to the
627    /// deleted-but-still-open inode (a "ghost" file invisible to readers
628    /// because `list_files_for_range` walks the directory). Safe no-op
629    /// when nothing matches. Returns true if a writer was evicted.
630    ///
631    /// **Definitive (principle 3 / eviction ownership):** the slot
632    /// pointing at `path` (if any) is evicted before this returns.
633    /// Fast path uses the file path's deterministic encoding to
634    /// derive the PV name and look up exactly one slot, avoiding
635    /// the previous full-scan blocking-lock cost (where one stuck
636    /// unrelated slot could delay every ETL eviction). Falls back
637    /// to a scan only if the path's filename can't be parsed.
638    ///
639    /// Sync method, blocking on the target slot's mutex — the
640    /// ETL caller wraps it in `tokio::task::spawn_blocking` so
641    /// the runtime worker isn't held during the wait.
642    pub fn evict_writer_for_path(&self, path: &Path) -> bool {
643        // Fast path: the file path encodes the PV name (see
644        // `pv_name_to_key`), so we can derive it and look up
645        // exactly one slot. Avoids the previous full-scan
646        // behaviour where one stuck unrelated slot could delay
647        // every ETL eviction.
648        if let Some(pv) = self.pv_name_from_path(path) {
649            return self.evict_writer_for_pv_at_path(&pv, path);
650        }
651        // Fallback: file path didn't decode (caller passed a
652        // non-storage path, or the encoding scheme has drifted).
653        // Run the legacy scan so we don't miss a writer just
654        // because the filename was unusual.
655        self.evict_writer_for_path_scan(path)
656    }
657
658    /// Reverse the deterministic `pv_name_to_key` → file-path
659    /// encoding. Returns `Some(pv_name)` for any path under the
660    /// storage root whose filename matches the
661    /// `{pv_key}:{partition}.pb` layout.
662    fn pv_name_from_path(&self, path: &Path) -> Option<String> {
663        let rel = path.strip_prefix(&self.root_folder).ok()?;
664        let s = rel.to_str()?;
665        // Strip the trailing partition+extension (everything from
666        // the LAST `:` onward — the last colon is the separator
667        // because `pv_name_to_key` has already replaced any colon
668        // in the PV name with `/`).
669        let colon = s.rfind(':')?;
670        let pv_key = &s[..colon];
671        if pv_key.is_empty() {
672            return None;
673        }
674        Some(pv_key.replace('/', ":"))
675    }
676
677    /// Direct-lookup eviction: lock just one slot (the one keyed
678    /// by `pv`) and evict if its writer's path matches. Returns
679    /// `true` when a writer was evicted, `false` otherwise.
680    fn evict_writer_for_pv_at_path(&self, pv: &str, path: &Path) -> bool {
681        let slot_arc = {
682            let cache = self.write_cache.lock().unwrap_or_else(|e| {
683                tracing::warn!(?path, "write cache poisoned at evict_writer_for_path: {e}");
684                e.into_inner()
685            });
686            cache.get(pv).cloned()
687        };
688        let Some(arc) = slot_arc else {
689            return false;
690        };
691        let mut slot_guard = arc.lock().unwrap_or_else(|e| e.into_inner());
692        let matches = slot_guard
693            .writer
694            .as_ref()
695            .map(|cw| cw.path == path)
696            .unwrap_or(false);
697        if !matches {
698            return false;
699        }
700        let Some(cached) = slot_guard.writer.take() else {
701            return false;
702        };
703        self.drop_writer_file_gone(pv, cached);
704        drop(slot_guard);
705        let mut cache = self.write_cache.lock().unwrap_or_else(|e| e.into_inner());
706        cache.remove(pv);
707        true
708    }
709
710    /// Fallback full-scan eviction for paths that don't decode
711    /// to a known PV name. Snapshots every slot Arc and blocking-
712    /// locks each — same correctness as the fast path, but with
713    /// the O(N) cost the fast path is designed to avoid.
714    fn evict_writer_for_path_scan(&self, path: &Path) -> bool {
715        let snapshot: Vec<(String, Arc<Mutex<PvWriterSlot>>)> = {
716            let cache = self.write_cache.lock().unwrap_or_else(|e| {
717                tracing::warn!(?path, "write cache poisoned at evict_writer_for_path: {e}");
718                e.into_inner()
719            });
720            cache
721                .iter()
722                .map(|(pv, slot)| (pv.clone(), slot.clone()))
723                .collect()
724        };
725
726        let mut removed = false;
727        let mut to_remove = Vec::new();
728        for (pv, slot_arc) in snapshot {
729            let mut slot_guard = slot_arc.lock().unwrap_or_else(|e| e.into_inner());
730            let matches = slot_guard
731                .writer
732                .as_ref()
733                .map(|cw| cw.path == path)
734                .unwrap_or(false);
735            if !matches {
736                continue;
737            }
738            if let Some(cached) = slot_guard.writer.take() {
739                self.drop_writer_file_gone(&pv, cached);
740                removed = true;
741            }
742            drop(slot_guard);
743            to_remove.push(pv);
744        }
745        if !to_remove.is_empty() {
746            let mut cache = self.write_cache.lock().unwrap_or_else(|e| e.into_inner());
747            for pv in &to_remove {
748                cache.remove(pv);
749            }
750        }
751        removed
752    }
753
754    /// Ensure a parent directory exists, using a cached set to skip repeated syscalls.
755    fn ensure_parent_dir(&self, path: &Path) -> anyhow::Result<()> {
756        if let Some(parent) = path.parent() {
757            let needs_create = {
758                // Recover from poison: known_dirs mutations are
759                // single-statement HashSet inserts, so a panicking
760                // thread can't leave half-modified state. Better to
761                // proceed than fail every future write_cached call.
762                let dirs = self.known_dirs.lock().unwrap_or_else(|e| e.into_inner());
763                !dirs.contains(parent)
764            };
765            if needs_create {
766                std::fs::create_dir_all(parent)?;
767                let mut dirs = self.known_dirs.lock().unwrap_or_else(|e| e.into_inner());
768                dirs.insert(parent.to_path_buf());
769            }
770        }
771        Ok(())
772    }
773
774    /// Look up the slot Arc for `pv`, creating an empty one under a
775    /// brief outer-cache lock if absent. The returned Arc lets the
776    /// caller serialise on the per-PV mutex without holding the
777    /// outer cache lock during file I/O.
778    fn slot_for(&self, pv: &str) -> Arc<Mutex<PvWriterSlot>> {
779        let mut cache = self.write_cache.lock().unwrap_or_else(|e| e.into_inner());
780        cache
781            .entry(pv.to_string())
782            .or_insert_with(|| {
783                Arc::new(Mutex::new(PvWriterSlot {
784                    writer: None,
785                    dead: false,
786                }))
787            })
788            .clone()
789    }
790
791    /// Write a sample using the cached BufWriter, creating the file + header if needed.
792    fn write_cached(
793        &self,
794        path: &Path,
795        pv: &str,
796        dbr_type: ArchDbType,
797        sample: &ArchiverSample,
798        meta: &AppendMeta,
799    ) -> anyhow::Result<()> {
800        let sample_bytes = writer::encode_sample(dbr_type, sample)?;
801        let escaped_sample = codec::escape(&sample_bytes);
802
803        let path_buf = path.to_path_buf();
804        let slot_arc = self.slot_for(pv);
805        // Recover from poison: per-PV slot mutations are confined to
806        // this method and `flush_dirty_writers` / `delete_pv_data` /
807        // `rename_pv`, all of which leave internally-consistent state
808        // on early return. Better to proceed than fail every future
809        // append for a PV whose writer once panicked.
810        let mut slot = slot_arc.lock().unwrap_or_else(|e| e.into_inner());
811
812        // Tombstone check: a concurrent `delete_pv_data` /
813        // `rename_pv` may have grabbed this Arc and set `dead`.
814        // Bail rather than recreate the file we just deleted.
815        if slot.dead {
816            anyhow::bail!("PV `{pv}` was deleted/renamed concurrently; refusing to recreate file");
817        }
818
819        // If the cached writer points at a different path, the partition has
820        // rolled over — flush and drop the old writer before opening the new
821        // one. Use `drop_dirty_writer` so a flush failure here records loss
822        // (without it, partition rollover at the wrong moment silently lost
823        // the old partition's last buffered samples).
824        if let Some(existing) = slot.writer.as_ref()
825            && existing.path != path_buf
826            && let Some(cached) = slot.writer.take()
827        {
828            let _ = self.drop_dirty_writer(pv, cached);
829        }
830
831        // Defense-in-depth for ghost-file writes: if the cached writer's
832        // target path no longer exists on disk (deleted by ETL while we
833        // missed the eviction, by manual `rm`, by a flaky NFS mount, …)
834        // its bytes are going into an orphaned inode invisible to readers.
835        // Drop via `drop_writer_file_gone` so dirty bytes get a loss marker
836        // (flushing to a deleted inode is meaningless; bytes are lost
837        // regardless).
838        if let Some(existing) = slot.writer.as_ref()
839            && !existing.path.exists()
840        {
841            tracing::warn!(
842                pv,
843                path = ?existing.path,
844                "Cached writer's file disappeared from filesystem; reopening"
845            );
846            if let Some(cached) = slot.writer.take() {
847                self.drop_writer_file_gone(pv, cached);
848            }
849        }
850
851        if slot.writer.is_none() {
852            let needs_header = file_needs_header(path);
853
854            // Atomic fd-cap reservation: loop trying to reserve a
855            // permit; each failed reservation triggers one LRU
856            // eviction (which drops a CachedWriter, decrementing
857            // `open_writers` via WriterFdGuard's Drop). Bail with a
858            // clear error if no evictable candidate exists — better
859            // than silently letting open() blow past the cap.
860            //
861            // CAS reservation closes the old check-then-fetch_add
862            // race where N concurrent appends would all see "below
863            // cap", all increment, and all open files past the cap.
864            let fd_guard = loop {
865                if let Some(guard) = self.fd_budget.try_reserve() {
866                    break guard;
867                }
868                if !self.evict_lru_writer(pv) {
869                    return Err(anyhow::anyhow!(
870                        "PlainPB tier `{}` at fd cap ({}) and no evictable \
871                         writer (all slots busy); refusing to open another \
872                         to protect the process fd budget",
873                        self.plugin_name,
874                        self.fd_budget.max()
875                    ));
876                }
877            };
878
879            // Open with EMFILE/ENFILE recovery: even with our
880            // internal reservation honoured, the OS-wide fd table
881            // can still be exhausted (other processes, other tiers
882            // sharing the same ulimit). Evict and retry once.
883            let file = match std::fs::OpenOptions::new()
884                .create(true)
885                .append(true)
886                .open(path)
887            {
888                Ok(f) => f,
889                Err(e) if is_too_many_open_files(&e) && self.evict_lru_writer(pv) => {
890                    tracing::warn!(
891                        ?path,
892                        "Hit OS file-handle limit; evicted LRU writer and \
893                         retrying open"
894                    );
895                    std::fs::OpenOptions::new()
896                        .create(true)
897                        .append(true)
898                        .open(path)?
899                }
900                Err(e) => return Err(e.into()),
901            };
902            let mut bw = BufWriter::with_capacity(64 * 1024, file);
903
904            if needs_header {
905                let (year, _, _) = sample.decompose_timestamp();
906                let header = writer::build_payload_info(
907                    pv,
908                    dbr_type,
909                    year,
910                    meta.element_count,
911                    &meta.headers,
912                );
913                let header_bytes = header.encode_to_vec();
914                let escaped_header = codec::escape(&header_bytes);
915                // Single write_all so the header+newline never split
916                // across BufWriter flushes — same atomicity rationale
917                // as the sample frame below.
918                let mut header_frame = Vec::with_capacity(escaped_header.len() + 1);
919                header_frame.extend_from_slice(&escaped_header);
920                header_frame.push(codec::NEWLINE);
921                if let Err(e) = bw.write_all(&header_frame) {
922                    // Header write failed. Discard the BufWriter
923                    // via `into_parts` so its `Drop` can't re-issue
924                    // the failing syscall — without this, the
925                    // drop-time auto-flush could write a partial
926                    // header to disk after we've already classified
927                    // the bytes as lost. (Principle: failure-
928                    // classified resource never goes through the
929                    // normal destructor path.) The created file
930                    // exists on disk; `file_needs_header` on the
931                    // next attempt sees the unreadable header and
932                    // truncates.
933                    let (_file, _buffered) = bw.into_parts();
934                    // fd_guard's Drop releases the fd permit at
935                    // function exit.
936                    return Err(e.into());
937                }
938            }
939
940            slot.writer = Some(CachedWriter {
941                path: path_buf,
942                writer: bw,
943                // Header bytes (if any) are buffered but not yet
944                // flushed. Mark dirty so the periodic flush picks
945                // them up — without this, a PV that gets created
946                // and then receives no further samples within a
947                // flush_period would never persist its header.
948                dirty: true,
949                last_used: SystemTime::now(),
950                // Move the reservation we obtained at the top of
951                // this branch into the writer; its Drop will
952                // release the fd permit when the writer is taken /
953                // partition-rolled / evicted / dropped.
954                _fd_guard: fd_guard,
955            });
956        }
957
958        let cached = slot.writer.as_mut().expect("just inserted");
959        cached.last_used = SystemTime::now();
960        // Atomic-at-buffer-layer sample frame: a single `write_all`
961        // means the BufWriter never splits the sample/newline pair
962        // across two internal flushes. OS-level write atomicity is
963        // still bounded by the kernel page boundary, but this removes
964        // our contribution to the partial-record risk.
965        let mut frame = Vec::with_capacity(escaped_sample.len() + 1);
966        frame.extend_from_slice(&escaped_sample);
967        frame.push(codec::NEWLINE);
968        if let Err(e) = cached.writer.write_all(&frame) {
969            // After a partial write (ENOSPC, NFS hiccup, …) the
970            // BufWriter's internal state is suspect — reusing it
971            // would compound the corruption (tail garbage, repeated
972            // failures). Evict so the next call goes through the
973            // create+append+header path which validates the file
974            // and reopens fresh. Tail-trim runs in `file_needs_header`
975            // on that next open and removes any partial record.
976            //
977            // Use `into_parts` to discard buffered bytes WITHOUT
978            // letting BufWriter::drop attempt a final flush; that
979            // flush would re-issue the same failing syscall (worst
980            // case: another partial write) and the buffered bytes
981            // are already suspect.
982            tracing::warn!(
983                pv,
984                path = ?cached.path,
985                "Write failed; evicting cached writer to force \
986                 reopen on next sample: {e}"
987            );
988            // Record loss IF the writer had previously-buffered
989            // dirty bytes (other samples queued in BufWriter that
990            // never reached disk). This sample's own ts won't be
991            // reported because we return Err below — but past
992            // samples whose ts WERE reported would otherwise be
993            // false-committed by the global owner.
994            let was_dirty = cached.dirty;
995            if let Some(removed) = slot.writer.take() {
996                let (_file, _buffered) = removed.writer.into_parts();
997                // _file dropped → fd closed without flush.
998                if was_dirty {
999                    self.record_dirty_loss(pv);
1000                    metrics::counter!(
1001                        "archiver_pb_dirty_drop_loss_total",
1002                        "tier" => self.plugin_name.clone(),
1003                    )
1004                    .increment(1);
1005                }
1006            }
1007            // Drop the slot from the outer map so the next append
1008            // gets a clean lookup (no stale empty slot lingering).
1009            drop(slot);
1010            let mut cache = self.write_cache.lock().unwrap_or_else(|e| e.into_inner());
1011            cache.remove(pv);
1012            return Err(e.into());
1013        }
1014        cached.dirty = true;
1015        Ok(())
1016    }
1017
1018    /// Evict one cached writer to free a file-descriptor permit.
1019    ///
1020    /// Two-pass policy:
1021    ///   1. **Clean writer LRU.** Pick the oldest slot whose
1022    ///      writer has `dirty == false`. Drop it; no flush needed,
1023    ///      no risk of data loss.
1024    ///   2. **Dirty writer LRU (last resort).** If no clean
1025    ///      candidate, pick the oldest dirty slot. Flush it. On
1026    ///      flush *failure*, push the PV onto `evicted_with_loss`
1027    ///      so the next `flush_dirty_writers` reports it in
1028    ///      `failed` — without this, write_loop would silently
1029    ///      commit a stale `last_event` for bytes that never
1030    ///      reached disk.
1031    ///
1032    /// Slots whose mutex is held by another thread are skipped —
1033    /// `try_lock` returning `WouldBlock` means an in-flight append
1034    /// is already using that fd, so taking it wouldn't free
1035    /// anything anyway.
1036    ///
1037    /// Returns `true` when something was evicted (caller can
1038    /// retry), `false` when no candidate could be freed.
1039    fn evict_lru_writer(&self, current_pv: &str) -> bool {
1040        let candidates: Vec<(String, Arc<Mutex<PvWriterSlot>>)> = {
1041            let cache = self.write_cache.lock().unwrap_or_else(|e| e.into_inner());
1042            cache
1043                .iter()
1044                .filter(|(pv, _)| pv.as_str() != current_pv)
1045                .map(|(pv, slot)| (pv.clone(), slot.clone()))
1046                .collect()
1047        };
1048
1049        // Pass 1: prefer CLEAN writers. No flush, no risk of loss.
1050        if self.try_evict_with_filter(&candidates, /* want_dirty = */ false) {
1051            return true;
1052        }
1053        // Pass 2: forced fallback — dirty writers. Flush attempted;
1054        // failures surfaced via `evicted_with_loss`.
1055        self.try_evict_with_filter(&candidates, /* want_dirty = */ true)
1056    }
1057
1058    /// Inner half of [`evict_lru_writer`] — find the oldest
1059    /// candidate matching `want_dirty` and evict it. Split out so
1060    /// the two passes share the snapshot + lock-retry logic.
1061    fn try_evict_with_filter(
1062        &self,
1063        candidates: &[(String, Arc<Mutex<PvWriterSlot>>)],
1064        want_dirty: bool,
1065    ) -> bool {
1066        let mut oldest: Option<(String, Arc<Mutex<PvWriterSlot>>, SystemTime)> = None;
1067        for (pv, slot_arc) in candidates {
1068            let Ok(guard) = slot_arc.try_lock() else {
1069                continue;
1070            };
1071            let Some(cw) = guard.writer.as_ref() else {
1072                drop(guard);
1073                continue;
1074            };
1075            if cw.dirty != want_dirty {
1076                drop(guard);
1077                continue;
1078            }
1079            let last_used = cw.last_used;
1080            drop(guard);
1081            match &oldest {
1082                Some((_, _, ts)) if *ts <= last_used => {}
1083                _ => oldest = Some((pv.clone(), slot_arc.clone(), last_used)),
1084            }
1085        }
1086
1087        let Some((pv, slot_arc, _)) = oldest else {
1088            return false;
1089        };
1090        let Ok(mut guard) = slot_arc.try_lock() else {
1091            // Lost the race — another thread grabbed the slot
1092            // between our scan and the eviction.
1093            return false;
1094        };
1095        let Some(cached) = guard.writer.take() else {
1096            return false;
1097        };
1098        // Single dirty-drop helper does the flush, the
1099        // loss-marker bookkeeping, and the metric — kept
1100        // consistent with partition-rollover and write-error
1101        // paths so a future code path can't accidentally bypass
1102        // the loss surface.
1103        let _ = self.drop_dirty_writer(&pv, cached);
1104        // CachedWriter's WriterFdGuard already released the fd
1105        // permit when `drop_dirty_writer` consumed it.
1106        drop(guard);
1107        let mut cache = self.write_cache.lock().unwrap_or_else(|e| e.into_inner());
1108        cache.remove(&pv);
1109        true
1110    }
1111}
1112
1113/// True iff `e` corresponds to a POSIX EMFILE (per-process fd limit)
1114/// or ENFILE (system-wide fd limit). Used by write_cached's open
1115/// retry to distinguish recoverable resource exhaustion from real
1116/// filesystem errors. Codes are POSIX-standard (Linux + macOS:
1117/// EMFILE=24, ENFILE=23).
1118fn is_too_many_open_files(e: &std::io::Error) -> bool {
1119    matches!(e.raw_os_error(), Some(23) | Some(24))
1120}
1121
1122/// Decide whether a file at `path` needs a fresh PayloadInfo header
1123/// written before sample data is appended. Returns `true` when:
1124/// 1. the file doesn't exist,
1125/// 2. the file exists but is 0 bytes (Java parity 651c3a6b: a crash
1126///    mid-create would otherwise leave a header-less file), OR
1127/// 3. the file exists with bytes but `PbFileReader::open` cannot
1128///    parse the header — in that case we **truncate** the file so
1129///    the caller's `create+append` opens cleanly. Without this
1130///    third branch, a partial-header crash makes the file forever
1131///    unreadable AND every subsequent append silently piles garbage
1132///    onto a corrupt prefix.
1133fn file_needs_header(path: &Path) -> bool {
1134    if !path.exists() {
1135        return true;
1136    }
1137    let size = std::fs::metadata(path).map(|m| m.len()).unwrap_or(0);
1138    if size == 0 {
1139        return true;
1140    }
1141    if PbFileReader::open(path).is_err() {
1142        tracing::warn!(
1143            ?path,
1144            "PB file has unreadable header; truncating so a fresh \
1145             header gets written"
1146        );
1147        if let Err(e) = std::fs::OpenOptions::new()
1148            .write(true)
1149            .truncate(true)
1150            .open(path)
1151        {
1152            tracing::warn!(?path, "Failed to truncate corrupt PB file: {e}");
1153        }
1154        return true;
1155    }
1156    // Header is valid; defend against a tail with a partial sample
1157    // (writer killed mid-flush). Truncating to the last NEWLINE
1158    // boundary loses at most one record but keeps the file readable
1159    // — without this, a reader hits the partial record and stops
1160    // returning every sample after that point.
1161    if let Err(e) = trim_to_last_newline(path) {
1162        tracing::warn!(?path, "Failed to trim partial trailing record: {e}");
1163    }
1164    false
1165}
1166
1167/// Truncate `path` to end at its last NEWLINE byte (inclusive). Used
1168/// to drop a partial sample frame at file tail after a crash.
1169fn trim_to_last_newline(path: &Path) -> std::io::Result<()> {
1170    use std::io::{Read, Seek, SeekFrom};
1171    let mut file = std::fs::OpenOptions::new()
1172        .read(true)
1173        .write(true)
1174        .open(path)?;
1175    let len = file.metadata()?.len();
1176    if len == 0 {
1177        return Ok(());
1178    }
1179    // If the very last byte is already a NEWLINE, nothing to trim.
1180    file.seek(SeekFrom::End(-1))?;
1181    let mut tail = [0u8; 1];
1182    file.read_exact(&mut tail)?;
1183    if tail[0] == codec::NEWLINE {
1184        return Ok(());
1185    }
1186    // Scan backwards in chunks for the last NEWLINE.
1187    const CHUNK: usize = 4096;
1188    let mut buf = vec![0u8; CHUNK];
1189    let mut window_end = len;
1190    while window_end > 0 {
1191        let read_len = (window_end as usize).min(CHUNK);
1192        let read_start = window_end - read_len as u64;
1193        file.seek(SeekFrom::Start(read_start))?;
1194        file.read_exact(&mut buf[..read_len])?;
1195        if let Some(idx) = buf[..read_len].iter().rposition(|&b| b == codec::NEWLINE) {
1196            let new_len = read_start + idx as u64 + 1;
1197            tracing::warn!(
1198                ?path,
1199                old_len = len,
1200                new_len,
1201                "Trimming partial trailing PB record"
1202            );
1203            file.set_len(new_len)?;
1204            return Ok(());
1205        }
1206        window_end = read_start;
1207    }
1208    // No NEWLINE anywhere — file is just one giant un-terminated
1209    // record (or single header line that didn't get its newline).
1210    // Leave as-is; truncation here would be more destructive than
1211    // the corruption we're trying to fix.
1212    Ok(())
1213}
1214
1215/// Convert PV name to file path key.
1216/// `SIM:Sine` → `SIM/Sine`
1217///
1218/// Defensive: an attacker-supplied PV name like `../../etc/passwd` would
1219/// otherwise pass straight through and let `Path::join` escape the
1220/// storage root. Registry-side validation already rejects these at
1221/// register_pv / import_pv / add_alias time, but we re-validate here so
1222/// any code path that bypasses the registry (e.g. retrieval of a PV
1223/// name read directly from a PB file's PayloadInfo) still fails closed.
1224/// Returns a sanitized fallback rather than panicking so retrieval
1225/// errors stay diagnosable.
1226pub(crate) fn pv_name_to_key(pv: &str) -> String {
1227    if !crate::registry::is_valid_pv_name(pv) {
1228        // Strip every disallowed character so path joins stay anchored
1229        // at the storage root. Use a marker prefix so an audit can spot
1230        // these: we never write to such paths in normal operation.
1231        let mut sanitized = String::with_capacity(pv.len() + 16);
1232        sanitized.push_str("__invalid__/");
1233        for c in pv.chars() {
1234            if c.is_ascii_alphanumeric() || c == '_' || c == '-' {
1235                sanitized.push(c);
1236            } else {
1237                sanitized.push('_');
1238            }
1239        }
1240        tracing::warn!(
1241            pv,
1242            "PV name rejected by validator; sanitized to {sanitized}"
1243        );
1244        return sanitized;
1245    }
1246    pv.replace(':', "/")
1247}
1248
1249/// Read the last sample from a PB file by seeking near the end.
1250/// Falls back to full sequential read for edge cases (e.g., very large single sample).
1251fn read_last_sample_from_file(path: &Path) -> anyhow::Result<Option<ArchiverSample>> {
1252    let file = std::fs::File::open(path)?;
1253    let file_len = file.metadata()?.len();
1254    if file_len == 0 {
1255        return Ok(None);
1256    }
1257
1258    let mut rdr = std::io::BufReader::new(file);
1259
1260    // Read header to get year and dbr_type.
1261    let mut header_line = Vec::new();
1262    rdr.read_until(codec::NEWLINE, &mut header_line)?;
1263    if header_line.last() == Some(&codec::NEWLINE) {
1264        header_line.pop();
1265    }
1266    let header_bytes = codec::unescape(&header_line);
1267    let payload_info = archiver_proto::epics_event::PayloadInfo::decode(header_bytes.as_slice())?;
1268    let year = payload_info.year;
1269    let dbr_type = ArchDbType::from_i32(payload_info.r#type).unwrap_or(ArchDbType::ScalarDouble);
1270
1271    let header_end = rdr.stream_position()?;
1272    if header_end >= file_len {
1273        return Ok(None);
1274    }
1275
1276    // Read the last 64KB (or less) to find the final sample line.
1277    let data_len = file_len - header_end;
1278    let chunk_size = (64 * 1024u64).min(data_len);
1279    let seek_pos = file_len - chunk_size;
1280    rdr.seek(SeekFrom::Start(seek_pos))?;
1281
1282    let mut tail = Vec::with_capacity(chunk_size as usize);
1283    rdr.read_to_end(&mut tail)?;
1284
1285    // Trim trailing newline.
1286    if tail.last() == Some(&codec::NEWLINE) {
1287        tail.pop();
1288    }
1289
1290    if tail.is_empty() {
1291        return Ok(None);
1292    }
1293
1294    // Find the last complete line (after the last newline byte in the chunk).
1295    let last_line_data = if let Some(pos) = tail.iter().rposition(|&b| b == codec::NEWLINE) {
1296        &tail[pos + 1..]
1297    } else if seek_pos <= header_end {
1298        // Entire data section is in the chunk — this IS the (only) line.
1299        &tail
1300    } else {
1301        // Very large single line that exceeds 64KB — fall back to sequential read.
1302        let mut reader = PbFileReader::open(path)?;
1303        let mut last = None;
1304        while let Some(sample) = reader.next_event()? {
1305            last = Some(sample);
1306        }
1307        return Ok(last);
1308    };
1309
1310    if last_line_data.is_empty() {
1311        return Ok(None);
1312    }
1313
1314    let raw = codec::unescape(last_line_data);
1315    if let Ok(sample) = reader::decode_sample(dbr_type, year, &raw) {
1316        return Ok(Some(sample));
1317    }
1318
1319    // Java parity (20ec1a02): a crash mid-write leaves a torn last line.
1320    // Walk forward from the start tracking the last good sample so the
1321    // tail-corruption case still surfaces a usable answer instead of an
1322    // I/O-style error to the caller. Bounded by the file size (we'll
1323    // stop at end-of-stream); the cost only matters for the rare
1324    // corrupt-tail case.
1325    tracing::warn!(
1326        ?path,
1327        "PB tail decode failed; falling back to forward scan for last good sample"
1328    );
1329    let mut reader = PbFileReader::open(path)?;
1330    let mut last = None;
1331    while let Ok(Some(sample)) = reader.next_event() {
1332        last = Some(sample);
1333    }
1334    Ok(last)
1335}
1336
1337/// Build PV file prefix info for matching files in a directory.
1338fn pv_file_parts(pv: &str) -> (PathBuf, String) {
1339    let pv_key = pv_name_to_key(pv);
1340    let dir_part = pv_key.rsplit_once('/').map(|(dir, _)| dir).unwrap_or("");
1341    let file_prefix = pv_key
1342        .rsplit_once('/')
1343        .map(|(_, name)| name)
1344        .unwrap_or(&pv_key)
1345        .to_string();
1346    (PathBuf::from(dir_part), file_prefix)
1347}
1348
1349/// List PB files for a PV in a directory, matching the PV file prefix.
1350/// Crate-public re-export of [`list_pv_pb_files`] — used by the ETL
1351/// executor to consolidate one PV's files without duplicating the
1352/// directory-walking logic.
1353pub fn list_pv_pb_files_pub(root: &Path, pv: &str) -> anyhow::Result<Vec<PathBuf>> {
1354    list_pv_pb_files(root, pv)
1355}
1356
1357fn list_pv_pb_files(root: &Path, pv: &str) -> anyhow::Result<Vec<PathBuf>> {
1358    let (dir_part, file_prefix) = pv_file_parts(pv);
1359    let pv_dir = root.join(&dir_part);
1360
1361    if !pv_dir.exists() {
1362        return Ok(Vec::new());
1363    }
1364
1365    let mut files: Vec<PathBuf> = std::fs::read_dir(&pv_dir)?
1366        .filter_map(|e| e.ok())
1367        .map(|e| e.path())
1368        .filter(|p| {
1369            p.extension().and_then(|e| e.to_str()) == Some("pb")
1370                && p.file_name().and_then(|n| n.to_str()).is_some_and(|n| {
1371                    n.starts_with(&file_prefix) && n[file_prefix.len()..].starts_with(':')
1372                })
1373        })
1374        .collect();
1375
1376    files.sort();
1377    Ok(files)
1378}
1379
1380#[async_trait]
1381impl StoragePlugin for PlainPbStoragePlugin {
1382    fn name(&self) -> &str {
1383        &self.plugin_name
1384    }
1385
1386    fn partition_granularity(&self) -> PartitionGranularity {
1387        self.granularity
1388    }
1389
1390    async fn append_event(
1391        &self,
1392        pv: &str,
1393        dbr_type: ArchDbType,
1394        sample: &ArchiverSample,
1395    ) -> anyhow::Result<()> {
1396        let meta = AppendMeta::default();
1397        self.append_event_with_meta(pv, dbr_type, sample, &meta)
1398            .await
1399    }
1400
1401    async fn append_event_with_meta(
1402        &self,
1403        pv: &str,
1404        dbr_type: ArchDbType,
1405        sample: &ArchiverSample,
1406        meta: &AppendMeta,
1407    ) -> anyhow::Result<()> {
1408        let path = self.file_path_for(pv, sample.timestamp);
1409        debug!(?path, pv, "appending event");
1410
1411        self.ensure_parent_dir(&path)?;
1412        self.write_cached(&path, pv, dbr_type, sample, meta)
1413    }
1414
1415    async fn get_data(
1416        &self,
1417        pv: &str,
1418        start: SystemTime,
1419        end: SystemTime,
1420    ) -> anyhow::Result<Vec<Box<dyn EventStream>>> {
1421        // Flush cached writes so readers see the latest data.
1422        self.flush_writes().await?;
1423
1424        let files = self.list_files_for_range(pv, start, end);
1425
1426        // Java parity (88c7601): single-file short-circuit. When the only
1427        // matching file's last sample is older than `start`, return that
1428        // single event in a tiny stream rather than opening a full reader
1429        // that the lower-bound filter would just discard. Equivalent to
1430        // Java's `CallableEventStream.makeOneEventCallable(...)` branch.
1431        // Java parity (88c7601): Java compares `lastEventEpochSeconds <= startTime`,
1432        // so a file whose final sample lands exactly on `start` still
1433        // short-circuits. `<` would force a full reader open at the
1434        // boundary and emit the same single sample after a wasted seek.
1435        if files.len() == 1
1436            && let Some(last) = read_last_sample_from_file(&files[0])?
1437            && last.timestamp <= start
1438        {
1439            let reader = PbFileReader::open(&files[0])?;
1440            let desc = reader.description().clone();
1441            return Ok(vec![Box::new(SingleSampleStream {
1442                desc,
1443                sample: Some(last),
1444            })]);
1445        }
1446
1447        let mut streams: Vec<Box<dyn EventStream>> = Vec::new();
1448        for file in files {
1449            let reader = PbFileReader::open_seeked(&file, start)?;
1450            // Java parity (e3b4471 + 88c7601): clamp output at both
1451            // ends. Without the upper bound, files whose partition name
1452            // overlaps the query but whose late-arriving samples spill
1453            // past `end` leak stale tail data. Without the lower bound,
1454            // a binary-search miss leaves the reader at data-start and
1455            // emits every pre-`start` sample in the file.
1456            streams.push(Box::new(BoundedReader::new(reader, start, end)));
1457        }
1458        Ok(streams)
1459    }
1460
1461    async fn get_last_known_event(&self, pv: &str) -> anyhow::Result<Option<ArchiverSample>> {
1462        // Flush cached writes so readers can see the latest data.
1463        self.flush_writes().await?;
1464
1465        let pb_files = list_pv_pb_files(&self.root_folder, pv)?;
1466
1467        // Read from the last (most recent) file, using optimized tail read.
1468        for path in pb_files.into_iter().rev() {
1469            if let Some(sample) = read_last_sample_from_file(&path)? {
1470                return Ok(Some(sample));
1471            }
1472        }
1473        Ok(None)
1474    }
1475
1476    async fn get_last_event_before(
1477        &self,
1478        pv: &str,
1479        target: SystemTime,
1480    ) -> anyhow::Result<Option<ArchiverSample>> {
1481        self.flush_writes().await?;
1482
1483        let pb_files = list_pv_pb_files(&self.root_folder, pv)?;
1484
1485        // Walk newest-to-oldest; first file whose final sample is before
1486        // `target` provides the answer (its last sample IS the answer).
1487        // For files whose final sample is at-or-after target, scan from
1488        // the start to find the last sample with ts < target.
1489        for path in pb_files.into_iter().rev() {
1490            let Some(last) = read_last_sample_from_file(&path)? else {
1491                continue;
1492            };
1493            if last.timestamp < target {
1494                return Ok(Some(last));
1495            }
1496            // Final sample is past target — scan the file forward and
1497            // track the latest sample with ts < target.
1498            let mut reader = PbFileReader::open(&path)?;
1499            let mut last_before: Option<ArchiverSample> = None;
1500            while let Some(sample) = reader.next_event()? {
1501                if sample.timestamp >= target {
1502                    break;
1503                }
1504                last_before = Some(sample);
1505            }
1506            if last_before.is_some() {
1507                return Ok(last_before);
1508            }
1509            // Every sample in this file is at-or-after target; the answer,
1510            // if any, lives in an older file.
1511        }
1512        Ok(None)
1513    }
1514
1515    async fn delete_pv_data(&self, pv: &str) -> anyhow::Result<u64> {
1516        // Three-phase delete to close the concurrent-append race:
1517        //
1518        //   Phase 1: KEEP the slot in the cache, set `dead = true`,
1519        //            drop the writer (file-gone helper). Any
1520        //            concurrent `append` that performs `slot_for`
1521        //            during this window — whether before or after
1522        //            we acquire the slot lock — gets the SAME
1523        //            tombstoned slot back. write_cached's
1524        //            dead-check then bails before opening any new
1525        //            file under this PV's name.
1526        //
1527        //   Phase 2: List + remove on-disk files (async).
1528        //
1529        //   Phase 3 (RAII): `_cleanup` drops at function exit and
1530        //            removes the slot from the cache. Runs on
1531        //            EVERY return path — Ok, ?, panic. Without
1532        //            this guard, an early-? from list_pv_pb_files
1533        //            or remove_file would leave the slot
1534        //            tombstoned forever with no operator-visible
1535        //            way to recover the PV name.
1536        let _cleanup = TombstoneCleanupGuard {
1537            plugin: self,
1538            pv: pv.to_string(),
1539        };
1540        let slot_arc = self.slot_for(pv);
1541        {
1542            let mut slot = slot_arc.lock().unwrap_or_else(|e| e.into_inner());
1543            slot.dead = true;
1544            if let Some(cached) = slot.writer.take() {
1545                // Files are about to be deleted unconditionally —
1546                // flushing buffered bytes is wasted I/O. Use the
1547                // file-gone helper so dirty bytes get a loss
1548                // marker, satisfying principle 1 (every dirty
1549                // drop classified). The next ingest flush will
1550                // surface the PV to the global owner; the owner's
1551                // commit is a no-op (registry row also torn down
1552                // by the management API), but the classification
1553                // stays uniform.
1554                self.drop_writer_file_gone(pv, cached);
1555            }
1556        }
1557
1558        let entries = list_pv_pb_files(&self.root_folder, pv)?;
1559        let mut deleted = 0u64;
1560        for path in entries {
1561            tokio::fs::remove_file(&path).await?;
1562            deleted += 1;
1563        }
1564
1565        // Clean up empty directory + invalidate the known_dirs
1566        // cache for it. Without the cache invalidation, a later
1567        // re-archive of the same PV would skip create_dir_all
1568        // (cache says "exists") and then fail to open with
1569        // ENOENT — the directory was deleted out from under the
1570        // cache.
1571        let (dir_part, _) = pv_file_parts(pv);
1572        let pv_dir = self.root_folder.join(&dir_part);
1573        if pv_dir.exists() {
1574            let is_empty = std::fs::read_dir(&pv_dir)?.next().is_none();
1575            if is_empty {
1576                let _ = tokio::fs::remove_dir(&pv_dir).await;
1577                let mut dirs = self.known_dirs.lock().unwrap_or_else(|e| e.into_inner());
1578                dirs.remove(&pv_dir);
1579            }
1580        }
1581
1582        debug!(pv, deleted, "Deleted PV data files");
1583        Ok(deleted)
1584    }
1585
1586    async fn flush_writes(&self) -> anyhow::Result<()> {
1587        // Read-side callers (`get_data` etc.) only care about real
1588        // I/O errors. Deferred writers (an `append` is in flight)
1589        // are not failures — those bytes will reach disk on the
1590        // next cycle, and the reader can still see everything
1591        // already flushed. Surfacing deferred as Err would make
1592        // every concurrent read+write race look like storage death.
1593        //
1594        // **Does NOT drain `evicted_with_loss`.** Loss markers
1595        // are reserved for the global flush owner — read-side
1596        // consumption would silently swallow a failure before
1597        // the owner can drop its `ts_updates` for the lost PV.
1598        // (Write failures during this read-side pass are still
1599        // recorded in `evicted_with_loss` by `flush_dirty_writers`
1600        // — they just aren't drained here.)
1601        let outcome = self.flush_dirty_writers();
1602        if !outcome.failed.is_empty() {
1603            anyhow::bail!(
1604                "{} writer flush(es) failed (first pv={})",
1605                outcome.failed.len(),
1606                outcome.failed[0],
1607            );
1608        }
1609        Ok(())
1610    }
1611
1612    async fn flush_ingest_writes(&self) -> anyhow::Result<IngestFlushResult> {
1613        // Surface failed/deferred separately so the engine can keep
1614        // deferred PVs in its ts_updates map (their bytes will reach
1615        // disk next cycle) while dropping failed PVs (their bytes
1616        // are lost). Lumping them together caused permanent
1617        // registry-timestamp loss for PVs that went briefly busy
1618        // then silent.
1619        let outcome = self.flush_dirty_writers();
1620        let mut failed = outcome.failed;
1621        // Drain the loss queue here — the global flush owner is
1622        // the SOLE consumer of these markers. Includes:
1623        //   * LRU dirty-eviction losses (proactive cap pressure)
1624        //   * Partition rollover / ghost-file / evict-by-path
1625        //     dirty-drop losses
1626        //   * Read-side flush failures (recorded by
1627        //     `flush_dirty_writers` even when invoked by retrieval)
1628        // Dedupe so an entry that appears in both `failed` and the
1629        // queue (this-cycle flush failure) is reported once.
1630        {
1631            let mut ev = self
1632                .evicted_with_loss
1633                .lock()
1634                .unwrap_or_else(|e| e.into_inner());
1635            failed.append(&mut *ev);
1636        }
1637        failed.sort();
1638        failed.dedup();
1639        Ok(IngestFlushResult {
1640            failed,
1641            deferred: outcome.deferred,
1642        })
1643    }
1644
1645    fn stores_for_pv(&self, pv: &str) -> anyhow::Result<Vec<StoreSummary>> {
1646        let files = list_pv_pb_files(&self.root_folder, pv).unwrap_or_default();
1647        let count = files.len() as u64;
1648        let bytes: u64 = files
1649            .iter()
1650            .filter_map(|p| std::fs::metadata(p).ok())
1651            .map(|m| m.len())
1652            .sum();
1653        Ok(vec![StoreSummary {
1654            name: self.plugin_name.clone(),
1655            root_folder: self.root_folder.clone(),
1656            granularity: self.granularity,
1657            pv_file_count: Some(count),
1658            pv_size_bytes: Some(bytes),
1659            total_size_bytes: None,
1660            total_files: None,
1661        }])
1662    }
1663
1664    fn appliance_metrics(&self) -> anyhow::Result<Vec<StoreSummary>> {
1665        let (total_files, total_size) = total_pb_stats(&self.root_folder);
1666        Ok(vec![StoreSummary {
1667            name: self.plugin_name.clone(),
1668            root_folder: self.root_folder.clone(),
1669            granularity: self.granularity,
1670            pv_file_count: None,
1671            pv_size_bytes: None,
1672            total_size_bytes: Some(total_size),
1673            total_files: Some(total_files),
1674        }])
1675    }
1676
1677    async fn rename_pv(&self, from: &str, to: &str) -> anyhow::Result<u64> {
1678        // Phase 1: tombstone the SOURCE slot in cache (don't
1679        // remove it yet — see delete_pv_data for the same race
1680        // rationale). Concurrent appends to `from` during the
1681        // file-rename window will find this same dead slot and
1682        // bail at the dead-check.
1683        //
1684        // Phase 3 (RAII): the cleanup guard removes the source
1685        // slot from the cache on every return path so a
1686        // partway-failed rename can't leave `from` permanently
1687        // tombstoned.
1688        let _cleanup = TombstoneCleanupGuard {
1689            plugin: self,
1690            pv: from.to_string(),
1691        };
1692        let from_slot = self.slot_for(from);
1693        {
1694            let mut slot = from_slot.lock().unwrap_or_else(|e| e.into_inner());
1695            slot.dead = true;
1696            if let Some(cached) = slot.writer.take() {
1697                // Try to flush so the post-rename dest file
1698                // inherits the source's last buffered samples.
1699                // On flush failure, record loss for `from`; the
1700                // global owner will drop its pending entry. The
1701                // source name's registry row is being removed by
1702                // the rename anyway, so the commit-side effect is
1703                // a no-op — but principle 1 (every dirty drop
1704                // classified) holds.
1705                let _ = self.drop_dirty_writer(from, cached);
1706            }
1707        }
1708        // Defensive: clear any stale destination writer before
1709        // the rename moves source files into the dest's path
1710        // namespace. Dest is NOT tombstoned — `to` is the live
1711        // PV after this returns, and future appends to it are
1712        // legal. (If the operator was actively appending to `to`
1713        // at the moment of rename, they made a mistake; we don't
1714        // optimise for that case.)
1715        //
1716        // **Outer lock briefly, then release**: take the slot
1717        // Arc out under the outer lock, drop the outer guard,
1718        // THEN lock the slot and run drop_dirty_writer (which
1719        // does sync flush I/O). Holding the outer lock across
1720        // the flush would block every concurrent slot_for() —
1721        // i.e., every shard's append — for the duration of the
1722        // dest writer's flush, which on slow storage could stall
1723        // the entire ingest path.
1724        let dest_slot_arc = {
1725            let mut cache = self
1726                .write_cache
1727                .lock()
1728                .map_err(|e| anyhow::anyhow!("write cache poisoned: {e}"))?;
1729            cache.remove(to)
1730        };
1731        if let Some(arc) = dest_slot_arc {
1732            let mut slot = arc.lock().unwrap_or_else(|e| e.into_inner());
1733            if let Some(cached) = slot.writer.take() {
1734                let _ = self.drop_dirty_writer(to, cached);
1735            }
1736        }
1737
1738        let from_files = list_pv_pb_files(&self.root_folder, from)?;
1739        if from_files.is_empty() {
1740            return Ok(0);
1741        }
1742        let from_key = pv_name_to_key(from);
1743        let from_leaf = from_key.rsplit('/').next().unwrap_or(&from_key).to_string();
1744        let to_key = pv_name_to_key(to);
1745        let to_leaf = to_key.rsplit('/').next().unwrap_or(&to_key).to_string();
1746
1747        // Ensure destination parent directory exists so std::fs::rename can
1748        // place files across PV-name prefixes (e.g. SIM:Sine -> RING:Current
1749        // changes the parent dir from SIM/ to RING/).
1750        let (to_dir_part, _) = pv_file_parts(to);
1751        let to_dir = self.root_folder.join(&to_dir_part);
1752        if !to_dir.as_os_str().is_empty() && !to_dir.exists() {
1753            std::fs::create_dir_all(&to_dir)?;
1754        }
1755
1756        let mut moved = 0u64;
1757        for src in &from_files {
1758            let file_name = src
1759                .file_name()
1760                .and_then(|n| n.to_str())
1761                .ok_or_else(|| anyhow::anyhow!("non-utf8 filename: {src:?}"))?;
1762            // file is "{from_leaf}:{partition}.pb" — replace the leaf prefix.
1763            let suffix = file_name
1764                .strip_prefix(&from_leaf)
1765                .and_then(|s| s.strip_prefix(':'))
1766                .ok_or_else(|| {
1767                    anyhow::anyhow!("filename {file_name} did not match expected PV leaf")
1768                })?;
1769            let new_name = format!("{to_leaf}:{suffix}");
1770            let dst = to_dir.join(new_name);
1771            std::fs::rename(src, &dst)?;
1772            moved += 1;
1773        }
1774
1775        // Clean up empty source directory + invalidate
1776        // known_dirs cache for it (same rationale as
1777        // delete_pv_data — a stale entry would make the next
1778        // append to a re-archived `from` skip create_dir_all and
1779        // hit ENOENT).
1780        let (from_dir_part, _) = pv_file_parts(from);
1781        let from_dir = self.root_folder.join(&from_dir_part);
1782        if !from_dir_part.as_os_str().is_empty()
1783            && from_dir.exists()
1784            && std::fs::read_dir(&from_dir)?.next().is_none()
1785        {
1786            let _ = std::fs::remove_dir(&from_dir);
1787            let mut dirs = self.known_dirs.lock().unwrap_or_else(|e| e.into_inner());
1788            dirs.remove(&from_dir);
1789        }
1790
1791        // Phase 3 cleanup of the tombstoned source slot is done
1792        // by `_cleanup`'s Drop on function exit. Single source of
1793        // truth for the cleanup keeps it consistent across every
1794        // return path.
1795
1796        Ok(moved)
1797    }
1798}
1799
1800/// Sum sizes and counts of `.pb` files under `root` recursively. Errors are
1801/// logged and ignored so a single unreadable file doesn't poison the metric.
1802fn total_pb_stats(root: &Path) -> (u64, u64) {
1803    fn walk(p: &Path, files: &mut u64, bytes: &mut u64) {
1804        let entries = match std::fs::read_dir(p) {
1805            Ok(e) => e,
1806            Err(_) => return,
1807        };
1808        for entry in entries.flatten() {
1809            let path = entry.path();
1810            if path.is_dir() {
1811                walk(&path, files, bytes);
1812            } else if path.extension().and_then(|e| e.to_str()) == Some("pb") {
1813                *files += 1;
1814                if let Ok(meta) = entry.metadata() {
1815                    *bytes += meta.len();
1816                }
1817            }
1818        }
1819    }
1820    let mut files = 0u64;
1821    let mut bytes = 0u64;
1822    if root.exists() {
1823        walk(root, &mut files, &mut bytes);
1824    }
1825    (files, bytes)
1826}