Skip to main content

archiver_core/storage/
traits.rs

1use std::path::PathBuf;
2use std::time::SystemTime;
3
4use async_trait::async_trait;
5
6use crate::storage::partition::PartitionGranularity;
7use crate::types::{ArchDbType, ArchiverSample, EventStreamDesc};
8
9/// Per-tier description of a storage stage. Surfaced through the
10/// `getStoresForPV` and `getApplianceMetrics` BPL endpoints so operators
11/// can see tier layout and per-PV file counts without poking the disk.
12#[derive(Debug, Clone)]
13pub struct StoreSummary {
14    pub name: String,
15    pub root_folder: PathBuf,
16    pub granularity: PartitionGranularity,
17    /// Number of `.pb` partition files this tier holds for the given PV.
18    /// `None` when the summary was requested without a PV scope.
19    pub pv_file_count: Option<u64>,
20    /// Sum of `.pb` file sizes (bytes) for the given PV in this tier.
21    /// `None` when the summary was requested without a PV scope.
22    pub pv_size_bytes: Option<u64>,
23    /// Total size on disk of all `.pb` files in this tier (bytes), summed across PVs.
24    /// `None` when the summary is PV-scoped.
25    pub total_size_bytes: Option<u64>,
26    /// Total number of `.pb` files in this tier across all PVs.
27    /// `None` when the summary is PV-scoped.
28    pub total_files: Option<u64>,
29}
30
31/// A stream of archived events (read side).
32pub trait EventStream: Send {
33    fn description(&self) -> &EventStreamDesc;
34    fn next_event(&mut self) -> anyhow::Result<Option<ArchiverSample>>;
35}
36
37/// Optional metadata to include in PlainPB PayloadInfo headers.
38#[derive(Debug, Clone, Default)]
39pub struct AppendMeta {
40    pub element_count: Option<i32>,
41    pub headers: Vec<(String, String)>,
42}
43
44/// Outcome of a single `flush_ingest_writes` pass.
45///
46/// `failed` and `deferred` carry **different** semantics for callers
47/// like the engine's write_loop:
48///
49/// * **failed** — the flush errored for this PV; its buffered bytes
50///   have been discarded (writer evicted via `into_parts`). The
51///   caller MUST drop these from any pending timestamp commit AND
52///   from its `ts_updates` map so the registry doesn't claim
53///   `last_event` for samples that never reached disk.
54///
55/// * **deferred** — the writer's per-PV slot was already locked by
56///   an in-flight append. The bytes are still buffered and will be
57///   flushed on the next cycle. The caller MUST skip these from
58///   THIS cycle's commit but MUST keep them in `ts_updates` so the
59///   timestamp commits on a later cycle. Treating deferred as a
60///   permanent failure permanently loses the registry timestamp
61///   for any PV that gets busy and then goes silent.
62#[derive(Debug, Clone, Default)]
63pub struct IngestFlushResult {
64    pub failed: Vec<String>,
65    pub deferred: Vec<String>,
66}
67
68/// Storage plugin trait — the primary interface for reading/writing archived data.
69#[async_trait]
70pub trait StoragePlugin: Send + Sync {
71    fn name(&self) -> &str;
72    fn partition_granularity(&self) -> PartitionGranularity;
73
74    /// Append a single sample to storage.
75    async fn append_event(
76        &self,
77        pv: &str,
78        dbr_type: ArchDbType,
79        sample: &ArchiverSample,
80    ) -> anyhow::Result<()>;
81
82    /// Append a single sample with optional metadata for PlainPB headers.
83    async fn append_event_with_meta(
84        &self,
85        pv: &str,
86        dbr_type: ArchDbType,
87        sample: &ArchiverSample,
88        _meta: &AppendMeta,
89    ) -> anyhow::Result<()> {
90        // Default implementation ignores metadata.
91        self.append_event(pv, dbr_type, sample).await
92    }
93
94    /// Read data for a PV within a time range. Returns multiple streams
95    /// (one per partition file).
96    async fn get_data(
97        &self,
98        pv: &str,
99        start: SystemTime,
100        end: SystemTime,
101    ) -> anyhow::Result<Vec<Box<dyn EventStream>>>;
102
103    /// Get the most recent known event for a PV.
104    async fn get_last_known_event(&self, pv: &str) -> anyhow::Result<Option<ArchiverSample>>;
105
106    /// Get the last sample whose timestamp is strictly before `target`.
107    /// Used by retrieval to prepend a continuity sample when the user's
108    /// query window starts in a gap between samples (Java's
109    /// `getLastEventOfPreviousPartitionBeforeTimeAsStream`). Returns None
110    /// if no such sample exists.
111    ///
112    /// Default implementation: walks `get_last_known_event` and returns
113    /// it iff its timestamp is < target. Plugins with cheaper backward
114    /// scans should override.
115    async fn get_last_event_before(
116        &self,
117        pv: &str,
118        target: SystemTime,
119    ) -> anyhow::Result<Option<ArchiverSample>> {
120        match self.get_last_known_event(pv).await? {
121            Some(sample) if sample.timestamp < target => Ok(Some(sample)),
122            _ => Ok(None),
123        }
124    }
125
126    /// Delete all stored data for a PV. Returns the number of files deleted.
127    /// Default implementation returns 0 (no-op for backward compatibility).
128    async fn delete_pv_data(&self, _pv: &str) -> anyhow::Result<u64> {
129        Ok(0)
130    }
131
132    /// Flush any buffered writes to disk. Default is no-op.
133    async fn flush_writes(&self) -> anyhow::Result<()> {
134        Ok(())
135    }
136
137    /// Flush only the cached writers used by the *ingest* path (the
138    /// engine's monitor/scan write_loop). See [`IngestFlushResult`]
139    /// for the failed/deferred distinction — `failed` PVs lost
140    /// their buffered bytes, `deferred` PVs are still buffered and
141    /// must be retried on the next cycle.
142    ///
143    /// Multi-tier implementations should limit scope to the ingest
144    /// tier (e.g. STS only) so a slow MTS/LTS mount can't stall the
145    /// live archive pipeline. ETL drives MTS/LTS flushing separately.
146    async fn flush_ingest_writes(&self) -> anyhow::Result<IngestFlushResult> {
147        // Default: best-effort fall back to flush_writes. Implementations
148        // that can identify per-PV failures/deferrals should override.
149        self.flush_writes()
150            .await
151            .map(|_| IngestFlushResult::default())
152    }
153
154    /// Per-tier summary scoped to a single PV: name, root folder, granularity,
155    /// and how many `.pb` files this tier holds for that PV. Total size /
156    /// total files are left None.
157    fn stores_for_pv(&self, pv: &str) -> anyhow::Result<Vec<StoreSummary>>;
158
159    /// Per-tier summary aggregated across all PVs: total size on disk and
160    /// total file count. `pv_file_count` is left None.
161    fn appliance_metrics(&self) -> anyhow::Result<Vec<StoreSummary>>;
162
163    /// Rename `from` → `to` in this storage backend. Implementations may copy
164    /// or rename underlying files; the contract is that after a successful
165    /// return, reads for `to` see all data previously stored under `from` and
166    /// reads for `from` see none. Defaults to error so missing implementations
167    /// surface explicitly.
168    async fn rename_pv(&self, _from: &str, _to: &str) -> anyhow::Result<u64> {
169        anyhow::bail!("rename_pv not implemented for this storage plugin")
170    }
171}
172
173/// Post-processor trait for data reduction (mean, max, min, etc.).
174pub trait PostProcessor: Send {
175    fn name(&self) -> &str;
176    fn interval_secs(&self) -> u64;
177    fn process(&self, input: Box<dyn EventStream>) -> Box<dyn EventStream>;
178}