Skip to main content

archiver_engine/
channel_manager.rs

1use std::sync::atomic::{AtomicI32, AtomicI64, AtomicU64, Ordering};
2use std::sync::{Arc, Mutex};
3use std::time::{Duration, SystemTime};
4
5use dashmap::DashMap;
6use epics_rs::base::server::snapshot::DbrClass;
7use epics_rs::base::types::{DbFieldType, EpicsValue};
8use epics_rs::ca::client::{CaChannel, CaClient, ConnectionEvent};
9use epics_rs::pva::client_native::PvaClient;
10use epics_rs::pva::pvdata::{PvField, ScalarValue};
11use tokio::sync::mpsc;
12use tokio_util::sync::CancellationToken;
13use tracing::{debug, error, info, warn};
14
15use archiver_core::registry::{Protocol, PvRecord, PvRegistry, PvStatus, SampleMode};
16use archiver_core::storage::traits::{AppendMeta, IngestFlushResult, StoragePlugin};
17use archiver_core::types::{ArchDbType, ArchiverSample, ArchiverValue};
18
19use crate::policy::PolicyConfig;
20
21/// Timeout for initial CA channel connection.
22const CA_CONNECT_TIMEOUT: Duration = Duration::from_secs(10);
23/// Timeout for CA reconnection attempts in the monitor loop.
24const CA_RECONNECT_TIMEOUT: Duration = Duration::from_secs(30);
25/// Delay before retrying a failed CA subscription.
26const CA_RETRY_DELAY: Duration = Duration::from_secs(5);
27
28/// Hard floor on accepted timestamps. Mirrors Java's `PAST_CUTOFF_TIMESTAMP`
29/// of 1991-01-01 — earlier than that, the timestamp is almost certainly a
30/// stale uninitialised IOC clock or a sentinel.
31const PAST_CUTOFF_UNIX_SECS: i64 = 662_688_000; // 1991-01-01 00:00:00 UTC
32
33/// Filter a freshly-received sample timestamp against the wall clock and
34/// the floor. Returns `Some(ts)` if accepted, `None` if it should be
35/// dropped (caller bumps `timestamp_drops`).
36///
37/// `drift_secs` is the configured `server_ioc_drift_secs` (Java parity
38/// 6538631), so per-site tuning doesn't require recompiling. `now` is
39/// passed in (rather than calling `SystemTime::now()` here) so the test
40/// suite can pin time deterministically.
41fn ioc_timestamp_in_window(ts: SystemTime, now: SystemTime, drift_secs: u64) -> bool {
42    let unix = ts
43        .duration_since(SystemTime::UNIX_EPOCH)
44        .map(|d| d.as_secs() as i64)
45        .unwrap_or(i64::MIN);
46    if unix < PAST_CUTOFF_UNIX_SECS {
47        return false;
48    }
49    // Within ±drift_secs of `now`?
50    let now_unix = now
51        .duration_since(SystemTime::UNIX_EPOCH)
52        .map(|d| d.as_secs() as i64)
53        .unwrap_or(0);
54    let delta = (unix - now_unix).unsigned_abs();
55    delta <= drift_secs
56}
57
58/// Discrete connection state for `getPVDetails` (Java parity dea7acb).
59/// Distinguishes never-connected from connecting from confirmed-down.
60#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
61pub enum PvConnectionState {
62    /// No connection attempt has been made (or none has progressed
63    /// past `wait_connected` yet).
64    #[default]
65    Idle,
66    /// Currently waiting on `wait_connected` for the first time on
67    /// this channel handle.
68    Connecting,
69    /// Channel has reported a successful connect; samples are flowing
70    /// or the channel is otherwise live.
71    Connected,
72    /// Channel reported connect at least once but the monitor loop has
73    /// since dropped — distinct from `Idle` so operators can spot a
74    /// regression vs a never-resolved name.
75    Disconnected,
76}
77
78impl PvConnectionState {
79    pub fn as_str(self) -> &'static str {
80        match self {
81            Self::Idle => "Idle",
82            Self::Connecting => "Connecting",
83            Self::Connected => "Connected",
84            Self::Disconnected => "Disconnected",
85        }
86    }
87}
88
89/// Connection state tracked per PV.
90#[derive(Debug, Clone, Default)]
91pub struct ConnectionInfo {
92    pub connected_since: Option<SystemTime>,
93    pub last_event_time: Option<SystemTime>,
94    pub is_connected: bool,
95    /// Java parity (dea7acb): discrete connection state for the
96    /// PVDetails report. Operators can distinguish never-connected,
97    /// currently-connecting, and previously-connected-now-down.
98    pub state: PvConnectionState,
99}
100
101/// Per-PV diagnostic counters for the BPL drop / rate / connection
102/// reports. All counts are monotonic across the PV's lifetime; the
103/// rate handlers compute deltas against `first_event_unix_secs`.
104///
105/// Tracked here (not in ConnectionInfo) so they survive transient
106/// disconnects and are read lock-free from the report endpoints.
107#[derive(Debug)]
108pub struct PvCounters {
109    /// Total events produced by monitor/scan, including those that
110    /// later got dropped before write.
111    pub events_received: AtomicU64,
112    /// Total events successfully written to storage.
113    pub events_stored: AtomicU64,
114    /// Unix-epoch seconds of the first event we ever saw for this PV.
115    /// 0 = "no event yet" (i64 because Atomic<Option<...>> doesn't exist).
116    pub first_event_unix_secs: AtomicI64,
117    /// Number of events the bounded sample channel rejected (write
118    /// loop falling behind producer). Surfaces as
119    /// `getDroppedEventsBufferOverflowReport`.
120    pub buffer_overflow_drops: AtomicU64,
121    /// Number of events whose timestamp went backwards relative to the
122    /// previously-stored event. Java archiver's
123    /// `DroppedEventsTimestampReport`.
124    pub timestamp_drops: AtomicU64,
125    /// Number of events whose runtime DBR type didn't match the
126    /// PvRecord's stored type — the engine drops these because mixing
127    /// types in one PB partition would corrupt downstream readers.
128    pub type_change_drops: AtomicU64,
129    /// Number of disconnect transitions seen on this PV's CA channel.
130    /// `LostConnectionsReport`.
131    pub disconnect_count: AtomicU64,
132    /// Last unix-epoch seconds of disconnect transition.
133    pub last_disconnect_unix_secs: AtomicI64,
134    /// Number of transient subscribe / monitor-recv / scan-read errors
135    /// (Java parity 8fe73eb). Distinct from `disconnect_count` —
136    /// these are recoverable per-attempt failures rather than confirmed
137    /// link drops.
138    pub transient_error_count: AtomicU64,
139    /// Latest DBR type observed from CA that did not match the
140    /// archive-time recorded type (Java parity 9f2234f). `-1` = no
141    /// mismatch ever seen.
142    pub latest_observed_dbr: AtomicI32,
143    /// Number of DBR_CTRL metadata refresh attempts that failed (timeout,
144    /// transport error, missing display info). Operators looking at PVs
145    /// with empty PREC/EGU should check this.
146    pub metadata_fetch_failures: AtomicU64,
147    /// Number of `storage.append_event_with_meta` calls that exceeded
148    /// the per-sample storage timeout. Distinct from
149    /// `buffer_overflow_drops` (mpsc channel saturation) so an
150    /// operator can tell "the storage tier is wedged" apart from
151    /// "the writer can't keep up with the producer".
152    pub storage_append_timeouts: AtomicU64,
153}
154
155impl Default for PvCounters {
156    fn default() -> Self {
157        Self {
158            events_received: AtomicU64::new(0),
159            events_stored: AtomicU64::new(0),
160            first_event_unix_secs: AtomicI64::new(0),
161            buffer_overflow_drops: AtomicU64::new(0),
162            timestamp_drops: AtomicU64::new(0),
163            type_change_drops: AtomicU64::new(0),
164            disconnect_count: AtomicU64::new(0),
165            last_disconnect_unix_secs: AtomicI64::new(0),
166            transient_error_count: AtomicU64::new(0),
167            // -1 sentinel = no type mismatch ever observed.
168            latest_observed_dbr: AtomicI32::new(-1),
169            metadata_fetch_failures: AtomicU64::new(0),
170            storage_append_timeouts: AtomicU64::new(0),
171        }
172    }
173}
174
175/// Read-only snapshot of `PvCounters` — owned values so callers can
176/// move them across threads / serialise without reaching into Atomics.
177#[derive(Debug, Clone)]
178pub struct PvCountersSnapshot {
179    pub events_received: u64,
180    pub events_stored: u64,
181    pub first_event_unix_secs: Option<i64>,
182    pub buffer_overflow_drops: u64,
183    pub timestamp_drops: u64,
184    pub type_change_drops: u64,
185    pub disconnect_count: u64,
186    pub last_disconnect_unix_secs: Option<i64>,
187    pub transient_error_count: u64,
188    /// `Some(dbr_type as i32)` if a type-change mismatch has been
189    /// observed; `None` otherwise (Java parity 9f2234f).
190    pub latest_observed_dbr: Option<i32>,
191    pub metadata_fetch_failures: u64,
192    pub storage_append_timeouts: u64,
193}
194
195impl From<&PvCounters> for PvCountersSnapshot {
196    fn from(c: &PvCounters) -> Self {
197        let first = c.first_event_unix_secs.load(Ordering::Relaxed);
198        let last_disc = c.last_disconnect_unix_secs.load(Ordering::Relaxed);
199        Self {
200            events_received: c.events_received.load(Ordering::Relaxed),
201            events_stored: c.events_stored.load(Ordering::Relaxed),
202            first_event_unix_secs: if first == 0 { None } else { Some(first) },
203            buffer_overflow_drops: c.buffer_overflow_drops.load(Ordering::Relaxed),
204            timestamp_drops: c.timestamp_drops.load(Ordering::Relaxed),
205            type_change_drops: c.type_change_drops.load(Ordering::Relaxed),
206            disconnect_count: c.disconnect_count.load(Ordering::Relaxed),
207            last_disconnect_unix_secs: if last_disc == 0 {
208                None
209            } else {
210                Some(last_disc)
211            },
212            transient_error_count: c.transient_error_count.load(Ordering::Relaxed),
213            latest_observed_dbr: match c.latest_observed_dbr.load(Ordering::Relaxed) {
214                -1 => None,
215                v => Some(v),
216            },
217            metadata_fetch_failures: c.metadata_fetch_failures.load(Ordering::Relaxed),
218            storage_append_timeouts: c.storage_append_timeouts.load(Ordering::Relaxed),
219        }
220    }
221}
222
223/// Handle for a running PV archiving task.
224struct PvHandle {
225    /// `Some` for CA-acquired PVs (used by `try_get_value` for the
226    /// live-value RPC); `None` for PVA-acquired PVs since PVA exposes
227    /// no Clone-able channel handle — live_value for those routes
228    /// through `pva_client` directly.
229    channel: Option<CaChannel>,
230    cancel_token: CancellationToken,
231    #[allow(dead_code)]
232    dbr_type: ArchDbType,
233    conn_info: Arc<Mutex<ConnectionInfo>>,
234    /// Latest values of metadata fields (.HIHI, .LOLO, .EGU, ...) attached to
235    /// every sample emitted for this PV. Populated by per-field monitor tasks
236    /// owned by `cancel_token` (and per-field child tokens in `field_tokens`),
237    /// so stopping the PV stops all of them.
238    extras: Arc<ExtraFieldsCache>,
239    /// Per-field cancellation tokens — child tokens of `cancel_token`. Keyed
240    /// by field name (e.g. "HIHI"). Lets `update_archive_fields` cancel one
241    /// field's task without disturbing the others or the main PV.
242    field_tokens: Arc<DashMap<String, CancellationToken>>,
243    /// Serialises concurrent `update_archive_fields` calls for this PV so
244    /// add/remove/respawn never race with itself.
245    update_lock: Arc<tokio::sync::Mutex<()>>,
246    /// Diagnostic counters surfaced through the BPL drop / rate /
247    /// connection reports. Lock-free reads; updates from the producer
248    /// (monitor/scan) and the writer happen on different threads.
249    counters: Arc<PvCounters>,
250}
251
252/// Thread-safe cache of latest extra-field values for one PV.
253/// Each map entry is `(field_name, stringified_value)`.
254type ExtraFieldsCache = DashMap<String, String>;
255
256/// Default capacity for the bounded sample channel.
257/// This limits memory usage when producers outpace the storage writer.
258/// At ~200 bytes per sample, 500K entries ≈ 100 MB worst-case.
259const SAMPLE_CHANNEL_CAPACITY: usize = 500_000;
260
261/// RAII guard that removes a key from `pending_archives` on drop,
262/// ensuring cleanup even if the owning future is cancelled.
263struct PendingGuard<'a> {
264    map: &'a DashMap<String, ()>,
265    key: String,
266}
267
268impl Drop for PendingGuard<'_> {
269    fn drop(&mut self) {
270        self.map.remove(&self.key);
271    }
272}
273
274/// Manages EPICS Channel Access + pvAccess connections and dispatches
275/// archived samples to storage.
276pub struct ChannelManager {
277    /// The CA client context.
278    ca_client: CaClient,
279    /// The PVA client context (lazily used only when a PV's registry
280    /// row carries `Protocol::Pva`).
281    pva_client: PvaClient,
282    /// Active channels: PV name → handle with cancellation.
283    channels: DashMap<String, PvHandle>,
284    /// PVs currently being archived (in-progress CA connect). Prevents TOCTOU races
285    /// where two concurrent `archive_pv` calls could double-subscribe the same PV.
286    pending_archives: DashMap<String, ()>,
287    /// Per-PV mutex serialising archive/pause/resume/stop/destroy on a single
288    /// PV. Without this, e.g. `pause_pv` racing with `resume_pv` can leave
289    /// the registry status and the channel map disagreeing.
290    op_locks: DashMap<String, Arc<tokio::sync::Mutex<()>>>,
291    /// Storage backend.
292    #[allow(dead_code)]
293    storage: Arc<dyn StoragePlugin>,
294    /// PV metadata registry.
295    registry: Arc<PvRegistry>,
296    /// Sample sender for the write thread.
297    sample_tx: mpsc::Sender<PvSample>,
298    /// Optional policy configuration.
299    policy: Option<PolicyConfig>,
300    /// Per-site IOC drift bound (Java parity 6538631).
301    server_ioc_drift_secs: u64,
302}
303
304/// A sample ready to be written to storage.
305pub struct PvSample {
306    pub pv_name: String,
307    pub dbr_type: ArchDbType,
308    pub sample: ArchiverSample,
309    pub element_count: Option<i32>,
310    /// Counter handle used by the write loop to record timestamp /
311    /// type-change drops. None on samples produced before counter
312    /// support was wired up — write_loop tolerates the absence.
313    pub counters: Option<Arc<PvCounters>>,
314}
315
316impl ChannelManager {
317    pub async fn new(
318        storage: Arc<dyn StoragePlugin>,
319        registry: Arc<PvRegistry>,
320        policy: Option<PolicyConfig>,
321    ) -> anyhow::Result<(Self, mpsc::Receiver<PvSample>)> {
322        Self::new_with_drift(storage, registry, policy, 30 * 60).await
323    }
324
325    /// Construct with an explicit IOC drift bound. Java parity (6538631):
326    /// keeps tests + sites that don't surface `EngineConfig` on the
327    /// existing default while letting the daemon plumb a configured value.
328    pub async fn new_with_drift(
329        storage: Arc<dyn StoragePlugin>,
330        registry: Arc<PvRegistry>,
331        policy: Option<PolicyConfig>,
332        server_ioc_drift_secs: u64,
333    ) -> anyhow::Result<(Self, mpsc::Receiver<PvSample>)> {
334        let ca_client = CaClient::new().await.map_err(|e| anyhow::anyhow!("{e}"))?;
335        let pva_client = PvaClient::new().map_err(|e| anyhow::anyhow!("{e}"))?;
336        let (tx, rx) = mpsc::channel(SAMPLE_CHANNEL_CAPACITY);
337
338        let mgr = Self {
339            ca_client,
340            pva_client,
341            channels: DashMap::new(),
342            pending_archives: DashMap::new(),
343            op_locks: DashMap::new(),
344            storage,
345            registry,
346            sample_tx: tx,
347            policy,
348            server_ioc_drift_secs,
349        };
350
351        Ok((mgr, rx))
352    }
353
354    /// Get-or-insert the per-PV operation mutex. The returned `Arc<Mutex>`
355    /// is what callers should `.lock().await` on; holding the entry guard
356    /// (via `entry().or_insert_with`) across the await would deadlock the
357    /// DashMap shard.
358    fn op_lock(&self, pv_name: &str) -> Arc<tokio::sync::Mutex<()>> {
359        if let Some(existing) = self.op_locks.get(pv_name) {
360            return existing.clone();
361        }
362        self.op_locks
363            .entry(pv_name.to_string())
364            .or_insert_with(|| Arc::new(tokio::sync::Mutex::new(())))
365            .clone()
366    }
367
368    /// Restore all active PVs from the registry (called on startup).
369    ///
370    /// `pvs_by_status(Active)` already filters out alias rows (they carry
371    /// `status='alias'`), but we re-check `alias_for.is_some()` here so a
372    /// future schema change can't silently re-introduce double-archiving
373    /// of the underlying IOC PV.
374    pub async fn restore_from_registry(&self) -> anyhow::Result<u64> {
375        let active_pvs = self.registry.pvs_by_status(PvStatus::Active)?;
376        let total = active_pvs.len() as u64;
377        info!(total, "Restoring PVs from registry");
378
379        let mut restored = 0u64;
380        for record in active_pvs {
381            if record.alias_for.is_some() {
382                warn!(
383                    pv = record.pv_name,
384                    target = record.alias_for.as_deref(),
385                    "Skipping alias row in restore; aliases are routed, not archived"
386                );
387                continue;
388            }
389            if let Err(e) = self.start_archiving_internal(&record).await {
390                warn!(pv = record.pv_name, "Failed to restore PV: {e}");
391                self.registry.set_status(&record.pv_name, PvStatus::Error)?;
392            } else {
393                restored += 1;
394            }
395        }
396        metrics::gauge!("archiver_pvs_active").set(restored as f64);
397        if restored < total {
398            warn!(
399                restored,
400                failed = total - restored,
401                "Some PVs failed to restore"
402            );
403        }
404
405        Ok(restored)
406    }
407
408    /// Start archiving a new PV.
409    pub async fn archive_pv(
410        &self,
411        pv_name: &str,
412        sample_mode: &SampleMode,
413        protocol: Protocol,
414    ) -> anyhow::Result<()> {
415        // Serialise with pause/resume/stop/destroy on the same PV.
416        let lock = self.op_lock(pv_name);
417        let _g = lock.lock().await;
418
419        if self.channels.contains_key(pv_name) {
420            anyhow::bail!("PV {pv_name} is already being archived");
421        }
422
423        // Atomically claim the PV to prevent concurrent archive_pv races.
424        // The guard ensures cleanup even if this future is cancelled.
425        if self
426            .pending_archives
427            .insert(pv_name.to_string(), ())
428            .is_some()
429        {
430            anyhow::bail!("PV {pv_name} archive operation already in progress");
431        }
432        let _guard = PendingGuard {
433            map: &self.pending_archives,
434            key: pv_name.to_string(),
435        };
436
437        match protocol {
438            Protocol::Ca => self.archive_pv_inner(pv_name, sample_mode).await,
439            Protocol::Pva => self.archive_pv_inner_pva(pv_name, sample_mode).await,
440        }
441    }
442
443    /// Inner implementation of archive_pv, separated for cleanup safety.
444    async fn archive_pv_inner(
445        &self,
446        pv_name: &str,
447        sample_mode: &SampleMode,
448    ) -> anyhow::Result<()> {
449        // Re-check after acquiring the pending slot (another task may have completed).
450        if self.channels.contains_key(pv_name) {
451            anyhow::bail!("PV {pv_name} is already being archived");
452        }
453
454        // Check policy override.
455        let (effective_mode, matched_policy_name) = if let Some(ref policy) = self.policy {
456            if let Some(p) = policy.find_policy(pv_name) {
457                (p.to_sample_mode(), Some(p.policy_name().to_string()))
458            } else {
459                (sample_mode.clone(), None)
460            }
461        } else {
462            (sample_mode.clone(), None)
463        };
464
465        // Connect to discover the native type.
466        let channel = self.ca_client.create_channel(pv_name);
467        channel
468            .wait_connected(CA_CONNECT_TIMEOUT)
469            .await
470            .map_err(|e| anyhow::anyhow!("Failed to connect to {pv_name}: {e}"))?;
471
472        let info = self
473            .ca_client
474            .cainfo(pv_name)
475            .await
476            .map_err(|e| anyhow::anyhow!("Failed to get info for {pv_name}: {e}"))?;
477
478        let dbr_type = dbr_field_to_arch_type(info.native_type);
479        let element_count = info.element_count as i32;
480
481        // Register in SQLite. (CA path — PVA acquisition uses a separate
482        // entry point, so this branch is always Protocol::Ca.)
483        self.registry
484            .register_pv(pv_name, dbr_type, &effective_mode, element_count)?;
485        // Java parity (b30f1a6): persist the matched policy's stable name so
486        // audit / metrics paths know which policy governed this archive.
487        if let Some(ref name) = matched_policy_name {
488            self.registry.update_policy_name(pv_name, Some(name))?;
489        }
490
491        let record = self
492            .registry
493            .get_pv(pv_name)?
494            .ok_or_else(|| anyhow::anyhow!("PV {pv_name} not found in registry"))?;
495        self.start_archiving_internal(&record).await?;
496
497        metrics::gauge!("archiver_pvs_active").increment(1.0);
498        info!(pv = pv_name, ?dbr_type, element_count, "Started archiving");
499        Ok(())
500    }
501
502    /// PVA equivalent of [`Self::archive_pv_inner`]. Connects via the
503    /// pvAccess client, infers archive type from a one-shot pvget, then
504    /// hands off to a `monitor_loop_pva` that mirrors the CA monitor
505    /// loop's lifecycle (samples → write_loop, cancel_token shutdown).
506    async fn archive_pv_inner_pva(
507        &self,
508        pv_name: &str,
509        sample_mode: &SampleMode,
510    ) -> anyhow::Result<()> {
511        if self.channels.contains_key(pv_name) {
512            anyhow::bail!("PV {pv_name} is already being archived");
513        }
514
515        // Apply policy override (same surface as CA path).
516        let (effective_mode, matched_policy_name) = if let Some(ref policy) = self.policy {
517            if let Some(p) = policy.find_policy(pv_name) {
518                (p.to_sample_mode(), Some(p.policy_name().to_string()))
519            } else {
520                (sample_mode.clone(), None)
521            }
522        } else {
523            (sample_mode.clone(), None)
524        };
525
526        // Connect + introspect: a one-shot pvget tells us the value
527        // shape. We rely on it instead of a `cainfo` analogue because
528        // PVA channels carry their type only via fetched data.
529        let connect = tokio::time::timeout(CA_CONNECT_TIMEOUT, self.pva_client.pvconnect(pv_name))
530            .await
531            .map_err(|_| anyhow::anyhow!("PVA connect to {pv_name} timed out"))?
532            .map_err(|e| anyhow::anyhow!("Failed to connect to {pv_name} via PVA: {e}"))?;
533        debug!(pv = pv_name, server = %connect, "PVA channel connected");
534
535        let initial = tokio::time::timeout(CA_CONNECT_TIMEOUT, self.pva_client.pvget(pv_name))
536            .await
537            .map_err(|_| anyhow::anyhow!("PVA pvget for {pv_name} timed out"))?
538            .map_err(|e| anyhow::anyhow!("Failed to pvget {pv_name}: {e}"))?;
539        let (dbr_type, element_count) = pv_field_to_arch_db_type(&initial).ok_or_else(|| {
540            anyhow::anyhow!(
541                "PV {pv_name}: PVA value is not a scalar/scalar-array; structured types not yet supported"
542            )
543        })?;
544
545        // Register with Pva flag.
546        self.registry.register_pv_with_protocol(
547            pv_name,
548            dbr_type,
549            &effective_mode,
550            element_count,
551            Protocol::Pva,
552        )?;
553        if let Some(ref name) = matched_policy_name {
554            self.registry.update_policy_name(pv_name, Some(name))?;
555        }
556        // Persist PREC/EGU extracted from the same pvget — equivalent
557        // to the CA path's DBR_CTRL refresh. Non-fatal on error.
558        let (prec, egu) = pv_field_extract_display(&initial);
559        if (prec.is_some() || egu.is_some())
560            && let Err(e) = self
561                .registry
562                .update_metadata(pv_name, prec.as_deref(), egu.as_deref())
563        {
564            debug!(pv = pv_name, "Failed to persist PVA PREC/EGU: {e}");
565        }
566
567        let record = self
568            .registry
569            .get_pv(pv_name)?
570            .ok_or_else(|| anyhow::anyhow!("PV {pv_name} not found in registry"))?;
571        self.start_archiving_internal(&record).await?;
572
573        metrics::gauge!("archiver_pvs_active").increment(1.0);
574        info!(
575            pv = pv_name,
576            ?dbr_type,
577            element_count,
578            protocol = "pva",
579            "Started archiving"
580        );
581        Ok(())
582    }
583
584    /// Internal: start a subscription for a PV record. Routes by
585    /// `record.protocol`; the CA branch keeps the historical behaviour,
586    /// the PVA branch goes through `start_archiving_internal_pva`.
587    async fn start_archiving_internal(&self, record: &PvRecord) -> anyhow::Result<()> {
588        if record.protocol == Protocol::Pva {
589            return self.start_archiving_internal_pva(record).await;
590        }
591        let pv_name = record.pv_name.clone();
592        let dbr_type = record.dbr_type;
593        let element_count = record.element_count;
594        let channel = self.ca_client.create_channel(&pv_name);
595        let cancel_token = CancellationToken::new();
596        let conn_info = Arc::new(Mutex::new(ConnectionInfo::default()));
597        let extras: Arc<ExtraFieldsCache> = Arc::new(DashMap::new());
598        let field_tokens: Arc<DashMap<String, CancellationToken>> = Arc::new(DashMap::new());
599        let update_lock = Arc::new(tokio::sync::Mutex::new(()));
600        let counters = Arc::new(PvCounters::default());
601
602        // Hold update_lock around the whole insert+spawn block so a
603        // concurrent update_archive_fields can't observe the empty
604        // field_tokens map and spawn its own copies of the same fields
605        // before we get to the spawn loop. update_archive_fields acquires
606        // the same update_lock before mutating tokens.
607        let _guard = update_lock.lock().await;
608
609        self.channels.insert(
610            pv_name.clone(),
611            PvHandle {
612                channel: Some(channel.clone()),
613                cancel_token: cancel_token.clone(),
614                dbr_type,
615                conn_info: conn_info.clone(),
616                extras: extras.clone(),
617                field_tokens: field_tokens.clone(),
618                update_lock: update_lock.clone(),
619                counters: counters.clone(),
620            },
621        );
622
623        // Start one monitor task per archive_field with a child cancel token,
624        // tracked so update_archive_fields can stop individual fields.
625        for field in &record.archive_fields {
626            let child = cancel_token.child_token();
627            field_tokens.insert(field.clone(), child.clone());
628            spawn_extra_field_monitor(
629                &self.ca_client,
630                &pv_name,
631                field,
632                extras.clone(),
633                child,
634                counters.clone(),
635            );
636        }
637        metrics::gauge!("archiver_extra_field_tasks").increment(record.archive_fields.len() as f64);
638        drop(_guard);
639
640        let tx = self.sample_tx.clone();
641        let token = cancel_token.clone();
642        let ci = conn_info.clone();
643        let extras_for_loop = extras.clone();
644        let counters_for_loop = counters.clone();
645        let registry_for_loop = self.registry.clone();
646
647        let drift = self.server_ioc_drift_secs;
648        match &record.sample_mode {
649            SampleMode::Monitor => {
650                tokio::spawn(async move {
651                    monitor_loop(
652                        pv_name,
653                        dbr_type,
654                        element_count,
655                        channel,
656                        tx,
657                        token,
658                        ci,
659                        registry_for_loop,
660                        extras_for_loop,
661                        counters_for_loop,
662                        drift,
663                    )
664                    .await;
665                });
666            }
667            SampleMode::Scan { period_secs } => {
668                let period = *period_secs;
669                tokio::spawn(async move {
670                    scan_loop(
671                        pv_name,
672                        dbr_type,
673                        element_count,
674                        channel,
675                        tx,
676                        token,
677                        period,
678                        ci,
679                        registry_for_loop,
680                        extras_for_loop,
681                        counters_for_loop,
682                    )
683                    .await;
684                });
685            }
686        }
687
688        Ok(())
689    }
690
691    /// PVA equivalent of `start_archiving_internal`. Spawns a single
692    /// callback-driven monitor task; pvAccess auto-reconnect inside
693    /// `pvmonitor_handle` removes the explicit reconnect loop the CA
694    /// path needs. PVA records use `channel: None` on the [`PvHandle`]
695    /// and skip per-field "extras" subscriptions (`.HIHI`/`.LOLO`/…)
696    /// since pvAccess wraps those in the NTScalar value structure
697    /// rather than separate channels.
698    async fn start_archiving_internal_pva(&self, record: &PvRecord) -> anyhow::Result<()> {
699        let pv_name = record.pv_name.clone();
700        let dbr_type = record.dbr_type;
701        let element_count = record.element_count;
702        let cancel_token = CancellationToken::new();
703        let conn_info = Arc::new(Mutex::new(ConnectionInfo::default()));
704        let extras: Arc<ExtraFieldsCache> = Arc::new(DashMap::new());
705        let field_tokens: Arc<DashMap<String, CancellationToken>> = Arc::new(DashMap::new());
706        let update_lock = Arc::new(tokio::sync::Mutex::new(()));
707        let counters = Arc::new(PvCounters::default());
708
709        self.channels.insert(
710            pv_name.clone(),
711            PvHandle {
712                channel: None,
713                cancel_token: cancel_token.clone(),
714                dbr_type,
715                conn_info: conn_info.clone(),
716                extras: extras.clone(),
717                field_tokens: field_tokens.clone(),
718                update_lock,
719                counters: counters.clone(),
720            },
721        );
722
723        let tx = self.sample_tx.clone();
724        let pva_client = self.pva_client.clone();
725        let token = cancel_token.clone();
726        let ci = conn_info.clone();
727        let counters_for_loop = counters.clone();
728        let drift = self.server_ioc_drift_secs;
729
730        match &record.sample_mode {
731            SampleMode::Monitor => {
732                let pv_name_loop = pv_name.clone();
733                let archive_fields_loop = record.archive_fields.clone();
734                let extras_for_loop = extras.clone();
735                tokio::spawn(async move {
736                    monitor_loop_pva(
737                        pv_name_loop,
738                        dbr_type,
739                        element_count,
740                        pva_client,
741                        tx,
742                        token,
743                        ci,
744                        counters_for_loop,
745                        drift,
746                        archive_fields_loop,
747                        extras_for_loop,
748                    )
749                    .await;
750                });
751            }
752            SampleMode::Scan { period_secs } => {
753                let period = *period_secs;
754                let pv_name_loop = pv_name.clone();
755                let archive_fields_loop = record.archive_fields.clone();
756                let extras_for_loop = extras.clone();
757                tokio::spawn(async move {
758                    scan_loop_pva(
759                        pv_name_loop,
760                        dbr_type,
761                        pva_client,
762                        tx,
763                        token,
764                        period,
765                        ci,
766                        counters_for_loop,
767                        drift,
768                        archive_fields_loop,
769                        extras_for_loop,
770                    )
771                    .await;
772                });
773            }
774        }
775
776        // Auxiliary periodic refreshers: keep PREC/EGU current after
777        // IOC restarts, and approximate disconnect events from a long
778        // sample gap. Both bind to the same cancel_token so a stop /
779        // delete reaps every PVA-side task in one go.
780        {
781            let pv_name = pv_name.clone();
782            let pva_client = self.pva_client.clone();
783            let registry = self.registry.clone();
784            let cancel = cancel_token.clone();
785            let counters_for_refresh = counters.clone();
786            tokio::spawn(async move {
787                pva_metadata_refresh_loop(
788                    pv_name,
789                    pva_client,
790                    registry,
791                    cancel,
792                    counters_for_refresh,
793                )
794                .await;
795            });
796        }
797        {
798            let pv_name = pv_name.clone();
799            let conn_info = conn_info.clone();
800            let counters_for_watch = counters.clone();
801            let cancel = cancel_token.clone();
802            let sample_mode = record.sample_mode.clone();
803            tokio::spawn(async move {
804                pva_state_watchdog(pv_name, conn_info, counters_for_watch, cancel, sample_mode)
805                    .await;
806            });
807        }
808
809        Ok(())
810    }
811
812    /// Replace the archive_fields list for a running PV. Cancels per-field
813    /// monitor tasks for fields that left the set, spawns fresh ones for
814    /// fields that joined, and leaves unchanged fields running. Serialised
815    /// per-PV by an async mutex so concurrent callers can't double-spawn.
816    /// The main PV keeps running.
817    pub async fn update_archive_fields(
818        &self,
819        pv_name: &str,
820        fields: &[String],
821    ) -> anyhow::Result<()> {
822        // Persist first so a restart sees the new set even if the engine
823        // half of the update fails partway through.
824        self.registry.update_archive_fields(pv_name, fields)?;
825
826        // If the PV isn't currently active there's nothing more to do —
827        // start_archiving_internal will pick up the new fields on resume.
828        let (parent_token, extras, field_tokens, update_lock, counters) = {
829            let Some(handle) = self.channels.get(pv_name) else {
830                return Ok(());
831            };
832            (
833                handle.cancel_token.clone(),
834                handle.extras.clone(),
835                handle.field_tokens.clone(),
836                handle.update_lock.clone(),
837                handle.counters.clone(),
838            )
839        };
840
841        // Serialise so two concurrent updates can't both decide the same
842        // field is missing and spawn it twice.
843        let _guard = update_lock.lock().await;
844
845        let wanted: std::collections::HashSet<&str> = fields.iter().map(|s| s.as_str()).collect();
846
847        // Cancel + drop tasks for fields that left the set. Removing the
848        // entry from `field_tokens` also drops our handle on the child
849        // token; the spawned task observes `cancelled()` and exits.
850        let to_remove: Vec<String> = field_tokens
851            .iter()
852            .filter(|e| !wanted.contains(e.key().as_str()))
853            .map(|e| e.key().clone())
854            .collect();
855        let removed_count = to_remove.len();
856        for key in to_remove {
857            if let Some((_, token)) = field_tokens.remove(&key) {
858                token.cancel();
859            }
860            extras.remove(&key);
861        }
862
863        // Spawn tasks for fields newly added. Existing fields keep their
864        // task and their last cached value.
865        let mut added_count = 0usize;
866        for f in fields {
867            if !field_tokens.contains_key(f) {
868                let child = parent_token.child_token();
869                field_tokens.insert(f.clone(), child.clone());
870                spawn_extra_field_monitor(
871                    &self.ca_client,
872                    pv_name,
873                    f,
874                    extras.clone(),
875                    child,
876                    counters.clone(),
877                );
878                added_count += 1;
879            }
880        }
881        let net = added_count as i64 - removed_count as i64;
882        if net != 0 {
883            metrics::gauge!("archiver_extra_field_tasks").increment(net as f64);
884        }
885        Ok(())
886    }
887
888    /// Pause archiving for a PV.
889    pub async fn pause_pv(&self, pv_name: &str) -> anyhow::Result<()> {
890        let lock = self.op_lock(pv_name);
891        let _g = lock.lock().await;
892        if let Some((_key, handle)) = self.channels.remove(pv_name) {
893            let extra_count = handle.field_tokens.len() as f64;
894            handle.cancel_token.cancel();
895            metrics::gauge!("archiver_pvs_active").decrement(1.0);
896            if extra_count > 0.0 {
897                metrics::gauge!("archiver_extra_field_tasks").decrement(extra_count);
898            }
899        }
900        self.registry.set_status(pv_name, PvStatus::Paused)?;
901        info!(pv = pv_name, "Paused archiving");
902        Ok(())
903    }
904
905    /// Resume a paused PV. Only paused or error PVs can be resumed;
906    /// calling resume on an already-active PV is a no-op (returns Ok).
907    pub async fn resume_pv(&self, pv_name: &str) -> anyhow::Result<()> {
908        let lock = self.op_lock(pv_name);
909        let _g = lock.lock().await;
910
911        let record = self
912            .registry
913            .get_pv(pv_name)?
914            .ok_or_else(|| anyhow::anyhow!("PV {pv_name} not found in registry"))?;
915
916        // Guard: if already active with a live task, nothing to do.
917        if record.status == PvStatus::Active && self.channels.contains_key(pv_name) {
918            info!(
919                pv = pv_name,
920                "PV is already actively archived, skipping resume"
921            );
922            return Ok(());
923        }
924
925        // Cancel any orphaned task to prevent duplicate subscriptions.
926        if let Some((_key, handle)) = self.channels.remove(pv_name) {
927            let extra_count = handle.field_tokens.len() as f64;
928            handle.cancel_token.cancel();
929            if extra_count > 0.0 {
930                metrics::gauge!("archiver_extra_field_tasks").decrement(extra_count);
931            }
932        }
933
934        // Start the task first; only mark Active if it succeeds.
935        self.start_archiving_internal(&record).await?;
936        self.registry.set_status(pv_name, PvStatus::Active)?;
937        metrics::gauge!("archiver_pvs_active").increment(1.0);
938        info!(pv = pv_name, "Resumed archiving");
939        Ok(())
940    }
941
942    /// Stop archiving a PV without removing it from the registry.
943    /// Sets the PV status to Inactive (data retained, monitoring stopped).
944    pub async fn stop_pv(&self, pv_name: &str) -> anyhow::Result<()> {
945        let lock = self.op_lock(pv_name);
946        let _g = lock.lock().await;
947        if let Some((_key, handle)) = self.channels.remove(pv_name) {
948            let extra_count = handle.field_tokens.len() as f64;
949            handle.cancel_token.cancel();
950            metrics::gauge!("archiver_pvs_active").decrement(1.0);
951            if extra_count > 0.0 {
952                metrics::gauge!("archiver_extra_field_tasks").decrement(extra_count);
953            }
954        }
955        self.registry.set_status(pv_name, PvStatus::Inactive)?;
956        info!(pv = pv_name, "Stopped archiving (inactive)");
957        Ok(())
958    }
959
960    /// Remove a PV from archiving entirely.
961    pub async fn destroy_pv(&self, pv_name: &str) -> anyhow::Result<()> {
962        let lock = self.op_lock(pv_name);
963        let _g = lock.lock().await;
964        if let Some((_key, handle)) = self.channels.remove(pv_name) {
965            let extra_count = handle.field_tokens.len() as f64;
966            handle.cancel_token.cancel();
967            metrics::gauge!("archiver_pvs_active").decrement(1.0);
968            if extra_count > 0.0 {
969                metrics::gauge!("archiver_extra_field_tasks").decrement(extra_count);
970            }
971        }
972        self.registry.remove_pv(pv_name)?;
973        // Don't remove the op_locks entry here: a concurrent caller may have
974        // already taken a clone of this Arc and be queued on it; removing
975        // would let a fresh caller obtain a new mutex and the queued one
976        // would race them. The map is bounded by the lifetime universe of
977        // PV names, which is acceptable.
978        info!(pv = pv_name, "Destroyed archiving channel");
979        Ok(())
980    }
981
982    /// List all currently archived PV names (from registry).
983    pub fn list_pvs(&self) -> Vec<String> {
984        self.registry.all_pv_names().unwrap_or_else(|e| {
985            warn!("Failed to list PVs: {e}");
986            Vec::new()
987        })
988    }
989
990    /// Match PVs by glob pattern (from registry).
991    pub fn matching_pvs(&self, pattern: &str) -> Vec<String> {
992        self.registry.matching_pvs(pattern).unwrap_or_else(|e| {
993            warn!("Failed to match PVs: {e}");
994            Vec::new()
995        })
996    }
997
998    /// Get the registry reference.
999    pub fn registry(&self) -> &Arc<PvRegistry> {
1000        &self.registry
1001    }
1002
1003    /// Get connection info for a PV.
1004    pub fn get_connection_info(&self, pv: &str) -> Option<ConnectionInfo> {
1005        self.channels.get(pv).map(|h| {
1006            h.conn_info
1007                .lock()
1008                .unwrap_or_else(|e| e.into_inner())
1009                .clone()
1010        })
1011    }
1012
1013    /// Get PV names that have never received any event (connected_since == None).
1014    pub fn get_never_connected_pvs(&self) -> Vec<String> {
1015        self.channels
1016            .iter()
1017            .filter(|entry| {
1018                let ci = entry
1019                    .value()
1020                    .conn_info
1021                    .lock()
1022                    .unwrap_or_else(|e| e.into_inner());
1023                ci.connected_since.is_none()
1024            })
1025            .map(|entry| entry.key().clone())
1026            .collect()
1027    }
1028
1029    /// Snapshot the diagnostic counters for one PV. Returns None if the
1030    /// PV isn't actively archived. The returned Arc is the live counter
1031    /// — callers read with `Ordering::Relaxed`.
1032    pub fn pv_counters(&self, pv_name: &str) -> Option<Arc<PvCounters>> {
1033        self.channels.get(pv_name).map(|h| h.counters.clone())
1034    }
1035
1036    /// Snapshot every active PV's counters. Returns `(pv_name,
1037    /// PvCountersSnapshot)` so callers don't have to handle Arc.
1038    pub fn all_pv_counters(&self) -> Vec<(String, PvCountersSnapshot)> {
1039        self.channels
1040            .iter()
1041            .map(|e| {
1042                (
1043                    e.key().clone(),
1044                    PvCountersSnapshot::from(&*e.value().counters),
1045                )
1046            })
1047            .collect()
1048    }
1049
1050    /// One-shot CA `get` against the running channel for `pv`. Returns
1051    /// `None` if the PV isn't actively archived. The timeout caps how
1052    /// long the caller will wait for a value when the IOC is slow.
1053    /// Powers `getEngineDataAction` / `getDataAtTimeEngine`.
1054    pub async fn live_value(
1055        &self,
1056        pv_name: &str,
1057        timeout: Duration,
1058    ) -> Option<anyhow::Result<ArchiverValue>> {
1059        let channel_opt = self.channels.get(pv_name)?.channel.clone();
1060        let Some(channel) = channel_opt else {
1061            // PVA-acquired PV — live_value via pva_client.pvget.
1062            let pva = self.pva_client.clone();
1063            let name = pv_name.to_string();
1064            let res = tokio::time::timeout(timeout, pva.pvget(&name)).await;
1065            return Some(match res {
1066                Ok(Ok(field)) => pv_field_scalar_to_archiver(&field)
1067                    .ok_or_else(|| anyhow::anyhow!("PVA value not a scalar")),
1068                Ok(Err(e)) => Err(anyhow::anyhow!("PVA get failed: {e}")),
1069                Err(_) => Err(anyhow::anyhow!("PVA get timed out after {timeout:?}")),
1070            });
1071        };
1072        // Wait briefly for connection — channel.get on a disconnected
1073        // channel would otherwise return an error from deep in the CA
1074        // stack. Capped by the same timeout the caller chose.
1075        if channel.wait_connected(timeout).await.is_err() {
1076            return Some(Err(anyhow::anyhow!(
1077                "channel not connected within {timeout:?}"
1078            )));
1079        }
1080        match tokio::time::timeout(timeout, channel.get()).await {
1081            Ok(Ok((_dbr_type, val))) => Some(Ok(epics_value_to_archiver(&val))),
1082            Ok(Err(e)) => Some(Err(anyhow::anyhow!("CA get failed: {e}"))),
1083            Err(_) => Some(Err(anyhow::anyhow!("CA get timed out after {timeout:?}"))),
1084        }
1085    }
1086
1087    /// Snapshot the latest cached extra-field values for `pv` —
1088    /// `(field_name, stringified_value)` pairs. Empty map when the PV
1089    /// isn't archived or has no archive_fields configured.
1090    pub fn extras_snapshot(&self, pv_name: &str) -> std::collections::HashMap<String, String> {
1091        match self.channels.get(pv_name) {
1092            Some(handle) => handle
1093                .extras
1094                .iter()
1095                .map(|e| (e.key().clone(), e.value().clone()))
1096                .collect(),
1097            None => std::collections::HashMap::new(),
1098        }
1099    }
1100
1101    /// Get PV names that are currently disconnected (is_connected == false).
1102    pub fn get_currently_disconnected_pvs(&self) -> Vec<String> {
1103        self.channels
1104            .iter()
1105            .filter(|entry| {
1106                let ci = entry
1107                    .value()
1108                    .conn_info
1109                    .lock()
1110                    .unwrap_or_else(|e| e.into_inner());
1111                !ci.is_connected
1112            })
1113            .map(|entry| entry.key().clone())
1114            .collect()
1115    }
1116}
1117
1118/// Read DBR_CTRL metadata from the channel and persist `precision`/`units`
1119/// to the registry when they differ from stored values. Best-effort: any
1120/// failure (timeout, transport error, missing display info, non-numeric
1121/// type) is logged at debug and silently ignored. Skips the SQL write
1122/// entirely when the in-memory record already matches, so reconnect
1123/// storms don't churn the database.
1124async fn refresh_ctrl_metadata(
1125    channel: &CaChannel,
1126    registry: &PvRegistry,
1127    pv_name: &str,
1128    counters: &PvCounters,
1129) {
1130    // 15s — long enough for slow-network IOCs (Java's default `epicsTimeout`
1131    // is 30s; 15s splits the difference). Failures count toward
1132    // `metadata_fetch_failures` so operators can spot PVs that never get
1133    // PREC/EGU populated.
1134    const FETCH_TIMEOUT: Duration = Duration::from_secs(15);
1135    let snapshot = match tokio::time::timeout(
1136        FETCH_TIMEOUT,
1137        channel.get_with_metadata(DbrClass::Ctrl),
1138    )
1139    .await
1140    {
1141        Ok(Ok(s)) => s,
1142        Ok(Err(e)) => {
1143            counters
1144                .metadata_fetch_failures
1145                .fetch_add(1, Ordering::Relaxed);
1146            debug!(pv = pv_name, "Ctrl metadata fetch failed: {e}");
1147            return;
1148        }
1149        Err(_) => {
1150            counters
1151                .metadata_fetch_failures
1152                .fetch_add(1, Ordering::Relaxed);
1153            debug!(pv = pv_name, "Ctrl metadata fetch timed out");
1154            return;
1155        }
1156    };
1157    let Some(display) = snapshot.display else {
1158        // Non-numeric type (string, enum, …) — DBR_CTRL has no
1159        // DisplayInfo. Not a failure; just nothing to persist.
1160        return;
1161    };
1162
1163    // Negative precision is the Java EAA "no precision info" sentinel —
1164    // persisting "-1" as the PREC string would surface as a literal -1
1165    // in the UI / API. Treat it the same as missing.
1166    let new_prec_opt: Option<String> = if display.precision < 0 {
1167        None
1168    } else {
1169        Some(display.precision.to_string())
1170    };
1171    let new_egu_trimmed = display.units.trim();
1172    let new_egu_opt: Option<&str> = if new_egu_trimmed.is_empty() {
1173        None
1174    } else {
1175        Some(new_egu_trimmed)
1176    };
1177
1178    let stored = match registry.get_pv(pv_name) {
1179        Ok(Some(r)) => r,
1180        Ok(None) => return,
1181        Err(e) => {
1182            debug!(
1183                pv = pv_name,
1184                "Registry read for metadata compare failed: {e}"
1185            );
1186            return;
1187        }
1188    };
1189
1190    let prec_changed = match (stored.prec.as_deref(), new_prec_opt.as_deref()) {
1191        (Some(s), Some(n)) => s != n,
1192        (None, Some(_)) => true,
1193        // Don't overwrite a populated PREC with the "no info" sentinel.
1194        _ => false,
1195    };
1196    let egu_changed = match (stored.egu.as_deref(), new_egu_opt) {
1197        (Some(s), Some(n)) => s != n,
1198        (None, Some(_)) => true,
1199        // Don't overwrite a populated EGU with empty, and don't
1200        // bother writing None over None.
1201        _ => false,
1202    };
1203    if !prec_changed && !egu_changed {
1204        return;
1205    }
1206
1207    let prec_arg = if prec_changed {
1208        new_prec_opt.as_deref()
1209    } else {
1210        None
1211    };
1212    let egu_arg = if egu_changed { new_egu_opt } else { None };
1213    if let Err(e) = registry.update_metadata(pv_name, prec_arg, egu_arg) {
1214        warn!(pv = pv_name, "Failed to persist PREC/EGU: {e}");
1215    } else {
1216        debug!(
1217            pv = pv_name,
1218            prec = ?prec_arg, egu = ?egu_arg,
1219            "Refreshed PREC/EGU from DBR_CTRL"
1220        );
1221    }
1222}
1223
1224/// Body shared by the two PVA monitor callback variants
1225/// (`pvmonitor_handle` takes `(FieldDesc, PvField)`,
1226/// `pvmonitor_with_request` takes `(PvField)`). Lifted out so we
1227/// can write the once-per-event logic in one place.
1228#[allow(clippy::too_many_arguments)]
1229fn pva_handle_event(
1230    field: &PvField,
1231    pv_name: &str,
1232    dbr_type: ArchDbType,
1233    tx: &mpsc::Sender<PvSample>,
1234    conn_info: &Mutex<ConnectionInfo>,
1235    counters: &Arc<PvCounters>,
1236    extras: &ExtraFieldsCache,
1237    extras_paths: &[(String, &'static str)],
1238    drift_secs: u64,
1239) {
1240    let now = SystemTime::now();
1241    let first_after_connect = {
1242        let mut ci = conn_info.lock().unwrap_or_else(|e| e.into_inner());
1243        let first = ci.last_event_time.is_none();
1244        if ci.connected_since.is_none() {
1245            ci.connected_since = Some(now);
1246        }
1247        ci.is_connected = true;
1248        ci.last_event_time = Some(now);
1249        ci.state = PvConnectionState::Connected;
1250        first
1251    };
1252    if first_after_connect {
1253        counters
1254            .first_event_unix_secs
1255            .compare_exchange(0, unix_secs(now), Ordering::Relaxed, Ordering::Relaxed)
1256            .ok();
1257    }
1258    counters.events_received.fetch_add(1, Ordering::Relaxed);
1259
1260    let Some(value) = pv_field_scalar_to_archiver(field) else {
1261        counters.type_change_drops.fetch_add(1, Ordering::Relaxed);
1262        debug!(pv = pv_name, "PVA event has non-scalar value; dropping");
1263        return;
1264    };
1265
1266    let ts = pv_field_extract_timestamp(field);
1267    if !first_after_connect && !ioc_timestamp_in_window(ts, now, drift_secs) {
1268        counters.timestamp_drops.fetch_add(1, Ordering::Relaxed);
1269        debug!(
1270            pv = pv_name,
1271            ?ts,
1272            "Dropping PVA sample with out-of-window timestamp"
1273        );
1274        return;
1275    }
1276    let elem_count = match pv_field_element_count(field) {
1277        0 => 1,
1278        n => n,
1279    };
1280    for (field_name, path) in extras_paths {
1281        if let Some(PvField::Scalar(s)) = pv_field_walk_path(field, path) {
1282            extras.insert(field_name.clone(), scalar_value_to_string(s));
1283        }
1284    }
1285    let mut sample = ArchiverSample::new(ts, value);
1286    attach_extras(extras, &mut sample);
1287    let pv_sample = PvSample {
1288        pv_name: pv_name.to_string(),
1289        dbr_type,
1290        sample,
1291        element_count: Some(elem_count),
1292        counters: Some(counters.clone()),
1293    };
1294    if let Err(tokio::sync::mpsc::error::TrySendError::Full(_)) = tx.try_send(pv_sample) {
1295        counters
1296            .buffer_overflow_drops
1297            .fetch_add(1, Ordering::Relaxed);
1298    }
1299}
1300
1301/// PVA monitor loop: subscribes once and parks until cancellation.
1302/// Each fan-in event is decoded inline, packaged as a [`PvSample`],
1303/// and non-blocking-pushed into the storage write_loop's channel —
1304/// blocking would stall the pvAccess reactor thread.
1305///
1306/// When `archive_fields` is non-empty, the subscription requests an
1307/// explicit pvRequest with the mapped sub-field paths so the IOC
1308/// includes them in every monitor event; the callback then mirrors
1309/// the CA path's "extras" cache by stringifying each requested field.
1310#[allow(clippy::too_many_arguments)]
1311async fn monitor_loop_pva(
1312    pv_name: String,
1313    dbr_type: ArchDbType,
1314    element_count: i32,
1315    pva_client: PvaClient,
1316    tx: mpsc::Sender<PvSample>,
1317    cancel_token: CancellationToken,
1318    conn_info: Arc<Mutex<ConnectionInfo>>,
1319    counters: Arc<PvCounters>,
1320    server_ioc_drift_secs: u64,
1321    archive_fields: Vec<String>,
1322    extras: Arc<ExtraFieldsCache>,
1323) {
1324    let drift_secs = server_ioc_drift_secs;
1325    // Static element_count is unused — each callback derives the
1326    // current array length from the live PvField, so an NTScalarArray
1327    // whose size changes between events still gets tagged correctly.
1328    let _ = element_count;
1329
1330    // Build the (CA name, PVA path) pairs once. Skip CA fields with
1331    // no PVA equivalent so a misconfigured archive_fields list
1332    // doesn't poison the pvRequest.
1333    let extras_paths: Vec<(String, &'static str)> = archive_fields
1334        .iter()
1335        .filter_map(|f| ca_archive_field_to_pva_path(f).map(|p| (f.clone(), p)))
1336        .collect();
1337    let request_expr = if extras_paths.is_empty() {
1338        None
1339    } else {
1340        let mut builder = epics_rs::pva::pv_request::PvRequestBuilder::new()
1341            .field("value")
1342            .field("alarm")
1343            .field("timeStamp");
1344        for (_, path) in &extras_paths {
1345            builder = builder.field(*path);
1346        }
1347        Some(builder.build())
1348    };
1349
1350    // Subscribe loop with retry. The custom-request path uses
1351    // `pvmonitor_with_request` (no SubscriptionHandle) inside a
1352    // tokio::select! so cancellation drops the future cleanly; the
1353    // empty-request path keeps the original SubscriptionHandle form
1354    // since that's the simpler path for the common case.
1355    loop {
1356        if let Some(ref req) = request_expr {
1357            // Custom-request path: 1-arg callback. pvmonitor_with_request
1358            // returns a future that runs to completion; race it against
1359            // the cancel signal.
1360            let pv_name_cb = pv_name.clone();
1361            let tx_cb = tx.clone();
1362            let conn_info_cb = conn_info.clone();
1363            let counters_cb = counters.clone();
1364            let extras_cb = extras.clone();
1365            let extras_paths_cb = extras_paths.clone();
1366            let cb = move |field: &PvField| {
1367                pva_handle_event(
1368                    field,
1369                    &pv_name_cb,
1370                    dbr_type,
1371                    &tx_cb,
1372                    &conn_info_cb,
1373                    &counters_cb,
1374                    &extras_cb,
1375                    &extras_paths_cb,
1376                    drift_secs,
1377                );
1378            };
1379            tokio::select! {
1380                _ = cancel_token.cancelled() => {
1381                    debug!(pv = pv_name, "PVA monitor (custom request) cancelled");
1382                    return;
1383                }
1384                res = pva_client.pvmonitor_with_request(&pv_name, req, cb) => {
1385                    match res {
1386                        Ok(()) => {
1387                            debug!(pv = pv_name, "PVA pvmonitor_with_request returned Ok; resubscribing");
1388                        }
1389                        Err(e) => {
1390                            counters
1391                                .transient_error_count
1392                                .fetch_add(1, Ordering::Relaxed);
1393                            warn!(pv = pv_name, "PVA pvmonitor_with_request failed: {e}; retrying");
1394                        }
1395                    }
1396                    tokio::select! {
1397                        _ = cancel_token.cancelled() => return,
1398                        _ = tokio::time::sleep(CA_RETRY_DELAY) => continue,
1399                    }
1400                }
1401            }
1402        } else {
1403            // Empty-request fast path: 2-arg callback. Keep the
1404            // SubscriptionHandle so the inner reactor task lives
1405            // until our explicit drop.
1406            let pv_name_cb = pv_name.clone();
1407            let tx_cb = tx.clone();
1408            let conn_info_cb = conn_info.clone();
1409            let counters_cb = counters.clone();
1410            let extras_cb = extras.clone();
1411            let extras_paths_cb = extras_paths.clone();
1412            let cb = move |_desc: &epics_rs::pva::pvdata::FieldDesc, field: &PvField| {
1413                pva_handle_event(
1414                    field,
1415                    &pv_name_cb,
1416                    dbr_type,
1417                    &tx_cb,
1418                    &conn_info_cb,
1419                    &counters_cb,
1420                    &extras_cb,
1421                    &extras_paths_cb,
1422                    drift_secs,
1423                );
1424            };
1425            let handle = match pva_client.pvmonitor_handle(&pv_name, cb).await {
1426                Ok(h) => h,
1427                Err(e) => {
1428                    counters
1429                        .transient_error_count
1430                        .fetch_add(1, Ordering::Relaxed);
1431                    warn!(pv = pv_name, "PVA pvmonitor failed: {e}; retrying");
1432                    tokio::select! {
1433                        _ = cancel_token.cancelled() => return,
1434                        _ = tokio::time::sleep(CA_RETRY_DELAY) => continue,
1435                    }
1436                }
1437            };
1438            debug!(pv = pv_name, "PVA monitor active");
1439            cancel_token.cancelled().await;
1440            debug!(pv = pv_name, "PVA monitor cancelled; dropping subscription");
1441            drop(handle);
1442            return;
1443        }
1444    }
1445}
1446
1447/// PVA Scan loop: periodic `pvget` instead of streaming monitor.
1448/// Mirrors the CA scan_loop's per-tick connection check + sample emit.
1449#[allow(clippy::too_many_arguments)]
1450async fn scan_loop_pva(
1451    pv_name: String,
1452    dbr_type: ArchDbType,
1453    pva_client: PvaClient,
1454    tx: mpsc::Sender<PvSample>,
1455    cancel_token: CancellationToken,
1456    period_secs: f64,
1457    conn_info: Arc<Mutex<ConnectionInfo>>,
1458    counters: Arc<PvCounters>,
1459    server_ioc_drift_secs: u64,
1460    archive_fields: Vec<String>,
1461    extras: Arc<ExtraFieldsCache>,
1462) {
1463    let extras_paths: Vec<(String, &'static str)> = archive_fields
1464        .iter()
1465        .filter_map(|f| ca_archive_field_to_pva_path(f).map(|p| (f.clone(), p)))
1466        .collect();
1467    let period = Duration::from_secs_f64(period_secs);
1468    let mut interval = tokio::time::interval(period);
1469    let drift_secs = server_ioc_drift_secs;
1470    // Hard timeout per pvget — at most one tick of slack so a stuck
1471    // PVA server can't accumulate pending requests.
1472    let pvget_timeout = period.max(Duration::from_secs(5));
1473
1474    loop {
1475        tokio::select! {
1476            _ = cancel_token.cancelled() => return,
1477            _ = interval.tick() => {}
1478        }
1479
1480        let res = tokio::time::timeout(pvget_timeout, pva_client.pvget(&pv_name)).await;
1481        let field = match res {
1482            Ok(Ok(f)) => f,
1483            Ok(Err(e)) => {
1484                let was_connected = {
1485                    let mut ci = conn_info.lock().unwrap_or_else(|e| e.into_inner());
1486                    let prev = ci.is_connected;
1487                    ci.is_connected = false;
1488                    ci.last_event_time = None;
1489                    ci.state = match ci.state {
1490                        PvConnectionState::Connected => PvConnectionState::Disconnected,
1491                        PvConnectionState::Disconnected => PvConnectionState::Disconnected,
1492                        _ => PvConnectionState::Connecting,
1493                    };
1494                    prev
1495                };
1496                if was_connected {
1497                    counters.disconnect_count.fetch_add(1, Ordering::Relaxed);
1498                    counters
1499                        .last_disconnect_unix_secs
1500                        .store(unix_secs(SystemTime::now()), Ordering::Relaxed);
1501                }
1502                counters
1503                    .transient_error_count
1504                    .fetch_add(1, Ordering::Relaxed);
1505                debug!(pv = pv_name, "PVA scan pvget failed: {e}");
1506                continue;
1507            }
1508            Err(_) => {
1509                counters
1510                    .transient_error_count
1511                    .fetch_add(1, Ordering::Relaxed);
1512                debug!(pv = pv_name, "PVA scan pvget timed out");
1513                continue;
1514            }
1515        };
1516
1517        let now = SystemTime::now();
1518        let first_after_connect = {
1519            let mut ci = conn_info.lock().unwrap_or_else(|e| e.into_inner());
1520            let first = ci.last_event_time.is_none();
1521            if ci.connected_since.is_none() {
1522                ci.connected_since = Some(now);
1523            }
1524            ci.is_connected = true;
1525            ci.last_event_time = Some(now);
1526            ci.state = PvConnectionState::Connected;
1527            first
1528        };
1529        counters.events_received.fetch_add(1, Ordering::Relaxed);
1530        if first_after_connect {
1531            counters
1532                .first_event_unix_secs
1533                .compare_exchange(0, unix_secs(now), Ordering::Relaxed, Ordering::Relaxed)
1534                .ok();
1535        }
1536
1537        let Some(value) = pv_field_scalar_to_archiver(&field) else {
1538            counters.type_change_drops.fetch_add(1, Ordering::Relaxed);
1539            debug!(pv = pv_name, "PVA scan returned non-scalar value; dropping");
1540            continue;
1541        };
1542        // Scan mode: timestamp the sample at receive time (no IOC
1543        // timestamp available reliably for periodic pvget, mirroring
1544        // CA scan_loop's `now` policy).
1545        let _ = drift_secs; // referenced for symmetry; not used here
1546        let elem_count = match pv_field_element_count(&field) {
1547            0 => 1,
1548            n => n,
1549        };
1550        // Refresh extras from this scan's pvget result (the pvget
1551        // returns the full NTScalar so all sub-fields are available
1552        // for free — no extra channel spawn needed).
1553        for (field_name, path) in &extras_paths {
1554            if let Some(PvField::Scalar(s)) = pv_field_walk_path(&field, path) {
1555                extras.insert(field_name.clone(), scalar_value_to_string(s));
1556            }
1557        }
1558        let mut sample = ArchiverSample::new(now, value);
1559        attach_extras(&extras, &mut sample);
1560        let pv_sample = PvSample {
1561            pv_name: pv_name.clone(),
1562            dbr_type,
1563            sample,
1564            element_count: Some(elem_count),
1565            counters: Some(counters.clone()),
1566        };
1567        if let Err(rejected) = try_send_with_overflow_count(&tx, pv_sample, &counters).await {
1568            // Channel closed (write_loop down). Cooperative shutdown.
1569            let _ = rejected;
1570            return;
1571        }
1572    }
1573}
1574
1575/// Periodic refresh task for PVA-acquired PVs: re-fetches DBR_CTRL-
1576/// equivalent metadata (`display.units`, `display.precision`) every
1577/// few minutes and persists when changed. PVA's monitor callback API
1578/// doesn't surface explicit reconnect events the way CA's
1579/// `ConnectionEvent` does, so a dedicated periodic poll is the
1580/// simplest way to keep PREC/EGU fresh after IOC restarts.
1581async fn pva_metadata_refresh_loop(
1582    pv_name: String,
1583    pva_client: PvaClient,
1584    registry: Arc<PvRegistry>,
1585    cancel_token: CancellationToken,
1586    counters: Arc<PvCounters>,
1587) {
1588    // 5 minutes — operators almost never change PREC/EGU at runtime;
1589    // shorter cadence wastes registry/network and longer leaves UI
1590    // stale across IOC restarts. Same magic number Java EAA uses
1591    // for its `metaFieldsRefresh` cron.
1592    const REFRESH_INTERVAL: Duration = Duration::from_secs(5 * 60);
1593    const FETCH_TIMEOUT: Duration = Duration::from_secs(15);
1594    let mut tick = tokio::time::interval(REFRESH_INTERVAL);
1595    // Skip the immediate first tick — initial pvget already populated
1596    // PREC/EGU at archive_pv time.
1597    tick.tick().await;
1598    loop {
1599        tokio::select! {
1600            _ = cancel_token.cancelled() => return,
1601            _ = tick.tick() => {}
1602        }
1603
1604        let res = tokio::time::timeout(FETCH_TIMEOUT, pva_client.pvget(&pv_name)).await;
1605        let field = match res {
1606            Ok(Ok(f)) => f,
1607            _ => {
1608                counters
1609                    .metadata_fetch_failures
1610                    .fetch_add(1, Ordering::Relaxed);
1611                continue;
1612            }
1613        };
1614        let (new_prec, new_egu) = pv_field_extract_display(&field);
1615        let stored = match registry.get_pv(&pv_name) {
1616            Ok(Some(r)) => r,
1617            _ => continue,
1618        };
1619        let prec_changed = match (stored.prec.as_deref(), new_prec.as_deref()) {
1620            (Some(s), Some(n)) => s != n,
1621            (None, Some(_)) => true,
1622            _ => false,
1623        };
1624        let egu_changed = match (stored.egu.as_deref(), new_egu.as_deref()) {
1625            (Some(s), Some(n)) => s != n,
1626            (None, Some(_)) => true,
1627            _ => false,
1628        };
1629        if !prec_changed && !egu_changed {
1630            continue;
1631        }
1632        let prec_arg = if prec_changed {
1633            new_prec.as_deref()
1634        } else {
1635            None
1636        };
1637        let egu_arg = if egu_changed {
1638            new_egu.as_deref()
1639        } else {
1640            None
1641        };
1642        if let Err(e) = registry.update_metadata(&pv_name, prec_arg, egu_arg) {
1643            warn!(pv = pv_name, "Failed to persist PVA PREC/EGU: {e}");
1644        } else {
1645            debug!(pv = pv_name, prec = ?prec_arg, egu = ?egu_arg, "Refreshed PVA display");
1646        }
1647    }
1648}
1649
1650/// State watchdog for PVA-acquired PVs: a dedicated task that flips
1651/// `conn_info.state` to `Disconnected` when no event has arrived
1652/// within `STALE_THRESHOLD`. PVA's callback API doesn't surface
1653/// explicit channel-state changes — we approximate by treating a
1654/// long event gap as a disconnect. Trade-off: a genuinely silent PV
1655/// (alarm-only, low-rate scan) appears disconnected. Operators can
1656/// raise the threshold by env var if their PV cadence is slower.
1657async fn pva_state_watchdog(
1658    pv_name: String,
1659    conn_info: Arc<Mutex<ConnectionInfo>>,
1660    counters: Arc<PvCounters>,
1661    cancel_token: CancellationToken,
1662    sample_mode: SampleMode,
1663) {
1664    const POLL_INTERVAL: Duration = Duration::from_secs(5);
1665    // Threshold scales with the expected event cadence so a slow Scan
1666    // PV (period=300 s) doesn't flap on every tick. For Monitor mode
1667    // the floor is 60 s — most production PVs change at least that
1668    // often. Operators who archive truly silent PVs (alarm-only) will
1669    // see stale disconnected status; that's a known limitation.
1670    let stale_threshold = match sample_mode {
1671        SampleMode::Monitor => Duration::from_secs(60),
1672        SampleMode::Scan { period_secs } => Duration::from_secs_f64((period_secs * 3.0).max(60.0)),
1673    };
1674    let mut interval = tokio::time::interval(POLL_INTERVAL);
1675    interval.tick().await;
1676    loop {
1677        tokio::select! {
1678            _ = cancel_token.cancelled() => return,
1679            _ = interval.tick() => {}
1680        }
1681        let stale_now = {
1682            let ci = conn_info.lock().unwrap_or_else(|e| e.into_inner());
1683            if !ci.is_connected {
1684                continue;
1685            }
1686            match ci.last_event_time {
1687                Some(t) => SystemTime::now()
1688                    .duration_since(t)
1689                    .map(|d| d > stale_threshold)
1690                    .unwrap_or(false),
1691                None => false,
1692            }
1693        };
1694        if stale_now {
1695            let was_connected = {
1696                let mut ci = conn_info.lock().unwrap_or_else(|e| e.into_inner());
1697                let prev = ci.is_connected;
1698                ci.is_connected = false;
1699                ci.state = PvConnectionState::Disconnected;
1700                prev
1701            };
1702            if was_connected {
1703                counters.disconnect_count.fetch_add(1, Ordering::Relaxed);
1704                counters
1705                    .last_disconnect_unix_secs
1706                    .store(unix_secs(SystemTime::now()), Ordering::Relaxed);
1707                debug!(
1708                    pv = pv_name,
1709                    "PVA watchdog: marking disconnected after stale heartbeat"
1710                );
1711            }
1712        }
1713    }
1714}
1715
1716/// Monitor loop: subscribe to a channel and forward values.
1717#[allow(clippy::too_many_arguments)]
1718async fn monitor_loop(
1719    pv_name: String,
1720    dbr_type: ArchDbType,
1721    element_count: i32,
1722    channel: CaChannel,
1723    tx: mpsc::Sender<PvSample>,
1724    cancel_token: CancellationToken,
1725    conn_info: Arc<Mutex<ConnectionInfo>>,
1726    registry: Arc<PvRegistry>,
1727    extras: Arc<ExtraFieldsCache>,
1728    counters: Arc<PvCounters>,
1729    server_ioc_drift_secs: u64,
1730) {
1731    loop {
1732        // Wait for connection, respecting cancellation.
1733        tokio::select! {
1734            _ = cancel_token.cancelled() => return,
1735            result = channel.wait_connected(CA_RECONNECT_TIMEOUT) => {
1736                if result.is_err() {
1737                    let was_connected = {
1738                        let mut ci = conn_info.lock().unwrap_or_else(|e| e.into_inner());
1739                        let prev_connected = ci.is_connected;
1740                        ci.is_connected = false;
1741                        ci.last_event_time = None;
1742                        // Connecting if we've never seen a connect, otherwise
1743                        // demote from Connected to Disconnected on a real loss.
1744                        ci.state = match ci.state {
1745                            PvConnectionState::Connected => PvConnectionState::Disconnected,
1746                            PvConnectionState::Disconnected => PvConnectionState::Disconnected,
1747                            _ => PvConnectionState::Connecting,
1748                        };
1749                        prev_connected
1750                    };
1751                    // A search-failure timeout that demotes us out of
1752                    // Connected counts as a fresh disconnect — without
1753                    // this, only the post-monitor `break` path bumps the
1754                    // counters and a flapping PV silently under-reports
1755                    // its drops while subsequent `attach_cnx_lost_headers`
1756                    // calls carry a stale `cnxlostepsecs`.
1757                    if was_connected {
1758                        counters.disconnect_count.fetch_add(1, Ordering::Relaxed);
1759                        counters
1760                            .last_disconnect_unix_secs
1761                            .store(unix_secs(SystemTime::now()), Ordering::Relaxed);
1762                    }
1763                    // Subscribe BEFORE re-checking the current state. If we
1764                    // subscribed after a state check, a Connected event
1765                    // firing in between would be lost forever, leaving this
1766                    // loop hung until the next disconnect/reconnect cycle.
1767                    let mut conn_rx = channel.connection_events();
1768
1769                    // Close the remaining race: the channel may have become
1770                    // connected between the outer `wait_connected` timeout
1771                    // and our `subscribe()` above, in which case no new event
1772                    // will arrive. A short re-probe catches that case.
1773                    if channel
1774                        .wait_connected(Duration::from_millis(100))
1775                        .await
1776                        .is_err()
1777                    {
1778                        loop {
1779                            tokio::select! {
1780                                _ = cancel_token.cancelled() => return,
1781                                event = conn_rx.recv() => {
1782                                    use tokio::sync::broadcast::error::RecvError;
1783                                    match event {
1784                                        Ok(ConnectionEvent::Connected) => break,
1785                                        Ok(_) => continue,
1786                                        // Lagged: we missed some events but
1787                                        // the channel is still live. Re-probe
1788                                        // state and otherwise keep waiting.
1789                                        Err(RecvError::Lagged(_)) => {
1790                                            if channel
1791                                                .wait_connected(Duration::from_millis(100))
1792                                                .await
1793                                                .is_ok()
1794                                            {
1795                                                break;
1796                                            }
1797                                            continue;
1798                                        }
1799                                        Err(RecvError::Closed) => return,
1800                                    }
1801                                }
1802                            }
1803                        }
1804                    }
1805                }
1806            }
1807        }
1808
1809        // Refresh DBR_CTRL metadata once per (re)connect so the registry's
1810        // PREC/EGU stay in sync with the IOC. Fire-and-forget so a slow
1811        // CA stack can't gate `subscribe()` (and therefore the first
1812        // sample) on a metadata round-trip. The spawned task is bound to
1813        // `cancel_token` so it terminates promptly when the PV is
1814        // stopped/deleted (otherwise a late metadata write could land on
1815        // a deleted-or-re-registered PV row).
1816        {
1817            let channel = channel.clone();
1818            let registry = registry.clone();
1819            let counters = counters.clone();
1820            let pv_name = pv_name.clone();
1821            let cancel = cancel_token.clone();
1822            tokio::spawn(async move {
1823                tokio::select! {
1824                    _ = cancel.cancelled() => {}
1825                    _ = refresh_ctrl_metadata(&channel, &registry, &pv_name, &counters) => {}
1826                }
1827            });
1828        }
1829
1830        // Subscribe.
1831        let mut monitor = match channel.subscribe().await {
1832            Ok(m) => m,
1833            Err(e) => {
1834                counters
1835                    .transient_error_count
1836                    .fetch_add(1, Ordering::Relaxed);
1837                warn!(pv = pv_name, "Subscribe failed: {e}, retrying...");
1838                tokio::select! {
1839                    _ = cancel_token.cancelled() => return,
1840                    _ = tokio::time::sleep(CA_RETRY_DELAY) => continue,
1841                }
1842            }
1843        };
1844
1845        debug!(pv = pv_name, "Monitor subscription active");
1846
1847        // Receive values with cancellation.
1848        loop {
1849            tokio::select! {
1850                _ = cancel_token.cancelled() => return,
1851                result = monitor.recv() => {
1852                    match result {
1853                        Some(Ok(snapshot)) => {
1854                            let now = SystemTime::now();
1855                            // Java parity (11e554d0): use the IOC-reported
1856                            // timestamp, not receive-time, so latency
1857                            // doesn't smear sample times. First sample
1858                            // after connect is accepted unconditionally
1859                            // — legitimate backfill on reconnect can
1860                            // include older timestamps. Subsequent
1861                            // samples whose IOC clock is more than
1862                            // SERVER_IOC_DRIFT_SECS away from wall clock,
1863                            // or earlier than the 1991 floor, are
1864                            // dropped + counted.
1865                            let first_after_connect = {
1866                                let mut ci = conn_info.lock().unwrap_or_else(|e| e.into_inner());
1867                                let first = ci.last_event_time.is_none();
1868                                if ci.connected_since.is_none() {
1869                                    ci.connected_since = Some(now);
1870                                }
1871                                ci.is_connected = true;
1872                                ci.last_event_time = Some(now);
1873                                ci.state = PvConnectionState::Connected;
1874                                first
1875                            };
1876                            if !first_after_connect
1877                                && !ioc_timestamp_in_window(
1878                                    snapshot.timestamp,
1879                                    now,
1880                                    server_ioc_drift_secs,
1881                                )
1882                            {
1883                                counters.timestamp_drops.fetch_add(1, Ordering::Relaxed);
1884                                debug!(
1885                                    pv = pv_name,
1886                                    ?snapshot.timestamp,
1887                                    "Dropping sample with out-of-window IOC timestamp"
1888                                );
1889                                continue;
1890                            }
1891                            counters.events_received.fetch_add(1, Ordering::Relaxed);
1892                            // CAS the first-event timestamp once. 0 sentinel
1893                            // means "no event yet"; replace with now.
1894                            let now_secs = unix_secs(now);
1895                            let _ = counters.first_event_unix_secs.compare_exchange(
1896                                0,
1897                                now_secs,
1898                                Ordering::Relaxed,
1899                                Ordering::Relaxed,
1900                            );
1901                            let archiver_val = epics_value_to_archiver(&snapshot.value);
1902                            let mut sample = ArchiverSample::new(snapshot.timestamp, archiver_val);
1903                            attach_extras(&extras, &mut sample);
1904                            if first_after_connect {
1905                                let lost_secs = counters
1906                                    .last_disconnect_unix_secs
1907                                    .load(Ordering::Relaxed);
1908                                attach_cnx_lost_headers(&mut sample, lost_secs, now_secs);
1909                            }
1910                            let pv_sample = PvSample {
1911                                pv_name: pv_name.clone(),
1912                                dbr_type,
1913                                sample,
1914                                element_count: Some(element_count),
1915                                counters: Some(counters.clone()),
1916                            };
1917                            if let Err(pv_sample) = try_send_with_overflow_count(
1918                                &tx,
1919                                pv_sample,
1920                                &counters,
1921                            )
1922                            .await
1923                            {
1924                                let _ = pv_sample;
1925                                return; // Write loop shut down
1926                            }
1927                        }
1928                        Some(Err(e)) => {
1929                            counters.transient_error_count.fetch_add(1, Ordering::Relaxed);
1930                            warn!(pv = pv_name, "Monitor error: {e}");
1931                        }
1932                        None => break, // Monitor ended, reconnect
1933                    }
1934                }
1935            }
1936        }
1937
1938        // Monitor ended (disconnect) — loop back to reconnect. Reset
1939        // `last_event_time` so the first sample after reconnect is
1940        // treated as `first_after_connect` and bypasses the drift
1941        // filter; without this, an IOC that comes back with a
1942        // legitimate backfill timestamp older than the 30 min window
1943        // would be silently dropped.
1944        {
1945            let mut ci = conn_info.lock().unwrap_or_else(|e| e.into_inner());
1946            ci.is_connected = false;
1947            ci.last_event_time = None;
1948            ci.state = PvConnectionState::Disconnected;
1949        }
1950        counters.disconnect_count.fetch_add(1, Ordering::Relaxed);
1951        counters
1952            .last_disconnect_unix_secs
1953            .store(unix_secs(SystemTime::now()), Ordering::Relaxed);
1954        debug!(pv = pv_name, "Monitor ended, waiting for reconnection");
1955    }
1956}
1957
1958fn unix_secs(t: SystemTime) -> i64 {
1959    t.duration_since(SystemTime::UNIX_EPOCH)
1960        .map(|d| d.as_secs() as i64)
1961        .unwrap_or(0)
1962}
1963
1964/// Send a sample and count buffer-overflow events. Tries non-blocking
1965/// first; if the bounded channel is full we increment the counter and
1966/// fall back to a blocking send so backpressure still works.
1967async fn try_send_with_overflow_count(
1968    tx: &mpsc::Sender<PvSample>,
1969    pv_sample: PvSample,
1970    counters: &PvCounters,
1971) -> Result<(), PvSample> {
1972    match tx.try_send(pv_sample) {
1973        Ok(()) => Ok(()),
1974        Err(tokio::sync::mpsc::error::TrySendError::Full(pv_sample)) => {
1975            counters
1976                .buffer_overflow_drops
1977                .fetch_add(1, Ordering::Relaxed);
1978            // Backpressure to the producer; this awaits until the writer
1979            // drains some space. We count the saturation event but don't
1980            // actually drop the sample.
1981            tx.send(pv_sample).await.map_err(|e| e.0)
1982        }
1983        Err(tokio::sync::mpsc::error::TrySendError::Closed(pv_sample)) => Err(pv_sample),
1984    }
1985}
1986
1987/// Scan loop: periodically read a channel value.
1988#[allow(clippy::too_many_arguments)]
1989async fn scan_loop(
1990    pv_name: String,
1991    dbr_type: ArchDbType,
1992    element_count: i32,
1993    channel: CaChannel,
1994    tx: mpsc::Sender<PvSample>,
1995    cancel_token: CancellationToken,
1996    period_secs: f64,
1997    conn_info: Arc<Mutex<ConnectionInfo>>,
1998    registry: Arc<PvRegistry>,
1999    extras: Arc<ExtraFieldsCache>,
2000    counters: Arc<PvCounters>,
2001) {
2002    let period = Duration::from_secs_f64(period_secs);
2003    let mut interval = tokio::time::interval(period);
2004    // Reset on every detected disconnect so we re-fetch metadata after
2005    // each reconnect (mirrors monitor_loop's once-per-(re)connect cadence).
2006    let mut metadata_done = false;
2007
2008    loop {
2009        tokio::select! {
2010            _ = cancel_token.cancelled() => return,
2011            _ = interval.tick() => {}
2012        }
2013
2014        if channel.wait_connected(CA_RETRY_DELAY).await.is_err() {
2015            let was_connected = {
2016                let mut ci = conn_info.lock().unwrap_or_else(|e| e.into_inner());
2017                let prev = ci.is_connected;
2018                ci.is_connected = false;
2019                ci.last_event_time = None;
2020                ci.state = match ci.state {
2021                    PvConnectionState::Connected => PvConnectionState::Disconnected,
2022                    PvConnectionState::Disconnected => PvConnectionState::Disconnected,
2023                    _ => PvConnectionState::Connecting,
2024                };
2025                prev
2026            };
2027            if was_connected {
2028                counters.disconnect_count.fetch_add(1, Ordering::Relaxed);
2029                counters
2030                    .last_disconnect_unix_secs
2031                    .store(unix_secs(SystemTime::now()), Ordering::Relaxed);
2032            }
2033            metadata_done = false;
2034            continue;
2035        }
2036
2037        if !metadata_done {
2038            // Fire-and-forget; matches monitor_loop's once-per-(re)connect
2039            // semantics. The local `metadata_done` flag exists because
2040            // scan_loop's outer iteration is per-tick (vs monitor_loop's
2041            // per-reconnect), so we'd otherwise spawn a fetch every tick.
2042            // Bound to `cancel_token` so a stopped/deleted PV doesn't get
2043            // a stale metadata write from an in-flight task.
2044            let channel = channel.clone();
2045            let registry = registry.clone();
2046            let counters = counters.clone();
2047            let pv_name = pv_name.clone();
2048            let cancel = cancel_token.clone();
2049            tokio::spawn(async move {
2050                tokio::select! {
2051                    _ = cancel.cancelled() => {}
2052                    _ = refresh_ctrl_metadata(&channel, &registry, &pv_name, &counters) => {}
2053                }
2054            });
2055            metadata_done = true;
2056        }
2057
2058        match channel.get().await {
2059            Ok((_dbr_type, epics_val)) => {
2060                let now = SystemTime::now();
2061                let first_after_connect = {
2062                    let mut ci = conn_info.lock().unwrap_or_else(|e| e.into_inner());
2063                    let first = ci.last_event_time.is_none();
2064                    if ci.connected_since.is_none() {
2065                        ci.connected_since = Some(now);
2066                    }
2067                    ci.is_connected = true;
2068                    ci.last_event_time = Some(now);
2069                    ci.state = PvConnectionState::Connected;
2070                    first
2071                };
2072                counters.events_received.fetch_add(1, Ordering::Relaxed);
2073                let now_secs = unix_secs(now);
2074                let _ = counters.first_event_unix_secs.compare_exchange(
2075                    0,
2076                    now_secs,
2077                    Ordering::Relaxed,
2078                    Ordering::Relaxed,
2079                );
2080                let archiver_val = epics_value_to_archiver(&epics_val);
2081                let mut sample = ArchiverSample::new(now, archiver_val);
2082                attach_extras(&extras, &mut sample);
2083                if first_after_connect {
2084                    let lost_secs = counters.last_disconnect_unix_secs.load(Ordering::Relaxed);
2085                    attach_cnx_lost_headers(&mut sample, lost_secs, now_secs);
2086                }
2087                let pv_sample = PvSample {
2088                    pv_name: pv_name.clone(),
2089                    dbr_type,
2090                    sample,
2091                    element_count: Some(element_count),
2092                    counters: Some(counters.clone()),
2093                };
2094                if try_send_with_overflow_count(&tx, pv_sample, &counters)
2095                    .await
2096                    .is_err()
2097                {
2098                    return;
2099                }
2100            }
2101            Err(e) => {
2102                counters
2103                    .transient_error_count
2104                    .fetch_add(1, Ordering::Relaxed);
2105                debug!(pv = pv_name, "Scan read error: {e}");
2106            }
2107        }
2108    }
2109}
2110
2111/// Java parity (ed07feb): tag the first sample after (re)connect with
2112/// `cnxlostepsecs` / `cnxregainedepsecs` / `startup` so consumers can
2113/// detect archiver restarts and PV resumes. Unconditional emission —
2114/// on a clean startup `lost_secs` is 0, which is itself a valid value
2115/// for downstream gap detection.
2116fn attach_cnx_lost_headers(sample: &mut ArchiverSample, lost_secs: i64, now_secs: i64) {
2117    sample
2118        .field_values
2119        .push(("cnxlostepsecs".to_string(), lost_secs.to_string()));
2120    sample
2121        .field_values
2122        .push(("cnxregainedepsecs".to_string(), now_secs.to_string()));
2123    sample
2124        .field_values
2125        .push(("startup".to_string(), "true".to_string()));
2126}
2127
2128/// Snapshot the extras cache into `sample.field_values`. We sort for stable
2129/// PB output across runs (the protobuf field is repeated; consumers that
2130/// diff/compare files appreciate determinism).
2131fn attach_extras(extras: &ExtraFieldsCache, sample: &mut ArchiverSample) {
2132    if extras.is_empty() {
2133        return;
2134    }
2135    let mut entries: Vec<(String, String)> = extras
2136        .iter()
2137        .map(|e| (e.key().clone(), e.value().clone()))
2138        .collect();
2139    entries.sort_by(|a, b| a.0.cmp(&b.0));
2140    sample.field_values = entries;
2141}
2142
2143/// Render an EpicsValue as the string we'll persist in the metadata field
2144/// slot. Numeric scalars get `Display`-format; strings pass through; arrays
2145/// fall back to JSON-ish bracket notation. Stays in sync with what Java
2146/// archiver writes into PVTypeInfo's `archiveFields` blob (a string map).
2147fn epics_value_to_field_string(val: &EpicsValue) -> String {
2148    match val {
2149        EpicsValue::String(s) => s.clone(),
2150        EpicsValue::Short(v) => v.to_string(),
2151        EpicsValue::Float(v) => v.to_string(),
2152        EpicsValue::Enum(v) => v.to_string(),
2153        EpicsValue::Char(v) => v.to_string(),
2154        EpicsValue::Long(v) => v.to_string(),
2155        EpicsValue::Int64(v) => v.to_string(),
2156        EpicsValue::Double(v) => v.to_string(),
2157        EpicsValue::ShortArray(v) => format!("{v:?}"),
2158        EpicsValue::FloatArray(v) => format!("{v:?}"),
2159        EpicsValue::EnumArray(v) => format!("{v:?}"),
2160        EpicsValue::DoubleArray(v) => format!("{v:?}"),
2161        EpicsValue::LongArray(v) => format!("{v:?}"),
2162        EpicsValue::Int64Array(v) => format!("{v:?}"),
2163        EpicsValue::CharArray(v) => String::from_utf8_lossy(v).into_owned(),
2164        EpicsValue::StringArray(v) => format!("{v:?}"),
2165    }
2166}
2167
2168/// Spawn a long-running task that subscribes to `<pv>.<field>` and updates
2169/// `extras` with each event. Owned by `parent_token` so pause/destroy cleans
2170/// it up alongside the main PV.
2171fn spawn_extra_field_monitor(
2172    ca_client: &CaClient,
2173    pv_name: &str,
2174    field: &str,
2175    extras: Arc<ExtraFieldsCache>,
2176    parent_token: CancellationToken,
2177    counters: Arc<PvCounters>,
2178) {
2179    let full_name = format!("{pv_name}.{field}");
2180    let channel = ca_client.create_channel(&full_name);
2181    let field_owned = field.to_string();
2182    let pv_owned = pv_name.to_string();
2183
2184    // Catch-unwind boundary: a panic from the CA client (e.g. malformed
2185    // wire frame, allocation failure inside epics_rs) would propagate to
2186    // the runtime and abort sibling tasks of this worker thread. Trap it
2187    // here, log with PV+field context, and return normally so the runtime
2188    // remains healthy.
2189    let panic_pv = pv_owned.clone();
2190    let panic_field = field_owned.clone();
2191    tokio::spawn(async move {
2192        let body = std::panic::AssertUnwindSafe(extra_field_monitor_body(
2193            channel,
2194            pv_owned,
2195            field_owned,
2196            extras,
2197            parent_token,
2198            counters,
2199        ));
2200        if let Err(payload) = futures::FutureExt::catch_unwind(body).await {
2201            let msg = panic_payload_msg(&payload);
2202            error!(
2203                pv = panic_pv,
2204                field = panic_field,
2205                "Extra-field monitor panicked: {msg}"
2206            );
2207        }
2208    });
2209}
2210
2211/// Body of the spawned extra-field monitor. Split out so the spawn site
2212/// can wrap it in `catch_unwind`.
2213async fn extra_field_monitor_body(
2214    channel: CaChannel,
2215    pv_owned: String,
2216    field_owned: String,
2217    extras: Arc<ExtraFieldsCache>,
2218    parent_token: CancellationToken,
2219    counters: Arc<PvCounters>,
2220) {
2221    // Initial connect attempt — failure here is non-fatal (the field may
2222    // not exist on every IOC; we just leave the cache empty).
2223    if channel.wait_connected(CA_CONNECT_TIMEOUT).await.is_err() {
2224        debug!(
2225            pv = pv_owned,
2226            field = field_owned,
2227            "Extra-field channel did not connect within timeout (will keep retrying via subscribe)"
2228        );
2229    }
2230
2231    // Exponential backoff for misconfigured fields (e.g. operator
2232    // listed `.HIHI` on a PV that doesn't expose it). Without this
2233    // we'd retry every 5s forever, churning CA search packets and
2234    // file descriptors. The cap is 60s; one warn at the cap so
2235    // ops know to fix archive_fields.
2236    let mut backoff = CA_RETRY_DELAY;
2237    let max_backoff = Duration::from_secs(60);
2238    let mut warned_at_cap = false;
2239
2240    loop {
2241        // Cancel-aware subscribe attempt.
2242        tokio::select! {
2243            _ = parent_token.cancelled() => return,
2244            sub = channel.subscribe() => {
2245                let mut monitor = match sub {
2246                    Ok(m) => m,
2247                    Err(e) => {
2248                        // Java parity (8fe73eb): bump the transient
2249                        // counter so a misconfigured `.HIHI` field
2250                        // shows up in the rate / drop reports.
2251                        counters.transient_error_count.fetch_add(1, Ordering::Relaxed);
2252                        debug!(
2253                            pv = pv_owned,
2254                            field = field_owned,
2255                            ?backoff,
2256                            "Extra-field subscribe failed: {e}; retrying"
2257                        );
2258                        if backoff >= max_backoff && !warned_at_cap {
2259                            warn!(
2260                                pv = pv_owned,
2261                                field = field_owned,
2262                                "Extra-field repeatedly fails to subscribe; \
2263                                 check archive_fields config (now retrying every 60s)"
2264                            );
2265                            warned_at_cap = true;
2266                        }
2267                        let sleep_for = backoff;
2268                        backoff = (backoff * 2).min(max_backoff);
2269                        tokio::select! {
2270                            _ = parent_token.cancelled() => return,
2271                            _ = tokio::time::sleep(sleep_for) => continue,
2272                        }
2273                    }
2274                };
2275                // Subscribe succeeded — reset backoff so the next
2276                // failure starts at the short delay again.
2277                backoff = CA_RETRY_DELAY;
2278                warned_at_cap = false;
2279                loop {
2280                    tokio::select! {
2281                        _ = parent_token.cancelled() => return,
2282                        ev = monitor.recv() => match ev {
2283                            Some(Ok(snapshot)) => {
2284                                extras.insert(
2285                                    field_owned.clone(),
2286                                    epics_value_to_field_string(&snapshot.value),
2287                                );
2288                            }
2289                            Some(Err(e)) => {
2290                                counters.transient_error_count.fetch_add(1, Ordering::Relaxed);
2291                                debug!(
2292                                    pv = pv_owned,
2293                                    field = field_owned,
2294                                    "Extra-field monitor error: {e}"
2295                                );
2296                            }
2297                            None => break, // resubscribe
2298                        }
2299                    }
2300                }
2301            }
2302        }
2303    }
2304}
2305
2306/// Format a panic payload (Box<dyn Any>) as a printable string. Mirrors
2307/// what `std::panicking::default_hook` extracts for the message.
2308fn panic_payload_msg(payload: &Box<dyn std::any::Any + Send>) -> String {
2309    if let Some(s) = payload.downcast_ref::<&'static str>() {
2310        (*s).to_string()
2311    } else if let Some(s) = payload.downcast_ref::<String>() {
2312        s.clone()
2313    } else {
2314        "<non-string panic payload>".to_string()
2315    }
2316}
2317
2318/// Resolve the data-bearing sub-field of a PVA value. NTScalar /
2319/// NTScalarArray / NTEnum wrap the raw value in a `value` field; bare
2320/// scalars are passed through.
2321fn pv_value_field(field: &PvField) -> &PvField {
2322    match field {
2323        PvField::Structure(s) => s.get_field("value").unwrap_or(field),
2324        _ => field,
2325    }
2326}
2327
2328/// Map a PVA `ScalarValue` to an archiver `ArchiverValue`.
2329fn scalar_value_to_archiver(s: &ScalarValue) -> ArchiverValue {
2330    match s {
2331        ScalarValue::Boolean(v) => ArchiverValue::ScalarEnum(*v as i32),
2332        ScalarValue::Byte(v) => ArchiverValue::ScalarByte(vec![*v as u8]),
2333        ScalarValue::UByte(v) => ArchiverValue::ScalarByte(vec![*v]),
2334        ScalarValue::Short(v) => ArchiverValue::ScalarShort(*v as i32),
2335        ScalarValue::UShort(v) => ArchiverValue::ScalarShort(*v as i32),
2336        ScalarValue::Int(v) => ArchiverValue::ScalarInt(*v),
2337        ScalarValue::UInt(v) => ArchiverValue::ScalarInt(*v as i32),
2338        ScalarValue::Long(v) => ArchiverValue::ScalarInt(*v as i32),
2339        ScalarValue::ULong(v) => ArchiverValue::ScalarInt(*v as i32),
2340        ScalarValue::Float(v) => ArchiverValue::ScalarFloat(*v),
2341        ScalarValue::Double(v) => ArchiverValue::ScalarDouble(*v),
2342        ScalarValue::String(s) => ArchiverValue::ScalarString(s.clone()),
2343    }
2344}
2345
2346/// Extract a scalar `ArchiverValue` from a top-level PVA value (which
2347/// may be an NTScalar wrapper). Returns `None` for array/struct types
2348/// that don't reduce to a scalar.
2349fn pv_field_scalar_to_archiver(field: &PvField) -> Option<ArchiverValue> {
2350    match pv_value_field(field) {
2351        PvField::Scalar(s) => Some(scalar_value_to_archiver(s)),
2352        _ => None,
2353    }
2354}
2355
2356/// Pick the `(ArchDbType, element_count)` to record at archive_pv time
2357/// from an NTScalar / NTScalarArray's introspection-pvget result.
2358/// Returns `None` for unsupported (struct / union / variant) shapes.
2359fn pv_field_to_arch_db_type(field: &PvField) -> Option<(ArchDbType, i32)> {
2360    let value = pv_value_field(field);
2361    Some(match value {
2362        PvField::Scalar(s) => (
2363            match s {
2364                ScalarValue::Boolean(_) => ArchDbType::ScalarEnum,
2365                ScalarValue::Byte(_) | ScalarValue::UByte(_) => ArchDbType::ScalarByte,
2366                ScalarValue::Short(_) | ScalarValue::UShort(_) => ArchDbType::ScalarShort,
2367                ScalarValue::Int(_)
2368                | ScalarValue::UInt(_)
2369                | ScalarValue::Long(_)
2370                | ScalarValue::ULong(_) => ArchDbType::ScalarInt,
2371                ScalarValue::Float(_) => ArchDbType::ScalarFloat,
2372                ScalarValue::Double(_) => ArchDbType::ScalarDouble,
2373                ScalarValue::String(_) => ArchDbType::ScalarString,
2374            },
2375            1,
2376        ),
2377        // Array variants — keep the corresponding scalar element type but
2378        // bump element_count. write_loop / PB encoder doesn't currently
2379        // serialise PVA arrays, so callers should reject these for now.
2380        PvField::ScalarArray(arr) => {
2381            let elem = arr.first()?;
2382            let inner = PvField::Scalar(elem.clone());
2383            let (t, _) = pv_field_to_arch_db_type(&inner)?;
2384            (t, arr.len() as i32)
2385        }
2386        _ => return None,
2387    })
2388}
2389
2390/// Map a CA field name (`HIHI`, `LOLO`, `EGU`, …) to the dotted
2391/// pvAccess path under an NTScalar / NTScalarArray. Returns `None`
2392/// for fields that have no PVA equivalent — caller should drop them
2393/// silently rather than building an unsatisfiable pvRequest.
2394fn ca_archive_field_to_pva_path(field: &str) -> Option<&'static str> {
2395    Some(match field {
2396        // display sub-structure
2397        "EGU" => "display.units",
2398        "PREC" => "display.precision",
2399        "DESC" => "display.description",
2400        "HOPR" => "display.limitHigh",
2401        "LOPR" => "display.limitLow",
2402        // valueAlarm sub-structure (alarm thresholds)
2403        "HIHI" => "valueAlarm.highAlarmLimit",
2404        "LOLO" => "valueAlarm.lowAlarmLimit",
2405        "HIGH" => "valueAlarm.highWarningLimit",
2406        "LOW" => "valueAlarm.lowWarningLimit",
2407        "HHSV" => "valueAlarm.highAlarmSeverity",
2408        "LLSV" => "valueAlarm.lowAlarmSeverity",
2409        "HSV" => "valueAlarm.highWarningSeverity",
2410        "LSV" => "valueAlarm.lowWarningSeverity",
2411        "HYST" => "valueAlarm.hysteresis",
2412        // control sub-structure (drive limits)
2413        "DRVH" => "control.limitHigh",
2414        "DRVL" => "control.limitLow",
2415        _ => return None,
2416    })
2417}
2418
2419/// Walk a dotted PVA field path (`valueAlarm.highAlarmLimit`) from
2420/// the root [`PvField`]. Returns `None` if any segment is missing or
2421/// the path lands on a non-structure intermediate.
2422fn pv_field_walk_path<'a>(root: &'a PvField, path: &str) -> Option<&'a PvField> {
2423    let mut current = root;
2424    for segment in path.split('.') {
2425        let PvField::Structure(s) = current else {
2426            return None;
2427        };
2428        current = s.get_field(segment)?;
2429    }
2430    Some(current)
2431}
2432
2433/// Stringify a [`ScalarValue`] for storage in the per-sample extras
2434/// map. Java archiver's `archiveFields` blob is a string-string map,
2435/// so we mirror that wire shape.
2436fn scalar_value_to_string(s: &ScalarValue) -> String {
2437    match s {
2438        ScalarValue::Boolean(v) => v.to_string(),
2439        ScalarValue::Byte(v) => v.to_string(),
2440        ScalarValue::Short(v) => v.to_string(),
2441        ScalarValue::Int(v) => v.to_string(),
2442        ScalarValue::Long(v) => v.to_string(),
2443        ScalarValue::UByte(v) => v.to_string(),
2444        ScalarValue::UShort(v) => v.to_string(),
2445        ScalarValue::UInt(v) => v.to_string(),
2446        ScalarValue::ULong(v) => v.to_string(),
2447        ScalarValue::Float(v) => v.to_string(),
2448        ScalarValue::Double(v) => v.to_string(),
2449        ScalarValue::String(s) => s.clone(),
2450    }
2451}
2452
2453/// Element count for a PVA value: 1 for scalar, length for arrays,
2454/// 0 for unsupported shapes (treated as "unknown" by callers).
2455fn pv_field_element_count(field: &PvField) -> i32 {
2456    match pv_value_field(field) {
2457        PvField::Scalar(_) => 1,
2458        PvField::ScalarArray(arr) => arr.len() as i32,
2459        PvField::ScalarArrayTyped(t) => t.len() as i32,
2460        _ => 0,
2461    }
2462}
2463
2464/// Pull `(precision, units)` out of an NTScalar / NTScalarArray's
2465/// `display` sub-structure. Returns `(None, None)` for bare scalars
2466/// or structures missing `display` (some servers omit it).
2467/// Negative precision is the Java EAA "no precision info" sentinel —
2468/// treat it the same as missing so we don't surface "-1" to the UI.
2469fn pv_field_extract_display(field: &PvField) -> (Option<String>, Option<String>) {
2470    let PvField::Structure(s) = field else {
2471        return (None, None);
2472    };
2473    let Some(PvField::Structure(disp)) = s.get_field("display") else {
2474        return (None, None);
2475    };
2476    let prec = match disp.get_field("precision") {
2477        Some(PvField::Scalar(ScalarValue::Int(p))) if *p >= 0 => Some(p.to_string()),
2478        Some(PvField::Scalar(ScalarValue::Short(p))) if *p >= 0 => Some(p.to_string()),
2479        Some(PvField::Scalar(ScalarValue::Long(p))) if *p >= 0 => Some(p.to_string()),
2480        _ => None,
2481    };
2482    let egu = match disp.get_field("units") {
2483        Some(PvField::Scalar(ScalarValue::String(u))) => {
2484            let t = u.trim();
2485            if t.is_empty() {
2486                None
2487            } else {
2488                Some(t.to_string())
2489            }
2490        }
2491        _ => None,
2492    };
2493    (prec, egu)
2494}
2495
2496/// Pull the IOC-reported timestamp out of an NTScalar / NTScalarArray.
2497/// Falls back to `SystemTime::now()` when the structure lacks a usable
2498/// `timeStamp` sub-field, so a malformed server doesn't drop samples.
2499fn pv_field_extract_timestamp(field: &PvField) -> SystemTime {
2500    let PvField::Structure(s) = field else {
2501        return SystemTime::now();
2502    };
2503    let Some(PvField::Structure(ts)) = s.get_field("timeStamp") else {
2504        return SystemTime::now();
2505    };
2506    let secs = match ts.get_field("secondsPastEpoch") {
2507        Some(PvField::Scalar(ScalarValue::Long(v))) => *v as u64,
2508        Some(PvField::Scalar(ScalarValue::ULong(v))) => *v,
2509        Some(PvField::Scalar(ScalarValue::Int(v))) => *v as u64,
2510        Some(PvField::Scalar(ScalarValue::UInt(v))) => *v as u64,
2511        _ => return SystemTime::now(),
2512    };
2513    let nanos = match ts.get_field("nanoseconds") {
2514        Some(PvField::Scalar(ScalarValue::Int(v))) => *v as u32,
2515        Some(PvField::Scalar(ScalarValue::UInt(v))) => *v,
2516        _ => 0,
2517    };
2518    SystemTime::UNIX_EPOCH + Duration::new(secs, nanos)
2519}
2520
2521/// Convert epics-base-rs DbFieldType to archiver ArchDbType.
2522fn dbr_field_to_arch_type(field_type: DbFieldType) -> ArchDbType {
2523    match field_type {
2524        DbFieldType::String => ArchDbType::ScalarString,
2525        DbFieldType::Short => ArchDbType::ScalarShort,
2526        DbFieldType::Float => ArchDbType::ScalarFloat,
2527        DbFieldType::Enum => ArchDbType::ScalarEnum,
2528        DbFieldType::Char => ArchDbType::ScalarByte,
2529        DbFieldType::Long => ArchDbType::ScalarInt,
2530        // PB PayloadType has no SCALAR_LONG (i64) — values outside i32 range
2531        // are truncated by the i32 cast in epics_value_to_archiver.
2532        DbFieldType::Int64 => ArchDbType::ScalarInt,
2533        DbFieldType::Double => ArchDbType::ScalarDouble,
2534    }
2535}
2536
2537/// Convert epics-base-rs EpicsValue to archiver ArchiverValue.
2538fn epics_value_to_archiver(val: &EpicsValue) -> ArchiverValue {
2539    match val {
2540        EpicsValue::String(s) => ArchiverValue::ScalarString(s.clone()),
2541        EpicsValue::Short(v) => ArchiverValue::ScalarShort(*v as i32),
2542        EpicsValue::Float(v) => ArchiverValue::ScalarFloat(*v),
2543        EpicsValue::Enum(v) => ArchiverValue::ScalarEnum(*v as i32),
2544        EpicsValue::Char(v) => ArchiverValue::ScalarByte(vec![*v]),
2545        EpicsValue::Long(v) => ArchiverValue::ScalarInt(*v),
2546        EpicsValue::Int64(v) => ArchiverValue::ScalarInt(*v as i32),
2547        EpicsValue::Double(v) => ArchiverValue::ScalarDouble(*v),
2548        EpicsValue::ShortArray(v) => {
2549            ArchiverValue::VectorShort(v.iter().map(|x| *x as i32).collect())
2550        }
2551        EpicsValue::FloatArray(v) => ArchiverValue::VectorFloat(v.clone()),
2552        EpicsValue::EnumArray(v) => {
2553            ArchiverValue::VectorEnum(v.iter().map(|x| *x as i32).collect())
2554        }
2555        EpicsValue::DoubleArray(v) => ArchiverValue::VectorDouble(v.clone()),
2556        EpicsValue::LongArray(v) => ArchiverValue::VectorInt(v.clone()),
2557        EpicsValue::Int64Array(v) => {
2558            ArchiverValue::VectorInt(v.iter().map(|x| *x as i32).collect())
2559        }
2560        EpicsValue::CharArray(v) => ArchiverValue::VectorChar(v.clone()),
2561        EpicsValue::StringArray(v) => ArchiverValue::VectorString(v.clone()),
2562    }
2563}
2564
2565/// Tunables for [`write_loop_with_config`]. Production uses the
2566/// defaults via [`write_loop`]; tests dial the timeouts down to
2567/// sub-second values so failure-injection cases finish in a few
2568/// hundred milliseconds instead of minutes.
2569#[derive(Debug, Clone)]
2570pub struct WriteLoopConfig {
2571    /// How often to call `flush_ingest_writes` and commit the
2572    /// pending `last_event` timestamps to the registry.
2573    pub flush_period: Duration,
2574    /// Bound on how long write_loop waits for a single
2575    /// `append_event_with_meta` JoinHandle. On timeout the sample
2576    /// is logged as abandoned-but-may-succeed-late and the loop
2577    /// moves on (the spawn_blocking task remains parked on the
2578    /// blocking pool — per-PV serialization in PlainPB keeps any
2579    /// late completion ordered behind subsequent same-PV samples).
2580    pub append_timeout: Duration,
2581    /// Bound on how long write_loop waits for one
2582    /// `flush_ingest_writes` JoinHandle during the periodic flush.
2583    /// On timeout, every pending `ts_updates` entry is deferred to
2584    /// the next cycle (we don't know which PVs reached disk).
2585    pub flush_timeout: Duration,
2586    /// Bound on each individual append issued during the shutdown
2587    /// drain.
2588    pub drain_per_sample_timeout: Duration,
2589    /// Total wall-clock budget for the shutdown drain. Once
2590    /// exceeded, remaining buffered samples are abandoned without
2591    /// even attempting an append, so a wedged STS can't stretch an
2592    /// orderly stop into "kill -9".
2593    pub drain_total_budget: Duration,
2594    /// Bound on the final `flush_writes` issued at shutdown.
2595    pub shutdown_flush_timeout: Duration,
2596}
2597
2598impl Default for WriteLoopConfig {
2599    fn default() -> Self {
2600        Self {
2601            flush_period: Duration::from_secs(10),
2602            append_timeout: Duration::from_secs(30),
2603            flush_timeout: Duration::from_secs(30),
2604            drain_per_sample_timeout: Duration::from_secs(5),
2605            drain_total_budget: Duration::from_secs(30),
2606            shutdown_flush_timeout: Duration::from_secs(15),
2607        }
2608    }
2609}
2610
2611/// Tunables for [`run_sharded_write_pool`].
2612///
2613/// `shards = 1` is the legacy single-worker layout and incurs no
2614/// dispatcher overhead. `shards > 1` spawns a dispatcher that hashes
2615/// each sample's `pv_name` to a fixed shard (so any one PV's samples
2616/// always land in the same per-shard write_loop, preserving
2617/// timestamp ordering) and N parallel shard workers — different PVs
2618/// can append concurrently instead of queueing behind a single task.
2619/// Sites with many active PVs and a fast STS should set this to
2620/// `min(num_cpus, expected concurrent slow appends)`.
2621#[derive(Debug, Clone)]
2622pub struct ShardedWritePoolConfig {
2623    /// Number of parallel shard workers. Must be ≥ 1; `1` is the
2624    /// degenerate case (no dispatcher, behaves identically to a
2625    /// bare `write_loop_with_config`).
2626    pub shards: usize,
2627    /// mpsc capacity of each shard's input channel. The dispatcher
2628    /// `try_send`s into this; when full, the OFFENDING SHARD's
2629    /// drop is recorded under
2630    /// `archiver_dispatcher_shard_overflow_drops_total{shard=N}`
2631    /// and the sample's per-PV `buffer_overflow_drops` counter is
2632    /// bumped. Other shards keep flowing — that's the per-shard
2633    /// isolation guarantee.
2634    pub per_shard_buffer: usize,
2635    /// Per-worker config (timeouts, flush period). Cloned into
2636    /// each shard.
2637    pub write_loop: WriteLoopConfig,
2638}
2639
2640impl Default for ShardedWritePoolConfig {
2641    fn default() -> Self {
2642        Self {
2643            shards: 1,
2644            per_shard_buffer: 4096,
2645            write_loop: WriteLoopConfig::default(),
2646        }
2647    }
2648}
2649
2650// PV-to-shard hash lives in `archiver_core::storage::plainpb` so
2651// the engine's dispatcher and PlainPB's
2652// `flush_ingest_writes_for_shard` agree on which shard owns which
2653// PV. Two definitions would mean shard 0's flush could iterate
2654// PVs the dispatcher routed to shard 1, re-introducing the
2655// misattribution bug.
2656use archiver_core::storage::plainpb::shard_for_pv;
2657
2658/// Run an N-shard write pool: 1 dispatcher (hashes pv_name → shard)
2659/// plus N parallel `write_loop_with_config` workers. Each shard has
2660/// independent `ts_updates`, an independent flush ticker, and its
2661/// own per-PV writer slots inside the shared storage plugin (so
2662/// per-PV ordering is naturally preserved by the consistent-hash
2663/// routing — samples for one PV always go to the same shard).
2664///
2665/// `shards == 1` short-circuits the dispatcher and runs a single
2666/// `write_loop_with_config` directly so single-worker deployments
2667/// pay zero overhead.
2668pub async fn run_sharded_write_pool(
2669    storage: Arc<dyn StoragePlugin>,
2670    registry: Arc<PvRegistry>,
2671    rx: mpsc::Receiver<PvSample>,
2672    shutdown: tokio::sync::watch::Receiver<bool>,
2673    cfg: ShardedWritePoolConfig,
2674) {
2675    let n = cfg.shards.max(1);
2676
2677    // Coalescing per-PV pending-timestamp map shared between the
2678    // flush owner, every shard worker, and every shard worker's
2679    // spawn_blocking late-success closure. Replaces the previous
2680    // mpsc report channel — see [`PendingReports`] for why.
2681    let pending = Arc::new(PendingReports::new());
2682
2683    // Spawn the global flush owner first so it's ready to flush
2684    // as soon as shards start appending.
2685    let flush_owner_handle = tokio::spawn(flush_owner_loop(
2686        storage.clone(),
2687        registry.clone(),
2688        pending.clone(),
2689        shutdown.clone(),
2690        cfg.write_loop.clone(),
2691    ));
2692
2693    if n == 1 {
2694        // Fast path: single shard, no dispatcher hop. We become
2695        // the shard worker ourselves and read from `rx` directly.
2696        shard_append_loop(
2697            0,
2698            storage.clone(),
2699            rx,
2700            pending.clone(),
2701            shutdown.clone(),
2702            cfg.write_loop.clone(),
2703        )
2704        .await;
2705    } else {
2706        // Multi-shard path: dispatcher + N shard workers.
2707        let mut shard_txs = Vec::with_capacity(n);
2708        let mut shard_handles = Vec::with_capacity(n);
2709        for shard_idx in 0..n {
2710            let (s_tx, s_rx) = mpsc::channel::<PvSample>(cfg.per_shard_buffer);
2711            shard_txs.push(s_tx);
2712            let storage = storage.clone();
2713            let pending = pending.clone();
2714            let shard_shutdown = shutdown.clone();
2715            let shard_cfg = cfg.write_loop.clone();
2716            shard_handles.push(tokio::spawn(shard_append_loop(
2717                shard_idx,
2718                storage,
2719                s_rx,
2720                pending,
2721                shard_shutdown,
2722                shard_cfg,
2723            )));
2724        }
2725
2726        dispatch_loop(rx, shard_txs, shutdown.clone()).await;
2727
2728        // Dispatcher exited → shard sample receivers see EOS or
2729        // the shutdown signal directly via their watch::Receiver.
2730        // Wait for shards to finish their drain branches.
2731        for h in shard_handles {
2732            let _ = h.await;
2733        }
2734    }
2735
2736    // Flush owner uses its own shutdown-watch + drain_total_budget
2737    // grace timer, not an EOS signal — see flush_owner_loop. Wait
2738    // for it to finish.
2739    let _ = flush_owner_handle.await;
2740}
2741
2742/// Reads the upstream main mpsc, hashes each sample's `pv_name`,
2743/// and forwards to the appropriate shard channel via
2744/// `Sender::try_send` for **per-shard isolation**.
2745///
2746/// `try_send` (not `send().await`) is the load-balancing
2747/// invariant: if one slow shard's channel is full, the dispatcher
2748/// drops THAT shard's overflow into `buffer_overflow_drops` and
2749/// keeps routing for the other shards. Using `send().await` would
2750/// block the dispatcher on the slow shard, back-pressuring the
2751/// upstream main channel and starving every other shard's PVs —
2752/// the exact failure mode the sharded layout is meant to prevent.
2753async fn dispatch_loop(
2754    mut rx: mpsc::Receiver<PvSample>,
2755    shard_txs: Vec<mpsc::Sender<PvSample>>,
2756    mut shutdown: tokio::sync::watch::Receiver<bool>,
2757) {
2758    let n = shard_txs.len();
2759    info!(shards = n, "Sharded write dispatcher started");
2760    loop {
2761        tokio::select! {
2762            biased;
2763            // `biased` so we always check shutdown before draining
2764            // another sample — keeps the shutdown latency bounded
2765            // even under a sustained sample storm.
2766            _ = shutdown.changed() => {
2767                if *shutdown.borrow() {
2768                    // Drain remaining samples (try_send for the
2769                    // same reason as the steady-state branch).
2770                    // Mirror the steady-state overflow accounting
2771                    // so a saturated shard's drain-time drops are
2772                    // visible to operators — silent loss here
2773                    // would shadow real samples lost during
2774                    // shutdown.
2775                    while let Ok(sample) = rx.try_recv() {
2776                        let idx = shard_for_pv(&sample.pv_name, n);
2777                        match shard_txs[idx].try_send(sample) {
2778                            Ok(()) => {}
2779                            Err(mpsc::error::TrySendError::Full(s)) => {
2780                                if let Some(c) = s.counters.as_ref() {
2781                                    c.buffer_overflow_drops
2782                                        .fetch_add(1, Ordering::Relaxed);
2783                                }
2784                                metrics::counter!(
2785                                    "archiver_dispatcher_shard_overflow_drops_total",
2786                                    "shard" => idx.to_string(),
2787                                    "phase" => "shutdown_drain",
2788                                )
2789                                .increment(1);
2790                                debug!(
2791                                    shard = idx,
2792                                    pv = s.pv_name,
2793                                    "Shutdown-drain shard channel full; sample dropped"
2794                                );
2795                            }
2796                            Err(mpsc::error::TrySendError::Closed(s)) => {
2797                                warn!(
2798                                    shard = idx,
2799                                    pv = s.pv_name,
2800                                    "Shutdown-drain shard channel closed; sample dropped"
2801                                );
2802                            }
2803                        }
2804                    }
2805                    break;
2806                }
2807            }
2808            maybe = rx.recv() => {
2809                match maybe {
2810                    Some(sample) => {
2811                        let idx = shard_for_pv(&sample.pv_name, n);
2812                        match shard_txs[idx].try_send(sample) {
2813                            Ok(()) => {}
2814                            Err(mpsc::error::TrySendError::Full(s)) => {
2815                                // Shard saturated. Surface the
2816                                // drop on the SAMPLE's per-PV
2817                                // counter so operators can pin
2818                                // the loss to a specific PV+shard.
2819                                if let Some(c) = s.counters.as_ref() {
2820                                    c.buffer_overflow_drops
2821                                        .fetch_add(1, Ordering::Relaxed);
2822                                }
2823                                metrics::counter!(
2824                                    "archiver_dispatcher_shard_overflow_drops_total",
2825                                    "shard" => idx.to_string(),
2826                                )
2827                                .increment(1);
2828                                debug!(
2829                                    shard = idx,
2830                                    pv = s.pv_name,
2831                                    "Shard channel full; sample dropped \
2832                                     (per-shard isolation)"
2833                                );
2834                            }
2835                            Err(mpsc::error::TrySendError::Closed(s)) => {
2836                                // Shard worker died (panic or
2837                                // shutdown race). We'll lose this
2838                                // sample but keep the dispatcher
2839                                // alive for OTHER shards.
2840                                warn!(
2841                                    shard = idx,
2842                                    pv = s.pv_name,
2843                                    "Shard channel closed; sample dropped"
2844                                );
2845                            }
2846                        }
2847                    }
2848                    None => break, // Upstream closed.
2849                }
2850            }
2851        }
2852    }
2853    info!("Sharded write dispatcher exiting");
2854}
2855
2856/// Background writer task — drains samples and writes to storage.
2857///
2858/// Thin wrapper that fills [`WriteLoopConfig`] from [`Default`] and
2859/// the caller-supplied `flush_period`. Production callers use this
2860/// form; the test suite uses [`write_loop_with_config`] directly to
2861/// dial timeouts down.
2862pub async fn write_loop(
2863    storage: Arc<dyn StoragePlugin>,
2864    registry: Arc<PvRegistry>,
2865    rx: mpsc::Receiver<PvSample>,
2866    shutdown: tokio::sync::watch::Receiver<bool>,
2867    flush_period: Duration,
2868) {
2869    let cfg = WriteLoopConfig {
2870        flush_period,
2871        ..Default::default()
2872    };
2873    write_loop_with_config(storage, registry, rx, shutdown, cfg).await
2874}
2875
2876/// RAII guard that clears the `flush_in_flight` flag when dropped,
2877/// even if the surrounding spawn_blocking closure panics. Without
2878/// this, an unwind inside `block_on(flush_ingest_writes)` would
2879/// skip the manual `store(false)` and leave the flag stuck `true`
2880/// forever — every subsequent ticker would see "still in flight"
2881/// and silently skip flushing, freezing the registry's `last_event`.
2882struct FlushInFlightGuard {
2883    flag: Arc<std::sync::atomic::AtomicBool>,
2884}
2885
2886impl Drop for FlushInFlightGuard {
2887    fn drop(&mut self) {
2888        self.flag.store(false, Ordering::Release);
2889    }
2890}
2891
2892/// Run one flush + commit cycle.
2893///
2894/// Spawns `flush_ingest_writes` on the blocking pool with a bounded
2895/// timeout, then commits the pending registry timestamps for every
2896/// PV the flush returned cleanly. Maintains an `in_flight` flag set
2897/// by the spawn_blocking task so a caller (the ticker branch) can
2898/// avoid stacking concurrent flushes when the previous one is still
2899/// running on a wedged FS.
2900///
2901/// Returns `true` if a flush was actually launched, `false` if the
2902/// helper was a no-op (in_flight already set, ts_updates empty).
2903///
2904/// Mutates `ts_updates`:
2905///   * Clean flush: drops `failed` PVs (bytes lost), commits all
2906///     other entries, drops committed entries on success.
2907///   * Deferred PVs: STAY in `ts_updates` for the next cycle.
2908///     Their bytes are still buffered, not lost.
2909///   * Flush error / panic: leaves every entry intact for retry.
2910///   * Flush timeout: CLEARS every entry. We don't know which PVs
2911///     reached disk; the in-flight task may still be running and
2912///     may even fail late, removing a writer from the cache. The
2913///     next flush would then see a "clean" state for that PV and
2914///     wrongly commit. Conservative drop is the only safe move
2915///     (Java archiver has the same trade-off — under-commit on
2916///     timeout, never over-commit). Future samples will repopulate
2917///     `ts_updates` and the next clean flush catches the registry up.
2918async fn run_flush_and_commit(
2919    storage: &Arc<dyn StoragePlugin>,
2920    registry: &Arc<PvRegistry>,
2921    pending: &Arc<PendingReports>,
2922    flush_timeout: Duration,
2923    in_flight: &Arc<std::sync::atomic::AtomicBool>,
2924) -> bool {
2925    if in_flight.load(Ordering::Acquire) {
2926        // Previous flush hasn't finished. Don't queue another —
2927        // we'd just stack spawn_blocking tasks on the wedged FS
2928        // and saturate the blocking pool. Wait for the existing
2929        // one to drain.
2930        return false;
2931    }
2932    // Snapshot the current pending map BEFORE setting in_flight
2933    // so concurrent shards reporting after this point survive
2934    // into the next cycle (their entries will be in
2935    // `pending` but not in `snapshot`, so we won't remove them
2936    // when we clean up). The snapshot is the authoritative view
2937    // of "what we're trying to commit this cycle".
2938    let snapshot = pending.snapshot();
2939    if snapshot.is_empty() {
2940        return false;
2941    }
2942    in_flight.store(true, Ordering::Release);
2943
2944    let storage_for_flush = storage.clone();
2945    let in_flight_for_task = in_flight.clone();
2946    let flush_join = tokio::task::spawn_blocking(move || {
2947        // RAII guard: clears `in_flight` on every exit path
2948        // (return, panic, future-cancellation). The guard's
2949        // existence is the contract — manually storing false
2950        // BEFORE returning would skip cleanup on a panic inside
2951        // block_on / inside flush_ingest_writes, leaving the flag
2952        // stuck `true` and silently freezing all future flushes.
2953        let _guard = FlushInFlightGuard {
2954            flag: in_flight_for_task,
2955        };
2956        let rt = tokio::runtime::Handle::current();
2957        rt.block_on(storage_for_flush.flush_ingest_writes())
2958    });
2959    match tokio::time::timeout(flush_timeout, flush_join).await {
2960        Ok(Ok(Ok(IngestFlushResult { failed, deferred }))) => {
2961            // Failed PVs: bytes lost. Drop every failed PV's
2962            // pending entry UNCONDITIONALLY — regardless of
2963            // whether the snapshot's value still matches.
2964            //
2965            // The previous "remove only if matches snapshot"
2966            // policy assumed a concurrent shard's post-snapshot
2967            // report meant "newer bytes ARE on disk". That
2968            // assumption fails for the eviction/loss-queue path:
2969            // the loss queue records only PV names, so a loss
2970            // event recorded between snapshot and flush-return
2971            // has no way to communicate which writer generation
2972            // (or timestamp horizon) it applies to. A
2973            // post-snapshot pending entry could refer to bytes
2974            // that ARE on disk (fresh writer after eviction) OR
2975            // bytes that were also lost — we can't tell.
2976            //
2977            // Conservative drop is the safe choice: never
2978            // commit a `last_event` for a PV the storage just
2979            // told us had lost bytes, even at the cost of an
2980            // occasional under-commit that the next sample
2981            // resolves.
2982            if !failed.is_empty() {
2983                pending.remove_failed(&failed);
2984                error!(
2985                    "STS flush dropped {} PV(s) from timestamp commit \
2986                     (bytes never reached disk): {:?}",
2987                    failed.len(),
2988                    failed
2989                );
2990            }
2991            // Deferred PVs: writer slot busy, bytes still buffered.
2992            // We do NOT remove them from `pending` (next cycle
2993            // will catch them) and we EXCLUDE them from this
2994            // cycle's commit batch.
2995            if !deferred.is_empty() {
2996                debug!(
2997                    "STS flush deferred {} PV(s); keeping in pending \
2998                     for next cycle: {:?}",
2999                    deferred.len(),
3000                    deferred
3001                );
3002            }
3003            // Build commit batch from the snapshot, excluding
3004            // failed (bytes lost) and deferred (bytes not on disk
3005            // yet).
3006            let failed_set: std::collections::HashSet<&str> =
3007                failed.iter().map(|s| s.as_str()).collect();
3008            let deferred_set: std::collections::HashSet<&str> =
3009                deferred.iter().map(|s| s.as_str()).collect();
3010            let to_commit: Vec<(&str, SystemTime)> = snapshot
3011                .iter()
3012                .filter(|(pv, _)| {
3013                    !failed_set.contains(pv.as_str()) && !deferred_set.contains(pv.as_str())
3014                })
3015                .map(|(pv, ts)| (pv.as_str(), *ts))
3016                .collect();
3017            if to_commit.is_empty() {
3018                return true;
3019            }
3020            match registry.batch_update_timestamps(&to_commit) {
3021                Ok(()) => {
3022                    // Remove committed entries from the shared
3023                    // map — but only if their current value still
3024                    // matches the snapshot's. This preserves
3025                    // concurrent updates that arrived between
3026                    // snapshot and remove.
3027                    let committed_map: std::collections::HashMap<String, SystemTime> = to_commit
3028                        .iter()
3029                        .map(|(pv, ts)| ((*pv).to_string(), *ts))
3030                        .collect();
3031                    pending.remove_committed(&committed_map);
3032                }
3033                Err(e) => {
3034                    // Don't remove — retry on next cycle.
3035                    // Otherwise a transient SQLite error orphans
3036                    // timestamps the disk has but the registry
3037                    // doesn't know about, with no retry path.
3038                    error!(
3039                        "Registry timestamp commit failed; \
3040                         keeping {} pending for retry: {e}",
3041                        snapshot.len()
3042                    );
3043                }
3044            }
3045        }
3046        Ok(Ok(Err(e))) => {
3047            error!(
3048                "STS ingest flush errored; deferring all {} \
3049                 pending timestamp commits: {e}",
3050                snapshot.len()
3051            );
3052        }
3053        Ok(Err(join_err)) => {
3054            error!(
3055                "STS ingest flush task panicked; deferring all {} \
3056                 pending timestamp commits: {join_err}",
3057                snapshot.len()
3058            );
3059        }
3060        Err(_) => {
3061            // Flush wedged. Conservative drop: the spawn_blocking
3062            // task may still be running and may even fail late
3063            // (which would remove a PV's writer from the cache,
3064            // making the NEXT flush return "clean" and tricking
3065            // the owner into committing a stale value for a PV
3066            // whose old bytes are gone). Drop the snapshot's
3067            // entries from `pending` (only if unchanged), so the
3068            // owner doesn't trust them; future samples rebuild
3069            // pending and the next clean flush catches the
3070            // registry up.
3071            //
3072            // The `in_flight` flag stays TRUE until the
3073            // spawn_blocking task naturally finishes — this
3074            // throttles us from queueing yet another flush onto
3075            // the wedged FS until it clears.
3076            metrics::counter!("archiver_storage_flush_timeouts_total").increment(1);
3077            let dropped = snapshot.len();
3078            pending.remove_committed(&snapshot);
3079            error!(
3080                "STS ingest flush timed out after {flush_timeout:?}; \
3081                 conservatively dropped {dropped} pending timestamp \
3082                 commit(s) (task remains on blocking pool; \
3083                 timestamps will be rebuilt from subsequent samples)"
3084            );
3085        }
3086    }
3087    true
3088}
3089
3090/// Coalescing per-PV pending-timestamp map shared between every
3091/// shard worker (incl. their spawn_blocking late-success closures)
3092/// and the single global flush owner.
3093///
3094/// **Why a shared map instead of an mpsc channel?** With a channel,
3095/// a slow flush owner causes the buffer to fill and `try_send`
3096/// drops happen — for a PV that goes silent right after a dropped
3097/// report, the registry's `last_event` is permanently
3098/// under-committed. With a coalescing map keyed by PV, every
3099/// shard call is an `entry().and_modify().or_insert()` that
3100/// always succeeds; concurrent updates from different shards
3101/// never lose data because the map is bounded by **PV count**
3102/// (typical: thousands), not by sample rate (typical: 100k/s).
3103///
3104/// **Coalescing semantics.** A successful append for `(pv, ts)`
3105/// upserts the entry to `max(current, ts)`. A stale late-success
3106/// report (e.g. from a spawn_blocking task that finished after
3107/// shard already wrote a newer sample) does NOT clobber a newer
3108/// committed value.
3109#[derive(Default)]
3110pub struct PendingReports {
3111    inner: dashmap::DashMap<String, SystemTime>,
3112}
3113
3114impl PendingReports {
3115    pub fn new() -> Self {
3116        Self::default()
3117    }
3118
3119    /// Coalescing report. If `pv` already has a newer timestamp,
3120    /// no-op. Always succeeds — this is the channel-replacement
3121    /// invariant that makes silent-PV under-commit impossible.
3122    pub fn report(&self, pv: &str, ts: SystemTime) {
3123        // DashMap's entry() locks one shard internally. Fast-path
3124        // updates use `and_modify`; brand-new PVs go through
3125        // `or_insert`. No external lock contention across shards.
3126        self.inner
3127            .entry(pv.to_string())
3128            .and_modify(|cur| {
3129                if *cur < ts {
3130                    *cur = ts;
3131                }
3132            })
3133            .or_insert(ts);
3134    }
3135
3136    /// Snapshot the entire map for a flush cycle. Returns a
3137    /// owned HashMap; the shared map stays intact so concurrent
3138    /// shards can keep reporting during the flush.
3139    pub fn snapshot(&self) -> std::collections::HashMap<String, SystemTime> {
3140        self.inner
3141            .iter()
3142            .map(|kv| (kv.key().clone(), *kv.value()))
3143            .collect()
3144    }
3145
3146    /// Remove entries whose CURRENT value still equals what we
3147    /// committed. If a concurrent shard advanced an entry to a
3148    /// newer ts after the snapshot, leave it alone — the next
3149    /// flush will pick it up.
3150    pub fn remove_committed(&self, committed: &std::collections::HashMap<String, SystemTime>) {
3151        for (pv, &committed_ts) in committed {
3152            // remove_if applies the predicate atomically inside
3153            // the DashMap shard lock.
3154            let _ = self.inner.remove_if(pv, |_, v| *v == committed_ts);
3155        }
3156    }
3157
3158    /// Unconditionally remove every failed PV's entry from the
3159    /// pending map, regardless of its current timestamp.
3160    ///
3161    /// **Why unconditional.** The storage's loss queue carries
3162    /// only PV names — it cannot tell us whether a concurrent
3163    /// shard's post-snapshot report was for bytes that ARE on
3164    /// disk (e.g., a fresh writer opened after eviction) versus
3165    /// bytes that are also gone. The matching `remove_committed`
3166    /// path can leave a post-snapshot entry behind, and the next
3167    /// clean flush would commit it — turning a "PV's bytes were
3168    /// lost" report into a `last_event` lie.
3169    ///
3170    /// The trade-off: a brand-new sample whose bytes truly DID
3171    /// reach disk after the loss event gets dropped from this
3172    /// commit cycle. Future samples re-populate `pending` and the
3173    /// registry catches up. Under-commit is the safe direction;
3174    /// over-commit is never acceptable.
3175    pub fn remove_failed(&self, failed: &[String]) {
3176        for pv in failed {
3177            let _ = self.inner.remove(pv);
3178        }
3179    }
3180
3181    pub fn is_empty(&self) -> bool {
3182        self.inner.is_empty()
3183    }
3184
3185    pub fn len(&self) -> usize {
3186        self.inner.len()
3187    }
3188}
3189
3190/// Same as [`write_loop`] but accepts the full [`WriteLoopConfig`].
3191///
3192/// Thin wrapper that runs a 1-shard sharded pool — kept on the
3193/// public surface for tests and any direct callers. The actual
3194/// work happens in [`run_sharded_write_pool`].
3195pub async fn write_loop_with_config(
3196    storage: Arc<dyn StoragePlugin>,
3197    registry: Arc<PvRegistry>,
3198    rx: mpsc::Receiver<PvSample>,
3199    shutdown: tokio::sync::watch::Receiver<bool>,
3200    cfg: WriteLoopConfig,
3201) {
3202    let pool_cfg = ShardedWritePoolConfig {
3203        shards: 1,
3204        // unused on the 1-shard fast path (rx is forwarded directly)
3205        per_shard_buffer: 4096,
3206        write_loop: cfg,
3207    };
3208    run_sharded_write_pool(storage, registry, rx, shutdown, pool_cfg).await;
3209}
3210
3211/// Per-shard append worker. Drains samples from `sample_rx`,
3212/// drops out-of-order or type-changed samples, then runs each
3213/// `append_event_with_meta` on the blocking pool with a bounded
3214/// timeout. On every success — including the late-success path
3215/// where write_loop's outer timeout already abandoned the
3216/// JoinHandle — coalesces `(pv, ts)` into the shared
3217/// [`PendingReports`] map so the global flush owner sees it on
3218/// the next flush cycle.
3219///
3220/// The shard worker holds NO ts_updates / flush state of its own.
3221/// It tracks `last_ts` and `last_dbr_type` only for the
3222/// per-PV ordering / type-change drop checks; those are local
3223/// to the shard because the dispatcher's consistent hash pins each
3224/// PV to one shard.
3225async fn shard_append_loop(
3226    shard_idx: usize,
3227    storage: Arc<dyn StoragePlugin>,
3228    mut sample_rx: mpsc::Receiver<PvSample>,
3229    pending: Arc<PendingReports>,
3230    mut shutdown: tokio::sync::watch::Receiver<bool>,
3231    cfg: WriteLoopConfig,
3232) {
3233    let append_timeout = cfg.append_timeout;
3234    info!(shard = shard_idx, "Shard append loop started");
3235    // Per-PV state for the in-shard sanity drops. The dispatcher's
3236    // consistent hash keeps each PV in one shard, so these maps
3237    // never see cross-shard PVs.
3238    let mut last_ts: std::collections::HashMap<String, SystemTime> =
3239        std::collections::HashMap::new();
3240    let mut last_dbr_type: std::collections::HashMap<String, ArchDbType> =
3241        std::collections::HashMap::new();
3242
3243    loop {
3244        tokio::select! {
3245            Some(pv_sample) = sample_rx.recv() => {
3246                shard_handle_sample(
3247                    shard_idx,
3248                    &storage,
3249                    &pending,
3250                    pv_sample,
3251                    &mut last_ts,
3252                    &mut last_dbr_type,
3253                    append_timeout,
3254                )
3255                .await;
3256            }
3257            _ = shutdown.changed() => {
3258                shard_drain_on_shutdown(
3259                    shard_idx,
3260                    &storage,
3261                    &pending,
3262                    &mut sample_rx,
3263                    &mut last_ts,
3264                    &mut last_dbr_type,
3265                    &cfg,
3266                )
3267                .await;
3268                break;
3269            }
3270        }
3271    }
3272    info!(shard = shard_idx, "Shard append loop exited");
3273}
3274
3275/// Process one sample on the shard's hot path. Extracted so the
3276/// steady-state and shutdown-drain branches can share the same
3277/// "drop checks → spawn_blocking append → report on success"
3278/// recipe.
3279async fn shard_handle_sample(
3280    shard_idx: usize,
3281    storage: &Arc<dyn StoragePlugin>,
3282    pending: &Arc<PendingReports>,
3283    pv_sample: PvSample,
3284    last_ts: &mut std::collections::HashMap<String, SystemTime>,
3285    last_dbr_type: &mut std::collections::HashMap<String, ArchDbType>,
3286    append_timeout: Duration,
3287) {
3288    let ts = pv_sample.sample.timestamp;
3289
3290    // Out-of-order timestamp drop. Storage requires monotonic
3291    // appends per PV; an older timestamp would produce a corrupt
3292    // partition. Tracked via shard-local `last_ts` so the check
3293    // survives flush cycles (the legacy ts_updates-based check
3294    // only caught within-cycle reorderings).
3295    if let Some(prev_ts) = last_ts.get(&pv_sample.pv_name)
3296        && ts < *prev_ts
3297    {
3298        if let Some(ref c) = pv_sample.counters {
3299            c.timestamp_drops.fetch_add(1, Ordering::Relaxed);
3300        }
3301        debug!(
3302            shard = shard_idx,
3303            pv = pv_sample.pv_name,
3304            ?ts,
3305            ?prev_ts,
3306            "Dropping out-of-order sample"
3307        );
3308        return;
3309    }
3310
3311    // Type-change drop. The first sample defines the PV's wire
3312    // type; later samples with a different DBR get dropped
3313    // (operator must changeTypeForPV first).
3314    let prev_type = last_dbr_type.insert(pv_sample.pv_name.clone(), pv_sample.dbr_type);
3315    if let Some(prev) = prev_type
3316        && prev != pv_sample.dbr_type
3317    {
3318        if let Some(ref c) = pv_sample.counters {
3319            c.type_change_drops.fetch_add(1, Ordering::Relaxed);
3320            // Java parity (9f2234f): record the latest observed
3321            // DBR so the dropped-events report can show what the
3322            // IOC is now sending vs the archived type.
3323            c.latest_observed_dbr
3324                .store(pv_sample.dbr_type as i32, Ordering::Relaxed);
3325        }
3326        debug!(
3327            shard = shard_idx,
3328            pv = pv_sample.pv_name,
3329            ?prev,
3330            new = ?pv_sample.dbr_type,
3331            "Dropping type-changed sample"
3332        );
3333        // Restore prev_type so a single mismatched sample doesn't
3334        // permanently flip our recorded type.
3335        last_dbr_type.insert(pv_sample.pv_name.clone(), prev);
3336        return;
3337    }
3338
3339    let meta = AppendMeta {
3340        element_count: pv_sample.element_count,
3341        ..Default::default()
3342    };
3343    let pv_name_for_post = pv_sample.pv_name.clone();
3344    let counters_for_post = pv_sample.counters.clone();
3345    let counters_in_task = pv_sample.counters.clone();
3346    let storage_for_task = storage.clone();
3347    let pending_for_task = pending.clone();
3348
3349    // Bound each append + isolate sync syscall hangs.
3350    // `spawn_blocking` moves the actual I/O to the blocking
3351    // thread pool, so a stuck NFS write parks a blocking-pool
3352    // thread instead of a runtime worker. The
3353    // `tokio::time::timeout` then bounds how long the shard waits
3354    // for that join — a stuck task is abandoned and the shard
3355    // continues.
3356    //
3357    // Late-success path: a timed-out task may still complete its
3358    // write later. The closure ALWAYS reports into the shared
3359    // `PendingReports` map on Ok (whether or not the shard
3360    // already moved on), so the global flush owner picks up late
3361    // successes too. Per-PV serialization inside PlainPB keeps
3362    // any late completion ordered behind subsequent same-PV
3363    // samples. The map's coalescing semantics ensure a stale late
3364    // success does NOT clobber a newer hot-path commit.
3365    let join = tokio::task::spawn_blocking(move || {
3366        let rt = tokio::runtime::Handle::current();
3367        let res = rt.block_on(storage_for_task.append_event_with_meta(
3368            &pv_sample.pv_name,
3369            pv_sample.dbr_type,
3370            &pv_sample.sample,
3371            &meta,
3372        ));
3373        if res.is_ok() {
3374            metrics::counter!("archiver_events_stored_total").increment(1);
3375            if let Some(c) = counters_in_task.as_ref() {
3376                c.events_stored.fetch_add(1, Ordering::Relaxed);
3377            }
3378            // Coalesce into the shared map. Always succeeds —
3379            // unlike the previous mpsc try_send, a saturated
3380            // flush owner cannot cause this report to drop, so
3381            // a PV that goes silent right after a successful
3382            // append still gets its `last_event` committed once
3383            // the owner gets to its next flush cycle.
3384            pending_for_task.report(&pv_sample.pv_name, ts);
3385        }
3386        res
3387    });
3388    let res = tokio::time::timeout(append_timeout, join).await;
3389    // Conservative ordering high-water (principle 5). Bump
3390    // `last_ts` on EVERY storage-layer return path — success,
3391    // error, panic, timeout — not just on Ok and timeout.
3392    //
3393    // Why all four:
3394    //   * Ok       — sample on disk; obvious bump.
3395    //   * Timeout  — sample MAY land on disk via late success;
3396    //                bump to keep on-disk order monotonic.
3397    //   * Error    — current storage contract is binary, but a
3398    //                future partial-success-with-error variant
3399    //                could leave bytes on disk. Bumping now
3400    //                hedges against that.
3401    //   * Panic    — closure aborted mid-write; storage state
3402    //                is undefined. Bumping is the safe choice.
3403    //
3404    // The pre-check earlier in this function already drops
3405    // out-of-order and type-changed samples BEFORE we get here,
3406    // so this bump only ever sets the high-water to a ts the
3407    // shard explicitly accepted into the storage layer.
3408    last_ts.insert(pv_name_for_post.clone(), ts);
3409    match res {
3410        Ok(Ok(Ok(()))) => {
3411            // Report already went out from inside the closure.
3412        }
3413        Ok(Ok(Err(e))) => {
3414            error!(shard = shard_idx, pv = pv_name_for_post, "Write error: {e}");
3415        }
3416        Ok(Err(join_err)) => {
3417            error!(
3418                shard = shard_idx,
3419                pv = pv_name_for_post,
3420                "Storage write task panicked: {join_err}"
3421            );
3422        }
3423        Err(_) => {
3424            // Sample MAY still land on disk later (per-PV
3425            // serialization keeps order). The closure will report
3426            // into the shared pending map whenever it eventually
3427            // completes, so registry's `last_event` catches up
3428            // via the late path even though we move on now.
3429            error!(
3430                shard = shard_idx,
3431                pv = pv_name_for_post,
3432                "Storage append timed out after {append_timeout:?}; \
3433                 shard abandoning (task remains on blocking pool, \
3434                 may still succeed late)"
3435            );
3436            if let Some(ref c) = counters_for_post {
3437                c.storage_append_timeouts.fetch_add(1, Ordering::Relaxed);
3438            }
3439        }
3440    }
3441}
3442
3443/// Drain the shard's remaining buffered samples on shutdown,
3444/// applying the same bounded-timeout discipline as the hot path.
3445async fn shard_drain_on_shutdown(
3446    shard_idx: usize,
3447    storage: &Arc<dyn StoragePlugin>,
3448    pending: &Arc<PendingReports>,
3449    sample_rx: &mut mpsc::Receiver<PvSample>,
3450    last_ts: &mut std::collections::HashMap<String, SystemTime>,
3451    last_dbr_type: &mut std::collections::HashMap<String, ArchDbType>,
3452    cfg: &WriteLoopConfig,
3453) {
3454    let drain_per_sample_timeout = cfg.drain_per_sample_timeout;
3455    let drain_total_budget = cfg.drain_total_budget;
3456    let drain_start = std::time::Instant::now();
3457    let mut drained_skipped = 0usize;
3458    while let Ok(pv_sample) = sample_rx.try_recv() {
3459        if drain_start.elapsed() > drain_total_budget {
3460            drained_skipped += 1;
3461            continue;
3462        }
3463        // Reuse the hot-path handler with a tighter timeout.
3464        // Late-success reports still flow into `pending` because
3465        // the closure clones the Arc independently of this
3466        // shard's lifecycle.
3467        shard_handle_sample(
3468            shard_idx,
3469            storage,
3470            pending,
3471            pv_sample,
3472            last_ts,
3473            last_dbr_type,
3474            drain_per_sample_timeout,
3475        )
3476        .await;
3477    }
3478    if drained_skipped > 0 {
3479        warn!(
3480            shard = shard_idx,
3481            "Shutdown drain budget ({drain_total_budget:?}) exhausted; \
3482             {drained_skipped} buffered sample(s) abandoned without \
3483             attempting append"
3484        );
3485    }
3486}
3487
3488/// Single global flush owner. Reads from the shared
3489/// [`PendingReports`] map (which all shards coalesce into),
3490/// drives the periodic `flush_ingest_writes`, and commits to the
3491/// registry. Because ALL shards report into this one map and the
3492/// owner is the only consumer, the flush result and the
3493/// pending-commit data live in the same task — no shard 0 vs
3494/// shard 1 attribution skew.
3495async fn flush_owner_loop(
3496    storage: Arc<dyn StoragePlugin>,
3497    registry: Arc<PvRegistry>,
3498    pending: Arc<PendingReports>,
3499    mut shutdown: tokio::sync::watch::Receiver<bool>,
3500    cfg: WriteLoopConfig,
3501) {
3502    let flush_period = cfg.flush_period;
3503    let flush_timeout = cfg.flush_timeout;
3504    let shutdown_flush_timeout = cfg.shutdown_flush_timeout;
3505    let drain_total_budget = cfg.drain_total_budget;
3506    // tokio::time::interval(Duration::ZERO) panics. Config-side
3507    // validation rejects 0 at load time, but defending in depth
3508    // here keeps tests and direct callers (which build configs by
3509    // hand) from crashing the engine on a misconfigured value.
3510    let flush_period = if flush_period.is_zero() {
3511        warn!(
3512            "flush_owner: flush_period was Duration::ZERO; \
3513             clamping to 1s to avoid tokio::time::interval panic"
3514        );
3515        Duration::from_secs(1)
3516    } else {
3517        flush_period
3518    };
3519    info!("Flush owner started");
3520
3521    let mut flush_ticker = tokio::time::interval(flush_period);
3522    flush_ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
3523    let _ = flush_ticker.tick().await;
3524
3525    let flush_in_flight = Arc::new(std::sync::atomic::AtomicBool::new(false));
3526
3527    // Phase 1: steady state. Ticker-driven flush only. Reports
3528    // accumulate in the shared `PendingReports` map continuously
3529    // (no recv loop — shards write directly), so we don't need
3530    // to multiplex report consumption against the flush.
3531    loop {
3532        tokio::select! {
3533            biased;
3534            _ = shutdown.changed() => {
3535                if *shutdown.borrow() {
3536                    break;
3537                }
3538            }
3539            _ = flush_ticker.tick() => {
3540                if !pending.is_empty() {
3541                    run_flush_and_commit(
3542                        &storage,
3543                        &registry,
3544                        &pending,
3545                        flush_timeout,
3546                        &flush_in_flight,
3547                    )
3548                    .await;
3549                }
3550            }
3551        }
3552    }
3553
3554    // Phase 2: shutdown grace. Two windows in one budget:
3555    //
3556    //   1. A brief minimum sleep so any in-flight spawn_blocking
3557    //      late-success closures can coalesce their reports into
3558    //      `pending` before the final flush snapshots it.
3559    //
3560    //   2. Wait for any still-running ticker flush to drain
3561    //      (`flush_in_flight` cleared by the spawn_blocking task's
3562    //      RAII guard). Without this, the final
3563    //      `run_flush_and_commit` would see `in_flight == true`,
3564    //      short-circuit, and skip every entry in `pending` —
3565    //      data on disk but no registry commit.
3566    //
3567    // Both windows fit inside the operator-configured
3568    // `drain_total_budget`. If a flush is genuinely wedged past
3569    // the budget, we proceed to final flush; that call will see
3570    // `in_flight` still set and short-circuit, but at least we
3571    // didn't pin the process forever. Future samples on restart
3572    // will re-populate `pending` and the registry catches up.
3573    let phase2_deadline = std::time::Instant::now() + drain_total_budget;
3574    let min_grace = std::cmp::min(drain_total_budget, Duration::from_millis(200));
3575    if !min_grace.is_zero() {
3576        tokio::time::sleep(min_grace).await;
3577    }
3578    while flush_in_flight.load(Ordering::Acquire) && std::time::Instant::now() < phase2_deadline {
3579        tokio::time::sleep(Duration::from_millis(50)).await;
3580    }
3581    if flush_in_flight.load(Ordering::Acquire) {
3582        warn!(
3583            "Shutdown grace exhausted while a flush is still in flight; \
3584             final flush will be skipped (pending entries left for next \
3585             process restart to rebuild from re-archived samples)"
3586        );
3587    }
3588
3589    // Final flush + commit. Use shutdown_flush_timeout (typically
3590    // shorter than the steady-state flush_timeout) so a wedged FS
3591    // doesn't block shutdown indefinitely.
3592    info!(
3593        pending_at_shutdown = pending.len(),
3594        "Flush owner running final flush + commit"
3595    );
3596    if !pending.is_empty() {
3597        run_flush_and_commit(
3598            &storage,
3599            &registry,
3600            &pending,
3601            shutdown_flush_timeout,
3602            &flush_in_flight,
3603        )
3604        .await;
3605    }
3606    info!("Flush owner exiting");
3607}