archiver_core/storage/traits.rs
1use std::path::PathBuf;
2use std::time::SystemTime;
3
4use async_trait::async_trait;
5
6use crate::storage::partition::PartitionGranularity;
7use crate::types::{ArchDbType, ArchiverSample, EventStreamDesc};
8
9/// Per-tier description of a storage stage. Surfaced through the
10/// `getStoresForPV` and `getApplianceMetrics` BPL endpoints so operators
11/// can see tier layout and per-PV file counts without poking the disk.
12#[derive(Debug, Clone)]
13pub struct StoreSummary {
14 pub name: String,
15 pub root_folder: PathBuf,
16 pub granularity: PartitionGranularity,
17 /// Number of `.pb` partition files this tier holds for the given PV.
18 /// `None` when the summary was requested without a PV scope.
19 pub pv_file_count: Option<u64>,
20 /// Sum of `.pb` file sizes (bytes) for the given PV in this tier.
21 /// `None` when the summary was requested without a PV scope.
22 pub pv_size_bytes: Option<u64>,
23 /// Total size on disk of all `.pb` files in this tier (bytes), summed across PVs.
24 /// `None` when the summary is PV-scoped.
25 pub total_size_bytes: Option<u64>,
26 /// Total number of `.pb` files in this tier across all PVs.
27 /// `None` when the summary is PV-scoped.
28 pub total_files: Option<u64>,
29}
30
31/// A stream of archived events (read side).
32pub trait EventStream: Send {
33 fn description(&self) -> &EventStreamDesc;
34 fn next_event(&mut self) -> anyhow::Result<Option<ArchiverSample>>;
35}
36
37/// Optional metadata to include in PlainPB PayloadInfo headers.
38#[derive(Debug, Clone, Default)]
39pub struct AppendMeta {
40 pub element_count: Option<i32>,
41 pub headers: Vec<(String, String)>,
42}
43
44/// Outcome of a single `flush_ingest_writes` pass.
45///
46/// `failed` and `deferred` carry **different** semantics for callers
47/// like the engine's write_loop:
48///
49/// * **failed** — the flush errored for this PV; its buffered bytes
50/// have been discarded (writer evicted via `into_parts`). The
51/// caller MUST drop these from any pending timestamp commit AND
52/// from its `ts_updates` map so the registry doesn't claim
53/// `last_event` for samples that never reached disk.
54///
55/// * **deferred** — the writer's per-PV slot was already locked by
56/// an in-flight append. The bytes are still buffered and will be
57/// flushed on the next cycle. The caller MUST skip these from
58/// THIS cycle's commit but MUST keep them in `ts_updates` so the
59/// timestamp commits on a later cycle. Treating deferred as a
60/// permanent failure permanently loses the registry timestamp
61/// for any PV that gets busy and then goes silent.
62#[derive(Debug, Clone, Default)]
63pub struct IngestFlushResult {
64 pub failed: Vec<String>,
65 pub deferred: Vec<String>,
66}
67
68/// Storage plugin trait — the primary interface for reading/writing archived data.
69#[async_trait]
70pub trait StoragePlugin: Send + Sync {
71 fn name(&self) -> &str;
72 fn partition_granularity(&self) -> PartitionGranularity;
73
74 /// Append a single sample to storage.
75 async fn append_event(
76 &self,
77 pv: &str,
78 dbr_type: ArchDbType,
79 sample: &ArchiverSample,
80 ) -> anyhow::Result<()>;
81
82 /// Append a single sample with optional metadata for PlainPB headers.
83 async fn append_event_with_meta(
84 &self,
85 pv: &str,
86 dbr_type: ArchDbType,
87 sample: &ArchiverSample,
88 _meta: &AppendMeta,
89 ) -> anyhow::Result<()> {
90 // Default implementation ignores metadata.
91 self.append_event(pv, dbr_type, sample).await
92 }
93
94 /// Read data for a PV within a time range. Returns multiple streams
95 /// (one per partition file).
96 async fn get_data(
97 &self,
98 pv: &str,
99 start: SystemTime,
100 end: SystemTime,
101 ) -> anyhow::Result<Vec<Box<dyn EventStream>>>;
102
103 /// Get the most recent known event for a PV.
104 async fn get_last_known_event(&self, pv: &str) -> anyhow::Result<Option<ArchiverSample>>;
105
106 /// Get the last sample whose timestamp is strictly before `target`.
107 /// Used by retrieval to prepend a continuity sample when the user's
108 /// query window starts in a gap between samples (Java's
109 /// `getLastEventOfPreviousPartitionBeforeTimeAsStream`). Returns None
110 /// if no such sample exists.
111 ///
112 /// Default implementation: walks `get_last_known_event` and returns
113 /// it iff its timestamp is < target. Plugins with cheaper backward
114 /// scans should override.
115 async fn get_last_event_before(
116 &self,
117 pv: &str,
118 target: SystemTime,
119 ) -> anyhow::Result<Option<ArchiverSample>> {
120 match self.get_last_known_event(pv).await? {
121 Some(sample) if sample.timestamp < target => Ok(Some(sample)),
122 _ => Ok(None),
123 }
124 }
125
126 /// Delete all stored data for a PV. Returns the number of files deleted.
127 /// Default implementation returns 0 (no-op for backward compatibility).
128 async fn delete_pv_data(&self, _pv: &str) -> anyhow::Result<u64> {
129 Ok(0)
130 }
131
132 /// Flush any buffered writes to disk. Default is no-op.
133 async fn flush_writes(&self) -> anyhow::Result<()> {
134 Ok(())
135 }
136
137 /// Flush only the cached writers used by the *ingest* path (the
138 /// engine's monitor/scan write_loop). See [`IngestFlushResult`]
139 /// for the failed/deferred distinction — `failed` PVs lost
140 /// their buffered bytes, `deferred` PVs are still buffered and
141 /// must be retried on the next cycle.
142 ///
143 /// Multi-tier implementations should limit scope to the ingest
144 /// tier (e.g. STS only) so a slow MTS/LTS mount can't stall the
145 /// live archive pipeline. ETL drives MTS/LTS flushing separately.
146 async fn flush_ingest_writes(&self) -> anyhow::Result<IngestFlushResult> {
147 // Default: best-effort fall back to flush_writes. Implementations
148 // that can identify per-PV failures/deferrals should override.
149 self.flush_writes()
150 .await
151 .map(|_| IngestFlushResult::default())
152 }
153
154 /// Per-tier summary scoped to a single PV: name, root folder, granularity,
155 /// and how many `.pb` files this tier holds for that PV. Total size /
156 /// total files are left None.
157 fn stores_for_pv(&self, pv: &str) -> anyhow::Result<Vec<StoreSummary>>;
158
159 /// Per-tier summary aggregated across all PVs: total size on disk and
160 /// total file count. `pv_file_count` is left None.
161 fn appliance_metrics(&self) -> anyhow::Result<Vec<StoreSummary>>;
162
163 /// Rename `from` → `to` in this storage backend. Implementations may copy
164 /// or rename underlying files; the contract is that after a successful
165 /// return, reads for `to` see all data previously stored under `from` and
166 /// reads for `from` see none. Defaults to error so missing implementations
167 /// surface explicitly.
168 async fn rename_pv(&self, _from: &str, _to: &str) -> anyhow::Result<u64> {
169 anyhow::bail!("rename_pv not implemented for this storage plugin")
170 }
171}
172
173/// Post-processor trait for data reduction (mean, max, min, etc.).
174pub trait PostProcessor: Send {
175 fn name(&self) -> &str;
176 fn interval_secs(&self) -> u64;
177 fn process(&self, input: Box<dyn EventStream>) -> Box<dyn EventStream>;
178}