Skip to main content

archiver_engine/
channel_manager.rs

1use std::sync::atomic::{AtomicI32, AtomicI64, AtomicU64, Ordering};
2use std::sync::{Arc, Mutex};
3use std::time::{Duration, SystemTime};
4
5use dashmap::DashMap;
6use epics_rs::base::server::snapshot::DbrClass;
7use epics_rs::base::types::{DbFieldType, EpicsValue};
8use epics_rs::ca::client::{CaChannel, CaClient, ConnectionEvent};
9use epics_rs::pva::client_native::PvaClient;
10use epics_rs::pva::proto::ByteOrder;
11use epics_rs::pva::pvdata::encode::{encode_pv_field, encode_type_desc};
12use epics_rs::pva::pvdata::{PvField, ScalarType, ScalarValue, TypedScalarArray};
13use tokio::sync::mpsc;
14use tokio_util::sync::CancellationToken;
15use tracing::{debug, error, info, warn};
16
17use archiver_core::registry::{Protocol, PvRecord, PvRegistry, PvStatus, SampleMode};
18use archiver_core::storage::traits::{AppendMeta, IngestFlushResult, StoragePlugin};
19use archiver_core::types::{ArchDbType, ArchiverSample, ArchiverValue};
20
21use crate::policy::PolicyConfig;
22
23/// Timeout for initial CA channel connection.
24const CA_CONNECT_TIMEOUT: Duration = Duration::from_secs(10);
25/// Timeout for CA reconnection attempts in the monitor loop.
26const CA_RECONNECT_TIMEOUT: Duration = Duration::from_secs(30);
27/// Delay before retrying a failed CA subscription.
28const CA_RETRY_DELAY: Duration = Duration::from_secs(5);
29
30/// Hard floor on accepted timestamps. Mirrors Java's `PAST_CUTOFF_TIMESTAMP`
31/// of 1991-01-01 — earlier than that, the timestamp is almost certainly a
32/// stale uninitialised IOC clock or a sentinel.
33const PAST_CUTOFF_UNIX_SECS: i64 = 662_688_000; // 1991-01-01 00:00:00 UTC
34
35/// Filter a freshly-received sample timestamp against the wall clock and
36/// the floor. Returns `Some(ts)` if accepted, `None` if it should be
37/// dropped (caller bumps `timestamp_drops`).
38///
39/// `drift_secs` is the configured `server_ioc_drift_secs` (Java parity
40/// 6538631), so per-site tuning doesn't require recompiling. `now` is
41/// passed in (rather than calling `SystemTime::now()` here) so the test
42/// suite can pin time deterministically.
43fn ioc_timestamp_in_window(ts: SystemTime, now: SystemTime, drift_secs: u64) -> bool {
44    let unix = ts
45        .duration_since(SystemTime::UNIX_EPOCH)
46        .map(|d| d.as_secs() as i64)
47        .unwrap_or(i64::MIN);
48    if unix < PAST_CUTOFF_UNIX_SECS {
49        return false;
50    }
51    // Within ±drift_secs of `now`?
52    let now_unix = now
53        .duration_since(SystemTime::UNIX_EPOCH)
54        .map(|d| d.as_secs() as i64)
55        .unwrap_or(0);
56    let delta = (unix - now_unix).unsigned_abs();
57    delta <= drift_secs
58}
59
60/// Discrete connection state for `getPVDetails` (Java parity dea7acb).
61/// Distinguishes never-connected from connecting from confirmed-down.
62#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
63pub enum PvConnectionState {
64    /// No connection attempt has been made (or none has progressed
65    /// past `wait_connected` yet).
66    #[default]
67    Idle,
68    /// Currently waiting on `wait_connected` for the first time on
69    /// this channel handle.
70    Connecting,
71    /// Channel has reported a successful connect; samples are flowing
72    /// or the channel is otherwise live.
73    Connected,
74    /// Channel reported connect at least once but the monitor loop has
75    /// since dropped — distinct from `Idle` so operators can spot a
76    /// regression vs a never-resolved name.
77    Disconnected,
78}
79
80impl PvConnectionState {
81    pub fn as_str(self) -> &'static str {
82        match self {
83            Self::Idle => "Idle",
84            Self::Connecting => "Connecting",
85            Self::Connected => "Connected",
86            Self::Disconnected => "Disconnected",
87        }
88    }
89}
90
91/// Connection state tracked per PV.
92#[derive(Debug, Clone, Default)]
93pub struct ConnectionInfo {
94    pub connected_since: Option<SystemTime>,
95    pub last_event_time: Option<SystemTime>,
96    pub is_connected: bool,
97    /// Java parity (dea7acb): discrete connection state for the
98    /// PVDetails report. Operators can distinguish never-connected,
99    /// currently-connecting, and previously-connected-now-down.
100    pub state: PvConnectionState,
101}
102
103/// Per-PV diagnostic counters for the BPL drop / rate / connection
104/// reports. All counts are monotonic across the PV's lifetime; the
105/// rate handlers compute deltas against `first_event_unix_secs`.
106///
107/// Tracked here (not in ConnectionInfo) so they survive transient
108/// disconnects and are read lock-free from the report endpoints.
109#[derive(Debug)]
110pub struct PvCounters {
111    /// Total events produced by monitor/scan, including those that
112    /// later got dropped before write.
113    pub events_received: AtomicU64,
114    /// Total events successfully written to storage.
115    pub events_stored: AtomicU64,
116    /// Unix-epoch seconds of the first event we ever saw for this PV.
117    /// 0 = "no event yet" (i64 because Atomic<Option<...>> doesn't exist).
118    pub first_event_unix_secs: AtomicI64,
119    /// Number of events the bounded sample channel rejected (write
120    /// loop falling behind producer). Surfaces as
121    /// `getDroppedEventsBufferOverflowReport`.
122    pub buffer_overflow_drops: AtomicU64,
123    /// Number of events whose timestamp went backwards relative to the
124    /// previously-stored event. Java archiver's
125    /// `DroppedEventsTimestampReport`.
126    pub timestamp_drops: AtomicU64,
127    /// Number of events whose runtime DBR type didn't match the
128    /// PvRecord's stored type — the engine drops these because mixing
129    /// types in one PB partition would corrupt downstream readers.
130    pub type_change_drops: AtomicU64,
131    /// Number of disconnect transitions seen on this PV's CA channel.
132    /// `LostConnectionsReport`.
133    pub disconnect_count: AtomicU64,
134    /// Last unix-epoch seconds of disconnect transition.
135    pub last_disconnect_unix_secs: AtomicI64,
136    /// Number of transient subscribe / monitor-recv / scan-read errors
137    /// (Java parity 8fe73eb). Distinct from `disconnect_count` —
138    /// these are recoverable per-attempt failures rather than confirmed
139    /// link drops.
140    pub transient_error_count: AtomicU64,
141    /// Latest DBR type observed from CA that did not match the
142    /// archive-time recorded type (Java parity 9f2234f). `-1` = no
143    /// mismatch ever seen.
144    pub latest_observed_dbr: AtomicI32,
145    /// Number of DBR_CTRL metadata refresh attempts that failed (timeout,
146    /// transport error, missing display info). Operators looking at PVs
147    /// with empty PREC/EGU should check this.
148    pub metadata_fetch_failures: AtomicU64,
149    /// Number of `storage.append_event_with_meta` calls that exceeded
150    /// the per-sample storage timeout. Distinct from
151    /// `buffer_overflow_drops` (mpsc channel saturation) so an
152    /// operator can tell "the storage tier is wedged" apart from
153    /// "the writer can't keep up with the producer".
154    pub storage_append_timeouts: AtomicU64,
155}
156
157impl Default for PvCounters {
158    fn default() -> Self {
159        Self {
160            events_received: AtomicU64::new(0),
161            events_stored: AtomicU64::new(0),
162            first_event_unix_secs: AtomicI64::new(0),
163            buffer_overflow_drops: AtomicU64::new(0),
164            timestamp_drops: AtomicU64::new(0),
165            type_change_drops: AtomicU64::new(0),
166            disconnect_count: AtomicU64::new(0),
167            last_disconnect_unix_secs: AtomicI64::new(0),
168            transient_error_count: AtomicU64::new(0),
169            // -1 sentinel = no type mismatch ever observed.
170            latest_observed_dbr: AtomicI32::new(-1),
171            metadata_fetch_failures: AtomicU64::new(0),
172            storage_append_timeouts: AtomicU64::new(0),
173        }
174    }
175}
176
177/// Read-only snapshot of `PvCounters` — owned values so callers can
178/// move them across threads / serialise without reaching into Atomics.
179#[derive(Debug, Clone)]
180pub struct PvCountersSnapshot {
181    pub events_received: u64,
182    pub events_stored: u64,
183    pub first_event_unix_secs: Option<i64>,
184    pub buffer_overflow_drops: u64,
185    pub timestamp_drops: u64,
186    pub type_change_drops: u64,
187    pub disconnect_count: u64,
188    pub last_disconnect_unix_secs: Option<i64>,
189    pub transient_error_count: u64,
190    /// `Some(dbr_type as i32)` if a type-change mismatch has been
191    /// observed; `None` otherwise (Java parity 9f2234f).
192    pub latest_observed_dbr: Option<i32>,
193    pub metadata_fetch_failures: u64,
194    pub storage_append_timeouts: u64,
195}
196
197impl From<&PvCounters> for PvCountersSnapshot {
198    fn from(c: &PvCounters) -> Self {
199        let first = c.first_event_unix_secs.load(Ordering::Relaxed);
200        let last_disc = c.last_disconnect_unix_secs.load(Ordering::Relaxed);
201        Self {
202            events_received: c.events_received.load(Ordering::Relaxed),
203            events_stored: c.events_stored.load(Ordering::Relaxed),
204            first_event_unix_secs: if first == 0 { None } else { Some(first) },
205            buffer_overflow_drops: c.buffer_overflow_drops.load(Ordering::Relaxed),
206            timestamp_drops: c.timestamp_drops.load(Ordering::Relaxed),
207            type_change_drops: c.type_change_drops.load(Ordering::Relaxed),
208            disconnect_count: c.disconnect_count.load(Ordering::Relaxed),
209            last_disconnect_unix_secs: if last_disc == 0 {
210                None
211            } else {
212                Some(last_disc)
213            },
214            transient_error_count: c.transient_error_count.load(Ordering::Relaxed),
215            latest_observed_dbr: match c.latest_observed_dbr.load(Ordering::Relaxed) {
216                -1 => None,
217                v => Some(v),
218            },
219            metadata_fetch_failures: c.metadata_fetch_failures.load(Ordering::Relaxed),
220            storage_append_timeouts: c.storage_append_timeouts.load(Ordering::Relaxed),
221        }
222    }
223}
224
225/// Handle for a running PV archiving task.
226struct PvHandle {
227    /// `Some` for CA-acquired PVs (used by `try_get_value` for the
228    /// live-value RPC); `None` for PVA-acquired PVs since PVA exposes
229    /// no Clone-able channel handle — live_value for those routes
230    /// through `pva_client` directly.
231    channel: Option<CaChannel>,
232    cancel_token: CancellationToken,
233    #[allow(dead_code)]
234    dbr_type: ArchDbType,
235    conn_info: Arc<Mutex<ConnectionInfo>>,
236    /// Latest values of metadata fields (.HIHI, .LOLO, .EGU, ...) attached to
237    /// every sample emitted for this PV. Populated by per-field monitor tasks
238    /// owned by `cancel_token` (and per-field child tokens in `field_tokens`),
239    /// so stopping the PV stops all of them.
240    extras: Arc<ExtraFieldsCache>,
241    /// Per-field cancellation tokens — child tokens of `cancel_token`. Keyed
242    /// by field name (e.g. "HIHI"). Lets `update_archive_fields` cancel one
243    /// field's task without disturbing the others or the main PV.
244    field_tokens: Arc<DashMap<String, CancellationToken>>,
245    /// Serialises concurrent `update_archive_fields` calls for this PV so
246    /// add/remove/respawn never race with itself.
247    update_lock: Arc<tokio::sync::Mutex<()>>,
248    /// Diagnostic counters surfaced through the BPL drop / rate /
249    /// connection reports. Lock-free reads; updates from the producer
250    /// (monitor/scan) and the writer happen on different threads.
251    counters: Arc<PvCounters>,
252}
253
254/// Thread-safe cache of latest extra-field values for one PV.
255/// Each map entry is `(field_name, stringified_value)`.
256type ExtraFieldsCache = DashMap<String, String>;
257
258/// Default capacity for the bounded sample channel.
259/// This limits memory usage when producers outpace the storage writer.
260/// At ~200 bytes per sample, 500K entries ≈ 100 MB worst-case.
261const SAMPLE_CHANNEL_CAPACITY: usize = 500_000;
262
263/// RAII guard that removes a key from `pending_archives` on drop,
264/// ensuring cleanup even if the owning future is cancelled.
265struct PendingGuard<'a> {
266    map: &'a DashMap<String, ()>,
267    key: String,
268}
269
270impl Drop for PendingGuard<'_> {
271    fn drop(&mut self) {
272        self.map.remove(&self.key);
273    }
274}
275
276/// Manages EPICS Channel Access + pvAccess connections and dispatches
277/// archived samples to storage.
278pub struct ChannelManager {
279    /// The CA client context.
280    ca_client: CaClient,
281    /// The PVA client context (lazily used only when a PV's registry
282    /// row carries `Protocol::Pva`).
283    pva_client: PvaClient,
284    /// Active channels: PV name → handle with cancellation.
285    channels: DashMap<String, PvHandle>,
286    /// PVs currently being archived (in-progress CA connect). Prevents TOCTOU races
287    /// where two concurrent `archive_pv` calls could double-subscribe the same PV.
288    pending_archives: DashMap<String, ()>,
289    /// Per-PV mutex serialising archive/pause/resume/stop/destroy on a single
290    /// PV. Without this, e.g. `pause_pv` racing with `resume_pv` can leave
291    /// the registry status and the channel map disagreeing.
292    op_locks: DashMap<String, Arc<tokio::sync::Mutex<()>>>,
293    /// Storage backend.
294    #[allow(dead_code)]
295    storage: Arc<dyn StoragePlugin>,
296    /// PV metadata registry.
297    registry: Arc<PvRegistry>,
298    /// Sample sender for the write thread.
299    sample_tx: mpsc::Sender<PvSample>,
300    /// Optional policy configuration.
301    policy: Option<PolicyConfig>,
302    /// Per-site IOC drift bound (Java parity 6538631).
303    server_ioc_drift_secs: u64,
304}
305
306/// A sample ready to be written to storage.
307pub struct PvSample {
308    pub pv_name: String,
309    pub dbr_type: ArchDbType,
310    pub sample: ArchiverSample,
311    pub element_count: Option<i32>,
312    /// Counter handle used by the write loop to record timestamp /
313    /// type-change drops. None on samples produced before counter
314    /// support was wired up — write_loop tolerates the absence.
315    pub counters: Option<Arc<PvCounters>>,
316}
317
318impl ChannelManager {
319    pub async fn new(
320        storage: Arc<dyn StoragePlugin>,
321        registry: Arc<PvRegistry>,
322        policy: Option<PolicyConfig>,
323    ) -> anyhow::Result<(Self, mpsc::Receiver<PvSample>)> {
324        Self::new_with_drift(storage, registry, policy, 30 * 60).await
325    }
326
327    /// Construct with an explicit IOC drift bound. Java parity (6538631):
328    /// keeps tests + sites that don't surface `EngineConfig` on the
329    /// existing default while letting the daemon plumb a configured value.
330    pub async fn new_with_drift(
331        storage: Arc<dyn StoragePlugin>,
332        registry: Arc<PvRegistry>,
333        policy: Option<PolicyConfig>,
334        server_ioc_drift_secs: u64,
335    ) -> anyhow::Result<(Self, mpsc::Receiver<PvSample>)> {
336        let ca_client = CaClient::new().await.map_err(|e| anyhow::anyhow!("{e}"))?;
337        let pva_client = PvaClient::new().map_err(|e| anyhow::anyhow!("{e}"))?;
338        let (tx, rx) = mpsc::channel(SAMPLE_CHANNEL_CAPACITY);
339
340        let mgr = Self {
341            ca_client,
342            pva_client,
343            channels: DashMap::new(),
344            pending_archives: DashMap::new(),
345            op_locks: DashMap::new(),
346            storage,
347            registry,
348            sample_tx: tx,
349            policy,
350            server_ioc_drift_secs,
351        };
352
353        Ok((mgr, rx))
354    }
355
356    /// Get-or-insert the per-PV operation mutex. The returned `Arc<Mutex>`
357    /// is what callers should `.lock().await` on; holding the entry guard
358    /// (via `entry().or_insert_with`) across the await would deadlock the
359    /// DashMap shard.
360    fn op_lock(&self, pv_name: &str) -> Arc<tokio::sync::Mutex<()>> {
361        if let Some(existing) = self.op_locks.get(pv_name) {
362            return existing.clone();
363        }
364        self.op_locks
365            .entry(pv_name.to_string())
366            .or_insert_with(|| Arc::new(tokio::sync::Mutex::new(())))
367            .clone()
368    }
369
370    /// Restore all active PVs from the registry (called on startup).
371    ///
372    /// `pvs_by_status(Active)` already filters out alias rows (they carry
373    /// `status='alias'`), but we re-check `alias_for.is_some()` here so a
374    /// future schema change can't silently re-introduce double-archiving
375    /// of the underlying IOC PV.
376    pub async fn restore_from_registry(&self) -> anyhow::Result<u64> {
377        let active_pvs = self.registry.pvs_by_status(PvStatus::Active)?;
378        let total = active_pvs.len() as u64;
379        info!(total, "Restoring PVs from registry");
380
381        let mut restored = 0u64;
382        for record in active_pvs {
383            if record.alias_for.is_some() {
384                warn!(
385                    pv = record.pv_name,
386                    target = record.alias_for.as_deref(),
387                    "Skipping alias row in restore; aliases are routed, not archived"
388                );
389                continue;
390            }
391            if let Err(e) = self.start_archiving_internal(&record).await {
392                warn!(pv = record.pv_name, "Failed to restore PV: {e}");
393                self.registry.set_status(&record.pv_name, PvStatus::Error)?;
394            } else {
395                restored += 1;
396            }
397        }
398        metrics::gauge!("archiver_pvs_active").set(restored as f64);
399        if restored < total {
400            warn!(
401                restored,
402                failed = total - restored,
403                "Some PVs failed to restore"
404            );
405        }
406
407        Ok(restored)
408    }
409
410    /// Start archiving a new PV.
411    pub async fn archive_pv(
412        &self,
413        pv_name: &str,
414        sample_mode: &SampleMode,
415        protocol: Protocol,
416    ) -> anyhow::Result<()> {
417        // Serialise with pause/resume/stop/destroy on the same PV.
418        let lock = self.op_lock(pv_name);
419        let _g = lock.lock().await;
420
421        if self.channels.contains_key(pv_name) {
422            anyhow::bail!("PV {pv_name} is already being archived");
423        }
424
425        // Atomically claim the PV to prevent concurrent archive_pv races.
426        // The guard ensures cleanup even if this future is cancelled.
427        if self
428            .pending_archives
429            .insert(pv_name.to_string(), ())
430            .is_some()
431        {
432            anyhow::bail!("PV {pv_name} archive operation already in progress");
433        }
434        let _guard = PendingGuard {
435            map: &self.pending_archives,
436            key: pv_name.to_string(),
437        };
438
439        match protocol {
440            Protocol::Ca => self.archive_pv_inner(pv_name, sample_mode).await,
441            Protocol::Pva => self.archive_pv_inner_pva(pv_name, sample_mode).await,
442        }
443    }
444
445    /// Inner implementation of archive_pv, separated for cleanup safety.
446    async fn archive_pv_inner(
447        &self,
448        pv_name: &str,
449        sample_mode: &SampleMode,
450    ) -> anyhow::Result<()> {
451        // Re-check after acquiring the pending slot (another task may have completed).
452        if self.channels.contains_key(pv_name) {
453            anyhow::bail!("PV {pv_name} is already being archived");
454        }
455
456        // Check policy override.
457        let (effective_mode, matched_policy_name) = if let Some(ref policy) = self.policy {
458            if let Some(p) = policy.find_policy(pv_name) {
459                (p.to_sample_mode(), Some(p.policy_name().to_string()))
460            } else {
461                (sample_mode.clone(), None)
462            }
463        } else {
464            (sample_mode.clone(), None)
465        };
466
467        // Connect to discover the native type.
468        let channel = self.ca_client.create_channel(pv_name);
469        channel
470            .wait_connected(CA_CONNECT_TIMEOUT)
471            .await
472            .map_err(|e| anyhow::anyhow!("Failed to connect to {pv_name}: {e}"))?;
473
474        let info = self
475            .ca_client
476            .cainfo(pv_name)
477            .await
478            .map_err(|e| anyhow::anyhow!("Failed to get info for {pv_name}: {e}"))?;
479
480        let dbr_type = dbr_field_to_arch_type(info.native_type);
481        let element_count = info.element_count as i32;
482
483        // Register in SQLite. (CA path — PVA acquisition uses a separate
484        // entry point, so this branch is always Protocol::Ca.)
485        self.registry
486            .register_pv(pv_name, dbr_type, &effective_mode, element_count)?;
487        // Java parity (b30f1a6): persist the matched policy's stable name so
488        // audit / metrics paths know which policy governed this archive.
489        if let Some(ref name) = matched_policy_name {
490            self.registry.update_policy_name(pv_name, Some(name))?;
491        }
492
493        let record = self
494            .registry
495            .get_pv(pv_name)?
496            .ok_or_else(|| anyhow::anyhow!("PV {pv_name} not found in registry"))?;
497        self.start_archiving_internal(&record).await?;
498
499        metrics::gauge!("archiver_pvs_active").increment(1.0);
500        info!(pv = pv_name, ?dbr_type, element_count, "Started archiving");
501        Ok(())
502    }
503
504    /// PVA equivalent of [`Self::archive_pv_inner`]. Connects via the
505    /// pvAccess client, infers archive type from a one-shot pvget, then
506    /// hands off to a `monitor_loop_pva` that mirrors the CA monitor
507    /// loop's lifecycle (samples → write_loop, cancel_token shutdown).
508    async fn archive_pv_inner_pva(
509        &self,
510        pv_name: &str,
511        sample_mode: &SampleMode,
512    ) -> anyhow::Result<()> {
513        if self.channels.contains_key(pv_name) {
514            anyhow::bail!("PV {pv_name} is already being archived");
515        }
516
517        // Apply policy override (same surface as CA path).
518        let (effective_mode, matched_policy_name) = if let Some(ref policy) = self.policy {
519            if let Some(p) = policy.find_policy(pv_name) {
520                (p.to_sample_mode(), Some(p.policy_name().to_string()))
521            } else {
522                (sample_mode.clone(), None)
523            }
524        } else {
525            (sample_mode.clone(), None)
526        };
527
528        // Connect + introspect: a one-shot pvget tells us the value
529        // shape. We rely on it instead of a `cainfo` analogue because
530        // PVA channels carry their type only via fetched data.
531        let connect = tokio::time::timeout(CA_CONNECT_TIMEOUT, self.pva_client.pvconnect(pv_name))
532            .await
533            .map_err(|_| anyhow::anyhow!("PVA connect to {pv_name} timed out"))?
534            .map_err(|e| anyhow::anyhow!("Failed to connect to {pv_name} via PVA: {e}"))?;
535        debug!(pv = pv_name, server = %connect, "PVA channel connected");
536
537        // pvget_full (not pvget) so the introspection round-trips
538        // alongside the value: every PVA read in this crate goes
539        // through a desc-aware API, matching the no-fallback invariant
540        // enforced by `pv_field_scalar_to_archiver`. The same op_get
541        // wire op underlies both, so this is a free strengthening.
542        let initial = tokio::time::timeout(CA_CONNECT_TIMEOUT, self.pva_client.pvget_full(pv_name))
543            .await
544            .map_err(|_| anyhow::anyhow!("PVA pvget for {pv_name} timed out"))?
545            .map_err(|e| anyhow::anyhow!("Failed to pvget {pv_name}: {e}"))?;
546        let (dbr_type, element_count) =
547            pv_field_to_arch_db_type(&initial.value).ok_or_else(|| {
548                anyhow::anyhow!("PV {pv_name}: empty PVA scalar-array; cannot infer element type")
549            })?;
550
551        // Register with Pva flag.
552        self.registry.register_pv_with_protocol(
553            pv_name,
554            dbr_type,
555            &effective_mode,
556            element_count,
557            Protocol::Pva,
558        )?;
559        if let Some(ref name) = matched_policy_name {
560            self.registry.update_policy_name(pv_name, Some(name))?;
561        }
562        // Persist PREC/EGU extracted from the same pvget — equivalent
563        // to the CA path's DBR_CTRL refresh. Non-fatal on error.
564        let (prec, egu) = pv_field_extract_display(&initial.value);
565        if (prec.is_some() || egu.is_some())
566            && let Err(e) = self
567                .registry
568                .update_metadata(pv_name, prec.as_deref(), egu.as_deref())
569        {
570            debug!(pv = pv_name, "Failed to persist PVA PREC/EGU: {e}");
571        }
572
573        let record = self
574            .registry
575            .get_pv(pv_name)?
576            .ok_or_else(|| anyhow::anyhow!("PV {pv_name} not found in registry"))?;
577        self.start_archiving_internal(&record).await?;
578
579        metrics::gauge!("archiver_pvs_active").increment(1.0);
580        info!(
581            pv = pv_name,
582            ?dbr_type,
583            element_count,
584            protocol = "pva",
585            "Started archiving"
586        );
587        Ok(())
588    }
589
590    /// Internal: start a subscription for a PV record. Routes by
591    /// `record.protocol`; the CA branch keeps the historical behaviour,
592    /// the PVA branch goes through `start_archiving_internal_pva`.
593    async fn start_archiving_internal(&self, record: &PvRecord) -> anyhow::Result<()> {
594        if record.protocol == Protocol::Pva {
595            return self.start_archiving_internal_pva(record).await;
596        }
597        let pv_name = record.pv_name.clone();
598        let dbr_type = record.dbr_type;
599        let element_count = record.element_count;
600        let channel = self.ca_client.create_channel(&pv_name);
601        let cancel_token = CancellationToken::new();
602        let conn_info = Arc::new(Mutex::new(ConnectionInfo::default()));
603        let extras: Arc<ExtraFieldsCache> = Arc::new(DashMap::new());
604        let field_tokens: Arc<DashMap<String, CancellationToken>> = Arc::new(DashMap::new());
605        let update_lock = Arc::new(tokio::sync::Mutex::new(()));
606        let counters = Arc::new(PvCounters::default());
607
608        // Hold update_lock around the whole insert+spawn block so a
609        // concurrent update_archive_fields can't observe the empty
610        // field_tokens map and spawn its own copies of the same fields
611        // before we get to the spawn loop. update_archive_fields acquires
612        // the same update_lock before mutating tokens.
613        let _guard = update_lock.lock().await;
614
615        self.channels.insert(
616            pv_name.clone(),
617            PvHandle {
618                channel: Some(channel.clone()),
619                cancel_token: cancel_token.clone(),
620                dbr_type,
621                conn_info: conn_info.clone(),
622                extras: extras.clone(),
623                field_tokens: field_tokens.clone(),
624                update_lock: update_lock.clone(),
625                counters: counters.clone(),
626            },
627        );
628
629        // Start one monitor task per archive_field with a child cancel token,
630        // tracked so update_archive_fields can stop individual fields.
631        for field in &record.archive_fields {
632            let child = cancel_token.child_token();
633            field_tokens.insert(field.clone(), child.clone());
634            spawn_extra_field_monitor(
635                &self.ca_client,
636                &pv_name,
637                field,
638                extras.clone(),
639                child,
640                counters.clone(),
641            );
642        }
643        metrics::gauge!("archiver_extra_field_tasks").increment(record.archive_fields.len() as f64);
644        drop(_guard);
645
646        let tx = self.sample_tx.clone();
647        let token = cancel_token.clone();
648        let ci = conn_info.clone();
649        let extras_for_loop = extras.clone();
650        let counters_for_loop = counters.clone();
651        let registry_for_loop = self.registry.clone();
652
653        let drift = self.server_ioc_drift_secs;
654        match &record.sample_mode {
655            SampleMode::Monitor => {
656                tokio::spawn(async move {
657                    monitor_loop(
658                        pv_name,
659                        dbr_type,
660                        element_count,
661                        channel,
662                        tx,
663                        token,
664                        ci,
665                        registry_for_loop,
666                        extras_for_loop,
667                        counters_for_loop,
668                        drift,
669                    )
670                    .await;
671                });
672            }
673            SampleMode::Scan { period_secs } => {
674                let period = *period_secs;
675                tokio::spawn(async move {
676                    scan_loop(
677                        pv_name,
678                        dbr_type,
679                        element_count,
680                        channel,
681                        tx,
682                        token,
683                        period,
684                        ci,
685                        registry_for_loop,
686                        extras_for_loop,
687                        counters_for_loop,
688                    )
689                    .await;
690                });
691            }
692        }
693
694        Ok(())
695    }
696
697    /// PVA equivalent of `start_archiving_internal`. Spawns a single
698    /// callback-driven monitor task; pvAccess auto-reconnect inside
699    /// `pvmonitor_handle` removes the explicit reconnect loop the CA
700    /// path needs. PVA records use `channel: None` on the [`PvHandle`]
701    /// and skip per-field "extras" subscriptions (`.HIHI`/`.LOLO`/…)
702    /// since pvAccess wraps those in the NTScalar value structure
703    /// rather than separate channels.
704    async fn start_archiving_internal_pva(&self, record: &PvRecord) -> anyhow::Result<()> {
705        let pv_name = record.pv_name.clone();
706        let dbr_type = record.dbr_type;
707        let element_count = record.element_count;
708        let cancel_token = CancellationToken::new();
709        let conn_info = Arc::new(Mutex::new(ConnectionInfo::default()));
710        let extras: Arc<ExtraFieldsCache> = Arc::new(DashMap::new());
711        let field_tokens: Arc<DashMap<String, CancellationToken>> = Arc::new(DashMap::new());
712        let update_lock = Arc::new(tokio::sync::Mutex::new(()));
713        let counters = Arc::new(PvCounters::default());
714
715        self.channels.insert(
716            pv_name.clone(),
717            PvHandle {
718                channel: None,
719                cancel_token: cancel_token.clone(),
720                dbr_type,
721                conn_info: conn_info.clone(),
722                extras: extras.clone(),
723                field_tokens: field_tokens.clone(),
724                update_lock,
725                counters: counters.clone(),
726            },
727        );
728
729        let tx = self.sample_tx.clone();
730        let pva_client = self.pva_client.clone();
731        let token = cancel_token.clone();
732        let ci = conn_info.clone();
733        let counters_for_loop = counters.clone();
734        let drift = self.server_ioc_drift_secs;
735
736        match &record.sample_mode {
737            SampleMode::Monitor => {
738                let pv_name_loop = pv_name.clone();
739                let archive_fields_loop = record.archive_fields.clone();
740                let extras_for_loop = extras.clone();
741                tokio::spawn(async move {
742                    monitor_loop_pva(
743                        pv_name_loop,
744                        dbr_type,
745                        element_count,
746                        pva_client,
747                        tx,
748                        token,
749                        ci,
750                        counters_for_loop,
751                        drift,
752                        archive_fields_loop,
753                        extras_for_loop,
754                    )
755                    .await;
756                });
757            }
758            SampleMode::Scan { period_secs } => {
759                let period = *period_secs;
760                let pv_name_loop = pv_name.clone();
761                let archive_fields_loop = record.archive_fields.clone();
762                let extras_for_loop = extras.clone();
763                tokio::spawn(async move {
764                    scan_loop_pva(
765                        pv_name_loop,
766                        dbr_type,
767                        pva_client,
768                        tx,
769                        token,
770                        period,
771                        ci,
772                        counters_for_loop,
773                        drift,
774                        archive_fields_loop,
775                        extras_for_loop,
776                    )
777                    .await;
778                });
779            }
780        }
781
782        // Auxiliary periodic refreshers: keep PREC/EGU current after
783        // IOC restarts, and approximate disconnect events from a long
784        // sample gap. Both bind to the same cancel_token so a stop /
785        // delete reaps every PVA-side task in one go.
786        {
787            let pv_name = pv_name.clone();
788            let pva_client = self.pva_client.clone();
789            let registry = self.registry.clone();
790            let cancel = cancel_token.clone();
791            let counters_for_refresh = counters.clone();
792            tokio::spawn(async move {
793                pva_metadata_refresh_loop(
794                    pv_name,
795                    pva_client,
796                    registry,
797                    cancel,
798                    counters_for_refresh,
799                )
800                .await;
801            });
802        }
803        {
804            let pv_name = pv_name.clone();
805            let conn_info = conn_info.clone();
806            let counters_for_watch = counters.clone();
807            let cancel = cancel_token.clone();
808            let sample_mode = record.sample_mode.clone();
809            tokio::spawn(async move {
810                pva_state_watchdog(pv_name, conn_info, counters_for_watch, cancel, sample_mode)
811                    .await;
812            });
813        }
814
815        Ok(())
816    }
817
818    /// Replace the archive_fields list for a running PV. Cancels per-field
819    /// monitor tasks for fields that left the set, spawns fresh ones for
820    /// fields that joined, and leaves unchanged fields running. Serialised
821    /// per-PV by an async mutex so concurrent callers can't double-spawn.
822    /// The main PV keeps running.
823    pub async fn update_archive_fields(
824        &self,
825        pv_name: &str,
826        fields: &[String],
827    ) -> anyhow::Result<()> {
828        // Persist first so a restart sees the new set even if the engine
829        // half of the update fails partway through.
830        self.registry.update_archive_fields(pv_name, fields)?;
831
832        // If the PV isn't currently active there's nothing more to do —
833        // start_archiving_internal will pick up the new fields on resume.
834        let (parent_token, extras, field_tokens, update_lock, counters) = {
835            let Some(handle) = self.channels.get(pv_name) else {
836                return Ok(());
837            };
838            (
839                handle.cancel_token.clone(),
840                handle.extras.clone(),
841                handle.field_tokens.clone(),
842                handle.update_lock.clone(),
843                handle.counters.clone(),
844            )
845        };
846
847        // Serialise so two concurrent updates can't both decide the same
848        // field is missing and spawn it twice.
849        let _guard = update_lock.lock().await;
850
851        let wanted: std::collections::HashSet<&str> = fields.iter().map(|s| s.as_str()).collect();
852
853        // Cancel + drop tasks for fields that left the set. Removing the
854        // entry from `field_tokens` also drops our handle on the child
855        // token; the spawned task observes `cancelled()` and exits.
856        let to_remove: Vec<String> = field_tokens
857            .iter()
858            .filter(|e| !wanted.contains(e.key().as_str()))
859            .map(|e| e.key().clone())
860            .collect();
861        let removed_count = to_remove.len();
862        for key in to_remove {
863            if let Some((_, token)) = field_tokens.remove(&key) {
864                token.cancel();
865            }
866            extras.remove(&key);
867        }
868
869        // Spawn tasks for fields newly added. Existing fields keep their
870        // task and their last cached value.
871        let mut added_count = 0usize;
872        for f in fields {
873            if !field_tokens.contains_key(f) {
874                let child = parent_token.child_token();
875                field_tokens.insert(f.clone(), child.clone());
876                spawn_extra_field_monitor(
877                    &self.ca_client,
878                    pv_name,
879                    f,
880                    extras.clone(),
881                    child,
882                    counters.clone(),
883                );
884                added_count += 1;
885            }
886        }
887        let net = added_count as i64 - removed_count as i64;
888        if net != 0 {
889            metrics::gauge!("archiver_extra_field_tasks").increment(net as f64);
890        }
891        Ok(())
892    }
893
894    /// Pause archiving for a PV.
895    pub async fn pause_pv(&self, pv_name: &str) -> anyhow::Result<()> {
896        let lock = self.op_lock(pv_name);
897        let _g = lock.lock().await;
898        if let Some((_key, handle)) = self.channels.remove(pv_name) {
899            let extra_count = handle.field_tokens.len() as f64;
900            handle.cancel_token.cancel();
901            metrics::gauge!("archiver_pvs_active").decrement(1.0);
902            if extra_count > 0.0 {
903                metrics::gauge!("archiver_extra_field_tasks").decrement(extra_count);
904            }
905        }
906        self.registry.set_status(pv_name, PvStatus::Paused)?;
907        info!(pv = pv_name, "Paused archiving");
908        Ok(())
909    }
910
911    /// Resume a paused PV. Only paused or error PVs can be resumed;
912    /// calling resume on an already-active PV is a no-op (returns Ok).
913    pub async fn resume_pv(&self, pv_name: &str) -> anyhow::Result<()> {
914        let lock = self.op_lock(pv_name);
915        let _g = lock.lock().await;
916
917        let record = self
918            .registry
919            .get_pv(pv_name)?
920            .ok_or_else(|| anyhow::anyhow!("PV {pv_name} not found in registry"))?;
921
922        // Guard: if already active with a live task, nothing to do.
923        if record.status == PvStatus::Active && self.channels.contains_key(pv_name) {
924            info!(
925                pv = pv_name,
926                "PV is already actively archived, skipping resume"
927            );
928            return Ok(());
929        }
930
931        // Cancel any orphaned task to prevent duplicate subscriptions.
932        if let Some((_key, handle)) = self.channels.remove(pv_name) {
933            let extra_count = handle.field_tokens.len() as f64;
934            handle.cancel_token.cancel();
935            if extra_count > 0.0 {
936                metrics::gauge!("archiver_extra_field_tasks").decrement(extra_count);
937            }
938        }
939
940        // Start the task first; only mark Active if it succeeds.
941        self.start_archiving_internal(&record).await?;
942        self.registry.set_status(pv_name, PvStatus::Active)?;
943        metrics::gauge!("archiver_pvs_active").increment(1.0);
944        info!(pv = pv_name, "Resumed archiving");
945        Ok(())
946    }
947
948    /// Stop archiving a PV without removing it from the registry.
949    /// Sets the PV status to Inactive (data retained, monitoring stopped).
950    pub async fn stop_pv(&self, pv_name: &str) -> anyhow::Result<()> {
951        let lock = self.op_lock(pv_name);
952        let _g = lock.lock().await;
953        if let Some((_key, handle)) = self.channels.remove(pv_name) {
954            let extra_count = handle.field_tokens.len() as f64;
955            handle.cancel_token.cancel();
956            metrics::gauge!("archiver_pvs_active").decrement(1.0);
957            if extra_count > 0.0 {
958                metrics::gauge!("archiver_extra_field_tasks").decrement(extra_count);
959            }
960        }
961        self.registry.set_status(pv_name, PvStatus::Inactive)?;
962        info!(pv = pv_name, "Stopped archiving (inactive)");
963        Ok(())
964    }
965
966    /// Remove a PV from archiving entirely.
967    pub async fn destroy_pv(&self, pv_name: &str) -> anyhow::Result<()> {
968        let lock = self.op_lock(pv_name);
969        let _g = lock.lock().await;
970        if let Some((_key, handle)) = self.channels.remove(pv_name) {
971            let extra_count = handle.field_tokens.len() as f64;
972            handle.cancel_token.cancel();
973            metrics::gauge!("archiver_pvs_active").decrement(1.0);
974            if extra_count > 0.0 {
975                metrics::gauge!("archiver_extra_field_tasks").decrement(extra_count);
976            }
977        }
978        self.registry.remove_pv(pv_name)?;
979        // Don't remove the op_locks entry here: a concurrent caller may have
980        // already taken a clone of this Arc and be queued on it; removing
981        // would let a fresh caller obtain a new mutex and the queued one
982        // would race them. The map is bounded by the lifetime universe of
983        // PV names, which is acceptable.
984        info!(pv = pv_name, "Destroyed archiving channel");
985        Ok(())
986    }
987
988    /// List all currently archived PV names (from registry).
989    pub fn list_pvs(&self) -> Vec<String> {
990        self.registry.all_pv_names().unwrap_or_else(|e| {
991            warn!("Failed to list PVs: {e}");
992            Vec::new()
993        })
994    }
995
996    /// Match PVs by glob pattern (from registry).
997    pub fn matching_pvs(&self, pattern: &str) -> Vec<String> {
998        self.registry.matching_pvs(pattern).unwrap_or_else(|e| {
999            warn!("Failed to match PVs: {e}");
1000            Vec::new()
1001        })
1002    }
1003
1004    /// Get the registry reference.
1005    pub fn registry(&self) -> &Arc<PvRegistry> {
1006        &self.registry
1007    }
1008
1009    /// Get connection info for a PV.
1010    pub fn get_connection_info(&self, pv: &str) -> Option<ConnectionInfo> {
1011        self.channels.get(pv).map(|h| {
1012            h.conn_info
1013                .lock()
1014                .unwrap_or_else(|e| e.into_inner())
1015                .clone()
1016        })
1017    }
1018
1019    /// Get PV names that have never received any event (connected_since == None).
1020    pub fn get_never_connected_pvs(&self) -> Vec<String> {
1021        self.channels
1022            .iter()
1023            .filter(|entry| {
1024                let ci = entry
1025                    .value()
1026                    .conn_info
1027                    .lock()
1028                    .unwrap_or_else(|e| e.into_inner());
1029                ci.connected_since.is_none()
1030            })
1031            .map(|entry| entry.key().clone())
1032            .collect()
1033    }
1034
1035    /// Snapshot the diagnostic counters for one PV. Returns None if the
1036    /// PV isn't actively archived. The returned Arc is the live counter
1037    /// — callers read with `Ordering::Relaxed`.
1038    pub fn pv_counters(&self, pv_name: &str) -> Option<Arc<PvCounters>> {
1039        self.channels.get(pv_name).map(|h| h.counters.clone())
1040    }
1041
1042    /// Snapshot every active PV's counters. Returns `(pv_name,
1043    /// PvCountersSnapshot)` so callers don't have to handle Arc.
1044    pub fn all_pv_counters(&self) -> Vec<(String, PvCountersSnapshot)> {
1045        self.channels
1046            .iter()
1047            .map(|e| {
1048                (
1049                    e.key().clone(),
1050                    PvCountersSnapshot::from(&*e.value().counters),
1051                )
1052            })
1053            .collect()
1054    }
1055
1056    /// One-shot CA `get` against the running channel for `pv`. Returns
1057    /// `None` if the PV isn't actively archived. The timeout caps how
1058    /// long the caller will wait for a value when the IOC is slow.
1059    /// Powers `getEngineDataAction` / `getDataAtTimeEngine`.
1060    pub async fn live_value(
1061        &self,
1062        pv_name: &str,
1063        timeout: Duration,
1064    ) -> Option<anyhow::Result<ArchiverValue>> {
1065        let channel_opt = self.channels.get(pv_name)?.channel.clone();
1066        let Some(channel) = channel_opt else {
1067            // PVA-acquired PV — live_value via pvget_full so we get
1068            // the canonical channel descriptor alongside the value;
1069            // V4GenericBytes encoding then preserves Union / Variant
1070            // schemas instead of using lossy value-recovery.
1071            let pva = self.pva_client.clone();
1072            let name = pv_name.to_string();
1073            let res = tokio::time::timeout(timeout, pva.pvget_full(&name)).await;
1074            return Some(match res {
1075                Ok(Ok(result)) => pv_field_scalar_to_archiver(&result.value, &result.introspection)
1076                    .ok_or_else(|| anyhow::anyhow!("PVA value has no archiver mapping")),
1077                Ok(Err(e)) => Err(anyhow::anyhow!("PVA get failed: {e}")),
1078                Err(_) => Err(anyhow::anyhow!("PVA get timed out after {timeout:?}")),
1079            });
1080        };
1081        // Wait briefly for connection — channel.get on a disconnected
1082        // channel would otherwise return an error from deep in the CA
1083        // stack. Capped by the same timeout the caller chose.
1084        if channel.wait_connected(timeout).await.is_err() {
1085            return Some(Err(anyhow::anyhow!(
1086                "channel not connected within {timeout:?}"
1087            )));
1088        }
1089        match tokio::time::timeout(timeout, channel.get()).await {
1090            Ok(Ok((_dbr_type, val))) => Some(Ok(epics_value_to_archiver(&val))),
1091            Ok(Err(e)) => Some(Err(anyhow::anyhow!("CA get failed: {e}"))),
1092            Err(_) => Some(Err(anyhow::anyhow!("CA get timed out after {timeout:?}"))),
1093        }
1094    }
1095
1096    /// Snapshot the latest cached extra-field values for `pv` —
1097    /// `(field_name, stringified_value)` pairs. Empty map when the PV
1098    /// isn't archived or has no archive_fields configured.
1099    pub fn extras_snapshot(&self, pv_name: &str) -> std::collections::HashMap<String, String> {
1100        match self.channels.get(pv_name) {
1101            Some(handle) => handle
1102                .extras
1103                .iter()
1104                .map(|e| (e.key().clone(), e.value().clone()))
1105                .collect(),
1106            None => std::collections::HashMap::new(),
1107        }
1108    }
1109
1110    /// Get PV names that are currently disconnected (is_connected == false).
1111    pub fn get_currently_disconnected_pvs(&self) -> Vec<String> {
1112        self.channels
1113            .iter()
1114            .filter(|entry| {
1115                let ci = entry
1116                    .value()
1117                    .conn_info
1118                    .lock()
1119                    .unwrap_or_else(|e| e.into_inner());
1120                !ci.is_connected
1121            })
1122            .map(|entry| entry.key().clone())
1123            .collect()
1124    }
1125}
1126
1127/// Read DBR_CTRL metadata from the channel and persist `precision`/`units`
1128/// to the registry when they differ from stored values. Best-effort: any
1129/// failure (timeout, transport error, missing display info, non-numeric
1130/// type) is logged at debug and silently ignored. Skips the SQL write
1131/// entirely when the in-memory record already matches, so reconnect
1132/// storms don't churn the database.
1133async fn refresh_ctrl_metadata(
1134    channel: &CaChannel,
1135    registry: &PvRegistry,
1136    pv_name: &str,
1137    counters: &PvCounters,
1138) {
1139    // 15s — long enough for slow-network IOCs (Java's default `epicsTimeout`
1140    // is 30s; 15s splits the difference). Failures count toward
1141    // `metadata_fetch_failures` so operators can spot PVs that never get
1142    // PREC/EGU populated.
1143    const FETCH_TIMEOUT: Duration = Duration::from_secs(15);
1144    let snapshot = match tokio::time::timeout(
1145        FETCH_TIMEOUT,
1146        channel.get_with_metadata(DbrClass::Ctrl),
1147    )
1148    .await
1149    {
1150        Ok(Ok(s)) => s,
1151        Ok(Err(e)) => {
1152            counters
1153                .metadata_fetch_failures
1154                .fetch_add(1, Ordering::Relaxed);
1155            debug!(pv = pv_name, "Ctrl metadata fetch failed: {e}");
1156            return;
1157        }
1158        Err(_) => {
1159            counters
1160                .metadata_fetch_failures
1161                .fetch_add(1, Ordering::Relaxed);
1162            debug!(pv = pv_name, "Ctrl metadata fetch timed out");
1163            return;
1164        }
1165    };
1166    let Some(display) = snapshot.display else {
1167        // Non-numeric type (string, enum, …) — DBR_CTRL has no
1168        // DisplayInfo. Not a failure; just nothing to persist.
1169        return;
1170    };
1171
1172    // Negative precision is the Java EAA "no precision info" sentinel —
1173    // persisting "-1" as the PREC string would surface as a literal -1
1174    // in the UI / API. Treat it the same as missing.
1175    let new_prec_opt: Option<String> = if display.precision < 0 {
1176        None
1177    } else {
1178        Some(display.precision.to_string())
1179    };
1180    let new_egu_trimmed = display.units.trim();
1181    let new_egu_opt: Option<&str> = if new_egu_trimmed.is_empty() {
1182        None
1183    } else {
1184        Some(new_egu_trimmed)
1185    };
1186
1187    let stored = match registry.get_pv(pv_name) {
1188        Ok(Some(r)) => r,
1189        Ok(None) => return,
1190        Err(e) => {
1191            debug!(
1192                pv = pv_name,
1193                "Registry read for metadata compare failed: {e}"
1194            );
1195            return;
1196        }
1197    };
1198
1199    let prec_changed = match (stored.prec.as_deref(), new_prec_opt.as_deref()) {
1200        (Some(s), Some(n)) => s != n,
1201        (None, Some(_)) => true,
1202        // Don't overwrite a populated PREC with the "no info" sentinel.
1203        _ => false,
1204    };
1205    let egu_changed = match (stored.egu.as_deref(), new_egu_opt) {
1206        (Some(s), Some(n)) => s != n,
1207        (None, Some(_)) => true,
1208        // Don't overwrite a populated EGU with empty, and don't
1209        // bother writing None over None.
1210        _ => false,
1211    };
1212    if !prec_changed && !egu_changed {
1213        return;
1214    }
1215
1216    let prec_arg = if prec_changed {
1217        new_prec_opt.as_deref()
1218    } else {
1219        None
1220    };
1221    let egu_arg = if egu_changed { new_egu_opt } else { None };
1222    if let Err(e) = registry.update_metadata(pv_name, prec_arg, egu_arg) {
1223        warn!(pv = pv_name, "Failed to persist PREC/EGU: {e}");
1224    } else {
1225        debug!(
1226            pv = pv_name,
1227            prec = ?prec_arg, egu = ?egu_arg,
1228            "Refreshed PREC/EGU from DBR_CTRL"
1229        );
1230    }
1231}
1232
1233/// Body shared by the two PVA monitor callback variants
1234/// (`pvmonitor_handle` takes `(FieldDesc, PvField)`,
1235/// `pvmonitor_with_request` takes `(PvField)`). Lifted out so we
1236/// can write the once-per-event logic in one place.
1237///
1238/// `canonical_desc` is the channel-INIT introspection descriptor —
1239/// required at every callsite. The fast-path callback receives one
1240/// per event from the pvxs reactor; the custom-request path
1241/// pre-fetches it once via `pvinfo` and reuses the same `Arc`
1242/// across every event in a subscribe generation (a failed `pvinfo`
1243/// aborts subscribe + retries rather than degrading to lossy
1244/// value-recovery, so the on-disk invariant always holds).
1245#[allow(clippy::too_many_arguments)]
1246fn pva_handle_event(
1247    field: &PvField,
1248    canonical_desc: &epics_rs::pva::pvdata::FieldDesc,
1249    pv_name: &str,
1250    dbr_type: ArchDbType,
1251    tx: &mpsc::Sender<PvSample>,
1252    conn_info: &Mutex<ConnectionInfo>,
1253    counters: &Arc<PvCounters>,
1254    extras: &ExtraFieldsCache,
1255    extras_paths: &[(String, &'static str)],
1256    drift_secs: u64,
1257) {
1258    let now = SystemTime::now();
1259    let first_after_connect = {
1260        let mut ci = conn_info.lock().unwrap_or_else(|e| e.into_inner());
1261        let first = ci.last_event_time.is_none();
1262        if ci.connected_since.is_none() {
1263            ci.connected_since = Some(now);
1264        }
1265        ci.is_connected = true;
1266        ci.last_event_time = Some(now);
1267        ci.state = PvConnectionState::Connected;
1268        first
1269    };
1270    if first_after_connect {
1271        counters
1272            .first_event_unix_secs
1273            .compare_exchange(0, unix_secs(now), Ordering::Relaxed, Ordering::Relaxed)
1274            .ok();
1275    }
1276    counters.events_received.fetch_add(1, Ordering::Relaxed);
1277
1278    let Some(value) = pv_field_scalar_to_archiver(field, canonical_desc) else {
1279        counters.type_change_drops.fetch_add(1, Ordering::Relaxed);
1280        debug!(pv = pv_name, "PVA event has no archiver mapping; dropping");
1281        return;
1282    };
1283
1284    let ts = pv_field_extract_timestamp(field);
1285    if !first_after_connect && !ioc_timestamp_in_window(ts, now, drift_secs) {
1286        counters.timestamp_drops.fetch_add(1, Ordering::Relaxed);
1287        debug!(
1288            pv = pv_name,
1289            ?ts,
1290            "Dropping PVA sample with out-of-window timestamp"
1291        );
1292        return;
1293    }
1294    let elem_count = match pv_field_element_count(field) {
1295        0 => 1,
1296        n => n,
1297    };
1298    refresh_nt_enum_extras(field, extras);
1299    for (field_name, path) in extras_paths {
1300        if let Some(PvField::Scalar(s)) = pv_field_walk_path(field, path) {
1301            extras.insert(field_name.clone(), scalar_value_to_string(s));
1302        }
1303    }
1304    let mut sample = ArchiverSample::new(ts, value);
1305    attach_extras(extras, &mut sample);
1306    let pv_sample = PvSample {
1307        pv_name: pv_name.to_string(),
1308        dbr_type,
1309        sample,
1310        element_count: Some(elem_count),
1311        counters: Some(counters.clone()),
1312    };
1313    if let Err(tokio::sync::mpsc::error::TrySendError::Full(_)) = tx.try_send(pv_sample) {
1314        counters
1315            .buffer_overflow_drops
1316            .fetch_add(1, Ordering::Relaxed);
1317    }
1318}
1319
1320/// PVA monitor loop: subscribes once and parks until cancellation.
1321/// Each fan-in event is decoded inline, packaged as a [`PvSample`],
1322/// and non-blocking-pushed into the storage write_loop's channel —
1323/// blocking would stall the pvAccess reactor thread.
1324///
1325/// When `archive_fields` is non-empty, the subscription requests an
1326/// explicit pvRequest with the mapped sub-field paths so the IOC
1327/// includes them in every monitor event; the callback then mirrors
1328/// the CA path's "extras" cache by stringifying each requested field.
1329#[allow(clippy::too_many_arguments)]
1330async fn monitor_loop_pva(
1331    pv_name: String,
1332    dbr_type: ArchDbType,
1333    element_count: i32,
1334    pva_client: PvaClient,
1335    tx: mpsc::Sender<PvSample>,
1336    cancel_token: CancellationToken,
1337    conn_info: Arc<Mutex<ConnectionInfo>>,
1338    counters: Arc<PvCounters>,
1339    server_ioc_drift_secs: u64,
1340    archive_fields: Vec<String>,
1341    extras: Arc<ExtraFieldsCache>,
1342) {
1343    let drift_secs = server_ioc_drift_secs;
1344    // Static element_count is unused — each callback derives the
1345    // current array length from the live PvField, so an NTScalarArray
1346    // whose size changes between events still gets tagged correctly.
1347    let _ = element_count;
1348
1349    // Build the (CA name, PVA path) pairs once. Skip CA fields with
1350    // no PVA equivalent so a misconfigured archive_fields list
1351    // doesn't poison the pvRequest.
1352    let extras_paths: Vec<(String, &'static str)> = archive_fields
1353        .iter()
1354        .filter_map(|f| ca_archive_field_to_pva_path(f).map(|p| (f.clone(), p)))
1355        .collect();
1356    let request_expr = if extras_paths.is_empty() {
1357        None
1358    } else {
1359        let mut builder = epics_rs::pva::pv_request::PvRequestBuilder::new()
1360            .field("value")
1361            .field("alarm")
1362            .field("timeStamp");
1363        for (_, path) in &extras_paths {
1364            builder = builder.field(*path);
1365        }
1366        Some(builder.build())
1367    };
1368
1369    // Subscribe loop with retry. The custom-request path uses
1370    // `pvmonitor_with_request` (no SubscriptionHandle) inside a
1371    // tokio::select! so cancellation drops the future cleanly; the
1372    // empty-request path keeps the original SubscriptionHandle form
1373    // since that's the simpler path for the common case.
1374    loop {
1375        if let Some(ref req) = request_expr {
1376            // Custom-request path: 1-arg callback. `pvmonitor_with_request`
1377            // does not surface the channel's FieldDesc per event, so we
1378            // pre-fetch it once via `pvinfo` and reuse the Arc across
1379            // every event in this subscription generation. A failed
1380            // pvinfo aborts this subscribe attempt and retries — the
1381            // archiver invariant is that every V4 sample on disk
1382            // carries the channel-INIT descriptor, so degrading to
1383            // value-recovery is not allowed.
1384            let canonical = match pva_client.pvinfo(&pv_name).await {
1385                Ok(d) => Arc::new(d),
1386                Err(e) => {
1387                    counters
1388                        .transient_error_count
1389                        .fetch_add(1, Ordering::Relaxed);
1390                    warn!(
1391                        pv = pv_name,
1392                        "PVA pvinfo failed: {e}; cannot capture canonical descriptor — retrying subscribe"
1393                    );
1394                    tokio::select! {
1395                        _ = cancel_token.cancelled() => return,
1396                        _ = tokio::time::sleep(CA_RETRY_DELAY) => continue,
1397                    }
1398                }
1399            };
1400            let pv_name_cb = pv_name.clone();
1401            let tx_cb = tx.clone();
1402            let conn_info_cb = conn_info.clone();
1403            let counters_cb = counters.clone();
1404            let extras_cb = extras.clone();
1405            let extras_paths_cb = extras_paths.clone();
1406            let canonical_cb = canonical.clone();
1407            let cb = move |field: &PvField| {
1408                pva_handle_event(
1409                    field,
1410                    &canonical_cb,
1411                    &pv_name_cb,
1412                    dbr_type,
1413                    &tx_cb,
1414                    &conn_info_cb,
1415                    &counters_cb,
1416                    &extras_cb,
1417                    &extras_paths_cb,
1418                    drift_secs,
1419                );
1420            };
1421            tokio::select! {
1422                _ = cancel_token.cancelled() => {
1423                    debug!(pv = pv_name, "PVA monitor (custom request) cancelled");
1424                    return;
1425                }
1426                res = pva_client.pvmonitor_with_request(&pv_name, req, cb) => {
1427                    match res {
1428                        Ok(()) => {
1429                            debug!(pv = pv_name, "PVA pvmonitor_with_request returned Ok; resubscribing");
1430                        }
1431                        Err(e) => {
1432                            counters
1433                                .transient_error_count
1434                                .fetch_add(1, Ordering::Relaxed);
1435                            warn!(pv = pv_name, "PVA pvmonitor_with_request failed: {e}; retrying");
1436                        }
1437                    }
1438                    tokio::select! {
1439                        _ = cancel_token.cancelled() => return,
1440                        _ = tokio::time::sleep(CA_RETRY_DELAY) => continue,
1441                    }
1442                }
1443            }
1444        } else {
1445            // Empty-request fast path: 2-arg callback. Keep the
1446            // SubscriptionHandle so the inner reactor task lives
1447            // until our explicit drop. The per-event FieldDesc the
1448            // reactor hands us IS the channel-INIT descriptor (pvxs
1449            // refreshes only on type-change, which forces reconnect
1450            // upstream), so it's wire-faithful for Union / Variant
1451            // shapes that `PvField::descriptor()` would degrade.
1452            let pv_name_cb = pv_name.clone();
1453            let tx_cb = tx.clone();
1454            let conn_info_cb = conn_info.clone();
1455            let counters_cb = counters.clone();
1456            let extras_cb = extras.clone();
1457            let extras_paths_cb = extras_paths.clone();
1458            let cb = move |desc: &epics_rs::pva::pvdata::FieldDesc, field: &PvField| {
1459                pva_handle_event(
1460                    field,
1461                    desc,
1462                    &pv_name_cb,
1463                    dbr_type,
1464                    &tx_cb,
1465                    &conn_info_cb,
1466                    &counters_cb,
1467                    &extras_cb,
1468                    &extras_paths_cb,
1469                    drift_secs,
1470                );
1471            };
1472            let handle = match pva_client.pvmonitor_handle(&pv_name, cb).await {
1473                Ok(h) => h,
1474                Err(e) => {
1475                    counters
1476                        .transient_error_count
1477                        .fetch_add(1, Ordering::Relaxed);
1478                    warn!(pv = pv_name, "PVA pvmonitor failed: {e}; retrying");
1479                    tokio::select! {
1480                        _ = cancel_token.cancelled() => return,
1481                        _ = tokio::time::sleep(CA_RETRY_DELAY) => continue,
1482                    }
1483                }
1484            };
1485            debug!(pv = pv_name, "PVA monitor active");
1486            cancel_token.cancelled().await;
1487            debug!(pv = pv_name, "PVA monitor cancelled; dropping subscription");
1488            drop(handle);
1489            return;
1490        }
1491    }
1492}
1493
1494/// PVA Scan loop: periodic `pvget` instead of streaming monitor.
1495/// Mirrors the CA scan_loop's per-tick connection check + sample emit.
1496#[allow(clippy::too_many_arguments)]
1497async fn scan_loop_pva(
1498    pv_name: String,
1499    dbr_type: ArchDbType,
1500    pva_client: PvaClient,
1501    tx: mpsc::Sender<PvSample>,
1502    cancel_token: CancellationToken,
1503    period_secs: f64,
1504    conn_info: Arc<Mutex<ConnectionInfo>>,
1505    counters: Arc<PvCounters>,
1506    server_ioc_drift_secs: u64,
1507    archive_fields: Vec<String>,
1508    extras: Arc<ExtraFieldsCache>,
1509) {
1510    let extras_paths: Vec<(String, &'static str)> = archive_fields
1511        .iter()
1512        .filter_map(|f| ca_archive_field_to_pva_path(f).map(|p| (f.clone(), p)))
1513        .collect();
1514    let period = Duration::from_secs_f64(period_secs);
1515    let mut interval = tokio::time::interval(period);
1516    let drift_secs = server_ioc_drift_secs;
1517    // Hard timeout per pvget — at most one tick of slack so a stuck
1518    // PVA server can't accumulate pending requests.
1519    let pvget_timeout = period.max(Duration::from_secs(5));
1520
1521    loop {
1522        tokio::select! {
1523            _ = cancel_token.cancelled() => return,
1524            _ = interval.tick() => {}
1525        }
1526
1527        // pvget_full carries the channel's canonical FieldDesc alongside
1528        // the value, so V4GenericBytes encoding can preserve Union /
1529        // UnionArray / Variant schemas that `PvField::descriptor()`
1530        // would otherwise degrade.
1531        let res = tokio::time::timeout(pvget_timeout, pva_client.pvget_full(&pv_name)).await;
1532        let (field, canonical) = match res {
1533            Ok(Ok(r)) => (r.value, r.introspection),
1534            Ok(Err(e)) => {
1535                let was_connected = {
1536                    let mut ci = conn_info.lock().unwrap_or_else(|e| e.into_inner());
1537                    let prev = ci.is_connected;
1538                    ci.is_connected = false;
1539                    ci.last_event_time = None;
1540                    ci.state = match ci.state {
1541                        PvConnectionState::Connected => PvConnectionState::Disconnected,
1542                        PvConnectionState::Disconnected => PvConnectionState::Disconnected,
1543                        _ => PvConnectionState::Connecting,
1544                    };
1545                    prev
1546                };
1547                if was_connected {
1548                    counters.disconnect_count.fetch_add(1, Ordering::Relaxed);
1549                    counters
1550                        .last_disconnect_unix_secs
1551                        .store(unix_secs(SystemTime::now()), Ordering::Relaxed);
1552                }
1553                counters
1554                    .transient_error_count
1555                    .fetch_add(1, Ordering::Relaxed);
1556                debug!(pv = pv_name, "PVA scan pvget failed: {e}");
1557                continue;
1558            }
1559            Err(_) => {
1560                counters
1561                    .transient_error_count
1562                    .fetch_add(1, Ordering::Relaxed);
1563                debug!(pv = pv_name, "PVA scan pvget timed out");
1564                continue;
1565            }
1566        };
1567
1568        let now = SystemTime::now();
1569        let first_after_connect = {
1570            let mut ci = conn_info.lock().unwrap_or_else(|e| e.into_inner());
1571            let first = ci.last_event_time.is_none();
1572            if ci.connected_since.is_none() {
1573                ci.connected_since = Some(now);
1574            }
1575            ci.is_connected = true;
1576            ci.last_event_time = Some(now);
1577            ci.state = PvConnectionState::Connected;
1578            first
1579        };
1580        counters.events_received.fetch_add(1, Ordering::Relaxed);
1581        if first_after_connect {
1582            counters
1583                .first_event_unix_secs
1584                .compare_exchange(0, unix_secs(now), Ordering::Relaxed, Ordering::Relaxed)
1585                .ok();
1586        }
1587
1588        let Some(value) = pv_field_scalar_to_archiver(&field, &canonical) else {
1589            counters.type_change_drops.fetch_add(1, Ordering::Relaxed);
1590            debug!(
1591                pv = pv_name,
1592                "PVA scan value has no archiver mapping; dropping"
1593            );
1594            continue;
1595        };
1596        // Scan mode: timestamp the sample at receive time (no IOC
1597        // timestamp available reliably for periodic pvget, mirroring
1598        // CA scan_loop's `now` policy).
1599        let _ = drift_secs; // referenced for symmetry; not used here
1600        let elem_count = match pv_field_element_count(&field) {
1601            0 => 1,
1602            n => n,
1603        };
1604        // Refresh extras from this scan's pvget result (the pvget
1605        // returns the full NTScalar so all sub-fields are available
1606        // for free — no extra channel spawn needed).
1607        refresh_nt_enum_extras(&field, &extras);
1608        for (field_name, path) in &extras_paths {
1609            if let Some(PvField::Scalar(s)) = pv_field_walk_path(&field, path) {
1610                extras.insert(field_name.clone(), scalar_value_to_string(s));
1611            }
1612        }
1613        let mut sample = ArchiverSample::new(now, value);
1614        attach_extras(&extras, &mut sample);
1615        let pv_sample = PvSample {
1616            pv_name: pv_name.clone(),
1617            dbr_type,
1618            sample,
1619            element_count: Some(elem_count),
1620            counters: Some(counters.clone()),
1621        };
1622        if let Err(rejected) = try_send_with_overflow_count(&tx, pv_sample, &counters).await {
1623            // Channel closed (write_loop down). Cooperative shutdown.
1624            let _ = rejected;
1625            return;
1626        }
1627    }
1628}
1629
1630/// Periodic refresh task for PVA-acquired PVs: re-fetches DBR_CTRL-
1631/// equivalent metadata (`display.units`, `display.precision`) every
1632/// few minutes and persists when changed. PVA's monitor callback API
1633/// doesn't surface explicit reconnect events the way CA's
1634/// `ConnectionEvent` does, so a dedicated periodic poll is the
1635/// simplest way to keep PREC/EGU fresh after IOC restarts.
1636async fn pva_metadata_refresh_loop(
1637    pv_name: String,
1638    pva_client: PvaClient,
1639    registry: Arc<PvRegistry>,
1640    cancel_token: CancellationToken,
1641    counters: Arc<PvCounters>,
1642) {
1643    // 5 minutes — operators almost never change PREC/EGU at runtime;
1644    // shorter cadence wastes registry/network and longer leaves UI
1645    // stale across IOC restarts. Same magic number Java EAA uses
1646    // for its `metaFieldsRefresh` cron.
1647    const REFRESH_INTERVAL: Duration = Duration::from_secs(5 * 60);
1648    const FETCH_TIMEOUT: Duration = Duration::from_secs(15);
1649    let mut tick = tokio::time::interval(REFRESH_INTERVAL);
1650    // Skip the immediate first tick — initial pvget already populated
1651    // PREC/EGU at archive_pv time.
1652    tick.tick().await;
1653    loop {
1654        tokio::select! {
1655            _ = cancel_token.cancelled() => return,
1656            _ = tick.tick() => {}
1657        }
1658
1659        let res = tokio::time::timeout(FETCH_TIMEOUT, pva_client.pvget(&pv_name)).await;
1660        let field = match res {
1661            Ok(Ok(f)) => f,
1662            _ => {
1663                counters
1664                    .metadata_fetch_failures
1665                    .fetch_add(1, Ordering::Relaxed);
1666                continue;
1667            }
1668        };
1669        let (new_prec, new_egu) = pv_field_extract_display(&field);
1670        let stored = match registry.get_pv(&pv_name) {
1671            Ok(Some(r)) => r,
1672            _ => continue,
1673        };
1674        let prec_changed = match (stored.prec.as_deref(), new_prec.as_deref()) {
1675            (Some(s), Some(n)) => s != n,
1676            (None, Some(_)) => true,
1677            _ => false,
1678        };
1679        let egu_changed = match (stored.egu.as_deref(), new_egu.as_deref()) {
1680            (Some(s), Some(n)) => s != n,
1681            (None, Some(_)) => true,
1682            _ => false,
1683        };
1684        if !prec_changed && !egu_changed {
1685            continue;
1686        }
1687        let prec_arg = if prec_changed {
1688            new_prec.as_deref()
1689        } else {
1690            None
1691        };
1692        let egu_arg = if egu_changed {
1693            new_egu.as_deref()
1694        } else {
1695            None
1696        };
1697        if let Err(e) = registry.update_metadata(&pv_name, prec_arg, egu_arg) {
1698            warn!(pv = pv_name, "Failed to persist PVA PREC/EGU: {e}");
1699        } else {
1700            debug!(pv = pv_name, prec = ?prec_arg, egu = ?egu_arg, "Refreshed PVA display");
1701        }
1702    }
1703}
1704
1705/// State watchdog for PVA-acquired PVs: a dedicated task that flips
1706/// `conn_info.state` to `Disconnected` when no event has arrived
1707/// within `STALE_THRESHOLD`. PVA's callback API doesn't surface
1708/// explicit channel-state changes — we approximate by treating a
1709/// long event gap as a disconnect. Trade-off: a genuinely silent PV
1710/// (alarm-only, low-rate scan) appears disconnected. Operators can
1711/// raise the threshold by env var if their PV cadence is slower.
1712async fn pva_state_watchdog(
1713    pv_name: String,
1714    conn_info: Arc<Mutex<ConnectionInfo>>,
1715    counters: Arc<PvCounters>,
1716    cancel_token: CancellationToken,
1717    sample_mode: SampleMode,
1718) {
1719    const POLL_INTERVAL: Duration = Duration::from_secs(5);
1720    // Threshold scales with the expected event cadence so a slow Scan
1721    // PV (period=300 s) doesn't flap on every tick. For Monitor mode
1722    // the floor is 60 s — most production PVs change at least that
1723    // often. Operators who archive truly silent PVs (alarm-only) will
1724    // see stale disconnected status; that's a known limitation.
1725    let stale_threshold = match sample_mode {
1726        SampleMode::Monitor => Duration::from_secs(60),
1727        SampleMode::Scan { period_secs } => Duration::from_secs_f64((period_secs * 3.0).max(60.0)),
1728    };
1729    let mut interval = tokio::time::interval(POLL_INTERVAL);
1730    interval.tick().await;
1731    loop {
1732        tokio::select! {
1733            _ = cancel_token.cancelled() => return,
1734            _ = interval.tick() => {}
1735        }
1736        let stale_now = {
1737            let ci = conn_info.lock().unwrap_or_else(|e| e.into_inner());
1738            if !ci.is_connected {
1739                continue;
1740            }
1741            match ci.last_event_time {
1742                Some(t) => SystemTime::now()
1743                    .duration_since(t)
1744                    .map(|d| d > stale_threshold)
1745                    .unwrap_or(false),
1746                None => false,
1747            }
1748        };
1749        if stale_now {
1750            let was_connected = {
1751                let mut ci = conn_info.lock().unwrap_or_else(|e| e.into_inner());
1752                let prev = ci.is_connected;
1753                ci.is_connected = false;
1754                ci.state = PvConnectionState::Disconnected;
1755                prev
1756            };
1757            if was_connected {
1758                counters.disconnect_count.fetch_add(1, Ordering::Relaxed);
1759                counters
1760                    .last_disconnect_unix_secs
1761                    .store(unix_secs(SystemTime::now()), Ordering::Relaxed);
1762                debug!(
1763                    pv = pv_name,
1764                    "PVA watchdog: marking disconnected after stale heartbeat"
1765                );
1766            }
1767        }
1768    }
1769}
1770
1771/// Monitor loop: subscribe to a channel and forward values.
1772#[allow(clippy::too_many_arguments)]
1773async fn monitor_loop(
1774    pv_name: String,
1775    dbr_type: ArchDbType,
1776    element_count: i32,
1777    channel: CaChannel,
1778    tx: mpsc::Sender<PvSample>,
1779    cancel_token: CancellationToken,
1780    conn_info: Arc<Mutex<ConnectionInfo>>,
1781    registry: Arc<PvRegistry>,
1782    extras: Arc<ExtraFieldsCache>,
1783    counters: Arc<PvCounters>,
1784    server_ioc_drift_secs: u64,
1785) {
1786    loop {
1787        // Wait for connection, respecting cancellation.
1788        tokio::select! {
1789            _ = cancel_token.cancelled() => return,
1790            result = channel.wait_connected(CA_RECONNECT_TIMEOUT) => {
1791                if result.is_err() {
1792                    let was_connected = {
1793                        let mut ci = conn_info.lock().unwrap_or_else(|e| e.into_inner());
1794                        let prev_connected = ci.is_connected;
1795                        ci.is_connected = false;
1796                        ci.last_event_time = None;
1797                        // Connecting if we've never seen a connect, otherwise
1798                        // demote from Connected to Disconnected on a real loss.
1799                        ci.state = match ci.state {
1800                            PvConnectionState::Connected => PvConnectionState::Disconnected,
1801                            PvConnectionState::Disconnected => PvConnectionState::Disconnected,
1802                            _ => PvConnectionState::Connecting,
1803                        };
1804                        prev_connected
1805                    };
1806                    // A search-failure timeout that demotes us out of
1807                    // Connected counts as a fresh disconnect — without
1808                    // this, only the post-monitor `break` path bumps the
1809                    // counters and a flapping PV silently under-reports
1810                    // its drops while subsequent `attach_cnx_lost_headers`
1811                    // calls carry a stale `cnxlostepsecs`.
1812                    if was_connected {
1813                        counters.disconnect_count.fetch_add(1, Ordering::Relaxed);
1814                        counters
1815                            .last_disconnect_unix_secs
1816                            .store(unix_secs(SystemTime::now()), Ordering::Relaxed);
1817                    }
1818                    // Subscribe BEFORE re-checking the current state. If we
1819                    // subscribed after a state check, a Connected event
1820                    // firing in between would be lost forever, leaving this
1821                    // loop hung until the next disconnect/reconnect cycle.
1822                    let mut conn_rx = channel.connection_events();
1823
1824                    // Close the remaining race: the channel may have become
1825                    // connected between the outer `wait_connected` timeout
1826                    // and our `subscribe()` above, in which case no new event
1827                    // will arrive. A short re-probe catches that case.
1828                    if channel
1829                        .wait_connected(Duration::from_millis(100))
1830                        .await
1831                        .is_err()
1832                    {
1833                        loop {
1834                            tokio::select! {
1835                                _ = cancel_token.cancelled() => return,
1836                                event = conn_rx.recv() => {
1837                                    use tokio::sync::broadcast::error::RecvError;
1838                                    match event {
1839                                        Ok(ConnectionEvent::Connected) => break,
1840                                        Ok(_) => continue,
1841                                        // Lagged: we missed some events but
1842                                        // the channel is still live. Re-probe
1843                                        // state and otherwise keep waiting.
1844                                        Err(RecvError::Lagged(_)) => {
1845                                            if channel
1846                                                .wait_connected(Duration::from_millis(100))
1847                                                .await
1848                                                .is_ok()
1849                                            {
1850                                                break;
1851                                            }
1852                                            continue;
1853                                        }
1854                                        Err(RecvError::Closed) => return,
1855                                    }
1856                                }
1857                            }
1858                        }
1859                    }
1860                }
1861            }
1862        }
1863
1864        // Refresh DBR_CTRL metadata once per (re)connect so the registry's
1865        // PREC/EGU stay in sync with the IOC. Fire-and-forget so a slow
1866        // CA stack can't gate `subscribe()` (and therefore the first
1867        // sample) on a metadata round-trip. The spawned task is bound to
1868        // `cancel_token` so it terminates promptly when the PV is
1869        // stopped/deleted (otherwise a late metadata write could land on
1870        // a deleted-or-re-registered PV row).
1871        {
1872            let channel = channel.clone();
1873            let registry = registry.clone();
1874            let counters = counters.clone();
1875            let pv_name = pv_name.clone();
1876            let cancel = cancel_token.clone();
1877            tokio::spawn(async move {
1878                tokio::select! {
1879                    _ = cancel.cancelled() => {}
1880                    _ = refresh_ctrl_metadata(&channel, &registry, &pv_name, &counters) => {}
1881                }
1882            });
1883        }
1884
1885        // Subscribe.
1886        let mut monitor = match channel.subscribe().await {
1887            Ok(m) => m,
1888            Err(e) => {
1889                counters
1890                    .transient_error_count
1891                    .fetch_add(1, Ordering::Relaxed);
1892                warn!(pv = pv_name, "Subscribe failed: {e}, retrying...");
1893                tokio::select! {
1894                    _ = cancel_token.cancelled() => return,
1895                    _ = tokio::time::sleep(CA_RETRY_DELAY) => continue,
1896                }
1897            }
1898        };
1899
1900        debug!(pv = pv_name, "Monitor subscription active");
1901
1902        // Receive values with cancellation.
1903        loop {
1904            tokio::select! {
1905                _ = cancel_token.cancelled() => return,
1906                result = monitor.recv() => {
1907                    match result {
1908                        Some(Ok(snapshot)) => {
1909                            let now = SystemTime::now();
1910                            // Java parity (11e554d0): use the IOC-reported
1911                            // timestamp, not receive-time, so latency
1912                            // doesn't smear sample times. First sample
1913                            // after connect is accepted unconditionally
1914                            // — legitimate backfill on reconnect can
1915                            // include older timestamps. Subsequent
1916                            // samples whose IOC clock is more than
1917                            // SERVER_IOC_DRIFT_SECS away from wall clock,
1918                            // or earlier than the 1991 floor, are
1919                            // dropped + counted.
1920                            let first_after_connect = {
1921                                let mut ci = conn_info.lock().unwrap_or_else(|e| e.into_inner());
1922                                let first = ci.last_event_time.is_none();
1923                                if ci.connected_since.is_none() {
1924                                    ci.connected_since = Some(now);
1925                                }
1926                                ci.is_connected = true;
1927                                ci.last_event_time = Some(now);
1928                                ci.state = PvConnectionState::Connected;
1929                                first
1930                            };
1931                            if !first_after_connect
1932                                && !ioc_timestamp_in_window(
1933                                    snapshot.timestamp,
1934                                    now,
1935                                    server_ioc_drift_secs,
1936                                )
1937                            {
1938                                counters.timestamp_drops.fetch_add(1, Ordering::Relaxed);
1939                                debug!(
1940                                    pv = pv_name,
1941                                    ?snapshot.timestamp,
1942                                    "Dropping sample with out-of-window IOC timestamp"
1943                                );
1944                                continue;
1945                            }
1946                            counters.events_received.fetch_add(1, Ordering::Relaxed);
1947                            // CAS the first-event timestamp once. 0 sentinel
1948                            // means "no event yet"; replace with now.
1949                            let now_secs = unix_secs(now);
1950                            let _ = counters.first_event_unix_secs.compare_exchange(
1951                                0,
1952                                now_secs,
1953                                Ordering::Relaxed,
1954                                Ordering::Relaxed,
1955                            );
1956                            let archiver_val = epics_value_to_archiver(&snapshot.value);
1957                            let mut sample = ArchiverSample::new(snapshot.timestamp, archiver_val);
1958                            attach_extras(&extras, &mut sample);
1959                            if first_after_connect {
1960                                let lost_secs = counters
1961                                    .last_disconnect_unix_secs
1962                                    .load(Ordering::Relaxed);
1963                                attach_cnx_lost_headers(&mut sample, lost_secs, now_secs);
1964                            }
1965                            let pv_sample = PvSample {
1966                                pv_name: pv_name.clone(),
1967                                dbr_type,
1968                                sample,
1969                                element_count: Some(element_count),
1970                                counters: Some(counters.clone()),
1971                            };
1972                            if let Err(pv_sample) = try_send_with_overflow_count(
1973                                &tx,
1974                                pv_sample,
1975                                &counters,
1976                            )
1977                            .await
1978                            {
1979                                let _ = pv_sample;
1980                                return; // Write loop shut down
1981                            }
1982                        }
1983                        Some(Err(e)) => {
1984                            counters.transient_error_count.fetch_add(1, Ordering::Relaxed);
1985                            warn!(pv = pv_name, "Monitor error: {e}");
1986                        }
1987                        None => break, // Monitor ended, reconnect
1988                    }
1989                }
1990            }
1991        }
1992
1993        // Monitor ended (disconnect) — loop back to reconnect. Reset
1994        // `last_event_time` so the first sample after reconnect is
1995        // treated as `first_after_connect` and bypasses the drift
1996        // filter; without this, an IOC that comes back with a
1997        // legitimate backfill timestamp older than the 30 min window
1998        // would be silently dropped.
1999        {
2000            let mut ci = conn_info.lock().unwrap_or_else(|e| e.into_inner());
2001            ci.is_connected = false;
2002            ci.last_event_time = None;
2003            ci.state = PvConnectionState::Disconnected;
2004        }
2005        counters.disconnect_count.fetch_add(1, Ordering::Relaxed);
2006        counters
2007            .last_disconnect_unix_secs
2008            .store(unix_secs(SystemTime::now()), Ordering::Relaxed);
2009        debug!(pv = pv_name, "Monitor ended, waiting for reconnection");
2010    }
2011}
2012
2013fn unix_secs(t: SystemTime) -> i64 {
2014    t.duration_since(SystemTime::UNIX_EPOCH)
2015        .map(|d| d.as_secs() as i64)
2016        .unwrap_or(0)
2017}
2018
2019/// Send a sample and count buffer-overflow events. Tries non-blocking
2020/// first; if the bounded channel is full we increment the counter and
2021/// fall back to a blocking send so backpressure still works.
2022async fn try_send_with_overflow_count(
2023    tx: &mpsc::Sender<PvSample>,
2024    pv_sample: PvSample,
2025    counters: &PvCounters,
2026) -> Result<(), PvSample> {
2027    match tx.try_send(pv_sample) {
2028        Ok(()) => Ok(()),
2029        Err(tokio::sync::mpsc::error::TrySendError::Full(pv_sample)) => {
2030            counters
2031                .buffer_overflow_drops
2032                .fetch_add(1, Ordering::Relaxed);
2033            // Backpressure to the producer; this awaits until the writer
2034            // drains some space. We count the saturation event but don't
2035            // actually drop the sample.
2036            tx.send(pv_sample).await.map_err(|e| e.0)
2037        }
2038        Err(tokio::sync::mpsc::error::TrySendError::Closed(pv_sample)) => Err(pv_sample),
2039    }
2040}
2041
2042/// Scan loop: periodically read a channel value.
2043#[allow(clippy::too_many_arguments)]
2044async fn scan_loop(
2045    pv_name: String,
2046    dbr_type: ArchDbType,
2047    element_count: i32,
2048    channel: CaChannel,
2049    tx: mpsc::Sender<PvSample>,
2050    cancel_token: CancellationToken,
2051    period_secs: f64,
2052    conn_info: Arc<Mutex<ConnectionInfo>>,
2053    registry: Arc<PvRegistry>,
2054    extras: Arc<ExtraFieldsCache>,
2055    counters: Arc<PvCounters>,
2056) {
2057    let period = Duration::from_secs_f64(period_secs);
2058    let mut interval = tokio::time::interval(period);
2059    // Reset on every detected disconnect so we re-fetch metadata after
2060    // each reconnect (mirrors monitor_loop's once-per-(re)connect cadence).
2061    let mut metadata_done = false;
2062
2063    loop {
2064        tokio::select! {
2065            _ = cancel_token.cancelled() => return,
2066            _ = interval.tick() => {}
2067        }
2068
2069        if channel.wait_connected(CA_RETRY_DELAY).await.is_err() {
2070            let was_connected = {
2071                let mut ci = conn_info.lock().unwrap_or_else(|e| e.into_inner());
2072                let prev = ci.is_connected;
2073                ci.is_connected = false;
2074                ci.last_event_time = None;
2075                ci.state = match ci.state {
2076                    PvConnectionState::Connected => PvConnectionState::Disconnected,
2077                    PvConnectionState::Disconnected => PvConnectionState::Disconnected,
2078                    _ => PvConnectionState::Connecting,
2079                };
2080                prev
2081            };
2082            if was_connected {
2083                counters.disconnect_count.fetch_add(1, Ordering::Relaxed);
2084                counters
2085                    .last_disconnect_unix_secs
2086                    .store(unix_secs(SystemTime::now()), Ordering::Relaxed);
2087            }
2088            metadata_done = false;
2089            continue;
2090        }
2091
2092        if !metadata_done {
2093            // Fire-and-forget; matches monitor_loop's once-per-(re)connect
2094            // semantics. The local `metadata_done` flag exists because
2095            // scan_loop's outer iteration is per-tick (vs monitor_loop's
2096            // per-reconnect), so we'd otherwise spawn a fetch every tick.
2097            // Bound to `cancel_token` so a stopped/deleted PV doesn't get
2098            // a stale metadata write from an in-flight task.
2099            let channel = channel.clone();
2100            let registry = registry.clone();
2101            let counters = counters.clone();
2102            let pv_name = pv_name.clone();
2103            let cancel = cancel_token.clone();
2104            tokio::spawn(async move {
2105                tokio::select! {
2106                    _ = cancel.cancelled() => {}
2107                    _ = refresh_ctrl_metadata(&channel, &registry, &pv_name, &counters) => {}
2108                }
2109            });
2110            metadata_done = true;
2111        }
2112
2113        match channel.get().await {
2114            Ok((_dbr_type, epics_val)) => {
2115                let now = SystemTime::now();
2116                let first_after_connect = {
2117                    let mut ci = conn_info.lock().unwrap_or_else(|e| e.into_inner());
2118                    let first = ci.last_event_time.is_none();
2119                    if ci.connected_since.is_none() {
2120                        ci.connected_since = Some(now);
2121                    }
2122                    ci.is_connected = true;
2123                    ci.last_event_time = Some(now);
2124                    ci.state = PvConnectionState::Connected;
2125                    first
2126                };
2127                counters.events_received.fetch_add(1, Ordering::Relaxed);
2128                let now_secs = unix_secs(now);
2129                let _ = counters.first_event_unix_secs.compare_exchange(
2130                    0,
2131                    now_secs,
2132                    Ordering::Relaxed,
2133                    Ordering::Relaxed,
2134                );
2135                let archiver_val = epics_value_to_archiver(&epics_val);
2136                let mut sample = ArchiverSample::new(now, archiver_val);
2137                attach_extras(&extras, &mut sample);
2138                if first_after_connect {
2139                    let lost_secs = counters.last_disconnect_unix_secs.load(Ordering::Relaxed);
2140                    attach_cnx_lost_headers(&mut sample, lost_secs, now_secs);
2141                }
2142                let pv_sample = PvSample {
2143                    pv_name: pv_name.clone(),
2144                    dbr_type,
2145                    sample,
2146                    element_count: Some(element_count),
2147                    counters: Some(counters.clone()),
2148                };
2149                if try_send_with_overflow_count(&tx, pv_sample, &counters)
2150                    .await
2151                    .is_err()
2152                {
2153                    return;
2154                }
2155            }
2156            Err(e) => {
2157                counters
2158                    .transient_error_count
2159                    .fetch_add(1, Ordering::Relaxed);
2160                debug!(pv = pv_name, "Scan read error: {e}");
2161            }
2162        }
2163    }
2164}
2165
2166/// Java parity (ed07feb): tag the first sample after (re)connect with
2167/// `cnxlostepsecs` / `cnxregainedepsecs` / `startup` so consumers can
2168/// detect archiver restarts and PV resumes. Unconditional emission —
2169/// on a clean startup `lost_secs` is 0, which is itself a valid value
2170/// for downstream gap detection.
2171fn attach_cnx_lost_headers(sample: &mut ArchiverSample, lost_secs: i64, now_secs: i64) {
2172    sample
2173        .field_values
2174        .push(("cnxlostepsecs".to_string(), lost_secs.to_string()));
2175    sample
2176        .field_values
2177        .push(("cnxregainedepsecs".to_string(), now_secs.to_string()));
2178    sample
2179        .field_values
2180        .push(("startup".to_string(), "true".to_string()));
2181}
2182
2183/// Snapshot the extras cache into `sample.field_values`. We sort for stable
2184/// PB output across runs (the protobuf field is repeated; consumers that
2185/// diff/compare files appreciate determinism).
2186fn attach_extras(extras: &ExtraFieldsCache, sample: &mut ArchiverSample) {
2187    if extras.is_empty() {
2188        return;
2189    }
2190    let mut entries: Vec<(String, String)> = extras
2191        .iter()
2192        .map(|e| (e.key().clone(), e.value().clone()))
2193        .collect();
2194    entries.sort_by(|a, b| a.0.cmp(&b.0));
2195    sample.field_values = entries;
2196}
2197
2198/// Render an EpicsValue as the string we'll persist in the metadata field
2199/// slot. Numeric scalars get `Display`-format; strings pass through; arrays
2200/// fall back to JSON-ish bracket notation. Stays in sync with what Java
2201/// archiver writes into PVTypeInfo's `archiveFields` blob (a string map).
2202fn epics_value_to_field_string(val: &EpicsValue) -> String {
2203    match val {
2204        EpicsValue::String(s) => s.clone(),
2205        EpicsValue::Short(v) => v.to_string(),
2206        EpicsValue::Float(v) => v.to_string(),
2207        EpicsValue::Enum(v) => v.to_string(),
2208        EpicsValue::Char(v) => v.to_string(),
2209        EpicsValue::Long(v) => v.to_string(),
2210        EpicsValue::Int64(v) => v.to_string(),
2211        EpicsValue::Double(v) => v.to_string(),
2212        EpicsValue::ShortArray(v) => format!("{v:?}"),
2213        EpicsValue::FloatArray(v) => format!("{v:?}"),
2214        EpicsValue::EnumArray(v) => format!("{v:?}"),
2215        EpicsValue::DoubleArray(v) => format!("{v:?}"),
2216        EpicsValue::LongArray(v) => format!("{v:?}"),
2217        EpicsValue::Int64Array(v) => format!("{v:?}"),
2218        EpicsValue::CharArray(v) => String::from_utf8_lossy(v).into_owned(),
2219        EpicsValue::StringArray(v) => format!("{v:?}"),
2220    }
2221}
2222
2223/// Spawn a long-running task that subscribes to `<pv>.<field>` and updates
2224/// `extras` with each event. Owned by `parent_token` so pause/destroy cleans
2225/// it up alongside the main PV.
2226fn spawn_extra_field_monitor(
2227    ca_client: &CaClient,
2228    pv_name: &str,
2229    field: &str,
2230    extras: Arc<ExtraFieldsCache>,
2231    parent_token: CancellationToken,
2232    counters: Arc<PvCounters>,
2233) {
2234    let full_name = format!("{pv_name}.{field}");
2235    let channel = ca_client.create_channel(&full_name);
2236    let field_owned = field.to_string();
2237    let pv_owned = pv_name.to_string();
2238
2239    // Catch-unwind boundary: a panic from the CA client (e.g. malformed
2240    // wire frame, allocation failure inside epics_rs) would propagate to
2241    // the runtime and abort sibling tasks of this worker thread. Trap it
2242    // here, log with PV+field context, and return normally so the runtime
2243    // remains healthy.
2244    let panic_pv = pv_owned.clone();
2245    let panic_field = field_owned.clone();
2246    tokio::spawn(async move {
2247        let body = std::panic::AssertUnwindSafe(extra_field_monitor_body(
2248            channel,
2249            pv_owned,
2250            field_owned,
2251            extras,
2252            parent_token,
2253            counters,
2254        ));
2255        if let Err(payload) = futures::FutureExt::catch_unwind(body).await {
2256            let msg = panic_payload_msg(&payload);
2257            error!(
2258                pv = panic_pv,
2259                field = panic_field,
2260                "Extra-field monitor panicked: {msg}"
2261            );
2262        }
2263    });
2264}
2265
2266/// Body of the spawned extra-field monitor. Split out so the spawn site
2267/// can wrap it in `catch_unwind`.
2268async fn extra_field_monitor_body(
2269    channel: CaChannel,
2270    pv_owned: String,
2271    field_owned: String,
2272    extras: Arc<ExtraFieldsCache>,
2273    parent_token: CancellationToken,
2274    counters: Arc<PvCounters>,
2275) {
2276    // Initial connect attempt — failure here is non-fatal (the field may
2277    // not exist on every IOC; we just leave the cache empty).
2278    if channel.wait_connected(CA_CONNECT_TIMEOUT).await.is_err() {
2279        debug!(
2280            pv = pv_owned,
2281            field = field_owned,
2282            "Extra-field channel did not connect within timeout (will keep retrying via subscribe)"
2283        );
2284    }
2285
2286    // Exponential backoff for misconfigured fields (e.g. operator
2287    // listed `.HIHI` on a PV that doesn't expose it). Without this
2288    // we'd retry every 5s forever, churning CA search packets and
2289    // file descriptors. The cap is 60s; one warn at the cap so
2290    // ops know to fix archive_fields.
2291    let mut backoff = CA_RETRY_DELAY;
2292    let max_backoff = Duration::from_secs(60);
2293    let mut warned_at_cap = false;
2294
2295    loop {
2296        // Cancel-aware subscribe attempt.
2297        tokio::select! {
2298            _ = parent_token.cancelled() => return,
2299            sub = channel.subscribe() => {
2300                let mut monitor = match sub {
2301                    Ok(m) => m,
2302                    Err(e) => {
2303                        // Java parity (8fe73eb): bump the transient
2304                        // counter so a misconfigured `.HIHI` field
2305                        // shows up in the rate / drop reports.
2306                        counters.transient_error_count.fetch_add(1, Ordering::Relaxed);
2307                        debug!(
2308                            pv = pv_owned,
2309                            field = field_owned,
2310                            ?backoff,
2311                            "Extra-field subscribe failed: {e}; retrying"
2312                        );
2313                        if backoff >= max_backoff && !warned_at_cap {
2314                            warn!(
2315                                pv = pv_owned,
2316                                field = field_owned,
2317                                "Extra-field repeatedly fails to subscribe; \
2318                                 check archive_fields config (now retrying every 60s)"
2319                            );
2320                            warned_at_cap = true;
2321                        }
2322                        let sleep_for = backoff;
2323                        backoff = (backoff * 2).min(max_backoff);
2324                        tokio::select! {
2325                            _ = parent_token.cancelled() => return,
2326                            _ = tokio::time::sleep(sleep_for) => continue,
2327                        }
2328                    }
2329                };
2330                // Subscribe succeeded — reset backoff so the next
2331                // failure starts at the short delay again.
2332                backoff = CA_RETRY_DELAY;
2333                warned_at_cap = false;
2334                loop {
2335                    tokio::select! {
2336                        _ = parent_token.cancelled() => return,
2337                        ev = monitor.recv() => match ev {
2338                            Some(Ok(snapshot)) => {
2339                                extras.insert(
2340                                    field_owned.clone(),
2341                                    epics_value_to_field_string(&snapshot.value),
2342                                );
2343                            }
2344                            Some(Err(e)) => {
2345                                counters.transient_error_count.fetch_add(1, Ordering::Relaxed);
2346                                debug!(
2347                                    pv = pv_owned,
2348                                    field = field_owned,
2349                                    "Extra-field monitor error: {e}"
2350                                );
2351                            }
2352                            None => break, // resubscribe
2353                        }
2354                    }
2355                }
2356            }
2357        }
2358    }
2359}
2360
2361/// Format a panic payload (Box<dyn Any>) as a printable string. Mirrors
2362/// what `std::panicking::default_hook` extracts for the message.
2363fn panic_payload_msg(payload: &Box<dyn std::any::Any + Send>) -> String {
2364    if let Some(s) = payload.downcast_ref::<&'static str>() {
2365        (*s).to_string()
2366    } else if let Some(s) = payload.downcast_ref::<String>() {
2367        s.clone()
2368    } else {
2369        "<non-string panic payload>".to_string()
2370    }
2371}
2372
2373/// Resolve the data-bearing sub-field of a PVA value. NTScalar /
2374/// NTScalarArray / NTEnum wrap the raw value in a `value` field; bare
2375/// scalars are passed through.
2376fn pv_value_field(field: &PvField) -> &PvField {
2377    match field {
2378        PvField::Structure(s) => s.get_field("value").unwrap_or(field),
2379        _ => field,
2380    }
2381}
2382
2383/// Map a PVA `ScalarValue` to an archiver `ArchiverValue`.
2384fn scalar_value_to_archiver(s: &ScalarValue) -> ArchiverValue {
2385    match s {
2386        ScalarValue::Boolean(v) => ArchiverValue::ScalarEnum(*v as i32),
2387        ScalarValue::Byte(v) => ArchiverValue::ScalarByte(vec![*v as u8]),
2388        ScalarValue::UByte(v) => ArchiverValue::ScalarByte(vec![*v]),
2389        ScalarValue::Short(v) => ArchiverValue::ScalarShort(*v as i32),
2390        ScalarValue::UShort(v) => ArchiverValue::ScalarShort(*v as i32),
2391        ScalarValue::Int(v) => ArchiverValue::ScalarInt(*v),
2392        ScalarValue::UInt(v) => ArchiverValue::ScalarInt(*v as i32),
2393        ScalarValue::Long(v) => ArchiverValue::ScalarInt(*v as i32),
2394        ScalarValue::ULong(v) => ArchiverValue::ScalarInt(*v as i32),
2395        ScalarValue::Float(v) => ArchiverValue::ScalarFloat(*v),
2396        ScalarValue::Double(v) => ArchiverValue::ScalarDouble(*v),
2397        ScalarValue::String(s) => ArchiverValue::ScalarString(s.clone()),
2398    }
2399}
2400
2401/// Extract an `ArchiverValue` from a top-level PVA value. NTScalar /
2402/// NTScalarArray are unwrapped via `value`; NTEnum collapses to
2403/// `ScalarEnum(index)` (Java archiver parity — choices land in the
2404/// monitor callback's extras cache); bare Structure roots (NTTable,
2405/// NTNDArray, user-defined NTs) wire-encode the whole root as opaque
2406/// [`ArchiverValue::V4GenericBytes`] so retrieval can reconstruct the
2407/// full structure.
2408///
2409/// `canonical_desc` is the channel-INIT descriptor that producers
2410/// (monitor/scan/live_value callsites) plumb through so
2411/// [`encode_pv_field_self_describing`] preserves Union / UnionArray
2412/// / Variant shapes that [`PvField::descriptor()`] would degrade.
2413/// Required at every callsite — there is no value-recovery fallback.
2414fn pv_field_scalar_to_archiver(
2415    field: &PvField,
2416    canonical_desc: &epics_rs::pva::pvdata::FieldDesc,
2417) -> Option<ArchiverValue> {
2418    // NTEnum special-case: take the `value.index` int as the scalar
2419    // sample. Must run before `pv_value_field` peeling, because
2420    // peeling on an NTEnum returns the inner `enum_t` Structure,
2421    // which would fall through to the V4GenericBytes path.
2422    if let Some((index, _)) = nt_enum_parts(field) {
2423        return Some(ArchiverValue::ScalarEnum(index));
2424    }
2425    match pv_value_field(field) {
2426        PvField::Scalar(s) => Some(scalar_value_to_archiver(s)),
2427        PvField::ScalarArrayTyped(arr) => Some(typed_scalar_array_to_archiver(arr)),
2428        PvField::ScalarArray(items) => {
2429            // Legacy untyped scalar-array path (rare with pvxs 0.14+,
2430            // which decodes into `ScalarArrayTyped`). Promote to the
2431            // typed form so a single converter covers both.
2432            let st = items.first()?.scalar_type();
2433            let typed = TypedScalarArray::from_scalar_values(items, st)?;
2434            Some(typed_scalar_array_to_archiver(&typed))
2435        }
2436        // Any other shape — NTTable (`value` is itself a Structure of
2437        // column arrays), NTNDArray, user-defined NTs — archive the
2438        // root PvField (NOT the unwrapped value, which would lose
2439        // labels / alarm / timeStamp) as opaque V4GenericBytes.
2440        _ => Some(ArchiverValue::V4GenericBytes(
2441            encode_pv_field_self_describing(field, canonical_desc),
2442        )),
2443    }
2444}
2445
2446/// Pick the `(ArchDbType, element_count)` to record at archive_pv time
2447/// from a PVA introspection-pvget result.
2448///
2449/// - NTScalar / NTScalarArray: typed by the unwrapped `value` field.
2450/// - Bare scalar / scalar-array root: typed directly.
2451/// - Anything else (NTTable, NTNDArray, user-defined NT structures):
2452///   archived as [`ArchDbType::V4GenericBytes`] with element_count = 1
2453///   (the structure as a whole is one sample; inner cardinality is
2454///   carried inside the wire-encoded bytes).
2455fn pv_field_to_arch_db_type(field: &PvField) -> Option<(ArchDbType, i32)> {
2456    // NTEnum: register as ScalarEnum (the index is the archived
2457    // value); choices live in extras_cache rather than the dbr_type.
2458    if nt_enum_parts(field).is_some() {
2459        return Some((ArchDbType::ScalarEnum, 1));
2460    }
2461    let value = pv_value_field(field);
2462    Some(match value {
2463        PvField::Scalar(s) => (scalar_value_to_arch_db_type(s), 1),
2464        PvField::ScalarArray(arr) => {
2465            let elem = arr.first()?;
2466            (
2467                scalar_type_to_waveform(elem.scalar_type()),
2468                arr.len() as i32,
2469            )
2470        }
2471        PvField::ScalarArrayTyped(arr) => {
2472            (scalar_type_to_waveform(arr.scalar_type()), arr.len() as i32)
2473        }
2474        _ => (ArchDbType::V4GenericBytes, 1),
2475    })
2476}
2477
2478/// Scalar-form ArchDbType for a [`ScalarValue`].
2479fn scalar_value_to_arch_db_type(s: &ScalarValue) -> ArchDbType {
2480    match s {
2481        ScalarValue::Boolean(_) => ArchDbType::ScalarEnum,
2482        ScalarValue::Byte(_) | ScalarValue::UByte(_) => ArchDbType::ScalarByte,
2483        ScalarValue::Short(_) | ScalarValue::UShort(_) => ArchDbType::ScalarShort,
2484        ScalarValue::Int(_)
2485        | ScalarValue::UInt(_)
2486        | ScalarValue::Long(_)
2487        | ScalarValue::ULong(_) => ArchDbType::ScalarInt,
2488        ScalarValue::Float(_) => ArchDbType::ScalarFloat,
2489        ScalarValue::Double(_) => ArchDbType::ScalarDouble,
2490        ScalarValue::String(_) => ArchDbType::ScalarString,
2491    }
2492}
2493
2494/// Waveform-form ArchDbType for a [`ScalarType`].
2495fn scalar_type_to_waveform(st: ScalarType) -> ArchDbType {
2496    match st {
2497        ScalarType::Boolean => ArchDbType::WaveformEnum,
2498        ScalarType::Byte | ScalarType::UByte => ArchDbType::WaveformByte,
2499        ScalarType::Short | ScalarType::UShort => ArchDbType::WaveformShort,
2500        ScalarType::Int | ScalarType::UInt | ScalarType::Long | ScalarType::ULong => {
2501            ArchDbType::WaveformInt
2502        }
2503        ScalarType::Float => ArchDbType::WaveformFloat,
2504        ScalarType::Double => ArchDbType::WaveformDouble,
2505        ScalarType::String => ArchDbType::WaveformString,
2506    }
2507}
2508
2509/// Convert a [`TypedScalarArray`] into the matching [`ArchiverValue`]
2510/// vector variant. Widening (i8/i16/i64 → i32) mirrors the scalar path
2511/// in [`scalar_value_to_archiver`], so a waveform of `Long` stores as
2512/// `VectorInt` (out-of-range values truncate, same caveat as the CA
2513/// `DbFieldType::Int64` mapping).
2514fn typed_scalar_array_to_archiver(arr: &TypedScalarArray) -> ArchiverValue {
2515    match arr {
2516        TypedScalarArray::Boolean(a) => {
2517            ArchiverValue::VectorEnum(a.iter().map(|&b| b as i32).collect())
2518        }
2519        TypedScalarArray::Byte(a) => {
2520            ArchiverValue::VectorChar(a.iter().map(|&v| v as u8).collect())
2521        }
2522        TypedScalarArray::UByte(a) => ArchiverValue::VectorChar(a.to_vec()),
2523        TypedScalarArray::Short(a) => {
2524            ArchiverValue::VectorShort(a.iter().map(|&v| v as i32).collect())
2525        }
2526        TypedScalarArray::UShort(a) => {
2527            ArchiverValue::VectorShort(a.iter().map(|&v| v as i32).collect())
2528        }
2529        TypedScalarArray::Int(a) => ArchiverValue::VectorInt(a.to_vec()),
2530        TypedScalarArray::UInt(a) => {
2531            ArchiverValue::VectorInt(a.iter().map(|&v| v as i32).collect())
2532        }
2533        TypedScalarArray::Long(a) => {
2534            ArchiverValue::VectorInt(a.iter().map(|&v| v as i32).collect())
2535        }
2536        TypedScalarArray::ULong(a) => {
2537            ArchiverValue::VectorInt(a.iter().map(|&v| v as i32).collect())
2538        }
2539        TypedScalarArray::Float(a) => ArchiverValue::VectorFloat(a.to_vec()),
2540        TypedScalarArray::Double(a) => ArchiverValue::VectorDouble(a.to_vec()),
2541        TypedScalarArray::String(a) => ArchiverValue::VectorString(a.to_vec()),
2542    }
2543}
2544
2545/// NTEnum shape detection. Returns `Some((index, choices))` when
2546/// `field` is a Structure whose `struct_id` starts with
2547/// `"epics:nt/NTEnum"` AND carries a `value` substructure of the
2548/// `enum_t` shape (`index: integer`, `choices: string[]`). Java
2549/// archiver follows the same dual gate — explicit normative ID +
2550/// shape match — to avoid mis-classifying user structs that happen
2551/// to expose an `index` field.
2552///
2553/// `choices` is `None` when the substructure omits the field; the
2554/// monitor callback then leaves the prior cached value in place.
2555fn nt_enum_parts(field: &PvField) -> Option<(i32, Option<Vec<String>>)> {
2556    let PvField::Structure(root) = field else {
2557        return None;
2558    };
2559    if !root.struct_id.starts_with("epics:nt/NTEnum") {
2560        return None;
2561    }
2562    let PvField::Structure(inner) = root.get_field("value")? else {
2563        return None;
2564    };
2565    let index = match inner.get_field("index")? {
2566        PvField::Scalar(ScalarValue::Int(v)) => *v,
2567        PvField::Scalar(ScalarValue::Long(v)) => *v as i32,
2568        PvField::Scalar(ScalarValue::Short(v)) => *v as i32,
2569        PvField::Scalar(ScalarValue::UInt(v)) => *v as i32,
2570        PvField::Scalar(ScalarValue::ULong(v)) => *v as i32,
2571        PvField::Scalar(ScalarValue::UShort(v)) => *v as i32,
2572        _ => return None,
2573    };
2574    let choices = match inner.get_field("choices") {
2575        Some(PvField::ScalarArrayTyped(TypedScalarArray::String(arr))) => Some(arr.to_vec()),
2576        Some(PvField::ScalarArray(items)) => {
2577            let mut out = Vec::with_capacity(items.len());
2578            for s in items {
2579                if let ScalarValue::String(c) = s {
2580                    out.push(c.clone());
2581                } else {
2582                    return None; // mixed-type choices array — reject
2583                }
2584            }
2585            Some(out)
2586        }
2587        Some(_) => return None,
2588        None => None,
2589    };
2590    Some((index, choices))
2591}
2592
2593/// Encode NTEnum `choices` (UTF-8 strings) into the extras-cache
2594/// payload that lands in `field_values` per sample. JSON-array form
2595/// so retrieval clients can `JSON.parse(meta.enum_strs)` directly —
2596/// pipe/comma delimiters would break on choice names that contain
2597/// those characters. Hand-rolled to avoid pulling `serde_json` into
2598/// archiver-engine for a single string.
2599fn nt_enum_choices_to_extras(choices: &[String]) -> String {
2600    let mut out = String::with_capacity(2 + choices.iter().map(|s| s.len() + 3).sum::<usize>());
2601    out.push('[');
2602    for (i, c) in choices.iter().enumerate() {
2603        if i > 0 {
2604            out.push(',');
2605        }
2606        out.push('"');
2607        for ch in c.chars() {
2608            match ch {
2609                '"' => out.push_str("\\\""),
2610                '\\' => out.push_str("\\\\"),
2611                '\n' => out.push_str("\\n"),
2612                '\r' => out.push_str("\\r"),
2613                '\t' => out.push_str("\\t"),
2614                c if (c as u32) < 0x20 => {
2615                    use std::fmt::Write;
2616                    let _ = write!(out, "\\u{:04x}", c as u32);
2617                }
2618                c => out.push(c),
2619            }
2620        }
2621        out.push('"');
2622    }
2623    out.push(']');
2624    out
2625}
2626
2627/// Refresh the extras cache for an NTEnum-shaped monitor event so
2628/// downstream retrieval can surface `enum_strs` alongside the
2629/// archived index. No-op when `field` is not an NTEnum or its
2630/// `choices` substructure is absent (cached value from the last
2631/// successful event stays put).
2632fn refresh_nt_enum_extras(field: &PvField, extras: &ExtraFieldsCache) {
2633    if let Some((_, Some(choices))) = nt_enum_parts(field) {
2634        extras.insert("enum_strs".to_string(), nt_enum_choices_to_extras(&choices));
2635    }
2636}
2637
2638/// Serialize a top-level [`PvField`] (NTTable, NTNDArray, user-defined
2639/// structures) as the self-describing payload stored under
2640/// [`ArchDbType::V4GenericBytes`]. Layout is `type_desc || value` in
2641/// PVA wire format (BigEndian), so each sample on disk is decodable
2642/// in isolation — no shared introspection cache needed across the
2643/// archive day-roll boundary.
2644///
2645/// `canonical_desc` is the channel's introspection descriptor captured
2646/// at INIT (via `pvinfo` / `pvget_full` / per-event `pvmonitor_handle`
2647/// callback). It is **required**: `PvField::descriptor()` recovery
2648/// is lossy for `Union` (drops sibling variants), `UnionArray`
2649/// (empties variants list), and `Variant` (degrades to bare
2650/// `Variant`) — see epics-pva-rs `structure.rs:descriptor` doc. The
2651/// archiver invariant is that every V4 sample on disk preserves the
2652/// channel-level descriptor, so callers must plumb it through rather
2653/// than relying on value-recovery.
2654fn encode_pv_field_self_describing(
2655    field: &PvField,
2656    canonical_desc: &epics_rs::pva::pvdata::FieldDesc,
2657) -> Vec<u8> {
2658    let mut out = Vec::with_capacity(256);
2659    encode_type_desc(canonical_desc, ByteOrder::Big, &mut out);
2660    encode_pv_field(field, canonical_desc, ByteOrder::Big, &mut out);
2661    out
2662}
2663
2664/// Map a CA field name (`HIHI`, `LOLO`, `EGU`, …) to the dotted
2665/// pvAccess path under an NTScalar / NTScalarArray. Returns `None`
2666/// for fields that have no PVA equivalent — caller should drop them
2667/// silently rather than building an unsatisfiable pvRequest.
2668fn ca_archive_field_to_pva_path(field: &str) -> Option<&'static str> {
2669    Some(match field {
2670        // display sub-structure
2671        "EGU" => "display.units",
2672        "PREC" => "display.precision",
2673        "DESC" => "display.description",
2674        "HOPR" => "display.limitHigh",
2675        "LOPR" => "display.limitLow",
2676        // valueAlarm sub-structure (alarm thresholds)
2677        "HIHI" => "valueAlarm.highAlarmLimit",
2678        "LOLO" => "valueAlarm.lowAlarmLimit",
2679        "HIGH" => "valueAlarm.highWarningLimit",
2680        "LOW" => "valueAlarm.lowWarningLimit",
2681        "HHSV" => "valueAlarm.highAlarmSeverity",
2682        "LLSV" => "valueAlarm.lowAlarmSeverity",
2683        "HSV" => "valueAlarm.highWarningSeverity",
2684        "LSV" => "valueAlarm.lowWarningSeverity",
2685        "HYST" => "valueAlarm.hysteresis",
2686        // control sub-structure (drive limits)
2687        "DRVH" => "control.limitHigh",
2688        "DRVL" => "control.limitLow",
2689        _ => return None,
2690    })
2691}
2692
2693/// Walk a dotted PVA field path (`valueAlarm.highAlarmLimit`) from
2694/// the root [`PvField`]. Returns `None` if any segment is missing or
2695/// the path lands on a non-structure intermediate.
2696fn pv_field_walk_path<'a>(root: &'a PvField, path: &str) -> Option<&'a PvField> {
2697    let mut current = root;
2698    for segment in path.split('.') {
2699        let PvField::Structure(s) = current else {
2700            return None;
2701        };
2702        current = s.get_field(segment)?;
2703    }
2704    Some(current)
2705}
2706
2707/// Stringify a [`ScalarValue`] for storage in the per-sample extras
2708/// map. Java archiver's `archiveFields` blob is a string-string map,
2709/// so we mirror that wire shape.
2710fn scalar_value_to_string(s: &ScalarValue) -> String {
2711    match s {
2712        ScalarValue::Boolean(v) => v.to_string(),
2713        ScalarValue::Byte(v) => v.to_string(),
2714        ScalarValue::Short(v) => v.to_string(),
2715        ScalarValue::Int(v) => v.to_string(),
2716        ScalarValue::Long(v) => v.to_string(),
2717        ScalarValue::UByte(v) => v.to_string(),
2718        ScalarValue::UShort(v) => v.to_string(),
2719        ScalarValue::UInt(v) => v.to_string(),
2720        ScalarValue::ULong(v) => v.to_string(),
2721        ScalarValue::Float(v) => v.to_string(),
2722        ScalarValue::Double(v) => v.to_string(),
2723        ScalarValue::String(s) => s.clone(),
2724    }
2725}
2726
2727/// Element count for a PVA value: 1 for scalar, length for arrays,
2728/// 0 for unsupported shapes (treated as "unknown" by callers).
2729fn pv_field_element_count(field: &PvField) -> i32 {
2730    match pv_value_field(field) {
2731        PvField::Scalar(_) => 1,
2732        PvField::ScalarArray(arr) => arr.len() as i32,
2733        PvField::ScalarArrayTyped(t) => t.len() as i32,
2734        _ => 0,
2735    }
2736}
2737
2738/// Pull `(precision, units)` out of an NTScalar / NTScalarArray's
2739/// `display` sub-structure. Returns `(None, None)` for bare scalars
2740/// or structures missing `display` (some servers omit it).
2741/// Negative precision is the Java EAA "no precision info" sentinel —
2742/// treat it the same as missing so we don't surface "-1" to the UI.
2743fn pv_field_extract_display(field: &PvField) -> (Option<String>, Option<String>) {
2744    let PvField::Structure(s) = field else {
2745        return (None, None);
2746    };
2747    let Some(PvField::Structure(disp)) = s.get_field("display") else {
2748        return (None, None);
2749    };
2750    let prec = match disp.get_field("precision") {
2751        Some(PvField::Scalar(ScalarValue::Int(p))) if *p >= 0 => Some(p.to_string()),
2752        Some(PvField::Scalar(ScalarValue::Short(p))) if *p >= 0 => Some(p.to_string()),
2753        Some(PvField::Scalar(ScalarValue::Long(p))) if *p >= 0 => Some(p.to_string()),
2754        _ => None,
2755    };
2756    let egu = match disp.get_field("units") {
2757        Some(PvField::Scalar(ScalarValue::String(u))) => {
2758            let t = u.trim();
2759            if t.is_empty() {
2760                None
2761            } else {
2762                Some(t.to_string())
2763            }
2764        }
2765        _ => None,
2766    };
2767    (prec, egu)
2768}
2769
2770/// Pull the IOC-reported timestamp out of an NTScalar / NTScalarArray.
2771/// Falls back to `SystemTime::now()` when the structure lacks a usable
2772/// `timeStamp` sub-field, so a malformed server doesn't drop samples.
2773fn pv_field_extract_timestamp(field: &PvField) -> SystemTime {
2774    let PvField::Structure(s) = field else {
2775        return SystemTime::now();
2776    };
2777    let Some(PvField::Structure(ts)) = s.get_field("timeStamp") else {
2778        return SystemTime::now();
2779    };
2780    let secs = match ts.get_field("secondsPastEpoch") {
2781        Some(PvField::Scalar(ScalarValue::Long(v))) => *v as u64,
2782        Some(PvField::Scalar(ScalarValue::ULong(v))) => *v,
2783        Some(PvField::Scalar(ScalarValue::Int(v))) => *v as u64,
2784        Some(PvField::Scalar(ScalarValue::UInt(v))) => *v as u64,
2785        _ => return SystemTime::now(),
2786    };
2787    let nanos = match ts.get_field("nanoseconds") {
2788        Some(PvField::Scalar(ScalarValue::Int(v))) => *v as u32,
2789        Some(PvField::Scalar(ScalarValue::UInt(v))) => *v,
2790        _ => 0,
2791    };
2792    SystemTime::UNIX_EPOCH + Duration::new(secs, nanos)
2793}
2794
2795/// Convert epics-base-rs DbFieldType to archiver ArchDbType.
2796fn dbr_field_to_arch_type(field_type: DbFieldType) -> ArchDbType {
2797    match field_type {
2798        DbFieldType::String => ArchDbType::ScalarString,
2799        DbFieldType::Short => ArchDbType::ScalarShort,
2800        DbFieldType::Float => ArchDbType::ScalarFloat,
2801        DbFieldType::Enum => ArchDbType::ScalarEnum,
2802        DbFieldType::Char => ArchDbType::ScalarByte,
2803        DbFieldType::Long => ArchDbType::ScalarInt,
2804        // PB PayloadType has no SCALAR_LONG (i64) — values outside i32 range
2805        // are truncated by the i32 cast in epics_value_to_archiver.
2806        DbFieldType::Int64 => ArchDbType::ScalarInt,
2807        DbFieldType::Double => ArchDbType::ScalarDouble,
2808    }
2809}
2810
2811/// Convert epics-base-rs EpicsValue to archiver ArchiverValue.
2812fn epics_value_to_archiver(val: &EpicsValue) -> ArchiverValue {
2813    match val {
2814        EpicsValue::String(s) => ArchiverValue::ScalarString(s.clone()),
2815        EpicsValue::Short(v) => ArchiverValue::ScalarShort(*v as i32),
2816        EpicsValue::Float(v) => ArchiverValue::ScalarFloat(*v),
2817        EpicsValue::Enum(v) => ArchiverValue::ScalarEnum(*v as i32),
2818        EpicsValue::Char(v) => ArchiverValue::ScalarByte(vec![*v]),
2819        EpicsValue::Long(v) => ArchiverValue::ScalarInt(*v),
2820        EpicsValue::Int64(v) => ArchiverValue::ScalarInt(*v as i32),
2821        EpicsValue::Double(v) => ArchiverValue::ScalarDouble(*v),
2822        EpicsValue::ShortArray(v) => {
2823            ArchiverValue::VectorShort(v.iter().map(|x| *x as i32).collect())
2824        }
2825        EpicsValue::FloatArray(v) => ArchiverValue::VectorFloat(v.clone()),
2826        EpicsValue::EnumArray(v) => {
2827            ArchiverValue::VectorEnum(v.iter().map(|x| *x as i32).collect())
2828        }
2829        EpicsValue::DoubleArray(v) => ArchiverValue::VectorDouble(v.clone()),
2830        EpicsValue::LongArray(v) => ArchiverValue::VectorInt(v.clone()),
2831        EpicsValue::Int64Array(v) => {
2832            ArchiverValue::VectorInt(v.iter().map(|x| *x as i32).collect())
2833        }
2834        EpicsValue::CharArray(v) => ArchiverValue::VectorChar(v.clone()),
2835        EpicsValue::StringArray(v) => ArchiverValue::VectorString(v.clone()),
2836    }
2837}
2838
2839/// Tunables for [`write_loop_with_config`]. Production uses the
2840/// defaults via [`write_loop`]; tests dial the timeouts down to
2841/// sub-second values so failure-injection cases finish in a few
2842/// hundred milliseconds instead of minutes.
2843#[derive(Debug, Clone)]
2844pub struct WriteLoopConfig {
2845    /// How often to call `flush_ingest_writes` and commit the
2846    /// pending `last_event` timestamps to the registry.
2847    pub flush_period: Duration,
2848    /// Bound on how long write_loop waits for a single
2849    /// `append_event_with_meta` JoinHandle. On timeout the sample
2850    /// is logged as abandoned-but-may-succeed-late and the loop
2851    /// moves on (the spawn_blocking task remains parked on the
2852    /// blocking pool — per-PV serialization in PlainPB keeps any
2853    /// late completion ordered behind subsequent same-PV samples).
2854    pub append_timeout: Duration,
2855    /// Bound on how long write_loop waits for one
2856    /// `flush_ingest_writes` JoinHandle during the periodic flush.
2857    /// On timeout, every pending `ts_updates` entry is deferred to
2858    /// the next cycle (we don't know which PVs reached disk).
2859    pub flush_timeout: Duration,
2860    /// Bound on each individual append issued during the shutdown
2861    /// drain.
2862    pub drain_per_sample_timeout: Duration,
2863    /// Total wall-clock budget for the shutdown drain. Once
2864    /// exceeded, remaining buffered samples are abandoned without
2865    /// even attempting an append, so a wedged STS can't stretch an
2866    /// orderly stop into "kill -9".
2867    pub drain_total_budget: Duration,
2868    /// Bound on the final `flush_writes` issued at shutdown.
2869    pub shutdown_flush_timeout: Duration,
2870}
2871
2872impl Default for WriteLoopConfig {
2873    fn default() -> Self {
2874        Self {
2875            flush_period: Duration::from_secs(10),
2876            append_timeout: Duration::from_secs(30),
2877            flush_timeout: Duration::from_secs(30),
2878            drain_per_sample_timeout: Duration::from_secs(5),
2879            drain_total_budget: Duration::from_secs(30),
2880            shutdown_flush_timeout: Duration::from_secs(15),
2881        }
2882    }
2883}
2884
2885/// Tunables for [`run_sharded_write_pool`].
2886///
2887/// `shards = 1` is the legacy single-worker layout and incurs no
2888/// dispatcher overhead. `shards > 1` spawns a dispatcher that hashes
2889/// each sample's `pv_name` to a fixed shard (so any one PV's samples
2890/// always land in the same per-shard write_loop, preserving
2891/// timestamp ordering) and N parallel shard workers — different PVs
2892/// can append concurrently instead of queueing behind a single task.
2893/// Sites with many active PVs and a fast STS should set this to
2894/// `min(num_cpus, expected concurrent slow appends)`.
2895#[derive(Debug, Clone)]
2896pub struct ShardedWritePoolConfig {
2897    /// Number of parallel shard workers. Must be ≥ 1; `1` is the
2898    /// degenerate case (no dispatcher, behaves identically to a
2899    /// bare `write_loop_with_config`).
2900    pub shards: usize,
2901    /// mpsc capacity of each shard's input channel. The dispatcher
2902    /// `try_send`s into this; when full, the OFFENDING SHARD's
2903    /// drop is recorded under
2904    /// `archiver_dispatcher_shard_overflow_drops_total{shard=N}`
2905    /// and the sample's per-PV `buffer_overflow_drops` counter is
2906    /// bumped. Other shards keep flowing — that's the per-shard
2907    /// isolation guarantee.
2908    pub per_shard_buffer: usize,
2909    /// Per-worker config (timeouts, flush period). Cloned into
2910    /// each shard.
2911    pub write_loop: WriteLoopConfig,
2912}
2913
2914impl Default for ShardedWritePoolConfig {
2915    fn default() -> Self {
2916        Self {
2917            shards: 1,
2918            per_shard_buffer: 4096,
2919            write_loop: WriteLoopConfig::default(),
2920        }
2921    }
2922}
2923
2924// PV-to-shard hash lives in `archiver_core::storage::plainpb` so
2925// the engine's dispatcher and PlainPB's
2926// `flush_ingest_writes_for_shard` agree on which shard owns which
2927// PV. Two definitions would mean shard 0's flush could iterate
2928// PVs the dispatcher routed to shard 1, re-introducing the
2929// misattribution bug.
2930use archiver_core::storage::plainpb::shard_for_pv;
2931
2932/// Run an N-shard write pool: 1 dispatcher (hashes pv_name → shard)
2933/// plus N parallel `write_loop_with_config` workers. Each shard has
2934/// independent `ts_updates`, an independent flush ticker, and its
2935/// own per-PV writer slots inside the shared storage plugin (so
2936/// per-PV ordering is naturally preserved by the consistent-hash
2937/// routing — samples for one PV always go to the same shard).
2938///
2939/// `shards == 1` short-circuits the dispatcher and runs a single
2940/// `write_loop_with_config` directly so single-worker deployments
2941/// pay zero overhead.
2942pub async fn run_sharded_write_pool(
2943    storage: Arc<dyn StoragePlugin>,
2944    registry: Arc<PvRegistry>,
2945    rx: mpsc::Receiver<PvSample>,
2946    shutdown: tokio::sync::watch::Receiver<bool>,
2947    cfg: ShardedWritePoolConfig,
2948) {
2949    let n = cfg.shards.max(1);
2950
2951    // Coalescing per-PV pending-timestamp map shared between the
2952    // flush owner, every shard worker, and every shard worker's
2953    // spawn_blocking late-success closure. Replaces the previous
2954    // mpsc report channel — see [`PendingReports`] for why.
2955    let pending = Arc::new(PendingReports::new());
2956
2957    // Spawn the global flush owner first so it's ready to flush
2958    // as soon as shards start appending.
2959    let flush_owner_handle = tokio::spawn(flush_owner_loop(
2960        storage.clone(),
2961        registry.clone(),
2962        pending.clone(),
2963        shutdown.clone(),
2964        cfg.write_loop.clone(),
2965    ));
2966
2967    if n == 1 {
2968        // Fast path: single shard, no dispatcher hop. We become
2969        // the shard worker ourselves and read from `rx` directly.
2970        shard_append_loop(
2971            0,
2972            storage.clone(),
2973            rx,
2974            pending.clone(),
2975            shutdown.clone(),
2976            cfg.write_loop.clone(),
2977        )
2978        .await;
2979    } else {
2980        // Multi-shard path: dispatcher + N shard workers.
2981        let mut shard_txs = Vec::with_capacity(n);
2982        let mut shard_handles = Vec::with_capacity(n);
2983        for shard_idx in 0..n {
2984            let (s_tx, s_rx) = mpsc::channel::<PvSample>(cfg.per_shard_buffer);
2985            shard_txs.push(s_tx);
2986            let storage = storage.clone();
2987            let pending = pending.clone();
2988            let shard_shutdown = shutdown.clone();
2989            let shard_cfg = cfg.write_loop.clone();
2990            shard_handles.push(tokio::spawn(shard_append_loop(
2991                shard_idx,
2992                storage,
2993                s_rx,
2994                pending,
2995                shard_shutdown,
2996                shard_cfg,
2997            )));
2998        }
2999
3000        dispatch_loop(rx, shard_txs, shutdown.clone()).await;
3001
3002        // Dispatcher exited → shard sample receivers see EOS or
3003        // the shutdown signal directly via their watch::Receiver.
3004        // Wait for shards to finish their drain branches.
3005        for h in shard_handles {
3006            let _ = h.await;
3007        }
3008    }
3009
3010    // Flush owner uses its own shutdown-watch + drain_total_budget
3011    // grace timer, not an EOS signal — see flush_owner_loop. Wait
3012    // for it to finish.
3013    let _ = flush_owner_handle.await;
3014}
3015
3016/// Reads the upstream main mpsc, hashes each sample's `pv_name`,
3017/// and forwards to the appropriate shard channel via
3018/// `Sender::try_send` for **per-shard isolation**.
3019///
3020/// `try_send` (not `send().await`) is the load-balancing
3021/// invariant: if one slow shard's channel is full, the dispatcher
3022/// drops THAT shard's overflow into `buffer_overflow_drops` and
3023/// keeps routing for the other shards. Using `send().await` would
3024/// block the dispatcher on the slow shard, back-pressuring the
3025/// upstream main channel and starving every other shard's PVs —
3026/// the exact failure mode the sharded layout is meant to prevent.
3027async fn dispatch_loop(
3028    mut rx: mpsc::Receiver<PvSample>,
3029    shard_txs: Vec<mpsc::Sender<PvSample>>,
3030    mut shutdown: tokio::sync::watch::Receiver<bool>,
3031) {
3032    let n = shard_txs.len();
3033    info!(shards = n, "Sharded write dispatcher started");
3034    loop {
3035        tokio::select! {
3036            biased;
3037            // `biased` so we always check shutdown before draining
3038            // another sample — keeps the shutdown latency bounded
3039            // even under a sustained sample storm.
3040            _ = shutdown.changed() => {
3041                if *shutdown.borrow() {
3042                    // Drain remaining samples (try_send for the
3043                    // same reason as the steady-state branch).
3044                    // Mirror the steady-state overflow accounting
3045                    // so a saturated shard's drain-time drops are
3046                    // visible to operators — silent loss here
3047                    // would shadow real samples lost during
3048                    // shutdown.
3049                    while let Ok(sample) = rx.try_recv() {
3050                        let idx = shard_for_pv(&sample.pv_name, n);
3051                        match shard_txs[idx].try_send(sample) {
3052                            Ok(()) => {}
3053                            Err(mpsc::error::TrySendError::Full(s)) => {
3054                                if let Some(c) = s.counters.as_ref() {
3055                                    c.buffer_overflow_drops
3056                                        .fetch_add(1, Ordering::Relaxed);
3057                                }
3058                                metrics::counter!(
3059                                    "archiver_dispatcher_shard_overflow_drops_total",
3060                                    "shard" => idx.to_string(),
3061                                    "phase" => "shutdown_drain",
3062                                )
3063                                .increment(1);
3064                                debug!(
3065                                    shard = idx,
3066                                    pv = s.pv_name,
3067                                    "Shutdown-drain shard channel full; sample dropped"
3068                                );
3069                            }
3070                            Err(mpsc::error::TrySendError::Closed(s)) => {
3071                                warn!(
3072                                    shard = idx,
3073                                    pv = s.pv_name,
3074                                    "Shutdown-drain shard channel closed; sample dropped"
3075                                );
3076                            }
3077                        }
3078                    }
3079                    break;
3080                }
3081            }
3082            maybe = rx.recv() => {
3083                match maybe {
3084                    Some(sample) => {
3085                        let idx = shard_for_pv(&sample.pv_name, n);
3086                        match shard_txs[idx].try_send(sample) {
3087                            Ok(()) => {}
3088                            Err(mpsc::error::TrySendError::Full(s)) => {
3089                                // Shard saturated. Surface the
3090                                // drop on the SAMPLE's per-PV
3091                                // counter so operators can pin
3092                                // the loss to a specific PV+shard.
3093                                if let Some(c) = s.counters.as_ref() {
3094                                    c.buffer_overflow_drops
3095                                        .fetch_add(1, Ordering::Relaxed);
3096                                }
3097                                metrics::counter!(
3098                                    "archiver_dispatcher_shard_overflow_drops_total",
3099                                    "shard" => idx.to_string(),
3100                                )
3101                                .increment(1);
3102                                debug!(
3103                                    shard = idx,
3104                                    pv = s.pv_name,
3105                                    "Shard channel full; sample dropped \
3106                                     (per-shard isolation)"
3107                                );
3108                            }
3109                            Err(mpsc::error::TrySendError::Closed(s)) => {
3110                                // Shard worker died (panic or
3111                                // shutdown race). We'll lose this
3112                                // sample but keep the dispatcher
3113                                // alive for OTHER shards.
3114                                warn!(
3115                                    shard = idx,
3116                                    pv = s.pv_name,
3117                                    "Shard channel closed; sample dropped"
3118                                );
3119                            }
3120                        }
3121                    }
3122                    None => break, // Upstream closed.
3123                }
3124            }
3125        }
3126    }
3127    info!("Sharded write dispatcher exiting");
3128}
3129
3130/// Background writer task — drains samples and writes to storage.
3131///
3132/// Thin wrapper that fills [`WriteLoopConfig`] from [`Default`] and
3133/// the caller-supplied `flush_period`. Production callers use this
3134/// form; the test suite uses [`write_loop_with_config`] directly to
3135/// dial timeouts down.
3136pub async fn write_loop(
3137    storage: Arc<dyn StoragePlugin>,
3138    registry: Arc<PvRegistry>,
3139    rx: mpsc::Receiver<PvSample>,
3140    shutdown: tokio::sync::watch::Receiver<bool>,
3141    flush_period: Duration,
3142) {
3143    let cfg = WriteLoopConfig {
3144        flush_period,
3145        ..Default::default()
3146    };
3147    write_loop_with_config(storage, registry, rx, shutdown, cfg).await
3148}
3149
3150/// RAII guard that clears the `flush_in_flight` flag when dropped,
3151/// even if the surrounding spawn_blocking closure panics. Without
3152/// this, an unwind inside `block_on(flush_ingest_writes)` would
3153/// skip the manual `store(false)` and leave the flag stuck `true`
3154/// forever — every subsequent ticker would see "still in flight"
3155/// and silently skip flushing, freezing the registry's `last_event`.
3156struct FlushInFlightGuard {
3157    flag: Arc<std::sync::atomic::AtomicBool>,
3158}
3159
3160impl Drop for FlushInFlightGuard {
3161    fn drop(&mut self) {
3162        self.flag.store(false, Ordering::Release);
3163    }
3164}
3165
3166/// Run one flush + commit cycle.
3167///
3168/// Spawns `flush_ingest_writes` on the blocking pool with a bounded
3169/// timeout, then commits the pending registry timestamps for every
3170/// PV the flush returned cleanly. Maintains an `in_flight` flag set
3171/// by the spawn_blocking task so a caller (the ticker branch) can
3172/// avoid stacking concurrent flushes when the previous one is still
3173/// running on a wedged FS.
3174///
3175/// Returns `true` if a flush was actually launched, `false` if the
3176/// helper was a no-op (in_flight already set, ts_updates empty).
3177///
3178/// Mutates `ts_updates`:
3179///   * Clean flush: drops `failed` PVs (bytes lost), commits all
3180///     other entries, drops committed entries on success.
3181///   * Deferred PVs: STAY in `ts_updates` for the next cycle.
3182///     Their bytes are still buffered, not lost.
3183///   * Flush error / panic: leaves every entry intact for retry.
3184///   * Flush timeout: CLEARS every entry. We don't know which PVs
3185///     reached disk; the in-flight task may still be running and
3186///     may even fail late, removing a writer from the cache. The
3187///     next flush would then see a "clean" state for that PV and
3188///     wrongly commit. Conservative drop is the only safe move
3189///     (Java archiver has the same trade-off — under-commit on
3190///     timeout, never over-commit). Future samples will repopulate
3191///     `ts_updates` and the next clean flush catches the registry up.
3192async fn run_flush_and_commit(
3193    storage: &Arc<dyn StoragePlugin>,
3194    registry: &Arc<PvRegistry>,
3195    pending: &Arc<PendingReports>,
3196    flush_timeout: Duration,
3197    in_flight: &Arc<std::sync::atomic::AtomicBool>,
3198) -> bool {
3199    if in_flight.load(Ordering::Acquire) {
3200        // Previous flush hasn't finished. Don't queue another —
3201        // we'd just stack spawn_blocking tasks on the wedged FS
3202        // and saturate the blocking pool. Wait for the existing
3203        // one to drain.
3204        return false;
3205    }
3206    // Snapshot the current pending map BEFORE setting in_flight
3207    // so concurrent shards reporting after this point survive
3208    // into the next cycle (their entries will be in
3209    // `pending` but not in `snapshot`, so we won't remove them
3210    // when we clean up). The snapshot is the authoritative view
3211    // of "what we're trying to commit this cycle".
3212    let snapshot = pending.snapshot();
3213    if snapshot.is_empty() {
3214        return false;
3215    }
3216    in_flight.store(true, Ordering::Release);
3217
3218    let storage_for_flush = storage.clone();
3219    let in_flight_for_task = in_flight.clone();
3220    let flush_join = tokio::task::spawn_blocking(move || {
3221        // RAII guard: clears `in_flight` on every exit path
3222        // (return, panic, future-cancellation). The guard's
3223        // existence is the contract — manually storing false
3224        // BEFORE returning would skip cleanup on a panic inside
3225        // block_on / inside flush_ingest_writes, leaving the flag
3226        // stuck `true` and silently freezing all future flushes.
3227        let _guard = FlushInFlightGuard {
3228            flag: in_flight_for_task,
3229        };
3230        let rt = tokio::runtime::Handle::current();
3231        rt.block_on(storage_for_flush.flush_ingest_writes())
3232    });
3233    match tokio::time::timeout(flush_timeout, flush_join).await {
3234        Ok(Ok(Ok(IngestFlushResult { failed, deferred }))) => {
3235            // Failed PVs: bytes lost. Drop every failed PV's
3236            // pending entry UNCONDITIONALLY — regardless of
3237            // whether the snapshot's value still matches.
3238            //
3239            // The previous "remove only if matches snapshot"
3240            // policy assumed a concurrent shard's post-snapshot
3241            // report meant "newer bytes ARE on disk". That
3242            // assumption fails for the eviction/loss-queue path:
3243            // the loss queue records only PV names, so a loss
3244            // event recorded between snapshot and flush-return
3245            // has no way to communicate which writer generation
3246            // (or timestamp horizon) it applies to. A
3247            // post-snapshot pending entry could refer to bytes
3248            // that ARE on disk (fresh writer after eviction) OR
3249            // bytes that were also lost — we can't tell.
3250            //
3251            // Conservative drop is the safe choice: never
3252            // commit a `last_event` for a PV the storage just
3253            // told us had lost bytes, even at the cost of an
3254            // occasional under-commit that the next sample
3255            // resolves.
3256            if !failed.is_empty() {
3257                pending.remove_failed(&failed);
3258                error!(
3259                    "STS flush dropped {} PV(s) from timestamp commit \
3260                     (bytes never reached disk): {:?}",
3261                    failed.len(),
3262                    failed
3263                );
3264            }
3265            // Deferred PVs: writer slot busy, bytes still buffered.
3266            // We do NOT remove them from `pending` (next cycle
3267            // will catch them) and we EXCLUDE them from this
3268            // cycle's commit batch.
3269            if !deferred.is_empty() {
3270                debug!(
3271                    "STS flush deferred {} PV(s); keeping in pending \
3272                     for next cycle: {:?}",
3273                    deferred.len(),
3274                    deferred
3275                );
3276            }
3277            // Build commit batch from the snapshot, excluding
3278            // failed (bytes lost) and deferred (bytes not on disk
3279            // yet).
3280            let failed_set: std::collections::HashSet<&str> =
3281                failed.iter().map(|s| s.as_str()).collect();
3282            let deferred_set: std::collections::HashSet<&str> =
3283                deferred.iter().map(|s| s.as_str()).collect();
3284            let to_commit: Vec<(&str, SystemTime)> = snapshot
3285                .iter()
3286                .filter(|(pv, _)| {
3287                    !failed_set.contains(pv.as_str()) && !deferred_set.contains(pv.as_str())
3288                })
3289                .map(|(pv, ts)| (pv.as_str(), *ts))
3290                .collect();
3291            if to_commit.is_empty() {
3292                return true;
3293            }
3294            match registry.batch_update_timestamps(&to_commit) {
3295                Ok(()) => {
3296                    // Remove committed entries from the shared
3297                    // map — but only if their current value still
3298                    // matches the snapshot's. This preserves
3299                    // concurrent updates that arrived between
3300                    // snapshot and remove.
3301                    let committed_map: std::collections::HashMap<String, SystemTime> = to_commit
3302                        .iter()
3303                        .map(|(pv, ts)| ((*pv).to_string(), *ts))
3304                        .collect();
3305                    pending.remove_committed(&committed_map);
3306                }
3307                Err(e) => {
3308                    // Don't remove — retry on next cycle.
3309                    // Otherwise a transient SQLite error orphans
3310                    // timestamps the disk has but the registry
3311                    // doesn't know about, with no retry path.
3312                    error!(
3313                        "Registry timestamp commit failed; \
3314                         keeping {} pending for retry: {e}",
3315                        snapshot.len()
3316                    );
3317                }
3318            }
3319        }
3320        Ok(Ok(Err(e))) => {
3321            error!(
3322                "STS ingest flush errored; deferring all {} \
3323                 pending timestamp commits: {e}",
3324                snapshot.len()
3325            );
3326        }
3327        Ok(Err(join_err)) => {
3328            error!(
3329                "STS ingest flush task panicked; deferring all {} \
3330                 pending timestamp commits: {join_err}",
3331                snapshot.len()
3332            );
3333        }
3334        Err(_) => {
3335            // Flush wedged. Conservative drop: the spawn_blocking
3336            // task may still be running and may even fail late
3337            // (which would remove a PV's writer from the cache,
3338            // making the NEXT flush return "clean" and tricking
3339            // the owner into committing a stale value for a PV
3340            // whose old bytes are gone). Drop the snapshot's
3341            // entries from `pending` (only if unchanged), so the
3342            // owner doesn't trust them; future samples rebuild
3343            // pending and the next clean flush catches the
3344            // registry up.
3345            //
3346            // The `in_flight` flag stays TRUE until the
3347            // spawn_blocking task naturally finishes — this
3348            // throttles us from queueing yet another flush onto
3349            // the wedged FS until it clears.
3350            metrics::counter!("archiver_storage_flush_timeouts_total").increment(1);
3351            let dropped = snapshot.len();
3352            pending.remove_committed(&snapshot);
3353            error!(
3354                "STS ingest flush timed out after {flush_timeout:?}; \
3355                 conservatively dropped {dropped} pending timestamp \
3356                 commit(s) (task remains on blocking pool; \
3357                 timestamps will be rebuilt from subsequent samples)"
3358            );
3359        }
3360    }
3361    true
3362}
3363
3364/// Coalescing per-PV pending-timestamp map shared between every
3365/// shard worker (incl. their spawn_blocking late-success closures)
3366/// and the single global flush owner.
3367///
3368/// **Why a shared map instead of an mpsc channel?** With a channel,
3369/// a slow flush owner causes the buffer to fill and `try_send`
3370/// drops happen — for a PV that goes silent right after a dropped
3371/// report, the registry's `last_event` is permanently
3372/// under-committed. With a coalescing map keyed by PV, every
3373/// shard call is an `entry().and_modify().or_insert()` that
3374/// always succeeds; concurrent updates from different shards
3375/// never lose data because the map is bounded by **PV count**
3376/// (typical: thousands), not by sample rate (typical: 100k/s).
3377///
3378/// **Coalescing semantics.** A successful append for `(pv, ts)`
3379/// upserts the entry to `max(current, ts)`. A stale late-success
3380/// report (e.g. from a spawn_blocking task that finished after
3381/// shard already wrote a newer sample) does NOT clobber a newer
3382/// committed value.
3383#[derive(Default)]
3384pub struct PendingReports {
3385    inner: dashmap::DashMap<String, SystemTime>,
3386}
3387
3388impl PendingReports {
3389    pub fn new() -> Self {
3390        Self::default()
3391    }
3392
3393    /// Coalescing report. If `pv` already has a newer timestamp,
3394    /// no-op. Always succeeds — this is the channel-replacement
3395    /// invariant that makes silent-PV under-commit impossible.
3396    pub fn report(&self, pv: &str, ts: SystemTime) {
3397        // DashMap's entry() locks one shard internally. Fast-path
3398        // updates use `and_modify`; brand-new PVs go through
3399        // `or_insert`. No external lock contention across shards.
3400        self.inner
3401            .entry(pv.to_string())
3402            .and_modify(|cur| {
3403                if *cur < ts {
3404                    *cur = ts;
3405                }
3406            })
3407            .or_insert(ts);
3408    }
3409
3410    /// Snapshot the entire map for a flush cycle. Returns a
3411    /// owned HashMap; the shared map stays intact so concurrent
3412    /// shards can keep reporting during the flush.
3413    pub fn snapshot(&self) -> std::collections::HashMap<String, SystemTime> {
3414        self.inner
3415            .iter()
3416            .map(|kv| (kv.key().clone(), *kv.value()))
3417            .collect()
3418    }
3419
3420    /// Remove entries whose CURRENT value still equals what we
3421    /// committed. If a concurrent shard advanced an entry to a
3422    /// newer ts after the snapshot, leave it alone — the next
3423    /// flush will pick it up.
3424    pub fn remove_committed(&self, committed: &std::collections::HashMap<String, SystemTime>) {
3425        for (pv, &committed_ts) in committed {
3426            // remove_if applies the predicate atomically inside
3427            // the DashMap shard lock.
3428            let _ = self.inner.remove_if(pv, |_, v| *v == committed_ts);
3429        }
3430    }
3431
3432    /// Unconditionally remove every failed PV's entry from the
3433    /// pending map, regardless of its current timestamp.
3434    ///
3435    /// **Why unconditional.** The storage's loss queue carries
3436    /// only PV names — it cannot tell us whether a concurrent
3437    /// shard's post-snapshot report was for bytes that ARE on
3438    /// disk (e.g., a fresh writer opened after eviction) versus
3439    /// bytes that are also gone. The matching `remove_committed`
3440    /// path can leave a post-snapshot entry behind, and the next
3441    /// clean flush would commit it — turning a "PV's bytes were
3442    /// lost" report into a `last_event` lie.
3443    ///
3444    /// The trade-off: a brand-new sample whose bytes truly DID
3445    /// reach disk after the loss event gets dropped from this
3446    /// commit cycle. Future samples re-populate `pending` and the
3447    /// registry catches up. Under-commit is the safe direction;
3448    /// over-commit is never acceptable.
3449    pub fn remove_failed(&self, failed: &[String]) {
3450        for pv in failed {
3451            let _ = self.inner.remove(pv);
3452        }
3453    }
3454
3455    pub fn is_empty(&self) -> bool {
3456        self.inner.is_empty()
3457    }
3458
3459    pub fn len(&self) -> usize {
3460        self.inner.len()
3461    }
3462}
3463
3464/// Same as [`write_loop`] but accepts the full [`WriteLoopConfig`].
3465///
3466/// Thin wrapper that runs a 1-shard sharded pool — kept on the
3467/// public surface for tests and any direct callers. The actual
3468/// work happens in [`run_sharded_write_pool`].
3469pub async fn write_loop_with_config(
3470    storage: Arc<dyn StoragePlugin>,
3471    registry: Arc<PvRegistry>,
3472    rx: mpsc::Receiver<PvSample>,
3473    shutdown: tokio::sync::watch::Receiver<bool>,
3474    cfg: WriteLoopConfig,
3475) {
3476    let pool_cfg = ShardedWritePoolConfig {
3477        shards: 1,
3478        // unused on the 1-shard fast path (rx is forwarded directly)
3479        per_shard_buffer: 4096,
3480        write_loop: cfg,
3481    };
3482    run_sharded_write_pool(storage, registry, rx, shutdown, pool_cfg).await;
3483}
3484
3485/// Per-shard append worker. Drains samples from `sample_rx`,
3486/// drops out-of-order or type-changed samples, then runs each
3487/// `append_event_with_meta` on the blocking pool with a bounded
3488/// timeout. On every success — including the late-success path
3489/// where write_loop's outer timeout already abandoned the
3490/// JoinHandle — coalesces `(pv, ts)` into the shared
3491/// [`PendingReports`] map so the global flush owner sees it on
3492/// the next flush cycle.
3493///
3494/// The shard worker holds NO ts_updates / flush state of its own.
3495/// It tracks `last_ts` and `last_dbr_type` only for the
3496/// per-PV ordering / type-change drop checks; those are local
3497/// to the shard because the dispatcher's consistent hash pins each
3498/// PV to one shard.
3499async fn shard_append_loop(
3500    shard_idx: usize,
3501    storage: Arc<dyn StoragePlugin>,
3502    mut sample_rx: mpsc::Receiver<PvSample>,
3503    pending: Arc<PendingReports>,
3504    mut shutdown: tokio::sync::watch::Receiver<bool>,
3505    cfg: WriteLoopConfig,
3506) {
3507    let append_timeout = cfg.append_timeout;
3508    info!(shard = shard_idx, "Shard append loop started");
3509    // Per-PV state for the in-shard sanity drops. The dispatcher's
3510    // consistent hash keeps each PV in one shard, so these maps
3511    // never see cross-shard PVs.
3512    let mut last_ts: std::collections::HashMap<String, SystemTime> =
3513        std::collections::HashMap::new();
3514    let mut last_dbr_type: std::collections::HashMap<String, ArchDbType> =
3515        std::collections::HashMap::new();
3516
3517    loop {
3518        tokio::select! {
3519            Some(pv_sample) = sample_rx.recv() => {
3520                shard_handle_sample(
3521                    shard_idx,
3522                    &storage,
3523                    &pending,
3524                    pv_sample,
3525                    &mut last_ts,
3526                    &mut last_dbr_type,
3527                    append_timeout,
3528                )
3529                .await;
3530            }
3531            _ = shutdown.changed() => {
3532                shard_drain_on_shutdown(
3533                    shard_idx,
3534                    &storage,
3535                    &pending,
3536                    &mut sample_rx,
3537                    &mut last_ts,
3538                    &mut last_dbr_type,
3539                    &cfg,
3540                )
3541                .await;
3542                break;
3543            }
3544        }
3545    }
3546    info!(shard = shard_idx, "Shard append loop exited");
3547}
3548
3549/// Process one sample on the shard's hot path. Extracted so the
3550/// steady-state and shutdown-drain branches can share the same
3551/// "drop checks → spawn_blocking append → report on success"
3552/// recipe.
3553async fn shard_handle_sample(
3554    shard_idx: usize,
3555    storage: &Arc<dyn StoragePlugin>,
3556    pending: &Arc<PendingReports>,
3557    pv_sample: PvSample,
3558    last_ts: &mut std::collections::HashMap<String, SystemTime>,
3559    last_dbr_type: &mut std::collections::HashMap<String, ArchDbType>,
3560    append_timeout: Duration,
3561) {
3562    let ts = pv_sample.sample.timestamp;
3563
3564    // Out-of-order timestamp drop. Storage requires monotonic
3565    // appends per PV; an older timestamp would produce a corrupt
3566    // partition. Tracked via shard-local `last_ts` so the check
3567    // survives flush cycles (the legacy ts_updates-based check
3568    // only caught within-cycle reorderings).
3569    if let Some(prev_ts) = last_ts.get(&pv_sample.pv_name)
3570        && ts < *prev_ts
3571    {
3572        if let Some(ref c) = pv_sample.counters {
3573            c.timestamp_drops.fetch_add(1, Ordering::Relaxed);
3574        }
3575        debug!(
3576            shard = shard_idx,
3577            pv = pv_sample.pv_name,
3578            ?ts,
3579            ?prev_ts,
3580            "Dropping out-of-order sample"
3581        );
3582        return;
3583    }
3584
3585    // Type-change drop. The first sample defines the PV's wire
3586    // type; later samples with a different DBR get dropped
3587    // (operator must changeTypeForPV first).
3588    let prev_type = last_dbr_type.insert(pv_sample.pv_name.clone(), pv_sample.dbr_type);
3589    if let Some(prev) = prev_type
3590        && prev != pv_sample.dbr_type
3591    {
3592        if let Some(ref c) = pv_sample.counters {
3593            c.type_change_drops.fetch_add(1, Ordering::Relaxed);
3594            // Java parity (9f2234f): record the latest observed
3595            // DBR so the dropped-events report can show what the
3596            // IOC is now sending vs the archived type.
3597            c.latest_observed_dbr
3598                .store(pv_sample.dbr_type as i32, Ordering::Relaxed);
3599        }
3600        debug!(
3601            shard = shard_idx,
3602            pv = pv_sample.pv_name,
3603            ?prev,
3604            new = ?pv_sample.dbr_type,
3605            "Dropping type-changed sample"
3606        );
3607        // Restore prev_type so a single mismatched sample doesn't
3608        // permanently flip our recorded type.
3609        last_dbr_type.insert(pv_sample.pv_name.clone(), prev);
3610        return;
3611    }
3612
3613    let meta = AppendMeta {
3614        element_count: pv_sample.element_count,
3615        ..Default::default()
3616    };
3617    let pv_name_for_post = pv_sample.pv_name.clone();
3618    let counters_for_post = pv_sample.counters.clone();
3619    let counters_in_task = pv_sample.counters.clone();
3620    let storage_for_task = storage.clone();
3621    let pending_for_task = pending.clone();
3622
3623    // Bound each append + isolate sync syscall hangs.
3624    // `spawn_blocking` moves the actual I/O to the blocking
3625    // thread pool, so a stuck NFS write parks a blocking-pool
3626    // thread instead of a runtime worker. The
3627    // `tokio::time::timeout` then bounds how long the shard waits
3628    // for that join — a stuck task is abandoned and the shard
3629    // continues.
3630    //
3631    // Late-success path: a timed-out task may still complete its
3632    // write later. The closure ALWAYS reports into the shared
3633    // `PendingReports` map on Ok (whether or not the shard
3634    // already moved on), so the global flush owner picks up late
3635    // successes too. Per-PV serialization inside PlainPB keeps
3636    // any late completion ordered behind subsequent same-PV
3637    // samples. The map's coalescing semantics ensure a stale late
3638    // success does NOT clobber a newer hot-path commit.
3639    let join = tokio::task::spawn_blocking(move || {
3640        let rt = tokio::runtime::Handle::current();
3641        let res = rt.block_on(storage_for_task.append_event_with_meta(
3642            &pv_sample.pv_name,
3643            pv_sample.dbr_type,
3644            &pv_sample.sample,
3645            &meta,
3646        ));
3647        if res.is_ok() {
3648            metrics::counter!("archiver_events_stored_total").increment(1);
3649            if let Some(c) = counters_in_task.as_ref() {
3650                c.events_stored.fetch_add(1, Ordering::Relaxed);
3651            }
3652            // Coalesce into the shared map. Always succeeds —
3653            // unlike the previous mpsc try_send, a saturated
3654            // flush owner cannot cause this report to drop, so
3655            // a PV that goes silent right after a successful
3656            // append still gets its `last_event` committed once
3657            // the owner gets to its next flush cycle.
3658            pending_for_task.report(&pv_sample.pv_name, ts);
3659        }
3660        res
3661    });
3662    let res = tokio::time::timeout(append_timeout, join).await;
3663    // Conservative ordering high-water (principle 5). Bump
3664    // `last_ts` on EVERY storage-layer return path — success,
3665    // error, panic, timeout — not just on Ok and timeout.
3666    //
3667    // Why all four:
3668    //   * Ok       — sample on disk; obvious bump.
3669    //   * Timeout  — sample MAY land on disk via late success;
3670    //                bump to keep on-disk order monotonic.
3671    //   * Error    — current storage contract is binary, but a
3672    //                future partial-success-with-error variant
3673    //                could leave bytes on disk. Bumping now
3674    //                hedges against that.
3675    //   * Panic    — closure aborted mid-write; storage state
3676    //                is undefined. Bumping is the safe choice.
3677    //
3678    // The pre-check earlier in this function already drops
3679    // out-of-order and type-changed samples BEFORE we get here,
3680    // so this bump only ever sets the high-water to a ts the
3681    // shard explicitly accepted into the storage layer.
3682    last_ts.insert(pv_name_for_post.clone(), ts);
3683    match res {
3684        Ok(Ok(Ok(()))) => {
3685            // Report already went out from inside the closure.
3686        }
3687        Ok(Ok(Err(e))) => {
3688            error!(shard = shard_idx, pv = pv_name_for_post, "Write error: {e}");
3689        }
3690        Ok(Err(join_err)) => {
3691            error!(
3692                shard = shard_idx,
3693                pv = pv_name_for_post,
3694                "Storage write task panicked: {join_err}"
3695            );
3696        }
3697        Err(_) => {
3698            // Sample MAY still land on disk later (per-PV
3699            // serialization keeps order). The closure will report
3700            // into the shared pending map whenever it eventually
3701            // completes, so registry's `last_event` catches up
3702            // via the late path even though we move on now.
3703            error!(
3704                shard = shard_idx,
3705                pv = pv_name_for_post,
3706                "Storage append timed out after {append_timeout:?}; \
3707                 shard abandoning (task remains on blocking pool, \
3708                 may still succeed late)"
3709            );
3710            if let Some(ref c) = counters_for_post {
3711                c.storage_append_timeouts.fetch_add(1, Ordering::Relaxed);
3712            }
3713        }
3714    }
3715}
3716
3717/// Drain the shard's remaining buffered samples on shutdown,
3718/// applying the same bounded-timeout discipline as the hot path.
3719async fn shard_drain_on_shutdown(
3720    shard_idx: usize,
3721    storage: &Arc<dyn StoragePlugin>,
3722    pending: &Arc<PendingReports>,
3723    sample_rx: &mut mpsc::Receiver<PvSample>,
3724    last_ts: &mut std::collections::HashMap<String, SystemTime>,
3725    last_dbr_type: &mut std::collections::HashMap<String, ArchDbType>,
3726    cfg: &WriteLoopConfig,
3727) {
3728    let drain_per_sample_timeout = cfg.drain_per_sample_timeout;
3729    let drain_total_budget = cfg.drain_total_budget;
3730    let drain_start = std::time::Instant::now();
3731    let mut drained_skipped = 0usize;
3732    while let Ok(pv_sample) = sample_rx.try_recv() {
3733        if drain_start.elapsed() > drain_total_budget {
3734            drained_skipped += 1;
3735            continue;
3736        }
3737        // Reuse the hot-path handler with a tighter timeout.
3738        // Late-success reports still flow into `pending` because
3739        // the closure clones the Arc independently of this
3740        // shard's lifecycle.
3741        shard_handle_sample(
3742            shard_idx,
3743            storage,
3744            pending,
3745            pv_sample,
3746            last_ts,
3747            last_dbr_type,
3748            drain_per_sample_timeout,
3749        )
3750        .await;
3751    }
3752    if drained_skipped > 0 {
3753        warn!(
3754            shard = shard_idx,
3755            "Shutdown drain budget ({drain_total_budget:?}) exhausted; \
3756             {drained_skipped} buffered sample(s) abandoned without \
3757             attempting append"
3758        );
3759    }
3760}
3761
3762/// Single global flush owner. Reads from the shared
3763/// [`PendingReports`] map (which all shards coalesce into),
3764/// drives the periodic `flush_ingest_writes`, and commits to the
3765/// registry. Because ALL shards report into this one map and the
3766/// owner is the only consumer, the flush result and the
3767/// pending-commit data live in the same task — no shard 0 vs
3768/// shard 1 attribution skew.
3769async fn flush_owner_loop(
3770    storage: Arc<dyn StoragePlugin>,
3771    registry: Arc<PvRegistry>,
3772    pending: Arc<PendingReports>,
3773    mut shutdown: tokio::sync::watch::Receiver<bool>,
3774    cfg: WriteLoopConfig,
3775) {
3776    let flush_period = cfg.flush_period;
3777    let flush_timeout = cfg.flush_timeout;
3778    let shutdown_flush_timeout = cfg.shutdown_flush_timeout;
3779    let drain_total_budget = cfg.drain_total_budget;
3780    // tokio::time::interval(Duration::ZERO) panics. Config-side
3781    // validation rejects 0 at load time, but defending in depth
3782    // here keeps tests and direct callers (which build configs by
3783    // hand) from crashing the engine on a misconfigured value.
3784    let flush_period = if flush_period.is_zero() {
3785        warn!(
3786            "flush_owner: flush_period was Duration::ZERO; \
3787             clamping to 1s to avoid tokio::time::interval panic"
3788        );
3789        Duration::from_secs(1)
3790    } else {
3791        flush_period
3792    };
3793    info!("Flush owner started");
3794
3795    let mut flush_ticker = tokio::time::interval(flush_period);
3796    flush_ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
3797    let _ = flush_ticker.tick().await;
3798
3799    let flush_in_flight = Arc::new(std::sync::atomic::AtomicBool::new(false));
3800
3801    // Phase 1: steady state. Ticker-driven flush only. Reports
3802    // accumulate in the shared `PendingReports` map continuously
3803    // (no recv loop — shards write directly), so we don't need
3804    // to multiplex report consumption against the flush.
3805    loop {
3806        tokio::select! {
3807            biased;
3808            _ = shutdown.changed() => {
3809                if *shutdown.borrow() {
3810                    break;
3811                }
3812            }
3813            _ = flush_ticker.tick() => {
3814                if !pending.is_empty() {
3815                    run_flush_and_commit(
3816                        &storage,
3817                        &registry,
3818                        &pending,
3819                        flush_timeout,
3820                        &flush_in_flight,
3821                    )
3822                    .await;
3823                }
3824            }
3825        }
3826    }
3827
3828    // Phase 2: shutdown grace. Two windows in one budget:
3829    //
3830    //   1. A brief minimum sleep so any in-flight spawn_blocking
3831    //      late-success closures can coalesce their reports into
3832    //      `pending` before the final flush snapshots it.
3833    //
3834    //   2. Wait for any still-running ticker flush to drain
3835    //      (`flush_in_flight` cleared by the spawn_blocking task's
3836    //      RAII guard). Without this, the final
3837    //      `run_flush_and_commit` would see `in_flight == true`,
3838    //      short-circuit, and skip every entry in `pending` —
3839    //      data on disk but no registry commit.
3840    //
3841    // Both windows fit inside the operator-configured
3842    // `drain_total_budget`. If a flush is genuinely wedged past
3843    // the budget, we proceed to final flush; that call will see
3844    // `in_flight` still set and short-circuit, but at least we
3845    // didn't pin the process forever. Future samples on restart
3846    // will re-populate `pending` and the registry catches up.
3847    let phase2_deadline = std::time::Instant::now() + drain_total_budget;
3848    let min_grace = std::cmp::min(drain_total_budget, Duration::from_millis(200));
3849    if !min_grace.is_zero() {
3850        tokio::time::sleep(min_grace).await;
3851    }
3852    while flush_in_flight.load(Ordering::Acquire) && std::time::Instant::now() < phase2_deadline {
3853        tokio::time::sleep(Duration::from_millis(50)).await;
3854    }
3855    if flush_in_flight.load(Ordering::Acquire) {
3856        warn!(
3857            "Shutdown grace exhausted while a flush is still in flight; \
3858             final flush will be skipped (pending entries left for next \
3859             process restart to rebuild from re-archived samples)"
3860        );
3861    }
3862
3863    // Final flush + commit. Use shutdown_flush_timeout (typically
3864    // shorter than the steady-state flush_timeout) so a wedged FS
3865    // doesn't block shutdown indefinitely.
3866    info!(
3867        pending_at_shutdown = pending.len(),
3868        "Flush owner running final flush + commit"
3869    );
3870    if !pending.is_empty() {
3871        run_flush_and_commit(
3872            &storage,
3873            &registry,
3874            &pending,
3875            shutdown_flush_timeout,
3876            &flush_in_flight,
3877        )
3878        .await;
3879    }
3880    info!("Flush owner exiting");
3881}
3882
3883#[cfg(test)]
3884mod pva_mapping_tests {
3885    use super::*;
3886    use epics_rs::pva::pvdata::PvStructure;
3887    use epics_rs::pva::pvdata::encode::{decode_pv_field, decode_type_desc};
3888    use std::io::Cursor;
3889
3890    /// Build a synthetic NTTable with two double columns. Mirrors what
3891    /// a live IOC publishes for a typical waveform-table channel.
3892    fn make_nttable() -> PvField {
3893        let mut table = PvStructure::new("epics:nt/NTTable:1.0");
3894        let labels =
3895            TypedScalarArray::String(vec!["x".to_string(), "y".to_string()].into_iter().collect());
3896        table
3897            .fields
3898            .push(("labels".into(), PvField::ScalarArrayTyped(labels)));
3899        let mut value_inner = PvStructure::new("");
3900        value_inner.fields.push((
3901            "x".into(),
3902            PvField::ScalarArrayTyped(TypedScalarArray::Double(vec![1.0, 2.0, 3.0].into())),
3903        ));
3904        value_inner.fields.push((
3905            "y".into(),
3906            PvField::ScalarArrayTyped(TypedScalarArray::Double(vec![4.0, 5.0, 6.0].into())),
3907        ));
3908        table
3909            .fields
3910            .push(("value".into(), PvField::Structure(value_inner)));
3911        PvField::Structure(table)
3912    }
3913
3914    #[test]
3915    fn nttable_classifies_as_v4_generic_bytes() {
3916        let pv = make_nttable();
3917        let (db, ec) = pv_field_to_arch_db_type(&pv).expect("classified");
3918        assert_eq!(db, ArchDbType::V4GenericBytes);
3919        assert_eq!(ec, 1);
3920    }
3921
3922    #[test]
3923    fn nttable_round_trips_through_self_describing_bytes() {
3924        let pv = make_nttable();
3925        let av = pv_field_scalar_to_archiver(&pv, &pv.descriptor()).expect("converted");
3926        let bytes = match av {
3927            ArchiverValue::V4GenericBytes(b) => b,
3928            other => panic!("expected V4GenericBytes, got {:?}", other.db_type()),
3929        };
3930        let mut cur = Cursor::new(bytes.as_slice());
3931        let desc = decode_type_desc(&mut cur, ByteOrder::Big).expect("desc decode");
3932        let decoded = decode_pv_field(&desc, &mut cur, ByteOrder::Big).expect("value decode");
3933        let PvField::Structure(s) = decoded else {
3934            panic!("expected Structure after decode");
3935        };
3936        assert_eq!(s.struct_id, "epics:nt/NTTable:1.0");
3937        // labels column survives + value substructure has 2 columns.
3938        let labels = s
3939            .fields
3940            .iter()
3941            .find_map(|(n, f)| if n == "labels" { Some(f) } else { None })
3942            .expect("labels");
3943        let Some(PvField::ScalarArrayTyped(TypedScalarArray::String(arr))) = Some(labels) else {
3944            panic!("labels not a string array");
3945        };
3946        assert_eq!(arr.len(), 2);
3947        let value = s
3948            .fields
3949            .iter()
3950            .find_map(|(n, f)| {
3951                if n == "value" {
3952                    if let PvField::Structure(v) = f {
3953                        Some(v)
3954                    } else {
3955                        None
3956                    }
3957                } else {
3958                    None
3959                }
3960            })
3961            .expect("value substruct");
3962        assert_eq!(value.fields.len(), 2);
3963    }
3964
3965    /// NTScalarArray of doubles should classify as WaveformDouble and
3966    /// convert to VectorDouble — this is the pre-existing PVA waveform
3967    /// path that was silently broken by the missing ScalarArrayTyped
3968    /// branch.
3969    #[test]
3970    fn nt_scalar_array_double_classifies_as_waveform() {
3971        let mut wrapper = PvStructure::new("epics:nt/NTScalarArray:1.0");
3972        wrapper.fields.push((
3973            "value".into(),
3974            PvField::ScalarArrayTyped(TypedScalarArray::Double(vec![1.5, 2.5, 3.5].into())),
3975        ));
3976        let pv = PvField::Structure(wrapper);
3977
3978        let (db, ec) = pv_field_to_arch_db_type(&pv).expect("classified");
3979        assert_eq!(db, ArchDbType::WaveformDouble);
3980        assert_eq!(ec, 3);
3981        let av = pv_field_scalar_to_archiver(&pv, &pv.descriptor()).expect("converted");
3982        match av {
3983            ArchiverValue::VectorDouble(v) => assert_eq!(v, vec![1.5, 2.5, 3.5]),
3984            other => panic!("expected VectorDouble, got {:?}", other.db_type()),
3985        }
3986    }
3987
3988    /// NTEnum: classification short-circuits to ScalarEnum (Java
3989    /// archiver parity — store the index, not the whole enum_t
3990    /// structure as opaque V4 bytes).
3991    #[test]
3992    fn nt_enum_classifies_as_scalar_enum() {
3993        let pv = make_nt_enum(2, &["Zero", "One", "Two"]);
3994        let (db, ec) = pv_field_to_arch_db_type(&pv).expect("classified");
3995        assert_eq!(db, ArchDbType::ScalarEnum);
3996        assert_eq!(ec, 1);
3997    }
3998
3999    /// NTEnum: value extraction yields ScalarEnum(index).
4000    #[test]
4001    fn nt_enum_value_is_index() {
4002        let pv = make_nt_enum(2, &["Off", "On"]);
4003        let av = pv_field_scalar_to_archiver(&pv, &pv.descriptor()).expect("converted");
4004        match av {
4005            ArchiverValue::ScalarEnum(i) => assert_eq!(i, 2),
4006            other => panic!("expected ScalarEnum, got {:?}", other.db_type()),
4007        }
4008    }
4009
4010    /// NTEnum: monitor callback feeds choices into the extras cache
4011    /// under the `enum_strs` key as a JSON-encoded string array.
4012    /// Downstream `attach_extras` then mirrors it into the sample's
4013    /// `field_values`, which the retrieval JSON surfaces under `meta`.
4014    #[test]
4015    fn nt_enum_extras_carries_json_choices() {
4016        let pv = make_nt_enum(1, &["Off", "On", "Trip"]);
4017        let extras: ExtraFieldsCache = DashMap::new();
4018        refresh_nt_enum_extras(&pv, &extras);
4019        let stored = extras
4020            .get("enum_strs")
4021            .map(|s| s.value().clone())
4022            .expect("enum_strs cached");
4023        assert_eq!(stored, r#"["Off","On","Trip"]"#);
4024    }
4025
4026    /// Top-level `UnionArray` PV: with the canonical channel
4027    /// descriptor plumbed through (per epics-rs `descriptor()` doc
4028    /// guidance), wire encoding preserves the variants list and the
4029    /// values round-trip. The archiver invariant requires the
4030    /// canonical descriptor at every callsite — there is no
4031    /// value-recovery fallback path to contrast against any more.
4032    #[test]
4033    fn union_array_roundtrips_with_canonical_descriptor() {
4034        use epics_rs::pva::pvdata::encode::{decode_pv_field, decode_type_desc};
4035        use epics_rs::pva::pvdata::{FieldDesc, UnionItem};
4036        use std::io::Cursor;
4037
4038        let variants_desc = vec![
4039            ("intVal".to_string(), FieldDesc::Scalar(ScalarType::Int)),
4040            ("dblVal".to_string(), FieldDesc::Scalar(ScalarType::Double)),
4041        ];
4042        let canonical = FieldDesc::UnionArray {
4043            struct_id: String::new(),
4044            variants: variants_desc.clone(),
4045        };
4046        let items = vec![
4047            UnionItem {
4048                selector: 0,
4049                variant_name: "intVal".into(),
4050                value: PvField::Scalar(ScalarValue::Int(42)),
4051            },
4052            UnionItem {
4053                selector: 1,
4054                variant_name: "dblVal".into(),
4055                value: PvField::Scalar(ScalarValue::Double(1.5)),
4056            },
4057        ];
4058        let field = PvField::UnionArray(items);
4059
4060        let wire = encode_pv_field_self_describing(&field, &canonical);
4061        let mut cur = Cursor::new(wire.as_slice());
4062        let desc_back = decode_type_desc(&mut cur, ByteOrder::Big).expect("desc");
4063        let val_back = decode_pv_field(&desc_back, &mut cur, ByteOrder::Big).expect("value");
4064        let PvField::UnionArray(items_back) = val_back else {
4065            panic!("expected UnionArray, got {val_back:?}");
4066        };
4067        assert_eq!(items_back.len(), 2);
4068        assert_eq!(items_back[0].selector, 0);
4069        assert!(matches!(
4070            items_back[0].value,
4071            PvField::Scalar(ScalarValue::Int(42))
4072        ));
4073        assert_eq!(items_back[1].selector, 1);
4074        match &items_back[1].value {
4075            PvField::Scalar(ScalarValue::Double(d)) => assert!((*d - 1.5).abs() < 1e-9),
4076            other => panic!("variant 1 not Double, got {other:?}"),
4077        }
4078
4079        // Descriptor's variants list survives the wire round-trip
4080        // (this is what was lost in the pre-plumb-through behaviour).
4081        match desc_back {
4082            FieldDesc::UnionArray { variants, .. } => {
4083                assert_eq!(variants.len(), 2);
4084                assert_eq!(variants[0].0, "intVal");
4085                assert!(matches!(variants[0].1, FieldDesc::Scalar(ScalarType::Int)));
4086                assert_eq!(variants[1].0, "dblVal");
4087                assert!(matches!(
4088                    variants[1].1,
4089                    FieldDesc::Scalar(ScalarType::Double)
4090                ));
4091            }
4092            other => panic!("expected UnionArray descriptor, got {other:?}"),
4093        }
4094    }
4095
4096    /// NTNDArray-style root: `value` is a `Union` over ten POD scalar
4097    /// array variants (one for each NT-spec POD type). Live PVA only
4098    /// ever ships ONE variant per sample, but the channel-INIT
4099    /// descriptor advertises all ten — and the archiver invariant
4100    /// requires every V4GenericBytes sample to carry the channel-INIT
4101    /// descriptor verbatim. This test pushes a `PvField` shaped like
4102    /// a live monitor event through `pv_field_scalar_to_archiver`
4103    /// using a multi-variant canonical descriptor, then decodes the
4104    /// resulting wire bytes and confirms ALL ten variant slots
4105    /// survive on disk. Previously (descriptor-recovery fallback)
4106    /// only the active variant would round-trip — regression guard
4107    /// for that bug.
4108    #[test]
4109    fn nt_nd_array_union_preserves_all_variants_via_canonical_desc() {
4110        use epics_rs::pva::pvdata::FieldDesc;
4111        use epics_rs::pva::pvdata::encode::{decode_pv_field, decode_type_desc};
4112        use std::io::Cursor;
4113
4114        // The ten NT-spec POD variants for NTNDArray::value.
4115        let variant_specs: &[(&str, ScalarType)] = &[
4116            ("booleanValue", ScalarType::Boolean),
4117            ("byteValue", ScalarType::Byte),
4118            ("shortValue", ScalarType::Short),
4119            ("intValue", ScalarType::Int),
4120            ("longValue", ScalarType::Long),
4121            ("ubyteValue", ScalarType::UByte),
4122            ("ushortValue", ScalarType::UShort),
4123            ("uintValue", ScalarType::UInt),
4124            ("ulongValue", ScalarType::ULong),
4125            ("floatValue", ScalarType::Float),
4126            ("doubleValue", ScalarType::Double),
4127        ];
4128        // Build the inner union descriptor with all eleven variants.
4129        let union_variants: Vec<(String, FieldDesc)> = variant_specs
4130            .iter()
4131            .map(|(n, st)| (n.to_string(), FieldDesc::ScalarArray(*st)))
4132            .collect();
4133        let canonical = FieldDesc::Structure {
4134            struct_id: "epics:nt/NTNDArray:1.0".into(),
4135            fields: vec![(
4136                "value".into(),
4137                FieldDesc::Union {
4138                    struct_id: String::new(),
4139                    variants: union_variants.clone(),
4140                },
4141            )],
4142        };
4143
4144        // Build the runtime value: union currently holds the
4145        // `doubleValue` variant (selector index 10) — a typical live
4146        // sample shape.
4147        let mut root = PvStructure::new("epics:nt/NTNDArray:1.0");
4148        root.fields.push((
4149            "value".into(),
4150            PvField::Union {
4151                selector: 10,
4152                variant_name: "doubleValue".into(),
4153                value: Box::new(PvField::ScalarArrayTyped(TypedScalarArray::Double(
4154                    vec![1.0, 2.0, 3.0].into(),
4155                ))),
4156            },
4157        ));
4158        let pv = PvField::Structure(root);
4159
4160        // Push through the V4 archiver pipeline.
4161        let av = pv_field_scalar_to_archiver(&pv, &canonical).expect("converted");
4162        let bytes = match av {
4163            ArchiverValue::V4GenericBytes(b) => b,
4164            other => panic!("expected V4GenericBytes, got {:?}", other.db_type()),
4165        };
4166
4167        // Decode and confirm:
4168        //   (a) the active variant carries the live value;
4169        //   (b) the descriptor advertises all eleven variants (the
4170        //       sibling variants that descriptor-recovery would lose).
4171        let mut cur = Cursor::new(bytes.as_slice());
4172        let desc_back = decode_type_desc(&mut cur, ByteOrder::Big).expect("desc");
4173        let decoded = decode_pv_field(&desc_back, &mut cur, ByteOrder::Big).expect("value");
4174        let PvField::Structure(s) = decoded else {
4175            panic!("expected Structure root");
4176        };
4177        let value = s
4178            .fields
4179            .iter()
4180            .find_map(|(n, f)| if n == "value" { Some(f) } else { None })
4181            .expect("value");
4182        let PvField::Union {
4183            selector,
4184            variant_name,
4185            value: inner,
4186        } = value
4187        else {
4188            panic!("expected Union, got {value:?}");
4189        };
4190        assert_eq!(*selector, 10);
4191        assert_eq!(variant_name, "doubleValue");
4192        let PvField::ScalarArrayTyped(TypedScalarArray::Double(arr)) = inner.as_ref() else {
4193            panic!("expected Double[]");
4194        };
4195        assert_eq!(arr.as_ref(), &[1.0, 2.0, 3.0]);
4196
4197        // The decoded descriptor's union variants list survived.
4198        let value_field_desc = match &desc_back {
4199            FieldDesc::Structure { fields, .. } => fields
4200                .iter()
4201                .find_map(|(n, f)| if n == "value" { Some(f) } else { None })
4202                .expect("value field in desc"),
4203            other => panic!("expected Structure descriptor, got {other:?}"),
4204        };
4205        let FieldDesc::Union {
4206            variants: v_back, ..
4207        } = value_field_desc
4208        else {
4209            panic!("expected Union descriptor for value");
4210        };
4211        assert_eq!(
4212            v_back.len(),
4213            variant_specs.len(),
4214            "all sibling Union variants must survive the wire round-trip — \
4215             this is what value-recovery descriptor would have dropped"
4216        );
4217        for (i, (expected_name, expected_st)) in variant_specs.iter().enumerate() {
4218            assert_eq!(&v_back[i].0, expected_name);
4219            assert!(matches!(v_back[i].1, FieldDesc::ScalarArray(st) if st == *expected_st));
4220        }
4221    }
4222
4223    /// User-defined Structure that LOOKS like NTEnum but lacks the
4224    /// `epics:nt/NTEnum` struct_id is NOT treated as one — falls
4225    /// through to V4GenericBytes. Guards against false positives on
4226    /// custom structs that happen to use `index`/`choices` field
4227    /// names.
4228    #[test]
4229    fn structure_without_nt_enum_id_is_v4_bytes() {
4230        let mut inner = PvStructure::new("");
4231        inner
4232            .fields
4233            .push(("index".into(), PvField::Scalar(ScalarValue::Int(1))));
4234        inner.fields.push((
4235            "choices".into(),
4236            PvField::ScalarArrayTyped(TypedScalarArray::String(
4237                vec!["a".to_string(), "b".to_string()].into_iter().collect(),
4238            )),
4239        ));
4240        let mut root = PvStructure::new("my:custom/Thing:1.0");
4241        root.fields
4242            .push(("value".into(), PvField::Structure(inner)));
4243        let pv = PvField::Structure(root);
4244        let (db, _) = pv_field_to_arch_db_type(&pv).expect("classified");
4245        assert_eq!(db, ArchDbType::V4GenericBytes);
4246    }
4247
4248    fn make_nt_enum(index: i32, choices: &[&str]) -> PvField {
4249        let mut inner = PvStructure::new("enum_t");
4250        inner
4251            .fields
4252            .push(("index".into(), PvField::Scalar(ScalarValue::Int(index))));
4253        let arr = TypedScalarArray::String(
4254            choices
4255                .iter()
4256                .map(|s| s.to_string())
4257                .collect::<Vec<_>>()
4258                .into(),
4259        );
4260        inner
4261            .fields
4262            .push(("choices".into(), PvField::ScalarArrayTyped(arr)));
4263        let mut root = PvStructure::new("epics:nt/NTEnum:1.0");
4264        root.fields
4265            .push(("value".into(), PvField::Structure(inner)));
4266        PvField::Structure(root)
4267    }
4268
4269    /// Bare NTScalar of doubles still classifies as ScalarDouble and
4270    /// converts to ScalarDouble — regression guard for the pre-existing
4271    /// path that earlier callers depended on.
4272    #[test]
4273    fn nt_scalar_double_still_scalar() {
4274        let mut wrapper = PvStructure::new("epics:nt/NTScalar:1.0");
4275        wrapper
4276            .fields
4277            .push(("value".into(), PvField::Scalar(ScalarValue::Double(42.5))));
4278        let pv = PvField::Structure(wrapper);
4279        let (db, ec) = pv_field_to_arch_db_type(&pv).expect("classified");
4280        assert_eq!(db, ArchDbType::ScalarDouble);
4281        assert_eq!(ec, 1);
4282        match pv_field_scalar_to_archiver(&pv, &pv.descriptor()).expect("converted") {
4283            ArchiverValue::ScalarDouble(v) => assert!((v - 42.5).abs() < 1e-9),
4284            other => panic!("expected ScalarDouble, got {:?}", other.db_type()),
4285        }
4286    }
4287}