Skip to main content

archiver_engine/
channel_manager.rs

1use std::sync::atomic::{AtomicI32, AtomicI64, AtomicU64, Ordering};
2use std::sync::{Arc, Mutex};
3use std::time::{Duration, SystemTime};
4
5use dashmap::DashMap;
6use epics_rs::base::server::snapshot::DbrClass;
7use epics_rs::base::types::{DbFieldType, EpicsValue};
8use epics_rs::ca::client::{CaChannel, CaClient, ConnectionEvent};
9use epics_rs::pva::client_native::PvaClient;
10use epics_rs::pva::pvdata::{PvField, ScalarValue};
11use tokio::sync::mpsc;
12use tokio_util::sync::CancellationToken;
13use tracing::{debug, error, info, warn};
14
15use archiver_core::registry::{Protocol, PvRecord, PvRegistry, PvStatus, SampleMode};
16use archiver_core::storage::traits::{AppendMeta, IngestFlushResult, StoragePlugin};
17use archiver_core::types::{ArchDbType, ArchiverSample, ArchiverValue};
18
19use crate::policy::PolicyConfig;
20
21/// Timeout for initial CA channel connection.
22const CA_CONNECT_TIMEOUT: Duration = Duration::from_secs(10);
23/// Timeout for CA reconnection attempts in the monitor loop.
24const CA_RECONNECT_TIMEOUT: Duration = Duration::from_secs(30);
25/// Delay before retrying a failed CA subscription.
26const CA_RETRY_DELAY: Duration = Duration::from_secs(5);
27
28/// Hard floor on accepted timestamps. Mirrors Java's `PAST_CUTOFF_TIMESTAMP`
29/// of 1991-01-01 — earlier than that, the timestamp is almost certainly a
30/// stale uninitialised IOC clock or a sentinel.
31const PAST_CUTOFF_UNIX_SECS: i64 = 662_688_000; // 1991-01-01 00:00:00 UTC
32
33/// Filter a freshly-received sample timestamp against the wall clock and
34/// the floor. Returns `Some(ts)` if accepted, `None` if it should be
35/// dropped (caller bumps `timestamp_drops`).
36///
37/// `drift_secs` is the configured `server_ioc_drift_secs` (Java parity
38/// 6538631), so per-site tuning doesn't require recompiling. `now` is
39/// passed in (rather than calling `SystemTime::now()` here) so the test
40/// suite can pin time deterministically.
41fn ioc_timestamp_in_window(ts: SystemTime, now: SystemTime, drift_secs: u64) -> bool {
42    let unix = ts
43        .duration_since(SystemTime::UNIX_EPOCH)
44        .map(|d| d.as_secs() as i64)
45        .unwrap_or(i64::MIN);
46    if unix < PAST_CUTOFF_UNIX_SECS {
47        return false;
48    }
49    // Within ±drift_secs of `now`?
50    let now_unix = now
51        .duration_since(SystemTime::UNIX_EPOCH)
52        .map(|d| d.as_secs() as i64)
53        .unwrap_or(0);
54    let delta = (unix - now_unix).unsigned_abs();
55    delta <= drift_secs
56}
57
58/// Discrete connection state for `getPVDetails` (Java parity dea7acb).
59/// Distinguishes never-connected from connecting from confirmed-down.
60#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
61pub enum PvConnectionState {
62    /// No connection attempt has been made (or none has progressed
63    /// past `wait_connected` yet).
64    #[default]
65    Idle,
66    /// Currently waiting on `wait_connected` for the first time on
67    /// this channel handle.
68    Connecting,
69    /// Channel has reported a successful connect; samples are flowing
70    /// or the channel is otherwise live.
71    Connected,
72    /// Channel reported connect at least once but the monitor loop has
73    /// since dropped — distinct from `Idle` so operators can spot a
74    /// regression vs a never-resolved name.
75    Disconnected,
76}
77
78impl PvConnectionState {
79    pub fn as_str(self) -> &'static str {
80        match self {
81            Self::Idle => "Idle",
82            Self::Connecting => "Connecting",
83            Self::Connected => "Connected",
84            Self::Disconnected => "Disconnected",
85        }
86    }
87}
88
89/// Connection state tracked per PV.
90#[derive(Debug, Clone, Default)]
91pub struct ConnectionInfo {
92    pub connected_since: Option<SystemTime>,
93    pub last_event_time: Option<SystemTime>,
94    pub is_connected: bool,
95    /// Java parity (dea7acb): discrete connection state for the
96    /// PVDetails report. Operators can distinguish never-connected,
97    /// currently-connecting, and previously-connected-now-down.
98    pub state: PvConnectionState,
99}
100
101/// Per-PV diagnostic counters for the BPL drop / rate / connection
102/// reports. All counts are monotonic across the PV's lifetime; the
103/// rate handlers compute deltas against `first_event_unix_secs`.
104///
105/// Tracked here (not in ConnectionInfo) so they survive transient
106/// disconnects and are read lock-free from the report endpoints.
107#[derive(Debug)]
108pub struct PvCounters {
109    /// Total events produced by monitor/scan, including those that
110    /// later got dropped before write.
111    pub events_received: AtomicU64,
112    /// Total events successfully written to storage.
113    pub events_stored: AtomicU64,
114    /// Unix-epoch seconds of the first event we ever saw for this PV.
115    /// 0 = "no event yet" (i64 because Atomic<Option<...>> doesn't exist).
116    pub first_event_unix_secs: AtomicI64,
117    /// Number of events the bounded sample channel rejected (write
118    /// loop falling behind producer). Surfaces as
119    /// `getDroppedEventsBufferOverflowReport`.
120    pub buffer_overflow_drops: AtomicU64,
121    /// Number of events whose timestamp went backwards relative to the
122    /// previously-stored event. Java archiver's
123    /// `DroppedEventsTimestampReport`.
124    pub timestamp_drops: AtomicU64,
125    /// Number of events whose runtime DBR type didn't match the
126    /// PvRecord's stored type — the engine drops these because mixing
127    /// types in one PB partition would corrupt downstream readers.
128    pub type_change_drops: AtomicU64,
129    /// Number of disconnect transitions seen on this PV's CA channel.
130    /// `LostConnectionsReport`.
131    pub disconnect_count: AtomicU64,
132    /// Last unix-epoch seconds of disconnect transition.
133    pub last_disconnect_unix_secs: AtomicI64,
134    /// Number of transient subscribe / monitor-recv / scan-read errors
135    /// (Java parity 8fe73eb). Distinct from `disconnect_count` —
136    /// these are recoverable per-attempt failures rather than confirmed
137    /// link drops.
138    pub transient_error_count: AtomicU64,
139    /// Latest DBR type observed from CA that did not match the
140    /// archive-time recorded type (Java parity 9f2234f). `-1` = no
141    /// mismatch ever seen.
142    pub latest_observed_dbr: AtomicI32,
143    /// Number of DBR_CTRL metadata refresh attempts that failed (timeout,
144    /// transport error, missing display info). Operators looking at PVs
145    /// with empty PREC/EGU should check this.
146    pub metadata_fetch_failures: AtomicU64,
147    /// Number of `storage.append_event_with_meta` calls that exceeded
148    /// the per-sample storage timeout. Distinct from
149    /// `buffer_overflow_drops` (mpsc channel saturation) so an
150    /// operator can tell "the storage tier is wedged" apart from
151    /// "the writer can't keep up with the producer".
152    pub storage_append_timeouts: AtomicU64,
153}
154
155impl Default for PvCounters {
156    fn default() -> Self {
157        Self {
158            events_received: AtomicU64::new(0),
159            events_stored: AtomicU64::new(0),
160            first_event_unix_secs: AtomicI64::new(0),
161            buffer_overflow_drops: AtomicU64::new(0),
162            timestamp_drops: AtomicU64::new(0),
163            type_change_drops: AtomicU64::new(0),
164            disconnect_count: AtomicU64::new(0),
165            last_disconnect_unix_secs: AtomicI64::new(0),
166            transient_error_count: AtomicU64::new(0),
167            // -1 sentinel = no type mismatch ever observed.
168            latest_observed_dbr: AtomicI32::new(-1),
169            metadata_fetch_failures: AtomicU64::new(0),
170            storage_append_timeouts: AtomicU64::new(0),
171        }
172    }
173}
174
175/// Read-only snapshot of `PvCounters` — owned values so callers can
176/// move them across threads / serialise without reaching into Atomics.
177#[derive(Debug, Clone)]
178pub struct PvCountersSnapshot {
179    pub events_received: u64,
180    pub events_stored: u64,
181    pub first_event_unix_secs: Option<i64>,
182    pub buffer_overflow_drops: u64,
183    pub timestamp_drops: u64,
184    pub type_change_drops: u64,
185    pub disconnect_count: u64,
186    pub last_disconnect_unix_secs: Option<i64>,
187    pub transient_error_count: u64,
188    /// `Some(dbr_type as i32)` if a type-change mismatch has been
189    /// observed; `None` otherwise (Java parity 9f2234f).
190    pub latest_observed_dbr: Option<i32>,
191    pub metadata_fetch_failures: u64,
192    pub storage_append_timeouts: u64,
193}
194
195impl From<&PvCounters> for PvCountersSnapshot {
196    fn from(c: &PvCounters) -> Self {
197        let first = c.first_event_unix_secs.load(Ordering::Relaxed);
198        let last_disc = c.last_disconnect_unix_secs.load(Ordering::Relaxed);
199        Self {
200            events_received: c.events_received.load(Ordering::Relaxed),
201            events_stored: c.events_stored.load(Ordering::Relaxed),
202            first_event_unix_secs: if first == 0 { None } else { Some(first) },
203            buffer_overflow_drops: c.buffer_overflow_drops.load(Ordering::Relaxed),
204            timestamp_drops: c.timestamp_drops.load(Ordering::Relaxed),
205            type_change_drops: c.type_change_drops.load(Ordering::Relaxed),
206            disconnect_count: c.disconnect_count.load(Ordering::Relaxed),
207            last_disconnect_unix_secs: if last_disc == 0 {
208                None
209            } else {
210                Some(last_disc)
211            },
212            transient_error_count: c.transient_error_count.load(Ordering::Relaxed),
213            latest_observed_dbr: match c.latest_observed_dbr.load(Ordering::Relaxed) {
214                -1 => None,
215                v => Some(v),
216            },
217            metadata_fetch_failures: c.metadata_fetch_failures.load(Ordering::Relaxed),
218            storage_append_timeouts: c.storage_append_timeouts.load(Ordering::Relaxed),
219        }
220    }
221}
222
223/// Handle for a running PV archiving task.
224struct PvHandle {
225    /// `Some` for CA-acquired PVs (used by `try_get_value` for the
226    /// live-value RPC); `None` for PVA-acquired PVs since PVA exposes
227    /// no Clone-able channel handle — live_value for those routes
228    /// through `pva_client` directly.
229    channel: Option<CaChannel>,
230    cancel_token: CancellationToken,
231    #[allow(dead_code)]
232    dbr_type: ArchDbType,
233    conn_info: Arc<Mutex<ConnectionInfo>>,
234    /// Latest values of metadata fields (.HIHI, .LOLO, .EGU, ...) attached to
235    /// every sample emitted for this PV. Populated by per-field monitor tasks
236    /// owned by `cancel_token` (and per-field child tokens in `field_tokens`),
237    /// so stopping the PV stops all of them.
238    extras: Arc<ExtraFieldsCache>,
239    /// Per-field cancellation tokens — child tokens of `cancel_token`. Keyed
240    /// by field name (e.g. "HIHI"). Lets `update_archive_fields` cancel one
241    /// field's task without disturbing the others or the main PV.
242    field_tokens: Arc<DashMap<String, CancellationToken>>,
243    /// Serialises concurrent `update_archive_fields` calls for this PV so
244    /// add/remove/respawn never race with itself.
245    update_lock: Arc<tokio::sync::Mutex<()>>,
246    /// Diagnostic counters surfaced through the BPL drop / rate /
247    /// connection reports. Lock-free reads; updates from the producer
248    /// (monitor/scan) and the writer happen on different threads.
249    counters: Arc<PvCounters>,
250}
251
252/// Thread-safe cache of latest extra-field values for one PV.
253/// Each map entry is `(field_name, stringified_value)`.
254type ExtraFieldsCache = DashMap<String, String>;
255
256/// Default capacity for the bounded sample channel.
257/// This limits memory usage when producers outpace the storage writer.
258/// At ~200 bytes per sample, 500K entries ≈ 100 MB worst-case.
259const SAMPLE_CHANNEL_CAPACITY: usize = 500_000;
260
261/// RAII guard that removes a key from `pending_archives` on drop,
262/// ensuring cleanup even if the owning future is cancelled.
263struct PendingGuard<'a> {
264    map: &'a DashMap<String, ()>,
265    key: String,
266}
267
268impl Drop for PendingGuard<'_> {
269    fn drop(&mut self) {
270        self.map.remove(&self.key);
271    }
272}
273
274/// Manages EPICS Channel Access + pvAccess connections and dispatches
275/// archived samples to storage.
276pub struct ChannelManager {
277    /// The CA client context.
278    ca_client: CaClient,
279    /// The PVA client context (lazily used only when a PV's registry
280    /// row carries `Protocol::Pva`).
281    pva_client: PvaClient,
282    /// Active channels: PV name → handle with cancellation.
283    channels: DashMap<String, PvHandle>,
284    /// PVs currently being archived (in-progress CA connect). Prevents TOCTOU races
285    /// where two concurrent `archive_pv` calls could double-subscribe the same PV.
286    pending_archives: DashMap<String, ()>,
287    /// Per-PV mutex serialising archive/pause/resume/stop/destroy on a single
288    /// PV. Without this, e.g. `pause_pv` racing with `resume_pv` can leave
289    /// the registry status and the channel map disagreeing.
290    op_locks: DashMap<String, Arc<tokio::sync::Mutex<()>>>,
291    /// Storage backend.
292    #[allow(dead_code)]
293    storage: Arc<dyn StoragePlugin>,
294    /// PV metadata registry.
295    registry: Arc<PvRegistry>,
296    /// Sample sender for the write thread.
297    sample_tx: mpsc::Sender<PvSample>,
298    /// Optional policy configuration.
299    policy: Option<PolicyConfig>,
300    /// Per-site IOC drift bound (Java parity 6538631).
301    server_ioc_drift_secs: u64,
302}
303
304/// A sample ready to be written to storage.
305pub struct PvSample {
306    pub pv_name: String,
307    pub dbr_type: ArchDbType,
308    pub sample: ArchiverSample,
309    pub element_count: Option<i32>,
310    /// Counter handle used by the write loop to record timestamp /
311    /// type-change drops. None on samples produced before counter
312    /// support was wired up — write_loop tolerates the absence.
313    pub counters: Option<Arc<PvCounters>>,
314}
315
316impl ChannelManager {
317    pub async fn new(
318        storage: Arc<dyn StoragePlugin>,
319        registry: Arc<PvRegistry>,
320        policy: Option<PolicyConfig>,
321    ) -> anyhow::Result<(Self, mpsc::Receiver<PvSample>)> {
322        Self::new_with_drift(storage, registry, policy, 30 * 60).await
323    }
324
325    /// Construct with an explicit IOC drift bound. Java parity (6538631):
326    /// keeps tests + sites that don't surface `EngineConfig` on the
327    /// existing default while letting the daemon plumb a configured value.
328    pub async fn new_with_drift(
329        storage: Arc<dyn StoragePlugin>,
330        registry: Arc<PvRegistry>,
331        policy: Option<PolicyConfig>,
332        server_ioc_drift_secs: u64,
333    ) -> anyhow::Result<(Self, mpsc::Receiver<PvSample>)> {
334        let ca_client = CaClient::new().await.map_err(|e| anyhow::anyhow!("{e}"))?;
335        let pva_client = PvaClient::new().map_err(|e| anyhow::anyhow!("{e}"))?;
336        let (tx, rx) = mpsc::channel(SAMPLE_CHANNEL_CAPACITY);
337
338        let mgr = Self {
339            ca_client,
340            pva_client,
341            channels: DashMap::new(),
342            pending_archives: DashMap::new(),
343            op_locks: DashMap::new(),
344            storage,
345            registry,
346            sample_tx: tx,
347            policy,
348            server_ioc_drift_secs,
349        };
350
351        Ok((mgr, rx))
352    }
353
354    /// Get-or-insert the per-PV operation mutex. The returned `Arc<Mutex>`
355    /// is what callers should `.lock().await` on; holding the entry guard
356    /// (via `entry().or_insert_with`) across the await would deadlock the
357    /// DashMap shard.
358    fn op_lock(&self, pv_name: &str) -> Arc<tokio::sync::Mutex<()>> {
359        if let Some(existing) = self.op_locks.get(pv_name) {
360            return existing.clone();
361        }
362        self.op_locks
363            .entry(pv_name.to_string())
364            .or_insert_with(|| Arc::new(tokio::sync::Mutex::new(())))
365            .clone()
366    }
367
368    /// Restore all active PVs from the registry (called on startup).
369    ///
370    /// `pvs_by_status(Active)` already filters out alias rows (they carry
371    /// `status='alias'`), but we re-check `alias_for.is_some()` here so a
372    /// future schema change can't silently re-introduce double-archiving
373    /// of the underlying IOC PV.
374    pub async fn restore_from_registry(&self) -> anyhow::Result<u64> {
375        let active_pvs = self.registry.pvs_by_status(PvStatus::Active)?;
376        let total = active_pvs.len() as u64;
377        info!(total, "Restoring PVs from registry");
378
379        let mut restored = 0u64;
380        for record in active_pvs {
381            if record.alias_for.is_some() {
382                warn!(
383                    pv = record.pv_name,
384                    target = record.alias_for.as_deref(),
385                    "Skipping alias row in restore; aliases are routed, not archived"
386                );
387                continue;
388            }
389            if let Err(e) = self.start_archiving_internal(&record).await {
390                warn!(pv = record.pv_name, "Failed to restore PV: {e}");
391                self.registry.set_status(&record.pv_name, PvStatus::Error)?;
392            } else {
393                restored += 1;
394            }
395        }
396        metrics::gauge!("archiver_pvs_active").set(restored as f64);
397        if restored < total {
398            warn!(
399                restored,
400                failed = total - restored,
401                "Some PVs failed to restore"
402            );
403        }
404
405        Ok(restored)
406    }
407
408    /// Start archiving a new PV.
409    pub async fn archive_pv(
410        &self,
411        pv_name: &str,
412        sample_mode: &SampleMode,
413        protocol: Protocol,
414    ) -> anyhow::Result<()> {
415        // Serialise with pause/resume/stop/destroy on the same PV.
416        let lock = self.op_lock(pv_name);
417        let _g = lock.lock().await;
418
419        if self.channels.contains_key(pv_name) {
420            anyhow::bail!("PV {pv_name} is already being archived");
421        }
422
423        // Atomically claim the PV to prevent concurrent archive_pv races.
424        // The guard ensures cleanup even if this future is cancelled.
425        if self
426            .pending_archives
427            .insert(pv_name.to_string(), ())
428            .is_some()
429        {
430            anyhow::bail!("PV {pv_name} archive operation already in progress");
431        }
432        let _guard = PendingGuard {
433            map: &self.pending_archives,
434            key: pv_name.to_string(),
435        };
436
437        match protocol {
438            Protocol::Ca => self.archive_pv_inner(pv_name, sample_mode).await,
439            Protocol::Pva => self.archive_pv_inner_pva(pv_name, sample_mode).await,
440        }
441    }
442
443    /// Inner implementation of archive_pv, separated for cleanup safety.
444    async fn archive_pv_inner(
445        &self,
446        pv_name: &str,
447        sample_mode: &SampleMode,
448    ) -> anyhow::Result<()> {
449        // Re-check after acquiring the pending slot (another task may have completed).
450        if self.channels.contains_key(pv_name) {
451            anyhow::bail!("PV {pv_name} is already being archived");
452        }
453
454        // Check policy override.
455        let (effective_mode, matched_policy_name) = if let Some(ref policy) = self.policy {
456            if let Some(p) = policy.find_policy(pv_name) {
457                (p.to_sample_mode(), Some(p.policy_name().to_string()))
458            } else {
459                (sample_mode.clone(), None)
460            }
461        } else {
462            (sample_mode.clone(), None)
463        };
464
465        // Connect to discover the native type.
466        let channel = self.ca_client.create_channel(pv_name);
467        channel
468            .wait_connected(CA_CONNECT_TIMEOUT)
469            .await
470            .map_err(|e| anyhow::anyhow!("Failed to connect to {pv_name}: {e}"))?;
471
472        let info = self
473            .ca_client
474            .cainfo(pv_name)
475            .await
476            .map_err(|e| anyhow::anyhow!("Failed to get info for {pv_name}: {e}"))?;
477
478        let dbr_type = dbr_field_to_arch_type(info.native_type);
479        let element_count = info.element_count as i32;
480
481        // Register in SQLite. (CA path — PVA acquisition uses a separate
482        // entry point, so this branch is always Protocol::Ca.)
483        self.registry
484            .register_pv(pv_name, dbr_type, &effective_mode, element_count)?;
485        // Java parity (b30f1a6): persist the matched policy's stable name so
486        // audit / metrics paths know which policy governed this archive.
487        if let Some(ref name) = matched_policy_name {
488            self.registry.update_policy_name(pv_name, Some(name))?;
489        }
490
491        let record = self
492            .registry
493            .get_pv(pv_name)?
494            .ok_or_else(|| anyhow::anyhow!("PV {pv_name} not found in registry"))?;
495        self.start_archiving_internal(&record).await?;
496
497        metrics::gauge!("archiver_pvs_active").increment(1.0);
498        info!(pv = pv_name, ?dbr_type, element_count, "Started archiving");
499        Ok(())
500    }
501
502    /// PVA equivalent of [`Self::archive_pv_inner`]. Connects via the
503    /// pvAccess client, infers archive type from a one-shot pvget, then
504    /// hands off to a `monitor_loop_pva` that mirrors the CA monitor
505    /// loop's lifecycle (samples → write_loop, cancel_token shutdown).
506    async fn archive_pv_inner_pva(
507        &self,
508        pv_name: &str,
509        sample_mode: &SampleMode,
510    ) -> anyhow::Result<()> {
511        if self.channels.contains_key(pv_name) {
512            anyhow::bail!("PV {pv_name} is already being archived");
513        }
514
515        // Apply policy override (same surface as CA path).
516        let (effective_mode, matched_policy_name) = if let Some(ref policy) = self.policy {
517            if let Some(p) = policy.find_policy(pv_name) {
518                (p.to_sample_mode(), Some(p.policy_name().to_string()))
519            } else {
520                (sample_mode.clone(), None)
521            }
522        } else {
523            (sample_mode.clone(), None)
524        };
525
526        // Connect + introspect: a one-shot pvget tells us the value
527        // shape. We rely on it instead of a `cainfo` analogue because
528        // PVA channels carry their type only via fetched data.
529        let connect = tokio::time::timeout(
530            CA_CONNECT_TIMEOUT,
531            self.pva_client.pvconnect(pv_name),
532        )
533        .await
534        .map_err(|_| anyhow::anyhow!("PVA connect to {pv_name} timed out"))?
535        .map_err(|e| anyhow::anyhow!("Failed to connect to {pv_name} via PVA: {e}"))?;
536        debug!(pv = pv_name, server = %connect, "PVA channel connected");
537
538        let initial = tokio::time::timeout(CA_CONNECT_TIMEOUT, self.pva_client.pvget(pv_name))
539            .await
540            .map_err(|_| anyhow::anyhow!("PVA pvget for {pv_name} timed out"))?
541            .map_err(|e| anyhow::anyhow!("Failed to pvget {pv_name}: {e}"))?;
542        let (dbr_type, element_count) = pv_field_to_arch_db_type(&initial).ok_or_else(|| {
543            anyhow::anyhow!(
544                "PV {pv_name}: PVA value is not a scalar/scalar-array; structured types not yet supported"
545            )
546        })?;
547
548        // Register with Pva flag.
549        self.registry.register_pv_with_protocol(
550            pv_name,
551            dbr_type,
552            &effective_mode,
553            element_count,
554            Protocol::Pva,
555        )?;
556        if let Some(ref name) = matched_policy_name {
557            self.registry.update_policy_name(pv_name, Some(name))?;
558        }
559        // Persist PREC/EGU extracted from the same pvget — equivalent
560        // to the CA path's DBR_CTRL refresh. Non-fatal on error.
561        let (prec, egu) = pv_field_extract_display(&initial);
562        if (prec.is_some() || egu.is_some())
563            && let Err(e) =
564                self.registry
565                    .update_metadata(pv_name, prec.as_deref(), egu.as_deref())
566        {
567            debug!(pv = pv_name, "Failed to persist PVA PREC/EGU: {e}");
568        }
569
570        let record = self
571            .registry
572            .get_pv(pv_name)?
573            .ok_or_else(|| anyhow::anyhow!("PV {pv_name} not found in registry"))?;
574        self.start_archiving_internal(&record).await?;
575
576        metrics::gauge!("archiver_pvs_active").increment(1.0);
577        info!(
578            pv = pv_name,
579            ?dbr_type,
580            element_count,
581            protocol = "pva",
582            "Started archiving"
583        );
584        Ok(())
585    }
586
587    /// Internal: start a subscription for a PV record. Routes by
588    /// `record.protocol`; the CA branch keeps the historical behaviour,
589    /// the PVA branch goes through `start_archiving_internal_pva`.
590    async fn start_archiving_internal(&self, record: &PvRecord) -> anyhow::Result<()> {
591        if record.protocol == Protocol::Pva {
592            return self.start_archiving_internal_pva(record).await;
593        }
594        let pv_name = record.pv_name.clone();
595        let dbr_type = record.dbr_type;
596        let element_count = record.element_count;
597        let channel = self.ca_client.create_channel(&pv_name);
598        let cancel_token = CancellationToken::new();
599        let conn_info = Arc::new(Mutex::new(ConnectionInfo::default()));
600        let extras: Arc<ExtraFieldsCache> = Arc::new(DashMap::new());
601        let field_tokens: Arc<DashMap<String, CancellationToken>> = Arc::new(DashMap::new());
602        let update_lock = Arc::new(tokio::sync::Mutex::new(()));
603        let counters = Arc::new(PvCounters::default());
604
605        // Hold update_lock around the whole insert+spawn block so a
606        // concurrent update_archive_fields can't observe the empty
607        // field_tokens map and spawn its own copies of the same fields
608        // before we get to the spawn loop. update_archive_fields acquires
609        // the same update_lock before mutating tokens.
610        let _guard = update_lock.lock().await;
611
612        self.channels.insert(
613            pv_name.clone(),
614            PvHandle {
615                channel: Some(channel.clone()),
616                cancel_token: cancel_token.clone(),
617                dbr_type,
618                conn_info: conn_info.clone(),
619                extras: extras.clone(),
620                field_tokens: field_tokens.clone(),
621                update_lock: update_lock.clone(),
622                counters: counters.clone(),
623            },
624        );
625
626        // Start one monitor task per archive_field with a child cancel token,
627        // tracked so update_archive_fields can stop individual fields.
628        for field in &record.archive_fields {
629            let child = cancel_token.child_token();
630            field_tokens.insert(field.clone(), child.clone());
631            spawn_extra_field_monitor(
632                &self.ca_client,
633                &pv_name,
634                field,
635                extras.clone(),
636                child,
637                counters.clone(),
638            );
639        }
640        metrics::gauge!("archiver_extra_field_tasks").increment(record.archive_fields.len() as f64);
641        drop(_guard);
642
643        let tx = self.sample_tx.clone();
644        let token = cancel_token.clone();
645        let ci = conn_info.clone();
646        let extras_for_loop = extras.clone();
647        let counters_for_loop = counters.clone();
648        let registry_for_loop = self.registry.clone();
649
650        let drift = self.server_ioc_drift_secs;
651        match &record.sample_mode {
652            SampleMode::Monitor => {
653                tokio::spawn(async move {
654                    monitor_loop(
655                        pv_name,
656                        dbr_type,
657                        element_count,
658                        channel,
659                        tx,
660                        token,
661                        ci,
662                        registry_for_loop,
663                        extras_for_loop,
664                        counters_for_loop,
665                        drift,
666                    )
667                    .await;
668                });
669            }
670            SampleMode::Scan { period_secs } => {
671                let period = *period_secs;
672                tokio::spawn(async move {
673                    scan_loop(
674                        pv_name,
675                        dbr_type,
676                        element_count,
677                        channel,
678                        tx,
679                        token,
680                        period,
681                        ci,
682                        registry_for_loop,
683                        extras_for_loop,
684                        counters_for_loop,
685                    )
686                    .await;
687                });
688            }
689        }
690
691        Ok(())
692    }
693
694    /// PVA equivalent of `start_archiving_internal`. Spawns a single
695    /// callback-driven monitor task; pvAccess auto-reconnect inside
696    /// `pvmonitor_handle` removes the explicit reconnect loop the CA
697    /// path needs. PVA records use `channel: None` on the [`PvHandle`]
698    /// and skip per-field "extras" subscriptions (`.HIHI`/`.LOLO`/…)
699    /// since pvAccess wraps those in the NTScalar value structure
700    /// rather than separate channels.
701    async fn start_archiving_internal_pva(&self, record: &PvRecord) -> anyhow::Result<()> {
702        let pv_name = record.pv_name.clone();
703        let dbr_type = record.dbr_type;
704        let element_count = record.element_count;
705        let cancel_token = CancellationToken::new();
706        let conn_info = Arc::new(Mutex::new(ConnectionInfo::default()));
707        let extras: Arc<ExtraFieldsCache> = Arc::new(DashMap::new());
708        let field_tokens: Arc<DashMap<String, CancellationToken>> = Arc::new(DashMap::new());
709        let update_lock = Arc::new(tokio::sync::Mutex::new(()));
710        let counters = Arc::new(PvCounters::default());
711
712        self.channels.insert(
713            pv_name.clone(),
714            PvHandle {
715                channel: None,
716                cancel_token: cancel_token.clone(),
717                dbr_type,
718                conn_info: conn_info.clone(),
719                extras: extras.clone(),
720                field_tokens: field_tokens.clone(),
721                update_lock,
722                counters: counters.clone(),
723            },
724        );
725
726        let tx = self.sample_tx.clone();
727        let pva_client = self.pva_client.clone();
728        let token = cancel_token.clone();
729        let ci = conn_info.clone();
730        let counters_for_loop = counters.clone();
731        let drift = self.server_ioc_drift_secs;
732
733        match &record.sample_mode {
734            SampleMode::Monitor => {
735                let pv_name_loop = pv_name.clone();
736                let archive_fields_loop = record.archive_fields.clone();
737                let extras_for_loop = extras.clone();
738                tokio::spawn(async move {
739                    monitor_loop_pva(
740                        pv_name_loop,
741                        dbr_type,
742                        element_count,
743                        pva_client,
744                        tx,
745                        token,
746                        ci,
747                        counters_for_loop,
748                        drift,
749                        archive_fields_loop,
750                        extras_for_loop,
751                    )
752                    .await;
753                });
754            }
755            SampleMode::Scan { period_secs } => {
756                let period = *period_secs;
757                let pv_name_loop = pv_name.clone();
758                let archive_fields_loop = record.archive_fields.clone();
759                let extras_for_loop = extras.clone();
760                tokio::spawn(async move {
761                    scan_loop_pva(
762                        pv_name_loop,
763                        dbr_type,
764                        pva_client,
765                        tx,
766                        token,
767                        period,
768                        ci,
769                        counters_for_loop,
770                        drift,
771                        archive_fields_loop,
772                        extras_for_loop,
773                    )
774                    .await;
775                });
776            }
777        }
778
779        // Auxiliary periodic refreshers: keep PREC/EGU current after
780        // IOC restarts, and approximate disconnect events from a long
781        // sample gap. Both bind to the same cancel_token so a stop /
782        // delete reaps every PVA-side task in one go.
783        {
784            let pv_name = pv_name.clone();
785            let pva_client = self.pva_client.clone();
786            let registry = self.registry.clone();
787            let cancel = cancel_token.clone();
788            let counters_for_refresh = counters.clone();
789            tokio::spawn(async move {
790                pva_metadata_refresh_loop(
791                    pv_name,
792                    pva_client,
793                    registry,
794                    cancel,
795                    counters_for_refresh,
796                )
797                .await;
798            });
799        }
800        {
801            let pv_name = pv_name.clone();
802            let conn_info = conn_info.clone();
803            let counters_for_watch = counters.clone();
804            let cancel = cancel_token.clone();
805            let sample_mode = record.sample_mode.clone();
806            tokio::spawn(async move {
807                pva_state_watchdog(
808                    pv_name,
809                    conn_info,
810                    counters_for_watch,
811                    cancel,
812                    sample_mode,
813                )
814                .await;
815            });
816        }
817
818        Ok(())
819    }
820
821    /// Replace the archive_fields list for a running PV. Cancels per-field
822    /// monitor tasks for fields that left the set, spawns fresh ones for
823    /// fields that joined, and leaves unchanged fields running. Serialised
824    /// per-PV by an async mutex so concurrent callers can't double-spawn.
825    /// The main PV keeps running.
826    pub async fn update_archive_fields(
827        &self,
828        pv_name: &str,
829        fields: &[String],
830    ) -> anyhow::Result<()> {
831        // Persist first so a restart sees the new set even if the engine
832        // half of the update fails partway through.
833        self.registry.update_archive_fields(pv_name, fields)?;
834
835        // If the PV isn't currently active there's nothing more to do —
836        // start_archiving_internal will pick up the new fields on resume.
837        let (parent_token, extras, field_tokens, update_lock, counters) = {
838            let Some(handle) = self.channels.get(pv_name) else {
839                return Ok(());
840            };
841            (
842                handle.cancel_token.clone(),
843                handle.extras.clone(),
844                handle.field_tokens.clone(),
845                handle.update_lock.clone(),
846                handle.counters.clone(),
847            )
848        };
849
850        // Serialise so two concurrent updates can't both decide the same
851        // field is missing and spawn it twice.
852        let _guard = update_lock.lock().await;
853
854        let wanted: std::collections::HashSet<&str> = fields.iter().map(|s| s.as_str()).collect();
855
856        // Cancel + drop tasks for fields that left the set. Removing the
857        // entry from `field_tokens` also drops our handle on the child
858        // token; the spawned task observes `cancelled()` and exits.
859        let to_remove: Vec<String> = field_tokens
860            .iter()
861            .filter(|e| !wanted.contains(e.key().as_str()))
862            .map(|e| e.key().clone())
863            .collect();
864        let removed_count = to_remove.len();
865        for key in to_remove {
866            if let Some((_, token)) = field_tokens.remove(&key) {
867                token.cancel();
868            }
869            extras.remove(&key);
870        }
871
872        // Spawn tasks for fields newly added. Existing fields keep their
873        // task and their last cached value.
874        let mut added_count = 0usize;
875        for f in fields {
876            if !field_tokens.contains_key(f) {
877                let child = parent_token.child_token();
878                field_tokens.insert(f.clone(), child.clone());
879                spawn_extra_field_monitor(
880                    &self.ca_client,
881                    pv_name,
882                    f,
883                    extras.clone(),
884                    child,
885                    counters.clone(),
886                );
887                added_count += 1;
888            }
889        }
890        let net = added_count as i64 - removed_count as i64;
891        if net != 0 {
892            metrics::gauge!("archiver_extra_field_tasks").increment(net as f64);
893        }
894        Ok(())
895    }
896
897    /// Pause archiving for a PV.
898    pub async fn pause_pv(&self, pv_name: &str) -> anyhow::Result<()> {
899        let lock = self.op_lock(pv_name);
900        let _g = lock.lock().await;
901        if let Some((_key, handle)) = self.channels.remove(pv_name) {
902            let extra_count = handle.field_tokens.len() as f64;
903            handle.cancel_token.cancel();
904            metrics::gauge!("archiver_pvs_active").decrement(1.0);
905            if extra_count > 0.0 {
906                metrics::gauge!("archiver_extra_field_tasks").decrement(extra_count);
907            }
908        }
909        self.registry.set_status(pv_name, PvStatus::Paused)?;
910        info!(pv = pv_name, "Paused archiving");
911        Ok(())
912    }
913
914    /// Resume a paused PV. Only paused or error PVs can be resumed;
915    /// calling resume on an already-active PV is a no-op (returns Ok).
916    pub async fn resume_pv(&self, pv_name: &str) -> anyhow::Result<()> {
917        let lock = self.op_lock(pv_name);
918        let _g = lock.lock().await;
919
920        let record = self
921            .registry
922            .get_pv(pv_name)?
923            .ok_or_else(|| anyhow::anyhow!("PV {pv_name} not found in registry"))?;
924
925        // Guard: if already active with a live task, nothing to do.
926        if record.status == PvStatus::Active && self.channels.contains_key(pv_name) {
927            info!(
928                pv = pv_name,
929                "PV is already actively archived, skipping resume"
930            );
931            return Ok(());
932        }
933
934        // Cancel any orphaned task to prevent duplicate subscriptions.
935        if let Some((_key, handle)) = self.channels.remove(pv_name) {
936            let extra_count = handle.field_tokens.len() as f64;
937            handle.cancel_token.cancel();
938            if extra_count > 0.0 {
939                metrics::gauge!("archiver_extra_field_tasks").decrement(extra_count);
940            }
941        }
942
943        // Start the task first; only mark Active if it succeeds.
944        self.start_archiving_internal(&record).await?;
945        self.registry.set_status(pv_name, PvStatus::Active)?;
946        metrics::gauge!("archiver_pvs_active").increment(1.0);
947        info!(pv = pv_name, "Resumed archiving");
948        Ok(())
949    }
950
951    /// Stop archiving a PV without removing it from the registry.
952    /// Sets the PV status to Inactive (data retained, monitoring stopped).
953    pub async fn stop_pv(&self, pv_name: &str) -> anyhow::Result<()> {
954        let lock = self.op_lock(pv_name);
955        let _g = lock.lock().await;
956        if let Some((_key, handle)) = self.channels.remove(pv_name) {
957            let extra_count = handle.field_tokens.len() as f64;
958            handle.cancel_token.cancel();
959            metrics::gauge!("archiver_pvs_active").decrement(1.0);
960            if extra_count > 0.0 {
961                metrics::gauge!("archiver_extra_field_tasks").decrement(extra_count);
962            }
963        }
964        self.registry.set_status(pv_name, PvStatus::Inactive)?;
965        info!(pv = pv_name, "Stopped archiving (inactive)");
966        Ok(())
967    }
968
969    /// Remove a PV from archiving entirely.
970    pub async fn destroy_pv(&self, pv_name: &str) -> anyhow::Result<()> {
971        let lock = self.op_lock(pv_name);
972        let _g = lock.lock().await;
973        if let Some((_key, handle)) = self.channels.remove(pv_name) {
974            let extra_count = handle.field_tokens.len() as f64;
975            handle.cancel_token.cancel();
976            metrics::gauge!("archiver_pvs_active").decrement(1.0);
977            if extra_count > 0.0 {
978                metrics::gauge!("archiver_extra_field_tasks").decrement(extra_count);
979            }
980        }
981        self.registry.remove_pv(pv_name)?;
982        // Don't remove the op_locks entry here: a concurrent caller may have
983        // already taken a clone of this Arc and be queued on it; removing
984        // would let a fresh caller obtain a new mutex and the queued one
985        // would race them. The map is bounded by the lifetime universe of
986        // PV names, which is acceptable.
987        info!(pv = pv_name, "Destroyed archiving channel");
988        Ok(())
989    }
990
991    /// List all currently archived PV names (from registry).
992    pub fn list_pvs(&self) -> Vec<String> {
993        self.registry.all_pv_names().unwrap_or_else(|e| {
994            warn!("Failed to list PVs: {e}");
995            Vec::new()
996        })
997    }
998
999    /// Match PVs by glob pattern (from registry).
1000    pub fn matching_pvs(&self, pattern: &str) -> Vec<String> {
1001        self.registry.matching_pvs(pattern).unwrap_or_else(|e| {
1002            warn!("Failed to match PVs: {e}");
1003            Vec::new()
1004        })
1005    }
1006
1007    /// Get the registry reference.
1008    pub fn registry(&self) -> &Arc<PvRegistry> {
1009        &self.registry
1010    }
1011
1012    /// Get connection info for a PV.
1013    pub fn get_connection_info(&self, pv: &str) -> Option<ConnectionInfo> {
1014        self.channels.get(pv).map(|h| {
1015            h.conn_info
1016                .lock()
1017                .unwrap_or_else(|e| e.into_inner())
1018                .clone()
1019        })
1020    }
1021
1022    /// Get PV names that have never received any event (connected_since == None).
1023    pub fn get_never_connected_pvs(&self) -> Vec<String> {
1024        self.channels
1025            .iter()
1026            .filter(|entry| {
1027                let ci = entry
1028                    .value()
1029                    .conn_info
1030                    .lock()
1031                    .unwrap_or_else(|e| e.into_inner());
1032                ci.connected_since.is_none()
1033            })
1034            .map(|entry| entry.key().clone())
1035            .collect()
1036    }
1037
1038    /// Snapshot the diagnostic counters for one PV. Returns None if the
1039    /// PV isn't actively archived. The returned Arc is the live counter
1040    /// — callers read with `Ordering::Relaxed`.
1041    pub fn pv_counters(&self, pv_name: &str) -> Option<Arc<PvCounters>> {
1042        self.channels.get(pv_name).map(|h| h.counters.clone())
1043    }
1044
1045    /// Snapshot every active PV's counters. Returns `(pv_name,
1046    /// PvCountersSnapshot)` so callers don't have to handle Arc.
1047    pub fn all_pv_counters(&self) -> Vec<(String, PvCountersSnapshot)> {
1048        self.channels
1049            .iter()
1050            .map(|e| {
1051                (
1052                    e.key().clone(),
1053                    PvCountersSnapshot::from(&*e.value().counters),
1054                )
1055            })
1056            .collect()
1057    }
1058
1059    /// One-shot CA `get` against the running channel for `pv`. Returns
1060    /// `None` if the PV isn't actively archived. The timeout caps how
1061    /// long the caller will wait for a value when the IOC is slow.
1062    /// Powers `getEngineDataAction` / `getDataAtTimeEngine`.
1063    pub async fn live_value(
1064        &self,
1065        pv_name: &str,
1066        timeout: Duration,
1067    ) -> Option<anyhow::Result<ArchiverValue>> {
1068        let channel_opt = self.channels.get(pv_name)?.channel.clone();
1069        let Some(channel) = channel_opt else {
1070            // PVA-acquired PV — live_value via pva_client.pvget.
1071            let pva = self.pva_client.clone();
1072            let name = pv_name.to_string();
1073            let res = tokio::time::timeout(timeout, pva.pvget(&name)).await;
1074            return Some(match res {
1075                Ok(Ok(field)) => pv_field_scalar_to_archiver(&field)
1076                    .ok_or_else(|| anyhow::anyhow!("PVA value not a scalar")),
1077                Ok(Err(e)) => Err(anyhow::anyhow!("PVA get failed: {e}")),
1078                Err(_) => Err(anyhow::anyhow!("PVA get timed out after {timeout:?}")),
1079            });
1080        };
1081        // Wait briefly for connection — channel.get on a disconnected
1082        // channel would otherwise return an error from deep in the CA
1083        // stack. Capped by the same timeout the caller chose.
1084        if channel.wait_connected(timeout).await.is_err() {
1085            return Some(Err(anyhow::anyhow!(
1086                "channel not connected within {timeout:?}"
1087            )));
1088        }
1089        match tokio::time::timeout(timeout, channel.get()).await {
1090            Ok(Ok((_dbr_type, val))) => Some(Ok(epics_value_to_archiver(&val))),
1091            Ok(Err(e)) => Some(Err(anyhow::anyhow!("CA get failed: {e}"))),
1092            Err(_) => Some(Err(anyhow::anyhow!("CA get timed out after {timeout:?}"))),
1093        }
1094    }
1095
1096    /// Snapshot the latest cached extra-field values for `pv` —
1097    /// `(field_name, stringified_value)` pairs. Empty map when the PV
1098    /// isn't archived or has no archive_fields configured.
1099    pub fn extras_snapshot(&self, pv_name: &str) -> std::collections::HashMap<String, String> {
1100        match self.channels.get(pv_name) {
1101            Some(handle) => handle
1102                .extras
1103                .iter()
1104                .map(|e| (e.key().clone(), e.value().clone()))
1105                .collect(),
1106            None => std::collections::HashMap::new(),
1107        }
1108    }
1109
1110    /// Get PV names that are currently disconnected (is_connected == false).
1111    pub fn get_currently_disconnected_pvs(&self) -> Vec<String> {
1112        self.channels
1113            .iter()
1114            .filter(|entry| {
1115                let ci = entry
1116                    .value()
1117                    .conn_info
1118                    .lock()
1119                    .unwrap_or_else(|e| e.into_inner());
1120                !ci.is_connected
1121            })
1122            .map(|entry| entry.key().clone())
1123            .collect()
1124    }
1125}
1126
1127/// Read DBR_CTRL metadata from the channel and persist `precision`/`units`
1128/// to the registry when they differ from stored values. Best-effort: any
1129/// failure (timeout, transport error, missing display info, non-numeric
1130/// type) is logged at debug and silently ignored. Skips the SQL write
1131/// entirely when the in-memory record already matches, so reconnect
1132/// storms don't churn the database.
1133async fn refresh_ctrl_metadata(
1134    channel: &CaChannel,
1135    registry: &PvRegistry,
1136    pv_name: &str,
1137    counters: &PvCounters,
1138) {
1139    // 15s — long enough for slow-network IOCs (Java's default `epicsTimeout`
1140    // is 30s; 15s splits the difference). Failures count toward
1141    // `metadata_fetch_failures` so operators can spot PVs that never get
1142    // PREC/EGU populated.
1143    const FETCH_TIMEOUT: Duration = Duration::from_secs(15);
1144    let snapshot = match tokio::time::timeout(
1145        FETCH_TIMEOUT,
1146        channel.get_with_metadata(DbrClass::Ctrl),
1147    )
1148    .await
1149    {
1150        Ok(Ok(s)) => s,
1151        Ok(Err(e)) => {
1152            counters
1153                .metadata_fetch_failures
1154                .fetch_add(1, Ordering::Relaxed);
1155            debug!(pv = pv_name, "Ctrl metadata fetch failed: {e}");
1156            return;
1157        }
1158        Err(_) => {
1159            counters
1160                .metadata_fetch_failures
1161                .fetch_add(1, Ordering::Relaxed);
1162            debug!(pv = pv_name, "Ctrl metadata fetch timed out");
1163            return;
1164        }
1165    };
1166    let Some(display) = snapshot.display else {
1167        // Non-numeric type (string, enum, …) — DBR_CTRL has no
1168        // DisplayInfo. Not a failure; just nothing to persist.
1169        return;
1170    };
1171
1172    // Negative precision is the Java EAA "no precision info" sentinel —
1173    // persisting "-1" as the PREC string would surface as a literal -1
1174    // in the UI / API. Treat it the same as missing.
1175    let new_prec_opt: Option<String> = if display.precision < 0 {
1176        None
1177    } else {
1178        Some(display.precision.to_string())
1179    };
1180    let new_egu_trimmed = display.units.trim();
1181    let new_egu_opt: Option<&str> = if new_egu_trimmed.is_empty() {
1182        None
1183    } else {
1184        Some(new_egu_trimmed)
1185    };
1186
1187    let stored = match registry.get_pv(pv_name) {
1188        Ok(Some(r)) => r,
1189        Ok(None) => return,
1190        Err(e) => {
1191            debug!(pv = pv_name, "Registry read for metadata compare failed: {e}");
1192            return;
1193        }
1194    };
1195
1196    let prec_changed = match (stored.prec.as_deref(), new_prec_opt.as_deref()) {
1197        (Some(s), Some(n)) => s != n,
1198        (None, Some(_)) => true,
1199        // Don't overwrite a populated PREC with the "no info" sentinel.
1200        _ => false,
1201    };
1202    let egu_changed = match (stored.egu.as_deref(), new_egu_opt) {
1203        (Some(s), Some(n)) => s != n,
1204        (None, Some(_)) => true,
1205        // Don't overwrite a populated EGU with empty, and don't
1206        // bother writing None over None.
1207        _ => false,
1208    };
1209    if !prec_changed && !egu_changed {
1210        return;
1211    }
1212
1213    let prec_arg = if prec_changed {
1214        new_prec_opt.as_deref()
1215    } else {
1216        None
1217    };
1218    let egu_arg = if egu_changed { new_egu_opt } else { None };
1219    if let Err(e) = registry.update_metadata(pv_name, prec_arg, egu_arg) {
1220        warn!(pv = pv_name, "Failed to persist PREC/EGU: {e}");
1221    } else {
1222        debug!(
1223            pv = pv_name,
1224            prec = ?prec_arg, egu = ?egu_arg,
1225            "Refreshed PREC/EGU from DBR_CTRL"
1226        );
1227    }
1228}
1229
1230/// Body shared by the two PVA monitor callback variants
1231/// (`pvmonitor_handle` takes `(FieldDesc, PvField)`,
1232/// `pvmonitor_with_request` takes `(PvField)`). Lifted out so we
1233/// can write the once-per-event logic in one place.
1234#[allow(clippy::too_many_arguments)]
1235fn pva_handle_event(
1236    field: &PvField,
1237    pv_name: &str,
1238    dbr_type: ArchDbType,
1239    tx: &mpsc::Sender<PvSample>,
1240    conn_info: &Mutex<ConnectionInfo>,
1241    counters: &Arc<PvCounters>,
1242    extras: &ExtraFieldsCache,
1243    extras_paths: &[(String, &'static str)],
1244    drift_secs: u64,
1245) {
1246    let now = SystemTime::now();
1247    let first_after_connect = {
1248        let mut ci = conn_info.lock().unwrap_or_else(|e| e.into_inner());
1249        let first = ci.last_event_time.is_none();
1250        if ci.connected_since.is_none() {
1251            ci.connected_since = Some(now);
1252        }
1253        ci.is_connected = true;
1254        ci.last_event_time = Some(now);
1255        ci.state = PvConnectionState::Connected;
1256        first
1257    };
1258    if first_after_connect {
1259        counters
1260            .first_event_unix_secs
1261            .compare_exchange(0, unix_secs(now), Ordering::Relaxed, Ordering::Relaxed)
1262            .ok();
1263    }
1264    counters.events_received.fetch_add(1, Ordering::Relaxed);
1265
1266    let Some(value) = pv_field_scalar_to_archiver(field) else {
1267        counters.type_change_drops.fetch_add(1, Ordering::Relaxed);
1268        debug!(pv = pv_name, "PVA event has non-scalar value; dropping");
1269        return;
1270    };
1271
1272    let ts = pv_field_extract_timestamp(field);
1273    if !first_after_connect && !ioc_timestamp_in_window(ts, now, drift_secs) {
1274        counters.timestamp_drops.fetch_add(1, Ordering::Relaxed);
1275        debug!(pv = pv_name, ?ts, "Dropping PVA sample with out-of-window timestamp");
1276        return;
1277    }
1278    let elem_count = match pv_field_element_count(field) {
1279        0 => 1,
1280        n => n,
1281    };
1282    for (field_name, path) in extras_paths {
1283        if let Some(PvField::Scalar(s)) = pv_field_walk_path(field, path) {
1284            extras.insert(field_name.clone(), scalar_value_to_string(s));
1285        }
1286    }
1287    let mut sample = ArchiverSample::new(ts, value);
1288    attach_extras(extras, &mut sample);
1289    let pv_sample = PvSample {
1290        pv_name: pv_name.to_string(),
1291        dbr_type,
1292        sample,
1293        element_count: Some(elem_count),
1294        counters: Some(counters.clone()),
1295    };
1296    if let Err(tokio::sync::mpsc::error::TrySendError::Full(_)) = tx.try_send(pv_sample) {
1297        counters.buffer_overflow_drops.fetch_add(1, Ordering::Relaxed);
1298    }
1299}
1300
1301/// PVA monitor loop: subscribes once and parks until cancellation.
1302/// Each fan-in event is decoded inline, packaged as a [`PvSample`],
1303/// and non-blocking-pushed into the storage write_loop's channel —
1304/// blocking would stall the pvAccess reactor thread.
1305///
1306/// When `archive_fields` is non-empty, the subscription requests an
1307/// explicit pvRequest with the mapped sub-field paths so the IOC
1308/// includes them in every monitor event; the callback then mirrors
1309/// the CA path's "extras" cache by stringifying each requested field.
1310#[allow(clippy::too_many_arguments)]
1311async fn monitor_loop_pva(
1312    pv_name: String,
1313    dbr_type: ArchDbType,
1314    element_count: i32,
1315    pva_client: PvaClient,
1316    tx: mpsc::Sender<PvSample>,
1317    cancel_token: CancellationToken,
1318    conn_info: Arc<Mutex<ConnectionInfo>>,
1319    counters: Arc<PvCounters>,
1320    server_ioc_drift_secs: u64,
1321    archive_fields: Vec<String>,
1322    extras: Arc<ExtraFieldsCache>,
1323) {
1324    let drift_secs = server_ioc_drift_secs;
1325    // Static element_count is unused — each callback derives the
1326    // current array length from the live PvField, so an NTScalarArray
1327    // whose size changes between events still gets tagged correctly.
1328    let _ = element_count;
1329
1330    // Build the (CA name, PVA path) pairs once. Skip CA fields with
1331    // no PVA equivalent so a misconfigured archive_fields list
1332    // doesn't poison the pvRequest.
1333    let extras_paths: Vec<(String, &'static str)> = archive_fields
1334        .iter()
1335        .filter_map(|f| ca_archive_field_to_pva_path(f).map(|p| (f.clone(), p)))
1336        .collect();
1337    let request_expr = if extras_paths.is_empty() {
1338        None
1339    } else {
1340        let mut builder = epics_rs::pva::pv_request::PvRequestBuilder::new()
1341            .field("value")
1342            .field("alarm")
1343            .field("timeStamp");
1344        for (_, path) in &extras_paths {
1345            builder = builder.field(*path);
1346        }
1347        Some(builder.build())
1348    };
1349
1350    // Subscribe loop with retry. The custom-request path uses
1351    // `pvmonitor_with_request` (no SubscriptionHandle) inside a
1352    // tokio::select! so cancellation drops the future cleanly; the
1353    // empty-request path keeps the original SubscriptionHandle form
1354    // since that's the simpler path for the common case.
1355    loop {
1356        if let Some(ref req) = request_expr {
1357            // Custom-request path: 1-arg callback. pvmonitor_with_request
1358            // returns a future that runs to completion; race it against
1359            // the cancel signal.
1360            let pv_name_cb = pv_name.clone();
1361            let tx_cb = tx.clone();
1362            let conn_info_cb = conn_info.clone();
1363            let counters_cb = counters.clone();
1364            let extras_cb = extras.clone();
1365            let extras_paths_cb = extras_paths.clone();
1366            let cb = move |field: &PvField| {
1367                pva_handle_event(
1368                    field,
1369                    &pv_name_cb,
1370                    dbr_type,
1371                    &tx_cb,
1372                    &conn_info_cb,
1373                    &counters_cb,
1374                    &extras_cb,
1375                    &extras_paths_cb,
1376                    drift_secs,
1377                );
1378            };
1379            tokio::select! {
1380                _ = cancel_token.cancelled() => {
1381                    debug!(pv = pv_name, "PVA monitor (custom request) cancelled");
1382                    return;
1383                }
1384                res = pva_client.pvmonitor_with_request(&pv_name, req, cb) => {
1385                    match res {
1386                        Ok(()) => {
1387                            debug!(pv = pv_name, "PVA pvmonitor_with_request returned Ok; resubscribing");
1388                        }
1389                        Err(e) => {
1390                            counters
1391                                .transient_error_count
1392                                .fetch_add(1, Ordering::Relaxed);
1393                            warn!(pv = pv_name, "PVA pvmonitor_with_request failed: {e}; retrying");
1394                        }
1395                    }
1396                    tokio::select! {
1397                        _ = cancel_token.cancelled() => return,
1398                        _ = tokio::time::sleep(CA_RETRY_DELAY) => continue,
1399                    }
1400                }
1401            }
1402        } else {
1403            // Empty-request fast path: 2-arg callback. Keep the
1404            // SubscriptionHandle so the inner reactor task lives
1405            // until our explicit drop.
1406            let pv_name_cb = pv_name.clone();
1407            let tx_cb = tx.clone();
1408            let conn_info_cb = conn_info.clone();
1409            let counters_cb = counters.clone();
1410            let extras_cb = extras.clone();
1411            let extras_paths_cb = extras_paths.clone();
1412            let cb = move |_desc: &epics_rs::pva::pvdata::FieldDesc, field: &PvField| {
1413                pva_handle_event(
1414                    field,
1415                    &pv_name_cb,
1416                    dbr_type,
1417                    &tx_cb,
1418                    &conn_info_cb,
1419                    &counters_cb,
1420                    &extras_cb,
1421                    &extras_paths_cb,
1422                    drift_secs,
1423                );
1424            };
1425            let handle = match pva_client.pvmonitor_handle(&pv_name, cb).await {
1426                Ok(h) => h,
1427                Err(e) => {
1428                    counters
1429                        .transient_error_count
1430                        .fetch_add(1, Ordering::Relaxed);
1431                    warn!(pv = pv_name, "PVA pvmonitor failed: {e}; retrying");
1432                    tokio::select! {
1433                        _ = cancel_token.cancelled() => return,
1434                        _ = tokio::time::sleep(CA_RETRY_DELAY) => continue,
1435                    }
1436                }
1437            };
1438            debug!(pv = pv_name, "PVA monitor active");
1439            cancel_token.cancelled().await;
1440            debug!(pv = pv_name, "PVA monitor cancelled; dropping subscription");
1441            drop(handle);
1442            return;
1443        }
1444    }
1445}
1446
1447/// PVA Scan loop: periodic `pvget` instead of streaming monitor.
1448/// Mirrors the CA scan_loop's per-tick connection check + sample emit.
1449#[allow(clippy::too_many_arguments)]
1450async fn scan_loop_pva(
1451    pv_name: String,
1452    dbr_type: ArchDbType,
1453    pva_client: PvaClient,
1454    tx: mpsc::Sender<PvSample>,
1455    cancel_token: CancellationToken,
1456    period_secs: f64,
1457    conn_info: Arc<Mutex<ConnectionInfo>>,
1458    counters: Arc<PvCounters>,
1459    server_ioc_drift_secs: u64,
1460    archive_fields: Vec<String>,
1461    extras: Arc<ExtraFieldsCache>,
1462) {
1463    let extras_paths: Vec<(String, &'static str)> = archive_fields
1464        .iter()
1465        .filter_map(|f| ca_archive_field_to_pva_path(f).map(|p| (f.clone(), p)))
1466        .collect();
1467    let period = Duration::from_secs_f64(period_secs);
1468    let mut interval = tokio::time::interval(period);
1469    let drift_secs = server_ioc_drift_secs;
1470    // Hard timeout per pvget — at most one tick of slack so a stuck
1471    // PVA server can't accumulate pending requests.
1472    let pvget_timeout = period.max(Duration::from_secs(5));
1473
1474    loop {
1475        tokio::select! {
1476            _ = cancel_token.cancelled() => return,
1477            _ = interval.tick() => {}
1478        }
1479
1480        let res = tokio::time::timeout(pvget_timeout, pva_client.pvget(&pv_name)).await;
1481        let field = match res {
1482            Ok(Ok(f)) => f,
1483            Ok(Err(e)) => {
1484                let was_connected = {
1485                    let mut ci = conn_info.lock().unwrap_or_else(|e| e.into_inner());
1486                    let prev = ci.is_connected;
1487                    ci.is_connected = false;
1488                    ci.last_event_time = None;
1489                    ci.state = match ci.state {
1490                        PvConnectionState::Connected => PvConnectionState::Disconnected,
1491                        PvConnectionState::Disconnected => PvConnectionState::Disconnected,
1492                        _ => PvConnectionState::Connecting,
1493                    };
1494                    prev
1495                };
1496                if was_connected {
1497                    counters.disconnect_count.fetch_add(1, Ordering::Relaxed);
1498                    counters
1499                        .last_disconnect_unix_secs
1500                        .store(unix_secs(SystemTime::now()), Ordering::Relaxed);
1501                }
1502                counters
1503                    .transient_error_count
1504                    .fetch_add(1, Ordering::Relaxed);
1505                debug!(pv = pv_name, "PVA scan pvget failed: {e}");
1506                continue;
1507            }
1508            Err(_) => {
1509                counters
1510                    .transient_error_count
1511                    .fetch_add(1, Ordering::Relaxed);
1512                debug!(pv = pv_name, "PVA scan pvget timed out");
1513                continue;
1514            }
1515        };
1516
1517        let now = SystemTime::now();
1518        let first_after_connect = {
1519            let mut ci = conn_info.lock().unwrap_or_else(|e| e.into_inner());
1520            let first = ci.last_event_time.is_none();
1521            if ci.connected_since.is_none() {
1522                ci.connected_since = Some(now);
1523            }
1524            ci.is_connected = true;
1525            ci.last_event_time = Some(now);
1526            ci.state = PvConnectionState::Connected;
1527            first
1528        };
1529        counters.events_received.fetch_add(1, Ordering::Relaxed);
1530        if first_after_connect {
1531            counters
1532                .first_event_unix_secs
1533                .compare_exchange(0, unix_secs(now), Ordering::Relaxed, Ordering::Relaxed)
1534                .ok();
1535        }
1536
1537        let Some(value) = pv_field_scalar_to_archiver(&field) else {
1538            counters.type_change_drops.fetch_add(1, Ordering::Relaxed);
1539            debug!(pv = pv_name, "PVA scan returned non-scalar value; dropping");
1540            continue;
1541        };
1542        // Scan mode: timestamp the sample at receive time (no IOC
1543        // timestamp available reliably for periodic pvget, mirroring
1544        // CA scan_loop's `now` policy).
1545        let _ = drift_secs; // referenced for symmetry; not used here
1546        let elem_count = match pv_field_element_count(&field) {
1547            0 => 1,
1548            n => n,
1549        };
1550        // Refresh extras from this scan's pvget result (the pvget
1551        // returns the full NTScalar so all sub-fields are available
1552        // for free — no extra channel spawn needed).
1553        for (field_name, path) in &extras_paths {
1554            if let Some(PvField::Scalar(s)) = pv_field_walk_path(&field, path) {
1555                extras.insert(field_name.clone(), scalar_value_to_string(s));
1556            }
1557        }
1558        let mut sample = ArchiverSample::new(now, value);
1559        attach_extras(&extras, &mut sample);
1560        let pv_sample = PvSample {
1561            pv_name: pv_name.clone(),
1562            dbr_type,
1563            sample,
1564            element_count: Some(elem_count),
1565            counters: Some(counters.clone()),
1566        };
1567        if let Err(rejected) =
1568            try_send_with_overflow_count(&tx, pv_sample, &counters).await
1569        {
1570            // Channel closed (write_loop down). Cooperative shutdown.
1571            let _ = rejected;
1572            return;
1573        }
1574    }
1575}
1576
1577/// Periodic refresh task for PVA-acquired PVs: re-fetches DBR_CTRL-
1578/// equivalent metadata (`display.units`, `display.precision`) every
1579/// few minutes and persists when changed. PVA's monitor callback API
1580/// doesn't surface explicit reconnect events the way CA's
1581/// `ConnectionEvent` does, so a dedicated periodic poll is the
1582/// simplest way to keep PREC/EGU fresh after IOC restarts.
1583async fn pva_metadata_refresh_loop(
1584    pv_name: String,
1585    pva_client: PvaClient,
1586    registry: Arc<PvRegistry>,
1587    cancel_token: CancellationToken,
1588    counters: Arc<PvCounters>,
1589) {
1590    // 5 minutes — operators almost never change PREC/EGU at runtime;
1591    // shorter cadence wastes registry/network and longer leaves UI
1592    // stale across IOC restarts. Same magic number Java EAA uses
1593    // for its `metaFieldsRefresh` cron.
1594    const REFRESH_INTERVAL: Duration = Duration::from_secs(5 * 60);
1595    const FETCH_TIMEOUT: Duration = Duration::from_secs(15);
1596    let mut tick = tokio::time::interval(REFRESH_INTERVAL);
1597    // Skip the immediate first tick — initial pvget already populated
1598    // PREC/EGU at archive_pv time.
1599    tick.tick().await;
1600    loop {
1601        tokio::select! {
1602            _ = cancel_token.cancelled() => return,
1603            _ = tick.tick() => {}
1604        }
1605
1606        let res = tokio::time::timeout(FETCH_TIMEOUT, pva_client.pvget(&pv_name)).await;
1607        let field = match res {
1608            Ok(Ok(f)) => f,
1609            _ => {
1610                counters
1611                    .metadata_fetch_failures
1612                    .fetch_add(1, Ordering::Relaxed);
1613                continue;
1614            }
1615        };
1616        let (new_prec, new_egu) = pv_field_extract_display(&field);
1617        let stored = match registry.get_pv(&pv_name) {
1618            Ok(Some(r)) => r,
1619            _ => continue,
1620        };
1621        let prec_changed = match (stored.prec.as_deref(), new_prec.as_deref()) {
1622            (Some(s), Some(n)) => s != n,
1623            (None, Some(_)) => true,
1624            _ => false,
1625        };
1626        let egu_changed = match (stored.egu.as_deref(), new_egu.as_deref()) {
1627            (Some(s), Some(n)) => s != n,
1628            (None, Some(_)) => true,
1629            _ => false,
1630        };
1631        if !prec_changed && !egu_changed {
1632            continue;
1633        }
1634        let prec_arg = if prec_changed { new_prec.as_deref() } else { None };
1635        let egu_arg = if egu_changed { new_egu.as_deref() } else { None };
1636        if let Err(e) = registry.update_metadata(&pv_name, prec_arg, egu_arg) {
1637            warn!(pv = pv_name, "Failed to persist PVA PREC/EGU: {e}");
1638        } else {
1639            debug!(pv = pv_name, prec = ?prec_arg, egu = ?egu_arg, "Refreshed PVA display");
1640        }
1641    }
1642}
1643
1644/// State watchdog for PVA-acquired PVs: a dedicated task that flips
1645/// `conn_info.state` to `Disconnected` when no event has arrived
1646/// within `STALE_THRESHOLD`. PVA's callback API doesn't surface
1647/// explicit channel-state changes — we approximate by treating a
1648/// long event gap as a disconnect. Trade-off: a genuinely silent PV
1649/// (alarm-only, low-rate scan) appears disconnected. Operators can
1650/// raise the threshold by env var if their PV cadence is slower.
1651async fn pva_state_watchdog(
1652    pv_name: String,
1653    conn_info: Arc<Mutex<ConnectionInfo>>,
1654    counters: Arc<PvCounters>,
1655    cancel_token: CancellationToken,
1656    sample_mode: SampleMode,
1657) {
1658    const POLL_INTERVAL: Duration = Duration::from_secs(5);
1659    // Threshold scales with the expected event cadence so a slow Scan
1660    // PV (period=300 s) doesn't flap on every tick. For Monitor mode
1661    // the floor is 60 s — most production PVs change at least that
1662    // often. Operators who archive truly silent PVs (alarm-only) will
1663    // see stale disconnected status; that's a known limitation.
1664    let stale_threshold = match sample_mode {
1665        SampleMode::Monitor => Duration::from_secs(60),
1666        SampleMode::Scan { period_secs } => {
1667            Duration::from_secs_f64((period_secs * 3.0).max(60.0))
1668        }
1669    };
1670    let mut interval = tokio::time::interval(POLL_INTERVAL);
1671    interval.tick().await;
1672    loop {
1673        tokio::select! {
1674            _ = cancel_token.cancelled() => return,
1675            _ = interval.tick() => {}
1676        }
1677        let stale_now = {
1678            let ci = conn_info.lock().unwrap_or_else(|e| e.into_inner());
1679            if !ci.is_connected {
1680                continue;
1681            }
1682            match ci.last_event_time {
1683                Some(t) => SystemTime::now()
1684                    .duration_since(t)
1685                    .map(|d| d > stale_threshold)
1686                    .unwrap_or(false),
1687                None => false,
1688            }
1689        };
1690        if stale_now {
1691            let was_connected = {
1692                let mut ci = conn_info.lock().unwrap_or_else(|e| e.into_inner());
1693                let prev = ci.is_connected;
1694                ci.is_connected = false;
1695                ci.state = PvConnectionState::Disconnected;
1696                prev
1697            };
1698            if was_connected {
1699                counters.disconnect_count.fetch_add(1, Ordering::Relaxed);
1700                counters
1701                    .last_disconnect_unix_secs
1702                    .store(unix_secs(SystemTime::now()), Ordering::Relaxed);
1703                debug!(
1704                    pv = pv_name,
1705                    "PVA watchdog: marking disconnected after stale heartbeat"
1706                );
1707            }
1708        }
1709    }
1710}
1711
1712/// Monitor loop: subscribe to a channel and forward values.
1713#[allow(clippy::too_many_arguments)]
1714async fn monitor_loop(
1715    pv_name: String,
1716    dbr_type: ArchDbType,
1717    element_count: i32,
1718    channel: CaChannel,
1719    tx: mpsc::Sender<PvSample>,
1720    cancel_token: CancellationToken,
1721    conn_info: Arc<Mutex<ConnectionInfo>>,
1722    registry: Arc<PvRegistry>,
1723    extras: Arc<ExtraFieldsCache>,
1724    counters: Arc<PvCounters>,
1725    server_ioc_drift_secs: u64,
1726) {
1727    loop {
1728        // Wait for connection, respecting cancellation.
1729        tokio::select! {
1730            _ = cancel_token.cancelled() => return,
1731            result = channel.wait_connected(CA_RECONNECT_TIMEOUT) => {
1732                if result.is_err() {
1733                    let was_connected = {
1734                        let mut ci = conn_info.lock().unwrap_or_else(|e| e.into_inner());
1735                        let prev_connected = ci.is_connected;
1736                        ci.is_connected = false;
1737                        ci.last_event_time = None;
1738                        // Connecting if we've never seen a connect, otherwise
1739                        // demote from Connected to Disconnected on a real loss.
1740                        ci.state = match ci.state {
1741                            PvConnectionState::Connected => PvConnectionState::Disconnected,
1742                            PvConnectionState::Disconnected => PvConnectionState::Disconnected,
1743                            _ => PvConnectionState::Connecting,
1744                        };
1745                        prev_connected
1746                    };
1747                    // A search-failure timeout that demotes us out of
1748                    // Connected counts as a fresh disconnect — without
1749                    // this, only the post-monitor `break` path bumps the
1750                    // counters and a flapping PV silently under-reports
1751                    // its drops while subsequent `attach_cnx_lost_headers`
1752                    // calls carry a stale `cnxlostepsecs`.
1753                    if was_connected {
1754                        counters.disconnect_count.fetch_add(1, Ordering::Relaxed);
1755                        counters
1756                            .last_disconnect_unix_secs
1757                            .store(unix_secs(SystemTime::now()), Ordering::Relaxed);
1758                    }
1759                    // Subscribe BEFORE re-checking the current state. If we
1760                    // subscribed after a state check, a Connected event
1761                    // firing in between would be lost forever, leaving this
1762                    // loop hung until the next disconnect/reconnect cycle.
1763                    let mut conn_rx = channel.connection_events();
1764
1765                    // Close the remaining race: the channel may have become
1766                    // connected between the outer `wait_connected` timeout
1767                    // and our `subscribe()` above, in which case no new event
1768                    // will arrive. A short re-probe catches that case.
1769                    if channel
1770                        .wait_connected(Duration::from_millis(100))
1771                        .await
1772                        .is_err()
1773                    {
1774                        loop {
1775                            tokio::select! {
1776                                _ = cancel_token.cancelled() => return,
1777                                event = conn_rx.recv() => {
1778                                    use tokio::sync::broadcast::error::RecvError;
1779                                    match event {
1780                                        Ok(ConnectionEvent::Connected) => break,
1781                                        Ok(_) => continue,
1782                                        // Lagged: we missed some events but
1783                                        // the channel is still live. Re-probe
1784                                        // state and otherwise keep waiting.
1785                                        Err(RecvError::Lagged(_)) => {
1786                                            if channel
1787                                                .wait_connected(Duration::from_millis(100))
1788                                                .await
1789                                                .is_ok()
1790                                            {
1791                                                break;
1792                                            }
1793                                            continue;
1794                                        }
1795                                        Err(RecvError::Closed) => return,
1796                                    }
1797                                }
1798                            }
1799                        }
1800                    }
1801                }
1802            }
1803        }
1804
1805        // Refresh DBR_CTRL metadata once per (re)connect so the registry's
1806        // PREC/EGU stay in sync with the IOC. Fire-and-forget so a slow
1807        // CA stack can't gate `subscribe()` (and therefore the first
1808        // sample) on a metadata round-trip. The spawned task is bound to
1809        // `cancel_token` so it terminates promptly when the PV is
1810        // stopped/deleted (otherwise a late metadata write could land on
1811        // a deleted-or-re-registered PV row).
1812        {
1813            let channel = channel.clone();
1814            let registry = registry.clone();
1815            let counters = counters.clone();
1816            let pv_name = pv_name.clone();
1817            let cancel = cancel_token.clone();
1818            tokio::spawn(async move {
1819                tokio::select! {
1820                    _ = cancel.cancelled() => {}
1821                    _ = refresh_ctrl_metadata(&channel, &registry, &pv_name, &counters) => {}
1822                }
1823            });
1824        }
1825
1826        // Subscribe.
1827        let mut monitor = match channel.subscribe().await {
1828            Ok(m) => m,
1829            Err(e) => {
1830                counters
1831                    .transient_error_count
1832                    .fetch_add(1, Ordering::Relaxed);
1833                warn!(pv = pv_name, "Subscribe failed: {e}, retrying...");
1834                tokio::select! {
1835                    _ = cancel_token.cancelled() => return,
1836                    _ = tokio::time::sleep(CA_RETRY_DELAY) => continue,
1837                }
1838            }
1839        };
1840
1841        debug!(pv = pv_name, "Monitor subscription active");
1842
1843        // Receive values with cancellation.
1844        loop {
1845            tokio::select! {
1846                _ = cancel_token.cancelled() => return,
1847                result = monitor.recv() => {
1848                    match result {
1849                        Some(Ok(snapshot)) => {
1850                            let now = SystemTime::now();
1851                            // Java parity (11e554d0): use the IOC-reported
1852                            // timestamp, not receive-time, so latency
1853                            // doesn't smear sample times. First sample
1854                            // after connect is accepted unconditionally
1855                            // — legitimate backfill on reconnect can
1856                            // include older timestamps. Subsequent
1857                            // samples whose IOC clock is more than
1858                            // SERVER_IOC_DRIFT_SECS away from wall clock,
1859                            // or earlier than the 1991 floor, are
1860                            // dropped + counted.
1861                            let first_after_connect = {
1862                                let mut ci = conn_info.lock().unwrap_or_else(|e| e.into_inner());
1863                                let first = ci.last_event_time.is_none();
1864                                if ci.connected_since.is_none() {
1865                                    ci.connected_since = Some(now);
1866                                }
1867                                ci.is_connected = true;
1868                                ci.last_event_time = Some(now);
1869                                ci.state = PvConnectionState::Connected;
1870                                first
1871                            };
1872                            if !first_after_connect
1873                                && !ioc_timestamp_in_window(
1874                                    snapshot.timestamp,
1875                                    now,
1876                                    server_ioc_drift_secs,
1877                                )
1878                            {
1879                                counters.timestamp_drops.fetch_add(1, Ordering::Relaxed);
1880                                debug!(
1881                                    pv = pv_name,
1882                                    ?snapshot.timestamp,
1883                                    "Dropping sample with out-of-window IOC timestamp"
1884                                );
1885                                continue;
1886                            }
1887                            counters.events_received.fetch_add(1, Ordering::Relaxed);
1888                            // CAS the first-event timestamp once. 0 sentinel
1889                            // means "no event yet"; replace with now.
1890                            let now_secs = unix_secs(now);
1891                            let _ = counters.first_event_unix_secs.compare_exchange(
1892                                0,
1893                                now_secs,
1894                                Ordering::Relaxed,
1895                                Ordering::Relaxed,
1896                            );
1897                            let archiver_val = epics_value_to_archiver(&snapshot.value);
1898                            let mut sample = ArchiverSample::new(snapshot.timestamp, archiver_val);
1899                            attach_extras(&extras, &mut sample);
1900                            if first_after_connect {
1901                                let lost_secs = counters
1902                                    .last_disconnect_unix_secs
1903                                    .load(Ordering::Relaxed);
1904                                attach_cnx_lost_headers(&mut sample, lost_secs, now_secs);
1905                            }
1906                            let pv_sample = PvSample {
1907                                pv_name: pv_name.clone(),
1908                                dbr_type,
1909                                sample,
1910                                element_count: Some(element_count),
1911                                counters: Some(counters.clone()),
1912                            };
1913                            if let Err(pv_sample) = try_send_with_overflow_count(
1914                                &tx,
1915                                pv_sample,
1916                                &counters,
1917                            )
1918                            .await
1919                            {
1920                                let _ = pv_sample;
1921                                return; // Write loop shut down
1922                            }
1923                        }
1924                        Some(Err(e)) => {
1925                            counters.transient_error_count.fetch_add(1, Ordering::Relaxed);
1926                            warn!(pv = pv_name, "Monitor error: {e}");
1927                        }
1928                        None => break, // Monitor ended, reconnect
1929                    }
1930                }
1931            }
1932        }
1933
1934        // Monitor ended (disconnect) — loop back to reconnect. Reset
1935        // `last_event_time` so the first sample after reconnect is
1936        // treated as `first_after_connect` and bypasses the drift
1937        // filter; without this, an IOC that comes back with a
1938        // legitimate backfill timestamp older than the 30 min window
1939        // would be silently dropped.
1940        {
1941            let mut ci = conn_info.lock().unwrap_or_else(|e| e.into_inner());
1942            ci.is_connected = false;
1943            ci.last_event_time = None;
1944            ci.state = PvConnectionState::Disconnected;
1945        }
1946        counters.disconnect_count.fetch_add(1, Ordering::Relaxed);
1947        counters
1948            .last_disconnect_unix_secs
1949            .store(unix_secs(SystemTime::now()), Ordering::Relaxed);
1950        debug!(pv = pv_name, "Monitor ended, waiting for reconnection");
1951    }
1952}
1953
1954fn unix_secs(t: SystemTime) -> i64 {
1955    t.duration_since(SystemTime::UNIX_EPOCH)
1956        .map(|d| d.as_secs() as i64)
1957        .unwrap_or(0)
1958}
1959
1960/// Send a sample and count buffer-overflow events. Tries non-blocking
1961/// first; if the bounded channel is full we increment the counter and
1962/// fall back to a blocking send so backpressure still works.
1963async fn try_send_with_overflow_count(
1964    tx: &mpsc::Sender<PvSample>,
1965    pv_sample: PvSample,
1966    counters: &PvCounters,
1967) -> Result<(), PvSample> {
1968    match tx.try_send(pv_sample) {
1969        Ok(()) => Ok(()),
1970        Err(tokio::sync::mpsc::error::TrySendError::Full(pv_sample)) => {
1971            counters
1972                .buffer_overflow_drops
1973                .fetch_add(1, Ordering::Relaxed);
1974            // Backpressure to the producer; this awaits until the writer
1975            // drains some space. We count the saturation event but don't
1976            // actually drop the sample.
1977            tx.send(pv_sample).await.map_err(|e| e.0)
1978        }
1979        Err(tokio::sync::mpsc::error::TrySendError::Closed(pv_sample)) => Err(pv_sample),
1980    }
1981}
1982
1983/// Scan loop: periodically read a channel value.
1984#[allow(clippy::too_many_arguments)]
1985async fn scan_loop(
1986    pv_name: String,
1987    dbr_type: ArchDbType,
1988    element_count: i32,
1989    channel: CaChannel,
1990    tx: mpsc::Sender<PvSample>,
1991    cancel_token: CancellationToken,
1992    period_secs: f64,
1993    conn_info: Arc<Mutex<ConnectionInfo>>,
1994    registry: Arc<PvRegistry>,
1995    extras: Arc<ExtraFieldsCache>,
1996    counters: Arc<PvCounters>,
1997) {
1998    let period = Duration::from_secs_f64(period_secs);
1999    let mut interval = tokio::time::interval(period);
2000    // Reset on every detected disconnect so we re-fetch metadata after
2001    // each reconnect (mirrors monitor_loop's once-per-(re)connect cadence).
2002    let mut metadata_done = false;
2003
2004    loop {
2005        tokio::select! {
2006            _ = cancel_token.cancelled() => return,
2007            _ = interval.tick() => {}
2008        }
2009
2010        if channel.wait_connected(CA_RETRY_DELAY).await.is_err() {
2011            let was_connected = {
2012                let mut ci = conn_info.lock().unwrap_or_else(|e| e.into_inner());
2013                let prev = ci.is_connected;
2014                ci.is_connected = false;
2015                ci.last_event_time = None;
2016                ci.state = match ci.state {
2017                    PvConnectionState::Connected => PvConnectionState::Disconnected,
2018                    PvConnectionState::Disconnected => PvConnectionState::Disconnected,
2019                    _ => PvConnectionState::Connecting,
2020                };
2021                prev
2022            };
2023            if was_connected {
2024                counters.disconnect_count.fetch_add(1, Ordering::Relaxed);
2025                counters
2026                    .last_disconnect_unix_secs
2027                    .store(unix_secs(SystemTime::now()), Ordering::Relaxed);
2028            }
2029            metadata_done = false;
2030            continue;
2031        }
2032
2033        if !metadata_done {
2034            // Fire-and-forget; matches monitor_loop's once-per-(re)connect
2035            // semantics. The local `metadata_done` flag exists because
2036            // scan_loop's outer iteration is per-tick (vs monitor_loop's
2037            // per-reconnect), so we'd otherwise spawn a fetch every tick.
2038            // Bound to `cancel_token` so a stopped/deleted PV doesn't get
2039            // a stale metadata write from an in-flight task.
2040            let channel = channel.clone();
2041            let registry = registry.clone();
2042            let counters = counters.clone();
2043            let pv_name = pv_name.clone();
2044            let cancel = cancel_token.clone();
2045            tokio::spawn(async move {
2046                tokio::select! {
2047                    _ = cancel.cancelled() => {}
2048                    _ = refresh_ctrl_metadata(&channel, &registry, &pv_name, &counters) => {}
2049                }
2050            });
2051            metadata_done = true;
2052        }
2053
2054        match channel.get().await {
2055            Ok((_dbr_type, epics_val)) => {
2056                let now = SystemTime::now();
2057                let first_after_connect = {
2058                    let mut ci = conn_info.lock().unwrap_or_else(|e| e.into_inner());
2059                    let first = ci.last_event_time.is_none();
2060                    if ci.connected_since.is_none() {
2061                        ci.connected_since = Some(now);
2062                    }
2063                    ci.is_connected = true;
2064                    ci.last_event_time = Some(now);
2065                    ci.state = PvConnectionState::Connected;
2066                    first
2067                };
2068                counters.events_received.fetch_add(1, Ordering::Relaxed);
2069                let now_secs = unix_secs(now);
2070                let _ = counters.first_event_unix_secs.compare_exchange(
2071                    0,
2072                    now_secs,
2073                    Ordering::Relaxed,
2074                    Ordering::Relaxed,
2075                );
2076                let archiver_val = epics_value_to_archiver(&epics_val);
2077                let mut sample = ArchiverSample::new(now, archiver_val);
2078                attach_extras(&extras, &mut sample);
2079                if first_after_connect {
2080                    let lost_secs = counters.last_disconnect_unix_secs.load(Ordering::Relaxed);
2081                    attach_cnx_lost_headers(&mut sample, lost_secs, now_secs);
2082                }
2083                let pv_sample = PvSample {
2084                    pv_name: pv_name.clone(),
2085                    dbr_type,
2086                    sample,
2087                    element_count: Some(element_count),
2088                    counters: Some(counters.clone()),
2089                };
2090                if try_send_with_overflow_count(&tx, pv_sample, &counters)
2091                    .await
2092                    .is_err()
2093                {
2094                    return;
2095                }
2096            }
2097            Err(e) => {
2098                counters
2099                    .transient_error_count
2100                    .fetch_add(1, Ordering::Relaxed);
2101                debug!(pv = pv_name, "Scan read error: {e}");
2102            }
2103        }
2104    }
2105}
2106
2107/// Java parity (ed07feb): tag the first sample after (re)connect with
2108/// `cnxlostepsecs` / `cnxregainedepsecs` / `startup` so consumers can
2109/// detect archiver restarts and PV resumes. Unconditional emission —
2110/// on a clean startup `lost_secs` is 0, which is itself a valid value
2111/// for downstream gap detection.
2112fn attach_cnx_lost_headers(sample: &mut ArchiverSample, lost_secs: i64, now_secs: i64) {
2113    sample
2114        .field_values
2115        .push(("cnxlostepsecs".to_string(), lost_secs.to_string()));
2116    sample
2117        .field_values
2118        .push(("cnxregainedepsecs".to_string(), now_secs.to_string()));
2119    sample
2120        .field_values
2121        .push(("startup".to_string(), "true".to_string()));
2122}
2123
2124/// Snapshot the extras cache into `sample.field_values`. We sort for stable
2125/// PB output across runs (the protobuf field is repeated; consumers that
2126/// diff/compare files appreciate determinism).
2127fn attach_extras(extras: &ExtraFieldsCache, sample: &mut ArchiverSample) {
2128    if extras.is_empty() {
2129        return;
2130    }
2131    let mut entries: Vec<(String, String)> = extras
2132        .iter()
2133        .map(|e| (e.key().clone(), e.value().clone()))
2134        .collect();
2135    entries.sort_by(|a, b| a.0.cmp(&b.0));
2136    sample.field_values = entries;
2137}
2138
2139/// Render an EpicsValue as the string we'll persist in the metadata field
2140/// slot. Numeric scalars get `Display`-format; strings pass through; arrays
2141/// fall back to JSON-ish bracket notation. Stays in sync with what Java
2142/// archiver writes into PVTypeInfo's `archiveFields` blob (a string map).
2143fn epics_value_to_field_string(val: &EpicsValue) -> String {
2144    match val {
2145        EpicsValue::String(s) => s.clone(),
2146        EpicsValue::Short(v) => v.to_string(),
2147        EpicsValue::Float(v) => v.to_string(),
2148        EpicsValue::Enum(v) => v.to_string(),
2149        EpicsValue::Char(v) => v.to_string(),
2150        EpicsValue::Long(v) => v.to_string(),
2151        EpicsValue::Int64(v) => v.to_string(),
2152        EpicsValue::Double(v) => v.to_string(),
2153        EpicsValue::ShortArray(v) => format!("{v:?}"),
2154        EpicsValue::FloatArray(v) => format!("{v:?}"),
2155        EpicsValue::EnumArray(v) => format!("{v:?}"),
2156        EpicsValue::DoubleArray(v) => format!("{v:?}"),
2157        EpicsValue::LongArray(v) => format!("{v:?}"),
2158        EpicsValue::Int64Array(v) => format!("{v:?}"),
2159        EpicsValue::CharArray(v) => String::from_utf8_lossy(v).into_owned(),
2160        EpicsValue::StringArray(v) => format!("{v:?}"),
2161    }
2162}
2163
2164/// Spawn a long-running task that subscribes to `<pv>.<field>` and updates
2165/// `extras` with each event. Owned by `parent_token` so pause/destroy cleans
2166/// it up alongside the main PV.
2167fn spawn_extra_field_monitor(
2168    ca_client: &CaClient,
2169    pv_name: &str,
2170    field: &str,
2171    extras: Arc<ExtraFieldsCache>,
2172    parent_token: CancellationToken,
2173    counters: Arc<PvCounters>,
2174) {
2175    let full_name = format!("{pv_name}.{field}");
2176    let channel = ca_client.create_channel(&full_name);
2177    let field_owned = field.to_string();
2178    let pv_owned = pv_name.to_string();
2179
2180    // Catch-unwind boundary: a panic from the CA client (e.g. malformed
2181    // wire frame, allocation failure inside epics_rs) would propagate to
2182    // the runtime and abort sibling tasks of this worker thread. Trap it
2183    // here, log with PV+field context, and return normally so the runtime
2184    // remains healthy.
2185    let panic_pv = pv_owned.clone();
2186    let panic_field = field_owned.clone();
2187    tokio::spawn(async move {
2188        let body = std::panic::AssertUnwindSafe(extra_field_monitor_body(
2189            channel,
2190            pv_owned,
2191            field_owned,
2192            extras,
2193            parent_token,
2194            counters,
2195        ));
2196        if let Err(payload) = futures::FutureExt::catch_unwind(body).await {
2197            let msg = panic_payload_msg(&payload);
2198            error!(
2199                pv = panic_pv,
2200                field = panic_field,
2201                "Extra-field monitor panicked: {msg}"
2202            );
2203        }
2204    });
2205}
2206
2207/// Body of the spawned extra-field monitor. Split out so the spawn site
2208/// can wrap it in `catch_unwind`.
2209async fn extra_field_monitor_body(
2210    channel: CaChannel,
2211    pv_owned: String,
2212    field_owned: String,
2213    extras: Arc<ExtraFieldsCache>,
2214    parent_token: CancellationToken,
2215    counters: Arc<PvCounters>,
2216) {
2217    // Initial connect attempt — failure here is non-fatal (the field may
2218    // not exist on every IOC; we just leave the cache empty).
2219    if channel.wait_connected(CA_CONNECT_TIMEOUT).await.is_err() {
2220        debug!(
2221            pv = pv_owned,
2222            field = field_owned,
2223            "Extra-field channel did not connect within timeout (will keep retrying via subscribe)"
2224        );
2225    }
2226
2227    // Exponential backoff for misconfigured fields (e.g. operator
2228    // listed `.HIHI` on a PV that doesn't expose it). Without this
2229    // we'd retry every 5s forever, churning CA search packets and
2230    // file descriptors. The cap is 60s; one warn at the cap so
2231    // ops know to fix archive_fields.
2232    let mut backoff = CA_RETRY_DELAY;
2233    let max_backoff = Duration::from_secs(60);
2234    let mut warned_at_cap = false;
2235
2236    loop {
2237        // Cancel-aware subscribe attempt.
2238        tokio::select! {
2239            _ = parent_token.cancelled() => return,
2240            sub = channel.subscribe() => {
2241                let mut monitor = match sub {
2242                    Ok(m) => m,
2243                    Err(e) => {
2244                        // Java parity (8fe73eb): bump the transient
2245                        // counter so a misconfigured `.HIHI` field
2246                        // shows up in the rate / drop reports.
2247                        counters.transient_error_count.fetch_add(1, Ordering::Relaxed);
2248                        debug!(
2249                            pv = pv_owned,
2250                            field = field_owned,
2251                            ?backoff,
2252                            "Extra-field subscribe failed: {e}; retrying"
2253                        );
2254                        if backoff >= max_backoff && !warned_at_cap {
2255                            warn!(
2256                                pv = pv_owned,
2257                                field = field_owned,
2258                                "Extra-field repeatedly fails to subscribe; \
2259                                 check archive_fields config (now retrying every 60s)"
2260                            );
2261                            warned_at_cap = true;
2262                        }
2263                        let sleep_for = backoff;
2264                        backoff = (backoff * 2).min(max_backoff);
2265                        tokio::select! {
2266                            _ = parent_token.cancelled() => return,
2267                            _ = tokio::time::sleep(sleep_for) => continue,
2268                        }
2269                    }
2270                };
2271                // Subscribe succeeded — reset backoff so the next
2272                // failure starts at the short delay again.
2273                backoff = CA_RETRY_DELAY;
2274                warned_at_cap = false;
2275                loop {
2276                    tokio::select! {
2277                        _ = parent_token.cancelled() => return,
2278                        ev = monitor.recv() => match ev {
2279                            Some(Ok(snapshot)) => {
2280                                extras.insert(
2281                                    field_owned.clone(),
2282                                    epics_value_to_field_string(&snapshot.value),
2283                                );
2284                            }
2285                            Some(Err(e)) => {
2286                                counters.transient_error_count.fetch_add(1, Ordering::Relaxed);
2287                                debug!(
2288                                    pv = pv_owned,
2289                                    field = field_owned,
2290                                    "Extra-field monitor error: {e}"
2291                                );
2292                            }
2293                            None => break, // resubscribe
2294                        }
2295                    }
2296                }
2297            }
2298        }
2299    }
2300}
2301
2302/// Format a panic payload (Box<dyn Any>) as a printable string. Mirrors
2303/// what `std::panicking::default_hook` extracts for the message.
2304fn panic_payload_msg(payload: &Box<dyn std::any::Any + Send>) -> String {
2305    if let Some(s) = payload.downcast_ref::<&'static str>() {
2306        (*s).to_string()
2307    } else if let Some(s) = payload.downcast_ref::<String>() {
2308        s.clone()
2309    } else {
2310        "<non-string panic payload>".to_string()
2311    }
2312}
2313
2314/// Resolve the data-bearing sub-field of a PVA value. NTScalar /
2315/// NTScalarArray / NTEnum wrap the raw value in a `value` field; bare
2316/// scalars are passed through.
2317fn pv_value_field(field: &PvField) -> &PvField {
2318    match field {
2319        PvField::Structure(s) => s.get_field("value").unwrap_or(field),
2320        _ => field,
2321    }
2322}
2323
2324/// Map a PVA `ScalarValue` to an archiver `ArchiverValue`.
2325fn scalar_value_to_archiver(s: &ScalarValue) -> ArchiverValue {
2326    match s {
2327        ScalarValue::Boolean(v) => ArchiverValue::ScalarEnum(*v as i32),
2328        ScalarValue::Byte(v) => ArchiverValue::ScalarByte(vec![*v as u8]),
2329        ScalarValue::UByte(v) => ArchiverValue::ScalarByte(vec![*v]),
2330        ScalarValue::Short(v) => ArchiverValue::ScalarShort(*v as i32),
2331        ScalarValue::UShort(v) => ArchiverValue::ScalarShort(*v as i32),
2332        ScalarValue::Int(v) => ArchiverValue::ScalarInt(*v),
2333        ScalarValue::UInt(v) => ArchiverValue::ScalarInt(*v as i32),
2334        ScalarValue::Long(v) => ArchiverValue::ScalarInt(*v as i32),
2335        ScalarValue::ULong(v) => ArchiverValue::ScalarInt(*v as i32),
2336        ScalarValue::Float(v) => ArchiverValue::ScalarFloat(*v),
2337        ScalarValue::Double(v) => ArchiverValue::ScalarDouble(*v),
2338        ScalarValue::String(s) => ArchiverValue::ScalarString(s.clone()),
2339    }
2340}
2341
2342/// Extract a scalar `ArchiverValue` from a top-level PVA value (which
2343/// may be an NTScalar wrapper). Returns `None` for array/struct types
2344/// that don't reduce to a scalar.
2345fn pv_field_scalar_to_archiver(field: &PvField) -> Option<ArchiverValue> {
2346    match pv_value_field(field) {
2347        PvField::Scalar(s) => Some(scalar_value_to_archiver(s)),
2348        _ => None,
2349    }
2350}
2351
2352/// Pick the `(ArchDbType, element_count)` to record at archive_pv time
2353/// from an NTScalar / NTScalarArray's introspection-pvget result.
2354/// Returns `None` for unsupported (struct / union / variant) shapes.
2355fn pv_field_to_arch_db_type(field: &PvField) -> Option<(ArchDbType, i32)> {
2356    let value = pv_value_field(field);
2357    Some(match value {
2358        PvField::Scalar(s) => (
2359            match s {
2360                ScalarValue::Boolean(_) => ArchDbType::ScalarEnum,
2361                ScalarValue::Byte(_) | ScalarValue::UByte(_) => ArchDbType::ScalarByte,
2362                ScalarValue::Short(_) | ScalarValue::UShort(_) => ArchDbType::ScalarShort,
2363                ScalarValue::Int(_)
2364                | ScalarValue::UInt(_)
2365                | ScalarValue::Long(_)
2366                | ScalarValue::ULong(_) => ArchDbType::ScalarInt,
2367                ScalarValue::Float(_) => ArchDbType::ScalarFloat,
2368                ScalarValue::Double(_) => ArchDbType::ScalarDouble,
2369                ScalarValue::String(_) => ArchDbType::ScalarString,
2370            },
2371            1,
2372        ),
2373        // Array variants — keep the corresponding scalar element type but
2374        // bump element_count. write_loop / PB encoder doesn't currently
2375        // serialise PVA arrays, so callers should reject these for now.
2376        PvField::ScalarArray(arr) => {
2377            let elem = arr.first()?;
2378            let inner = PvField::Scalar(elem.clone());
2379            let (t, _) = pv_field_to_arch_db_type(&inner)?;
2380            (t, arr.len() as i32)
2381        }
2382        _ => return None,
2383    })
2384}
2385
2386/// Map a CA field name (`HIHI`, `LOLO`, `EGU`, …) to the dotted
2387/// pvAccess path under an NTScalar / NTScalarArray. Returns `None`
2388/// for fields that have no PVA equivalent — caller should drop them
2389/// silently rather than building an unsatisfiable pvRequest.
2390fn ca_archive_field_to_pva_path(field: &str) -> Option<&'static str> {
2391    Some(match field {
2392        // display sub-structure
2393        "EGU" => "display.units",
2394        "PREC" => "display.precision",
2395        "DESC" => "display.description",
2396        "HOPR" => "display.limitHigh",
2397        "LOPR" => "display.limitLow",
2398        // valueAlarm sub-structure (alarm thresholds)
2399        "HIHI" => "valueAlarm.highAlarmLimit",
2400        "LOLO" => "valueAlarm.lowAlarmLimit",
2401        "HIGH" => "valueAlarm.highWarningLimit",
2402        "LOW" => "valueAlarm.lowWarningLimit",
2403        "HHSV" => "valueAlarm.highAlarmSeverity",
2404        "LLSV" => "valueAlarm.lowAlarmSeverity",
2405        "HSV" => "valueAlarm.highWarningSeverity",
2406        "LSV" => "valueAlarm.lowWarningSeverity",
2407        "HYST" => "valueAlarm.hysteresis",
2408        // control sub-structure (drive limits)
2409        "DRVH" => "control.limitHigh",
2410        "DRVL" => "control.limitLow",
2411        _ => return None,
2412    })
2413}
2414
2415/// Walk a dotted PVA field path (`valueAlarm.highAlarmLimit`) from
2416/// the root [`PvField`]. Returns `None` if any segment is missing or
2417/// the path lands on a non-structure intermediate.
2418fn pv_field_walk_path<'a>(root: &'a PvField, path: &str) -> Option<&'a PvField> {
2419    let mut current = root;
2420    for segment in path.split('.') {
2421        let PvField::Structure(s) = current else {
2422            return None;
2423        };
2424        current = s.get_field(segment)?;
2425    }
2426    Some(current)
2427}
2428
2429/// Stringify a [`ScalarValue`] for storage in the per-sample extras
2430/// map. Java archiver's `archiveFields` blob is a string-string map,
2431/// so we mirror that wire shape.
2432fn scalar_value_to_string(s: &ScalarValue) -> String {
2433    match s {
2434        ScalarValue::Boolean(v) => v.to_string(),
2435        ScalarValue::Byte(v) => v.to_string(),
2436        ScalarValue::Short(v) => v.to_string(),
2437        ScalarValue::Int(v) => v.to_string(),
2438        ScalarValue::Long(v) => v.to_string(),
2439        ScalarValue::UByte(v) => v.to_string(),
2440        ScalarValue::UShort(v) => v.to_string(),
2441        ScalarValue::UInt(v) => v.to_string(),
2442        ScalarValue::ULong(v) => v.to_string(),
2443        ScalarValue::Float(v) => v.to_string(),
2444        ScalarValue::Double(v) => v.to_string(),
2445        ScalarValue::String(s) => s.clone(),
2446    }
2447}
2448
2449/// Element count for a PVA value: 1 for scalar, length for arrays,
2450/// 0 for unsupported shapes (treated as "unknown" by callers).
2451fn pv_field_element_count(field: &PvField) -> i32 {
2452    match pv_value_field(field) {
2453        PvField::Scalar(_) => 1,
2454        PvField::ScalarArray(arr) => arr.len() as i32,
2455        PvField::ScalarArrayTyped(t) => t.len() as i32,
2456        _ => 0,
2457    }
2458}
2459
2460/// Pull `(precision, units)` out of an NTScalar / NTScalarArray's
2461/// `display` sub-structure. Returns `(None, None)` for bare scalars
2462/// or structures missing `display` (some servers omit it).
2463/// Negative precision is the Java EAA "no precision info" sentinel —
2464/// treat it the same as missing so we don't surface "-1" to the UI.
2465fn pv_field_extract_display(field: &PvField) -> (Option<String>, Option<String>) {
2466    let PvField::Structure(s) = field else {
2467        return (None, None);
2468    };
2469    let Some(PvField::Structure(disp)) = s.get_field("display") else {
2470        return (None, None);
2471    };
2472    let prec = match disp.get_field("precision") {
2473        Some(PvField::Scalar(ScalarValue::Int(p))) if *p >= 0 => Some(p.to_string()),
2474        Some(PvField::Scalar(ScalarValue::Short(p))) if *p >= 0 => Some(p.to_string()),
2475        Some(PvField::Scalar(ScalarValue::Long(p))) if *p >= 0 => Some(p.to_string()),
2476        _ => None,
2477    };
2478    let egu = match disp.get_field("units") {
2479        Some(PvField::Scalar(ScalarValue::String(u))) => {
2480            let t = u.trim();
2481            if t.is_empty() { None } else { Some(t.to_string()) }
2482        }
2483        _ => None,
2484    };
2485    (prec, egu)
2486}
2487
2488/// Pull the IOC-reported timestamp out of an NTScalar / NTScalarArray.
2489/// Falls back to `SystemTime::now()` when the structure lacks a usable
2490/// `timeStamp` sub-field, so a malformed server doesn't drop samples.
2491fn pv_field_extract_timestamp(field: &PvField) -> SystemTime {
2492    let PvField::Structure(s) = field else {
2493        return SystemTime::now();
2494    };
2495    let Some(PvField::Structure(ts)) = s.get_field("timeStamp") else {
2496        return SystemTime::now();
2497    };
2498    let secs = match ts.get_field("secondsPastEpoch") {
2499        Some(PvField::Scalar(ScalarValue::Long(v))) => *v as u64,
2500        Some(PvField::Scalar(ScalarValue::ULong(v))) => *v,
2501        Some(PvField::Scalar(ScalarValue::Int(v))) => *v as u64,
2502        Some(PvField::Scalar(ScalarValue::UInt(v))) => *v as u64,
2503        _ => return SystemTime::now(),
2504    };
2505    let nanos = match ts.get_field("nanoseconds") {
2506        Some(PvField::Scalar(ScalarValue::Int(v))) => *v as u32,
2507        Some(PvField::Scalar(ScalarValue::UInt(v))) => *v,
2508        _ => 0,
2509    };
2510    SystemTime::UNIX_EPOCH + Duration::new(secs, nanos)
2511}
2512
2513/// Convert epics-base-rs DbFieldType to archiver ArchDbType.
2514fn dbr_field_to_arch_type(field_type: DbFieldType) -> ArchDbType {
2515    match field_type {
2516        DbFieldType::String => ArchDbType::ScalarString,
2517        DbFieldType::Short => ArchDbType::ScalarShort,
2518        DbFieldType::Float => ArchDbType::ScalarFloat,
2519        DbFieldType::Enum => ArchDbType::ScalarEnum,
2520        DbFieldType::Char => ArchDbType::ScalarByte,
2521        DbFieldType::Long => ArchDbType::ScalarInt,
2522        // PB PayloadType has no SCALAR_LONG (i64) — values outside i32 range
2523        // are truncated by the i32 cast in epics_value_to_archiver.
2524        DbFieldType::Int64 => ArchDbType::ScalarInt,
2525        DbFieldType::Double => ArchDbType::ScalarDouble,
2526    }
2527}
2528
2529/// Convert epics-base-rs EpicsValue to archiver ArchiverValue.
2530fn epics_value_to_archiver(val: &EpicsValue) -> ArchiverValue {
2531    match val {
2532        EpicsValue::String(s) => ArchiverValue::ScalarString(s.clone()),
2533        EpicsValue::Short(v) => ArchiverValue::ScalarShort(*v as i32),
2534        EpicsValue::Float(v) => ArchiverValue::ScalarFloat(*v),
2535        EpicsValue::Enum(v) => ArchiverValue::ScalarEnum(*v as i32),
2536        EpicsValue::Char(v) => ArchiverValue::ScalarByte(vec![*v]),
2537        EpicsValue::Long(v) => ArchiverValue::ScalarInt(*v),
2538        EpicsValue::Int64(v) => ArchiverValue::ScalarInt(*v as i32),
2539        EpicsValue::Double(v) => ArchiverValue::ScalarDouble(*v),
2540        EpicsValue::ShortArray(v) => {
2541            ArchiverValue::VectorShort(v.iter().map(|x| *x as i32).collect())
2542        }
2543        EpicsValue::FloatArray(v) => ArchiverValue::VectorFloat(v.clone()),
2544        EpicsValue::EnumArray(v) => {
2545            ArchiverValue::VectorEnum(v.iter().map(|x| *x as i32).collect())
2546        }
2547        EpicsValue::DoubleArray(v) => ArchiverValue::VectorDouble(v.clone()),
2548        EpicsValue::LongArray(v) => ArchiverValue::VectorInt(v.clone()),
2549        EpicsValue::Int64Array(v) => {
2550            ArchiverValue::VectorInt(v.iter().map(|x| *x as i32).collect())
2551        }
2552        EpicsValue::CharArray(v) => ArchiverValue::VectorChar(v.clone()),
2553        EpicsValue::StringArray(v) => ArchiverValue::VectorString(v.clone()),
2554    }
2555}
2556
2557/// Tunables for [`write_loop_with_config`]. Production uses the
2558/// defaults via [`write_loop`]; tests dial the timeouts down to
2559/// sub-second values so failure-injection cases finish in a few
2560/// hundred milliseconds instead of minutes.
2561#[derive(Debug, Clone)]
2562pub struct WriteLoopConfig {
2563    /// How often to call `flush_ingest_writes` and commit the
2564    /// pending `last_event` timestamps to the registry.
2565    pub flush_period: Duration,
2566    /// Bound on how long write_loop waits for a single
2567    /// `append_event_with_meta` JoinHandle. On timeout the sample
2568    /// is logged as abandoned-but-may-succeed-late and the loop
2569    /// moves on (the spawn_blocking task remains parked on the
2570    /// blocking pool — per-PV serialization in PlainPB keeps any
2571    /// late completion ordered behind subsequent same-PV samples).
2572    pub append_timeout: Duration,
2573    /// Bound on how long write_loop waits for one
2574    /// `flush_ingest_writes` JoinHandle during the periodic flush.
2575    /// On timeout, every pending `ts_updates` entry is deferred to
2576    /// the next cycle (we don't know which PVs reached disk).
2577    pub flush_timeout: Duration,
2578    /// Bound on each individual append issued during the shutdown
2579    /// drain.
2580    pub drain_per_sample_timeout: Duration,
2581    /// Total wall-clock budget for the shutdown drain. Once
2582    /// exceeded, remaining buffered samples are abandoned without
2583    /// even attempting an append, so a wedged STS can't stretch an
2584    /// orderly stop into "kill -9".
2585    pub drain_total_budget: Duration,
2586    /// Bound on the final `flush_writes` issued at shutdown.
2587    pub shutdown_flush_timeout: Duration,
2588}
2589
2590impl Default for WriteLoopConfig {
2591    fn default() -> Self {
2592        Self {
2593            flush_period: Duration::from_secs(10),
2594            append_timeout: Duration::from_secs(30),
2595            flush_timeout: Duration::from_secs(30),
2596            drain_per_sample_timeout: Duration::from_secs(5),
2597            drain_total_budget: Duration::from_secs(30),
2598            shutdown_flush_timeout: Duration::from_secs(15),
2599        }
2600    }
2601}
2602
2603/// Tunables for [`run_sharded_write_pool`].
2604///
2605/// `shards = 1` is the legacy single-worker layout and incurs no
2606/// dispatcher overhead. `shards > 1` spawns a dispatcher that hashes
2607/// each sample's `pv_name` to a fixed shard (so any one PV's samples
2608/// always land in the same per-shard write_loop, preserving
2609/// timestamp ordering) and N parallel shard workers — different PVs
2610/// can append concurrently instead of queueing behind a single task.
2611/// Sites with many active PVs and a fast STS should set this to
2612/// `min(num_cpus, expected concurrent slow appends)`.
2613#[derive(Debug, Clone)]
2614pub struct ShardedWritePoolConfig {
2615    /// Number of parallel shard workers. Must be ≥ 1; `1` is the
2616    /// degenerate case (no dispatcher, behaves identically to a
2617    /// bare `write_loop_with_config`).
2618    pub shards: usize,
2619    /// mpsc capacity of each shard's input channel. The dispatcher
2620    /// `try_send`s into this; when full, the OFFENDING SHARD's
2621    /// drop is recorded under
2622    /// `archiver_dispatcher_shard_overflow_drops_total{shard=N}`
2623    /// and the sample's per-PV `buffer_overflow_drops` counter is
2624    /// bumped. Other shards keep flowing — that's the per-shard
2625    /// isolation guarantee.
2626    pub per_shard_buffer: usize,
2627    /// Per-worker config (timeouts, flush period). Cloned into
2628    /// each shard.
2629    pub write_loop: WriteLoopConfig,
2630}
2631
2632impl Default for ShardedWritePoolConfig {
2633    fn default() -> Self {
2634        Self {
2635            shards: 1,
2636            per_shard_buffer: 4096,
2637            write_loop: WriteLoopConfig::default(),
2638        }
2639    }
2640}
2641
2642// PV-to-shard hash lives in `archiver_core::storage::plainpb` so
2643// the engine's dispatcher and PlainPB's
2644// `flush_ingest_writes_for_shard` agree on which shard owns which
2645// PV. Two definitions would mean shard 0's flush could iterate
2646// PVs the dispatcher routed to shard 1, re-introducing the
2647// misattribution bug.
2648use archiver_core::storage::plainpb::shard_for_pv;
2649
2650/// Run an N-shard write pool: 1 dispatcher (hashes pv_name → shard)
2651/// + N parallel `write_loop_with_config` workers. Each shard has
2652/// independent `ts_updates`, an independent flush ticker, and its
2653/// own per-PV writer slots inside the shared storage plugin (so
2654/// per-PV ordering is naturally preserved by the consistent-hash
2655/// routing — samples for one PV always go to the same shard).
2656///
2657/// `shards == 1` short-circuits the dispatcher and runs a single
2658/// `write_loop_with_config` directly so single-worker deployments
2659/// pay zero overhead.
2660pub async fn run_sharded_write_pool(
2661    storage: Arc<dyn StoragePlugin>,
2662    registry: Arc<PvRegistry>,
2663    rx: mpsc::Receiver<PvSample>,
2664    shutdown: tokio::sync::watch::Receiver<bool>,
2665    cfg: ShardedWritePoolConfig,
2666) {
2667    let n = cfg.shards.max(1);
2668    let WriteLoopConfig {
2669        flush_period,
2670        append_timeout,
2671        flush_timeout,
2672        drain_per_sample_timeout,
2673        drain_total_budget,
2674        shutdown_flush_timeout,
2675    } = cfg.write_loop.clone();
2676
2677    // Coalescing per-PV pending-timestamp map shared between the
2678    // flush owner, every shard worker, and every shard worker's
2679    // spawn_blocking late-success closure. Replaces the previous
2680    // mpsc report channel — see [`PendingReports`] for why.
2681    let pending = Arc::new(PendingReports::new());
2682
2683    // Spawn the global flush owner first so it's ready to flush
2684    // as soon as shards start appending.
2685    let flush_owner_handle = tokio::spawn(flush_owner_loop(
2686        storage.clone(),
2687        registry.clone(),
2688        pending.clone(),
2689        shutdown.clone(),
2690        flush_period,
2691        flush_timeout,
2692        shutdown_flush_timeout,
2693        drain_total_budget,
2694    ));
2695
2696    if n == 1 {
2697        // Fast path: single shard, no dispatcher hop. We become
2698        // the shard worker ourselves and read from `rx` directly.
2699        shard_append_loop(
2700            0,
2701            storage.clone(),
2702            rx,
2703            pending.clone(),
2704            shutdown.clone(),
2705            append_timeout,
2706            drain_per_sample_timeout,
2707            drain_total_budget,
2708        )
2709        .await;
2710    } else {
2711        // Multi-shard path: dispatcher + N shard workers.
2712        let mut shard_txs = Vec::with_capacity(n);
2713        let mut shard_handles = Vec::with_capacity(n);
2714        for shard_idx in 0..n {
2715            let (s_tx, s_rx) = mpsc::channel::<PvSample>(cfg.per_shard_buffer);
2716            shard_txs.push(s_tx);
2717            let storage = storage.clone();
2718            let pending = pending.clone();
2719            let shard_shutdown = shutdown.clone();
2720            shard_handles.push(tokio::spawn(shard_append_loop(
2721                shard_idx,
2722                storage,
2723                s_rx,
2724                pending,
2725                shard_shutdown,
2726                append_timeout,
2727                drain_per_sample_timeout,
2728                drain_total_budget,
2729            )));
2730        }
2731
2732        dispatch_loop(rx, shard_txs, shutdown.clone()).await;
2733
2734        // Dispatcher exited → shard sample receivers see EOS or
2735        // the shutdown signal directly via their watch::Receiver.
2736        // Wait for shards to finish their drain branches.
2737        for h in shard_handles {
2738            let _ = h.await;
2739        }
2740    }
2741
2742    // Flush owner uses its own shutdown-watch + drain_total_budget
2743    // grace timer, not an EOS signal — see flush_owner_loop. Wait
2744    // for it to finish.
2745    let _ = flush_owner_handle.await;
2746}
2747
2748/// Reads the upstream main mpsc, hashes each sample's `pv_name`,
2749/// and forwards to the appropriate shard channel via
2750/// `Sender::try_send` for **per-shard isolation**.
2751///
2752/// `try_send` (not `send().await`) is the load-balancing
2753/// invariant: if one slow shard's channel is full, the dispatcher
2754/// drops THAT shard's overflow into `buffer_overflow_drops` and
2755/// keeps routing for the other shards. Using `send().await` would
2756/// block the dispatcher on the slow shard, back-pressuring the
2757/// upstream main channel and starving every other shard's PVs —
2758/// the exact failure mode the sharded layout is meant to prevent.
2759async fn dispatch_loop(
2760    mut rx: mpsc::Receiver<PvSample>,
2761    shard_txs: Vec<mpsc::Sender<PvSample>>,
2762    mut shutdown: tokio::sync::watch::Receiver<bool>,
2763) {
2764    let n = shard_txs.len();
2765    info!(shards = n, "Sharded write dispatcher started");
2766    loop {
2767        tokio::select! {
2768            biased;
2769            // `biased` so we always check shutdown before draining
2770            // another sample — keeps the shutdown latency bounded
2771            // even under a sustained sample storm.
2772            _ = shutdown.changed() => {
2773                if *shutdown.borrow() {
2774                    // Drain remaining samples (try_send for the
2775                    // same reason as the steady-state branch).
2776                    // Mirror the steady-state overflow accounting
2777                    // so a saturated shard's drain-time drops are
2778                    // visible to operators — silent loss here
2779                    // would shadow real samples lost during
2780                    // shutdown.
2781                    while let Ok(sample) = rx.try_recv() {
2782                        let idx = shard_for_pv(&sample.pv_name, n);
2783                        match shard_txs[idx].try_send(sample) {
2784                            Ok(()) => {}
2785                            Err(mpsc::error::TrySendError::Full(s)) => {
2786                                if let Some(c) = s.counters.as_ref() {
2787                                    c.buffer_overflow_drops
2788                                        .fetch_add(1, Ordering::Relaxed);
2789                                }
2790                                metrics::counter!(
2791                                    "archiver_dispatcher_shard_overflow_drops_total",
2792                                    "shard" => idx.to_string(),
2793                                    "phase" => "shutdown_drain",
2794                                )
2795                                .increment(1);
2796                                debug!(
2797                                    shard = idx,
2798                                    pv = s.pv_name,
2799                                    "Shutdown-drain shard channel full; sample dropped"
2800                                );
2801                            }
2802                            Err(mpsc::error::TrySendError::Closed(s)) => {
2803                                warn!(
2804                                    shard = idx,
2805                                    pv = s.pv_name,
2806                                    "Shutdown-drain shard channel closed; sample dropped"
2807                                );
2808                            }
2809                        }
2810                    }
2811                    break;
2812                }
2813            }
2814            maybe = rx.recv() => {
2815                match maybe {
2816                    Some(sample) => {
2817                        let idx = shard_for_pv(&sample.pv_name, n);
2818                        match shard_txs[idx].try_send(sample) {
2819                            Ok(()) => {}
2820                            Err(mpsc::error::TrySendError::Full(s)) => {
2821                                // Shard saturated. Surface the
2822                                // drop on the SAMPLE's per-PV
2823                                // counter so operators can pin
2824                                // the loss to a specific PV+shard.
2825                                if let Some(c) = s.counters.as_ref() {
2826                                    c.buffer_overflow_drops
2827                                        .fetch_add(1, Ordering::Relaxed);
2828                                }
2829                                metrics::counter!(
2830                                    "archiver_dispatcher_shard_overflow_drops_total",
2831                                    "shard" => idx.to_string(),
2832                                )
2833                                .increment(1);
2834                                debug!(
2835                                    shard = idx,
2836                                    pv = s.pv_name,
2837                                    "Shard channel full; sample dropped \
2838                                     (per-shard isolation)"
2839                                );
2840                            }
2841                            Err(mpsc::error::TrySendError::Closed(s)) => {
2842                                // Shard worker died (panic or
2843                                // shutdown race). We'll lose this
2844                                // sample but keep the dispatcher
2845                                // alive for OTHER shards.
2846                                warn!(
2847                                    shard = idx,
2848                                    pv = s.pv_name,
2849                                    "Shard channel closed; sample dropped"
2850                                );
2851                            }
2852                        }
2853                    }
2854                    None => break, // Upstream closed.
2855                }
2856            }
2857        }
2858    }
2859    info!("Sharded write dispatcher exiting");
2860}
2861
2862/// Background writer task — drains samples and writes to storage.
2863///
2864/// Thin wrapper that fills [`WriteLoopConfig`] from [`Default`] and
2865/// the caller-supplied `flush_period`. Production callers use this
2866/// form; the test suite uses [`write_loop_with_config`] directly to
2867/// dial timeouts down.
2868pub async fn write_loop(
2869    storage: Arc<dyn StoragePlugin>,
2870    registry: Arc<PvRegistry>,
2871    rx: mpsc::Receiver<PvSample>,
2872    shutdown: tokio::sync::watch::Receiver<bool>,
2873    flush_period: Duration,
2874) {
2875    let cfg = WriteLoopConfig {
2876        flush_period,
2877        ..Default::default()
2878    };
2879    write_loop_with_config(storage, registry, rx, shutdown, cfg).await
2880}
2881
2882/// RAII guard that clears the `flush_in_flight` flag when dropped,
2883/// even if the surrounding spawn_blocking closure panics. Without
2884/// this, an unwind inside `block_on(flush_ingest_writes)` would
2885/// skip the manual `store(false)` and leave the flag stuck `true`
2886/// forever — every subsequent ticker would see "still in flight"
2887/// and silently skip flushing, freezing the registry's `last_event`.
2888struct FlushInFlightGuard {
2889    flag: Arc<std::sync::atomic::AtomicBool>,
2890}
2891
2892impl Drop for FlushInFlightGuard {
2893    fn drop(&mut self) {
2894        self.flag.store(false, Ordering::Release);
2895    }
2896}
2897
2898/// Run one flush + commit cycle.
2899///
2900/// Spawns `flush_ingest_writes` on the blocking pool with a bounded
2901/// timeout, then commits the pending registry timestamps for every
2902/// PV the flush returned cleanly. Maintains an `in_flight` flag set
2903/// by the spawn_blocking task so a caller (the ticker branch) can
2904/// avoid stacking concurrent flushes when the previous one is still
2905/// running on a wedged FS.
2906///
2907/// Returns `true` if a flush was actually launched, `false` if the
2908/// helper was a no-op (in_flight already set, ts_updates empty).
2909///
2910/// Mutates `ts_updates`:
2911///   * Clean flush: drops `failed` PVs (bytes lost), commits all
2912///     other entries, drops committed entries on success.
2913///   * Deferred PVs: STAY in `ts_updates` for the next cycle.
2914///     Their bytes are still buffered, not lost.
2915///   * Flush error / panic: leaves every entry intact for retry.
2916///   * Flush timeout: CLEARS every entry. We don't know which PVs
2917///     reached disk; the in-flight task may still be running and
2918///     may even fail late, removing a writer from the cache. The
2919///     next flush would then see a "clean" state for that PV and
2920///     wrongly commit. Conservative drop is the only safe move
2921///     (Java archiver has the same trade-off — under-commit on
2922///     timeout, never over-commit). Future samples will repopulate
2923///     `ts_updates` and the next clean flush catches the registry up.
2924async fn run_flush_and_commit(
2925    storage: &Arc<dyn StoragePlugin>,
2926    registry: &Arc<PvRegistry>,
2927    pending: &Arc<PendingReports>,
2928    flush_timeout: Duration,
2929    in_flight: &Arc<std::sync::atomic::AtomicBool>,
2930) -> bool {
2931    if in_flight.load(Ordering::Acquire) {
2932        // Previous flush hasn't finished. Don't queue another —
2933        // we'd just stack spawn_blocking tasks on the wedged FS
2934        // and saturate the blocking pool. Wait for the existing
2935        // one to drain.
2936        return false;
2937    }
2938    // Snapshot the current pending map BEFORE setting in_flight
2939    // so concurrent shards reporting after this point survive
2940    // into the next cycle (their entries will be in
2941    // `pending` but not in `snapshot`, so we won't remove them
2942    // when we clean up). The snapshot is the authoritative view
2943    // of "what we're trying to commit this cycle".
2944    let snapshot = pending.snapshot();
2945    if snapshot.is_empty() {
2946        return false;
2947    }
2948    in_flight.store(true, Ordering::Release);
2949
2950    let storage_for_flush = storage.clone();
2951    let in_flight_for_task = in_flight.clone();
2952    let flush_join = tokio::task::spawn_blocking(move || {
2953        // RAII guard: clears `in_flight` on every exit path
2954        // (return, panic, future-cancellation). The guard's
2955        // existence is the contract — manually storing false
2956        // BEFORE returning would skip cleanup on a panic inside
2957        // block_on / inside flush_ingest_writes, leaving the flag
2958        // stuck `true` and silently freezing all future flushes.
2959        let _guard = FlushInFlightGuard {
2960            flag: in_flight_for_task,
2961        };
2962        let rt = tokio::runtime::Handle::current();
2963        rt.block_on(storage_for_flush.flush_ingest_writes())
2964    });
2965    match tokio::time::timeout(flush_timeout, flush_join).await {
2966        Ok(Ok(Ok(IngestFlushResult { failed, deferred }))) => {
2967            // Failed PVs: bytes lost. Drop every failed PV's
2968            // pending entry UNCONDITIONALLY — regardless of
2969            // whether the snapshot's value still matches.
2970            //
2971            // The previous "remove only if matches snapshot"
2972            // policy assumed a concurrent shard's post-snapshot
2973            // report meant "newer bytes ARE on disk". That
2974            // assumption fails for the eviction/loss-queue path:
2975            // the loss queue records only PV names, so a loss
2976            // event recorded between snapshot and flush-return
2977            // has no way to communicate which writer generation
2978            // (or timestamp horizon) it applies to. A
2979            // post-snapshot pending entry could refer to bytes
2980            // that ARE on disk (fresh writer after eviction) OR
2981            // bytes that were also lost — we can't tell.
2982            //
2983            // Conservative drop is the safe choice: never
2984            // commit a `last_event` for a PV the storage just
2985            // told us had lost bytes, even at the cost of an
2986            // occasional under-commit that the next sample
2987            // resolves.
2988            if !failed.is_empty() {
2989                pending.remove_failed(&failed);
2990                error!(
2991                    "STS flush dropped {} PV(s) from timestamp commit \
2992                     (bytes never reached disk): {:?}",
2993                    failed.len(),
2994                    failed
2995                );
2996            }
2997            // Deferred PVs: writer slot busy, bytes still buffered.
2998            // We do NOT remove them from `pending` (next cycle
2999            // will catch them) and we EXCLUDE them from this
3000            // cycle's commit batch.
3001            if !deferred.is_empty() {
3002                debug!(
3003                    "STS flush deferred {} PV(s); keeping in pending \
3004                     for next cycle: {:?}",
3005                    deferred.len(),
3006                    deferred
3007                );
3008            }
3009            // Build commit batch from the snapshot, excluding
3010            // failed (bytes lost) and deferred (bytes not on disk
3011            // yet).
3012            let failed_set: std::collections::HashSet<&str> =
3013                failed.iter().map(|s| s.as_str()).collect();
3014            let deferred_set: std::collections::HashSet<&str> =
3015                deferred.iter().map(|s| s.as_str()).collect();
3016            let to_commit: Vec<(&str, SystemTime)> = snapshot
3017                .iter()
3018                .filter(|(pv, _)| {
3019                    !failed_set.contains(pv.as_str())
3020                        && !deferred_set.contains(pv.as_str())
3021                })
3022                .map(|(pv, ts)| (pv.as_str(), *ts))
3023                .collect();
3024            if to_commit.is_empty() {
3025                return true;
3026            }
3027            match registry.batch_update_timestamps(&to_commit) {
3028                Ok(()) => {
3029                    // Remove committed entries from the shared
3030                    // map — but only if their current value still
3031                    // matches the snapshot's. This preserves
3032                    // concurrent updates that arrived between
3033                    // snapshot and remove.
3034                    let committed_map: std::collections::HashMap<String, SystemTime> =
3035                        to_commit
3036                            .iter()
3037                            .map(|(pv, ts)| ((*pv).to_string(), *ts))
3038                            .collect();
3039                    pending.remove_committed(&committed_map);
3040                }
3041                Err(e) => {
3042                    // Don't remove — retry on next cycle.
3043                    // Otherwise a transient SQLite error orphans
3044                    // timestamps the disk has but the registry
3045                    // doesn't know about, with no retry path.
3046                    error!(
3047                        "Registry timestamp commit failed; \
3048                         keeping {} pending for retry: {e}",
3049                        snapshot.len()
3050                    );
3051                }
3052            }
3053        }
3054        Ok(Ok(Err(e))) => {
3055            error!(
3056                "STS ingest flush errored; deferring all {} \
3057                 pending timestamp commits: {e}",
3058                snapshot.len()
3059            );
3060        }
3061        Ok(Err(join_err)) => {
3062            error!(
3063                "STS ingest flush task panicked; deferring all {} \
3064                 pending timestamp commits: {join_err}",
3065                snapshot.len()
3066            );
3067        }
3068        Err(_) => {
3069            // Flush wedged. Conservative drop: the spawn_blocking
3070            // task may still be running and may even fail late
3071            // (which would remove a PV's writer from the cache,
3072            // making the NEXT flush return "clean" and tricking
3073            // the owner into committing a stale value for a PV
3074            // whose old bytes are gone). Drop the snapshot's
3075            // entries from `pending` (only if unchanged), so the
3076            // owner doesn't trust them; future samples rebuild
3077            // pending and the next clean flush catches the
3078            // registry up.
3079            //
3080            // The `in_flight` flag stays TRUE until the
3081            // spawn_blocking task naturally finishes — this
3082            // throttles us from queueing yet another flush onto
3083            // the wedged FS until it clears.
3084            metrics::counter!("archiver_storage_flush_timeouts_total").increment(1);
3085            let dropped = snapshot.len();
3086            pending.remove_committed(&snapshot);
3087            error!(
3088                "STS ingest flush timed out after {flush_timeout:?}; \
3089                 conservatively dropped {dropped} pending timestamp \
3090                 commit(s) (task remains on blocking pool; \
3091                 timestamps will be rebuilt from subsequent samples)"
3092            );
3093        }
3094    }
3095    true
3096}
3097
3098/// Coalescing per-PV pending-timestamp map shared between every
3099/// shard worker (incl. their spawn_blocking late-success closures)
3100/// and the single global flush owner.
3101///
3102/// **Why a shared map instead of an mpsc channel?** With a channel,
3103/// a slow flush owner causes the buffer to fill and `try_send`
3104/// drops happen — for a PV that goes silent right after a dropped
3105/// report, the registry's `last_event` is permanently
3106/// under-committed. With a coalescing map keyed by PV, every
3107/// shard call is an `entry().and_modify().or_insert()` that
3108/// always succeeds; concurrent updates from different shards
3109/// never lose data because the map is bounded by **PV count**
3110/// (typical: thousands), not by sample rate (typical: 100k/s).
3111///
3112/// **Coalescing semantics.** A successful append for `(pv, ts)`
3113/// upserts the entry to `max(current, ts)`. A stale late-success
3114/// report (e.g. from a spawn_blocking task that finished after
3115/// shard already wrote a newer sample) does NOT clobber a newer
3116/// committed value.
3117#[derive(Default)]
3118pub struct PendingReports {
3119    inner: dashmap::DashMap<String, SystemTime>,
3120}
3121
3122impl PendingReports {
3123    pub fn new() -> Self {
3124        Self::default()
3125    }
3126
3127    /// Coalescing report. If `pv` already has a newer timestamp,
3128    /// no-op. Always succeeds — this is the channel-replacement
3129    /// invariant that makes silent-PV under-commit impossible.
3130    pub fn report(&self, pv: &str, ts: SystemTime) {
3131        // DashMap's entry() locks one shard internally. Fast-path
3132        // updates use `and_modify`; brand-new PVs go through
3133        // `or_insert`. No external lock contention across shards.
3134        self.inner
3135            .entry(pv.to_string())
3136            .and_modify(|cur| {
3137                if *cur < ts {
3138                    *cur = ts;
3139                }
3140            })
3141            .or_insert(ts);
3142    }
3143
3144    /// Snapshot the entire map for a flush cycle. Returns a
3145    /// owned HashMap; the shared map stays intact so concurrent
3146    /// shards can keep reporting during the flush.
3147    pub fn snapshot(&self) -> std::collections::HashMap<String, SystemTime> {
3148        self.inner
3149            .iter()
3150            .map(|kv| (kv.key().clone(), *kv.value()))
3151            .collect()
3152    }
3153
3154    /// Remove entries whose CURRENT value still equals what we
3155    /// committed. If a concurrent shard advanced an entry to a
3156    /// newer ts after the snapshot, leave it alone — the next
3157    /// flush will pick it up.
3158    pub fn remove_committed(
3159        &self,
3160        committed: &std::collections::HashMap<String, SystemTime>,
3161    ) {
3162        for (pv, &committed_ts) in committed {
3163            // remove_if applies the predicate atomically inside
3164            // the DashMap shard lock.
3165            let _ = self.inner.remove_if(pv, |_, v| *v == committed_ts);
3166        }
3167    }
3168
3169    /// Unconditionally remove every failed PV's entry from the
3170    /// pending map, regardless of its current timestamp.
3171    ///
3172    /// **Why unconditional.** The storage's loss queue carries
3173    /// only PV names — it cannot tell us whether a concurrent
3174    /// shard's post-snapshot report was for bytes that ARE on
3175    /// disk (e.g., a fresh writer opened after eviction) versus
3176    /// bytes that are also gone. The matching `remove_committed`
3177    /// path can leave a post-snapshot entry behind, and the next
3178    /// clean flush would commit it — turning a "PV's bytes were
3179    /// lost" report into a `last_event` lie.
3180    ///
3181    /// The trade-off: a brand-new sample whose bytes truly DID
3182    /// reach disk after the loss event gets dropped from this
3183    /// commit cycle. Future samples re-populate `pending` and the
3184    /// registry catches up. Under-commit is the safe direction;
3185    /// over-commit is never acceptable.
3186    pub fn remove_failed(&self, failed: &[String]) {
3187        for pv in failed {
3188            let _ = self.inner.remove(pv);
3189        }
3190    }
3191
3192    pub fn is_empty(&self) -> bool {
3193        self.inner.is_empty()
3194    }
3195
3196    pub fn len(&self) -> usize {
3197        self.inner.len()
3198    }
3199}
3200
3201/// Same as [`write_loop`] but accepts the full [`WriteLoopConfig`].
3202///
3203/// Thin wrapper that runs a 1-shard sharded pool — kept on the
3204/// public surface for tests and any direct callers. The actual
3205/// work happens in [`run_sharded_write_pool`].
3206pub async fn write_loop_with_config(
3207    storage: Arc<dyn StoragePlugin>,
3208    registry: Arc<PvRegistry>,
3209    rx: mpsc::Receiver<PvSample>,
3210    shutdown: tokio::sync::watch::Receiver<bool>,
3211    cfg: WriteLoopConfig,
3212) {
3213    let pool_cfg = ShardedWritePoolConfig {
3214        shards: 1,
3215        // unused on the 1-shard fast path (rx is forwarded directly)
3216        per_shard_buffer: 4096,
3217        write_loop: cfg,
3218    };
3219    run_sharded_write_pool(storage, registry, rx, shutdown, pool_cfg).await;
3220}
3221
3222/// Per-shard append worker. Drains samples from `sample_rx`,
3223/// drops out-of-order or type-changed samples, then runs each
3224/// `append_event_with_meta` on the blocking pool with a bounded
3225/// timeout. On every success — including the late-success path
3226/// where write_loop's outer timeout already abandoned the
3227/// JoinHandle — coalesces `(pv, ts)` into the shared
3228/// [`PendingReports`] map so the global flush owner sees it on
3229/// the next flush cycle.
3230///
3231/// The shard worker holds NO ts_updates / flush state of its own.
3232/// It tracks `last_ts` and `last_dbr_type` only for the
3233/// per-PV ordering / type-change drop checks; those are local
3234/// to the shard because the dispatcher's consistent hash pins each
3235/// PV to one shard.
3236async fn shard_append_loop(
3237    shard_idx: usize,
3238    storage: Arc<dyn StoragePlugin>,
3239    mut sample_rx: mpsc::Receiver<PvSample>,
3240    pending: Arc<PendingReports>,
3241    mut shutdown: tokio::sync::watch::Receiver<bool>,
3242    append_timeout: Duration,
3243    drain_per_sample_timeout: Duration,
3244    drain_total_budget: Duration,
3245) {
3246    info!(shard = shard_idx, "Shard append loop started");
3247    // Per-PV state for the in-shard sanity drops. The dispatcher's
3248    // consistent hash keeps each PV in one shard, so these maps
3249    // never see cross-shard PVs.
3250    let mut last_ts: std::collections::HashMap<String, SystemTime> =
3251        std::collections::HashMap::new();
3252    let mut last_dbr_type: std::collections::HashMap<String, ArchDbType> =
3253        std::collections::HashMap::new();
3254
3255    loop {
3256        tokio::select! {
3257            Some(pv_sample) = sample_rx.recv() => {
3258                shard_handle_sample(
3259                    shard_idx,
3260                    &storage,
3261                    &pending,
3262                    pv_sample,
3263                    &mut last_ts,
3264                    &mut last_dbr_type,
3265                    append_timeout,
3266                )
3267                .await;
3268            }
3269            _ = shutdown.changed() => {
3270                shard_drain_on_shutdown(
3271                    shard_idx,
3272                    &storage,
3273                    &pending,
3274                    &mut sample_rx,
3275                    &mut last_ts,
3276                    &mut last_dbr_type,
3277                    drain_per_sample_timeout,
3278                    drain_total_budget,
3279                )
3280                .await;
3281                break;
3282            }
3283        }
3284    }
3285    info!(shard = shard_idx, "Shard append loop exited");
3286}
3287
3288/// Process one sample on the shard's hot path. Extracted so the
3289/// steady-state and shutdown-drain branches can share the same
3290/// "drop checks → spawn_blocking append → report on success"
3291/// recipe.
3292async fn shard_handle_sample(
3293    shard_idx: usize,
3294    storage: &Arc<dyn StoragePlugin>,
3295    pending: &Arc<PendingReports>,
3296    pv_sample: PvSample,
3297    last_ts: &mut std::collections::HashMap<String, SystemTime>,
3298    last_dbr_type: &mut std::collections::HashMap<String, ArchDbType>,
3299    append_timeout: Duration,
3300) {
3301    let ts = pv_sample.sample.timestamp;
3302
3303    // Out-of-order timestamp drop. Storage requires monotonic
3304    // appends per PV; an older timestamp would produce a corrupt
3305    // partition. Tracked via shard-local `last_ts` so the check
3306    // survives flush cycles (the legacy ts_updates-based check
3307    // only caught within-cycle reorderings).
3308    if let Some(prev_ts) = last_ts.get(&pv_sample.pv_name)
3309        && ts < *prev_ts
3310    {
3311        if let Some(ref c) = pv_sample.counters {
3312            c.timestamp_drops.fetch_add(1, Ordering::Relaxed);
3313        }
3314        debug!(
3315            shard = shard_idx,
3316            pv = pv_sample.pv_name,
3317            ?ts,
3318            ?prev_ts,
3319            "Dropping out-of-order sample"
3320        );
3321        return;
3322    }
3323
3324    // Type-change drop. The first sample defines the PV's wire
3325    // type; later samples with a different DBR get dropped
3326    // (operator must changeTypeForPV first).
3327    let prev_type = last_dbr_type.insert(pv_sample.pv_name.clone(), pv_sample.dbr_type);
3328    if let Some(prev) = prev_type
3329        && prev != pv_sample.dbr_type
3330    {
3331        if let Some(ref c) = pv_sample.counters {
3332            c.type_change_drops.fetch_add(1, Ordering::Relaxed);
3333            // Java parity (9f2234f): record the latest observed
3334            // DBR so the dropped-events report can show what the
3335            // IOC is now sending vs the archived type.
3336            c.latest_observed_dbr
3337                .store(pv_sample.dbr_type as i32, Ordering::Relaxed);
3338        }
3339        debug!(
3340            shard = shard_idx,
3341            pv = pv_sample.pv_name,
3342            ?prev,
3343            new = ?pv_sample.dbr_type,
3344            "Dropping type-changed sample"
3345        );
3346        // Restore prev_type so a single mismatched sample doesn't
3347        // permanently flip our recorded type.
3348        last_dbr_type.insert(pv_sample.pv_name.clone(), prev);
3349        return;
3350    }
3351
3352    let meta = AppendMeta {
3353        element_count: pv_sample.element_count,
3354        ..Default::default()
3355    };
3356    let pv_name_for_post = pv_sample.pv_name.clone();
3357    let counters_for_post = pv_sample.counters.clone();
3358    let counters_in_task = pv_sample.counters.clone();
3359    let storage_for_task = storage.clone();
3360    let pending_for_task = pending.clone();
3361
3362    // Bound each append + isolate sync syscall hangs.
3363    // `spawn_blocking` moves the actual I/O to the blocking
3364    // thread pool, so a stuck NFS write parks a blocking-pool
3365    // thread instead of a runtime worker. The
3366    // `tokio::time::timeout` then bounds how long the shard waits
3367    // for that join — a stuck task is abandoned and the shard
3368    // continues.
3369    //
3370    // Late-success path: a timed-out task may still complete its
3371    // write later. The closure ALWAYS reports into the shared
3372    // `PendingReports` map on Ok (whether or not the shard
3373    // already moved on), so the global flush owner picks up late
3374    // successes too. Per-PV serialization inside PlainPB keeps
3375    // any late completion ordered behind subsequent same-PV
3376    // samples. The map's coalescing semantics ensure a stale late
3377    // success does NOT clobber a newer hot-path commit.
3378    let join = tokio::task::spawn_blocking(move || {
3379        let rt = tokio::runtime::Handle::current();
3380        let res = rt.block_on(storage_for_task.append_event_with_meta(
3381            &pv_sample.pv_name,
3382            pv_sample.dbr_type,
3383            &pv_sample.sample,
3384            &meta,
3385        ));
3386        if res.is_ok() {
3387            metrics::counter!("archiver_events_stored_total").increment(1);
3388            if let Some(c) = counters_in_task.as_ref() {
3389                c.events_stored.fetch_add(1, Ordering::Relaxed);
3390            }
3391            // Coalesce into the shared map. Always succeeds —
3392            // unlike the previous mpsc try_send, a saturated
3393            // flush owner cannot cause this report to drop, so
3394            // a PV that goes silent right after a successful
3395            // append still gets its `last_event` committed once
3396            // the owner gets to its next flush cycle.
3397            pending_for_task.report(&pv_sample.pv_name, ts);
3398        }
3399        res
3400    });
3401    let res = tokio::time::timeout(append_timeout, join).await;
3402    // Conservative ordering high-water (principle 5). Bump
3403    // `last_ts` on EVERY storage-layer return path — success,
3404    // error, panic, timeout — not just on Ok and timeout.
3405    //
3406    // Why all four:
3407    //   * Ok       — sample on disk; obvious bump.
3408    //   * Timeout  — sample MAY land on disk via late success;
3409    //                bump to keep on-disk order monotonic.
3410    //   * Error    — current storage contract is binary, but a
3411    //                future partial-success-with-error variant
3412    //                could leave bytes on disk. Bumping now
3413    //                hedges against that.
3414    //   * Panic    — closure aborted mid-write; storage state
3415    //                is undefined. Bumping is the safe choice.
3416    //
3417    // The pre-check earlier in this function already drops
3418    // out-of-order and type-changed samples BEFORE we get here,
3419    // so this bump only ever sets the high-water to a ts the
3420    // shard explicitly accepted into the storage layer.
3421    last_ts.insert(pv_name_for_post.clone(), ts);
3422    match res {
3423        Ok(Ok(Ok(()))) => {
3424            // Report already went out from inside the closure.
3425        }
3426        Ok(Ok(Err(e))) => {
3427            error!(shard = shard_idx, pv = pv_name_for_post, "Write error: {e}");
3428        }
3429        Ok(Err(join_err)) => {
3430            error!(
3431                shard = shard_idx,
3432                pv = pv_name_for_post,
3433                "Storage write task panicked: {join_err}"
3434            );
3435        }
3436        Err(_) => {
3437            // Sample MAY still land on disk later (per-PV
3438            // serialization keeps order). The closure will report
3439            // into the shared pending map whenever it eventually
3440            // completes, so registry's `last_event` catches up
3441            // via the late path even though we move on now.
3442            error!(
3443                shard = shard_idx,
3444                pv = pv_name_for_post,
3445                "Storage append timed out after {append_timeout:?}; \
3446                 shard abandoning (task remains on blocking pool, \
3447                 may still succeed late)"
3448            );
3449            if let Some(ref c) = counters_for_post {
3450                c.storage_append_timeouts.fetch_add(1, Ordering::Relaxed);
3451            }
3452        }
3453    }
3454}
3455
3456/// Drain the shard's remaining buffered samples on shutdown,
3457/// applying the same bounded-timeout discipline as the hot path.
3458async fn shard_drain_on_shutdown(
3459    shard_idx: usize,
3460    storage: &Arc<dyn StoragePlugin>,
3461    pending: &Arc<PendingReports>,
3462    sample_rx: &mut mpsc::Receiver<PvSample>,
3463    last_ts: &mut std::collections::HashMap<String, SystemTime>,
3464    last_dbr_type: &mut std::collections::HashMap<String, ArchDbType>,
3465    drain_per_sample_timeout: Duration,
3466    drain_total_budget: Duration,
3467) {
3468    let drain_start = std::time::Instant::now();
3469    let mut drained_skipped = 0usize;
3470    while let Ok(pv_sample) = sample_rx.try_recv() {
3471        if drain_start.elapsed() > drain_total_budget {
3472            drained_skipped += 1;
3473            continue;
3474        }
3475        // Reuse the hot-path handler with a tighter timeout.
3476        // Late-success reports still flow into `pending` because
3477        // the closure clones the Arc independently of this
3478        // shard's lifecycle.
3479        shard_handle_sample(
3480            shard_idx,
3481            storage,
3482            pending,
3483            pv_sample,
3484            last_ts,
3485            last_dbr_type,
3486            drain_per_sample_timeout,
3487        )
3488        .await;
3489    }
3490    if drained_skipped > 0 {
3491        warn!(
3492            shard = shard_idx,
3493            "Shutdown drain budget ({drain_total_budget:?}) exhausted; \
3494             {drained_skipped} buffered sample(s) abandoned without \
3495             attempting append"
3496        );
3497    }
3498}
3499
3500/// Single global flush owner. Reads from the shared
3501/// [`PendingReports`] map (which all shards coalesce into),
3502/// drives the periodic `flush_ingest_writes`, and commits to the
3503/// registry. Because ALL shards report into this one map and the
3504/// owner is the only consumer, the flush result and the
3505/// pending-commit data live in the same task — no shard 0 vs
3506/// shard 1 attribution skew.
3507async fn flush_owner_loop(
3508    storage: Arc<dyn StoragePlugin>,
3509    registry: Arc<PvRegistry>,
3510    pending: Arc<PendingReports>,
3511    mut shutdown: tokio::sync::watch::Receiver<bool>,
3512    flush_period: Duration,
3513    flush_timeout: Duration,
3514    shutdown_flush_timeout: Duration,
3515    drain_total_budget: Duration,
3516) {
3517    // tokio::time::interval(Duration::ZERO) panics. Config-side
3518    // validation rejects 0 at load time, but defending in depth
3519    // here keeps tests and direct callers (which build configs by
3520    // hand) from crashing the engine on a misconfigured value.
3521    let flush_period = if flush_period.is_zero() {
3522        warn!(
3523            "flush_owner: flush_period was Duration::ZERO; \
3524             clamping to 1s to avoid tokio::time::interval panic"
3525        );
3526        Duration::from_secs(1)
3527    } else {
3528        flush_period
3529    };
3530    info!("Flush owner started");
3531
3532    let mut flush_ticker = tokio::time::interval(flush_period);
3533    flush_ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
3534    let _ = flush_ticker.tick().await;
3535
3536    let flush_in_flight = Arc::new(std::sync::atomic::AtomicBool::new(false));
3537
3538    // Phase 1: steady state. Ticker-driven flush only. Reports
3539    // accumulate in the shared `PendingReports` map continuously
3540    // (no recv loop — shards write directly), so we don't need
3541    // to multiplex report consumption against the flush.
3542    loop {
3543        tokio::select! {
3544            biased;
3545            _ = shutdown.changed() => {
3546                if *shutdown.borrow() {
3547                    break;
3548                }
3549            }
3550            _ = flush_ticker.tick() => {
3551                if !pending.is_empty() {
3552                    run_flush_and_commit(
3553                        &storage,
3554                        &registry,
3555                        &pending,
3556                        flush_timeout,
3557                        &flush_in_flight,
3558                    )
3559                    .await;
3560                }
3561            }
3562        }
3563    }
3564
3565    // Phase 2: shutdown grace. Two windows in one budget:
3566    //
3567    //   1. A brief minimum sleep so any in-flight spawn_blocking
3568    //      late-success closures can coalesce their reports into
3569    //      `pending` before the final flush snapshots it.
3570    //
3571    //   2. Wait for any still-running ticker flush to drain
3572    //      (`flush_in_flight` cleared by the spawn_blocking task's
3573    //      RAII guard). Without this, the final
3574    //      `run_flush_and_commit` would see `in_flight == true`,
3575    //      short-circuit, and skip every entry in `pending` —
3576    //      data on disk but no registry commit.
3577    //
3578    // Both windows fit inside the operator-configured
3579    // `drain_total_budget`. If a flush is genuinely wedged past
3580    // the budget, we proceed to final flush; that call will see
3581    // `in_flight` still set and short-circuit, but at least we
3582    // didn't pin the process forever. Future samples on restart
3583    // will re-populate `pending` and the registry catches up.
3584    let phase2_deadline = std::time::Instant::now() + drain_total_budget;
3585    let min_grace = std::cmp::min(drain_total_budget, Duration::from_millis(200));
3586    if !min_grace.is_zero() {
3587        tokio::time::sleep(min_grace).await;
3588    }
3589    while flush_in_flight.load(Ordering::Acquire)
3590        && std::time::Instant::now() < phase2_deadline
3591    {
3592        tokio::time::sleep(Duration::from_millis(50)).await;
3593    }
3594    if flush_in_flight.load(Ordering::Acquire) {
3595        warn!(
3596            "Shutdown grace exhausted while a flush is still in flight; \
3597             final flush will be skipped (pending entries left for next \
3598             process restart to rebuild from re-archived samples)"
3599        );
3600    }
3601
3602    // Final flush + commit. Use shutdown_flush_timeout (typically
3603    // shorter than the steady-state flush_timeout) so a wedged FS
3604    // doesn't block shutdown indefinitely.
3605    info!(
3606        pending_at_shutdown = pending.len(),
3607        "Flush owner running final flush + commit"
3608    );
3609    if !pending.is_empty() {
3610        run_flush_and_commit(
3611            &storage,
3612            &registry,
3613            &pending,
3614            shutdown_flush_timeout,
3615            &flush_in_flight,
3616        )
3617        .await;
3618    }
3619    info!("Flush owner exiting");
3620}