Skip to main content

archiver_engine/
channel_manager.rs

1use std::sync::atomic::{AtomicI32, AtomicI64, AtomicU64, Ordering};
2use std::sync::{Arc, Mutex};
3use std::time::{Duration, SystemTime};
4
5use dashmap::DashMap;
6use epics_rs::base::types::{DbFieldType, EpicsValue};
7use epics_rs::ca::client::{CaChannel, CaClient, ConnectionEvent};
8use tokio::sync::mpsc;
9use tokio_util::sync::CancellationToken;
10use tracing::{debug, error, info, warn};
11
12use archiver_core::registry::{PvRecord, PvRegistry, PvStatus, SampleMode};
13use archiver_core::storage::traits::{AppendMeta, StoragePlugin};
14use archiver_core::types::{ArchDbType, ArchiverSample, ArchiverValue};
15
16use crate::policy::PolicyConfig;
17
18/// Timeout for initial CA channel connection.
19const CA_CONNECT_TIMEOUT: Duration = Duration::from_secs(10);
20/// Timeout for CA reconnection attempts in the monitor loop.
21const CA_RECONNECT_TIMEOUT: Duration = Duration::from_secs(30);
22/// Delay before retrying a failed CA subscription.
23const CA_RETRY_DELAY: Duration = Duration::from_secs(5);
24
25/// Hard floor on accepted timestamps. Mirrors Java's `PAST_CUTOFF_TIMESTAMP`
26/// of 1991-01-01 — earlier than that, the timestamp is almost certainly a
27/// stale uninitialised IOC clock or a sentinel.
28const PAST_CUTOFF_UNIX_SECS: i64 = 662_688_000; // 1991-01-01 00:00:00 UTC
29
30/// Filter a freshly-received sample timestamp against the wall clock and
31/// the floor. Returns `Some(ts)` if accepted, `None` if it should be
32/// dropped (caller bumps `timestamp_drops`).
33///
34/// `drift_secs` is the configured `server_ioc_drift_secs` (Java parity
35/// 6538631), so per-site tuning doesn't require recompiling. `now` is
36/// passed in (rather than calling `SystemTime::now()` here) so the test
37/// suite can pin time deterministically.
38fn ioc_timestamp_in_window(ts: SystemTime, now: SystemTime, drift_secs: u64) -> bool {
39    let unix = ts
40        .duration_since(SystemTime::UNIX_EPOCH)
41        .map(|d| d.as_secs() as i64)
42        .unwrap_or(i64::MIN);
43    if unix < PAST_CUTOFF_UNIX_SECS {
44        return false;
45    }
46    // Within ±drift_secs of `now`?
47    let now_unix = now
48        .duration_since(SystemTime::UNIX_EPOCH)
49        .map(|d| d.as_secs() as i64)
50        .unwrap_or(0);
51    let delta = (unix - now_unix).unsigned_abs();
52    delta <= drift_secs
53}
54
55/// Discrete connection state for `getPVDetails` (Java parity dea7acb).
56/// Distinguishes never-connected from connecting from confirmed-down.
57#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
58pub enum PvConnectionState {
59    /// No connection attempt has been made (or none has progressed
60    /// past `wait_connected` yet).
61    #[default]
62    Idle,
63    /// Currently waiting on `wait_connected` for the first time on
64    /// this channel handle.
65    Connecting,
66    /// Channel has reported a successful connect; samples are flowing
67    /// or the channel is otherwise live.
68    Connected,
69    /// Channel reported connect at least once but the monitor loop has
70    /// since dropped — distinct from `Idle` so operators can spot a
71    /// regression vs a never-resolved name.
72    Disconnected,
73}
74
75impl PvConnectionState {
76    pub fn as_str(self) -> &'static str {
77        match self {
78            Self::Idle => "Idle",
79            Self::Connecting => "Connecting",
80            Self::Connected => "Connected",
81            Self::Disconnected => "Disconnected",
82        }
83    }
84}
85
86/// Connection state tracked per PV.
87#[derive(Debug, Clone, Default)]
88pub struct ConnectionInfo {
89    pub connected_since: Option<SystemTime>,
90    pub last_event_time: Option<SystemTime>,
91    pub is_connected: bool,
92    /// Java parity (dea7acb): discrete connection state for the
93    /// PVDetails report. Operators can distinguish never-connected,
94    /// currently-connecting, and previously-connected-now-down.
95    pub state: PvConnectionState,
96}
97
98/// Per-PV diagnostic counters for the BPL drop / rate / connection
99/// reports. All counts are monotonic across the PV's lifetime; the
100/// rate handlers compute deltas against `first_event_unix_secs`.
101///
102/// Tracked here (not in ConnectionInfo) so they survive transient
103/// disconnects and are read lock-free from the report endpoints.
104#[derive(Debug)]
105pub struct PvCounters {
106    /// Total events produced by monitor/scan, including those that
107    /// later got dropped before write.
108    pub events_received: AtomicU64,
109    /// Total events successfully written to storage.
110    pub events_stored: AtomicU64,
111    /// Unix-epoch seconds of the first event we ever saw for this PV.
112    /// 0 = "no event yet" (i64 because Atomic<Option<...>> doesn't exist).
113    pub first_event_unix_secs: AtomicI64,
114    /// Number of events the bounded sample channel rejected (write
115    /// loop falling behind producer). Surfaces as
116    /// `getDroppedEventsBufferOverflowReport`.
117    pub buffer_overflow_drops: AtomicU64,
118    /// Number of events whose timestamp went backwards relative to the
119    /// previously-stored event. Java archiver's
120    /// `DroppedEventsTimestampReport`.
121    pub timestamp_drops: AtomicU64,
122    /// Number of events whose runtime DBR type didn't match the
123    /// PvRecord's stored type — the engine drops these because mixing
124    /// types in one PB partition would corrupt downstream readers.
125    pub type_change_drops: AtomicU64,
126    /// Number of disconnect transitions seen on this PV's CA channel.
127    /// `LostConnectionsReport`.
128    pub disconnect_count: AtomicU64,
129    /// Last unix-epoch seconds of disconnect transition.
130    pub last_disconnect_unix_secs: AtomicI64,
131    /// Number of transient subscribe / monitor-recv / scan-read errors
132    /// (Java parity 8fe73eb). Distinct from `disconnect_count` —
133    /// these are recoverable per-attempt failures rather than confirmed
134    /// link drops.
135    pub transient_error_count: AtomicU64,
136    /// Latest DBR type observed from CA that did not match the
137    /// archive-time recorded type (Java parity 9f2234f). `-1` = no
138    /// mismatch ever seen.
139    pub latest_observed_dbr: AtomicI32,
140}
141
142impl Default for PvCounters {
143    fn default() -> Self {
144        Self {
145            events_received: AtomicU64::new(0),
146            events_stored: AtomicU64::new(0),
147            first_event_unix_secs: AtomicI64::new(0),
148            buffer_overflow_drops: AtomicU64::new(0),
149            timestamp_drops: AtomicU64::new(0),
150            type_change_drops: AtomicU64::new(0),
151            disconnect_count: AtomicU64::new(0),
152            last_disconnect_unix_secs: AtomicI64::new(0),
153            transient_error_count: AtomicU64::new(0),
154            // -1 sentinel = no type mismatch ever observed.
155            latest_observed_dbr: AtomicI32::new(-1),
156        }
157    }
158}
159
160/// Read-only snapshot of `PvCounters` — owned values so callers can
161/// move them across threads / serialise without reaching into Atomics.
162#[derive(Debug, Clone)]
163pub struct PvCountersSnapshot {
164    pub events_received: u64,
165    pub events_stored: u64,
166    pub first_event_unix_secs: Option<i64>,
167    pub buffer_overflow_drops: u64,
168    pub timestamp_drops: u64,
169    pub type_change_drops: u64,
170    pub disconnect_count: u64,
171    pub last_disconnect_unix_secs: Option<i64>,
172    pub transient_error_count: u64,
173    /// `Some(dbr_type as i32)` if a type-change mismatch has been
174    /// observed; `None` otherwise (Java parity 9f2234f).
175    pub latest_observed_dbr: Option<i32>,
176}
177
178impl From<&PvCounters> for PvCountersSnapshot {
179    fn from(c: &PvCounters) -> Self {
180        let first = c.first_event_unix_secs.load(Ordering::Relaxed);
181        let last_disc = c.last_disconnect_unix_secs.load(Ordering::Relaxed);
182        Self {
183            events_received: c.events_received.load(Ordering::Relaxed),
184            events_stored: c.events_stored.load(Ordering::Relaxed),
185            first_event_unix_secs: if first == 0 { None } else { Some(first) },
186            buffer_overflow_drops: c.buffer_overflow_drops.load(Ordering::Relaxed),
187            timestamp_drops: c.timestamp_drops.load(Ordering::Relaxed),
188            type_change_drops: c.type_change_drops.load(Ordering::Relaxed),
189            disconnect_count: c.disconnect_count.load(Ordering::Relaxed),
190            last_disconnect_unix_secs: if last_disc == 0 {
191                None
192            } else {
193                Some(last_disc)
194            },
195            transient_error_count: c.transient_error_count.load(Ordering::Relaxed),
196            latest_observed_dbr: match c.latest_observed_dbr.load(Ordering::Relaxed) {
197                -1 => None,
198                v => Some(v),
199            },
200        }
201    }
202}
203
204/// Handle for a running PV archiving task.
205struct PvHandle {
206    #[allow(dead_code)]
207    channel: CaChannel,
208    cancel_token: CancellationToken,
209    #[allow(dead_code)]
210    dbr_type: ArchDbType,
211    conn_info: Arc<Mutex<ConnectionInfo>>,
212    /// Latest values of metadata fields (.HIHI, .LOLO, .EGU, ...) attached to
213    /// every sample emitted for this PV. Populated by per-field monitor tasks
214    /// owned by `cancel_token` (and per-field child tokens in `field_tokens`),
215    /// so stopping the PV stops all of them.
216    extras: Arc<ExtraFieldsCache>,
217    /// Per-field cancellation tokens — child tokens of `cancel_token`. Keyed
218    /// by field name (e.g. "HIHI"). Lets `update_archive_fields` cancel one
219    /// field's task without disturbing the others or the main PV.
220    field_tokens: Arc<DashMap<String, CancellationToken>>,
221    /// Serialises concurrent `update_archive_fields` calls for this PV so
222    /// add/remove/respawn never race with itself.
223    update_lock: Arc<tokio::sync::Mutex<()>>,
224    /// Diagnostic counters surfaced through the BPL drop / rate /
225    /// connection reports. Lock-free reads; updates from the producer
226    /// (monitor/scan) and the writer happen on different threads.
227    counters: Arc<PvCounters>,
228}
229
230/// Thread-safe cache of latest extra-field values for one PV.
231/// Each map entry is `(field_name, stringified_value)`.
232type ExtraFieldsCache = DashMap<String, String>;
233
234/// Default capacity for the bounded sample channel.
235/// This limits memory usage when producers outpace the storage writer.
236/// At ~200 bytes per sample, 500K entries ≈ 100 MB worst-case.
237const SAMPLE_CHANNEL_CAPACITY: usize = 500_000;
238
239/// RAII guard that removes a key from `pending_archives` on drop,
240/// ensuring cleanup even if the owning future is cancelled.
241struct PendingGuard<'a> {
242    map: &'a DashMap<String, ()>,
243    key: String,
244}
245
246impl Drop for PendingGuard<'_> {
247    fn drop(&mut self) {
248        self.map.remove(&self.key);
249    }
250}
251
252/// Manages EPICS Channel Access connections and dispatches archived samples to storage.
253pub struct ChannelManager {
254    /// The CA client context.
255    ca_client: CaClient,
256    /// Active channels: PV name → handle with cancellation.
257    channels: DashMap<String, PvHandle>,
258    /// PVs currently being archived (in-progress CA connect). Prevents TOCTOU races
259    /// where two concurrent `archive_pv` calls could double-subscribe the same PV.
260    pending_archives: DashMap<String, ()>,
261    /// Per-PV mutex serialising archive/pause/resume/stop/destroy on a single
262    /// PV. Without this, e.g. `pause_pv` racing with `resume_pv` can leave
263    /// the registry status and the channel map disagreeing.
264    op_locks: DashMap<String, Arc<tokio::sync::Mutex<()>>>,
265    /// Storage backend.
266    #[allow(dead_code)]
267    storage: Arc<dyn StoragePlugin>,
268    /// PV metadata registry.
269    registry: Arc<PvRegistry>,
270    /// Sample sender for the write thread.
271    sample_tx: mpsc::Sender<PvSample>,
272    /// Optional policy configuration.
273    policy: Option<PolicyConfig>,
274    /// Per-site IOC drift bound (Java parity 6538631).
275    server_ioc_drift_secs: u64,
276}
277
278/// A sample ready to be written to storage.
279pub struct PvSample {
280    pub pv_name: String,
281    pub dbr_type: ArchDbType,
282    pub sample: ArchiverSample,
283    pub element_count: Option<i32>,
284    /// Counter handle used by the write loop to record timestamp /
285    /// type-change drops. None on samples produced before counter
286    /// support was wired up — write_loop tolerates the absence.
287    pub counters: Option<Arc<PvCounters>>,
288}
289
290impl ChannelManager {
291    pub async fn new(
292        storage: Arc<dyn StoragePlugin>,
293        registry: Arc<PvRegistry>,
294        policy: Option<PolicyConfig>,
295    ) -> anyhow::Result<(Self, mpsc::Receiver<PvSample>)> {
296        Self::new_with_drift(storage, registry, policy, 30 * 60).await
297    }
298
299    /// Construct with an explicit IOC drift bound. Java parity (6538631):
300    /// keeps tests + sites that don't surface `EngineConfig` on the
301    /// existing default while letting the daemon plumb a configured value.
302    pub async fn new_with_drift(
303        storage: Arc<dyn StoragePlugin>,
304        registry: Arc<PvRegistry>,
305        policy: Option<PolicyConfig>,
306        server_ioc_drift_secs: u64,
307    ) -> anyhow::Result<(Self, mpsc::Receiver<PvSample>)> {
308        let ca_client = CaClient::new().await.map_err(|e| anyhow::anyhow!("{e}"))?;
309        let (tx, rx) = mpsc::channel(SAMPLE_CHANNEL_CAPACITY);
310
311        let mgr = Self {
312            ca_client,
313            channels: DashMap::new(),
314            pending_archives: DashMap::new(),
315            op_locks: DashMap::new(),
316            storage,
317            registry,
318            sample_tx: tx,
319            policy,
320            server_ioc_drift_secs,
321        };
322
323        Ok((mgr, rx))
324    }
325
326    /// Get-or-insert the per-PV operation mutex. The returned `Arc<Mutex>`
327    /// is what callers should `.lock().await` on; holding the entry guard
328    /// (via `entry().or_insert_with`) across the await would deadlock the
329    /// DashMap shard.
330    fn op_lock(&self, pv_name: &str) -> Arc<tokio::sync::Mutex<()>> {
331        if let Some(existing) = self.op_locks.get(pv_name) {
332            return existing.clone();
333        }
334        self.op_locks
335            .entry(pv_name.to_string())
336            .or_insert_with(|| Arc::new(tokio::sync::Mutex::new(())))
337            .clone()
338    }
339
340    /// Restore all active PVs from the registry (called on startup).
341    ///
342    /// `pvs_by_status(Active)` already filters out alias rows (they carry
343    /// `status='alias'`), but we re-check `alias_for.is_some()` here so a
344    /// future schema change can't silently re-introduce double-archiving
345    /// of the underlying IOC PV.
346    pub async fn restore_from_registry(&self) -> anyhow::Result<u64> {
347        let active_pvs = self.registry.pvs_by_status(PvStatus::Active)?;
348        let total = active_pvs.len() as u64;
349        info!(total, "Restoring PVs from registry");
350
351        let mut restored = 0u64;
352        for record in active_pvs {
353            if record.alias_for.is_some() {
354                warn!(
355                    pv = record.pv_name,
356                    target = record.alias_for.as_deref(),
357                    "Skipping alias row in restore; aliases are routed, not archived"
358                );
359                continue;
360            }
361            if let Err(e) = self.start_archiving_internal(&record).await {
362                warn!(pv = record.pv_name, "Failed to restore PV: {e}");
363                self.registry.set_status(&record.pv_name, PvStatus::Error)?;
364            } else {
365                restored += 1;
366            }
367        }
368        metrics::gauge!("archiver_pvs_active").set(restored as f64);
369        if restored < total {
370            warn!(
371                restored,
372                failed = total - restored,
373                "Some PVs failed to restore"
374            );
375        }
376
377        Ok(restored)
378    }
379
380    /// Start archiving a new PV.
381    pub async fn archive_pv(&self, pv_name: &str, sample_mode: &SampleMode) -> anyhow::Result<()> {
382        // Serialise with pause/resume/stop/destroy on the same PV.
383        let lock = self.op_lock(pv_name);
384        let _g = lock.lock().await;
385
386        if self.channels.contains_key(pv_name) {
387            anyhow::bail!("PV {pv_name} is already being archived");
388        }
389
390        // Atomically claim the PV to prevent concurrent archive_pv races.
391        // The guard ensures cleanup even if this future is cancelled.
392        if self
393            .pending_archives
394            .insert(pv_name.to_string(), ())
395            .is_some()
396        {
397            anyhow::bail!("PV {pv_name} archive operation already in progress");
398        }
399        let _guard = PendingGuard {
400            map: &self.pending_archives,
401            key: pv_name.to_string(),
402        };
403
404        self.archive_pv_inner(pv_name, sample_mode).await
405    }
406
407    /// Inner implementation of archive_pv, separated for cleanup safety.
408    async fn archive_pv_inner(
409        &self,
410        pv_name: &str,
411        sample_mode: &SampleMode,
412    ) -> anyhow::Result<()> {
413        // Re-check after acquiring the pending slot (another task may have completed).
414        if self.channels.contains_key(pv_name) {
415            anyhow::bail!("PV {pv_name} is already being archived");
416        }
417
418        // Check policy override.
419        let (effective_mode, matched_policy_name) = if let Some(ref policy) = self.policy {
420            if let Some(p) = policy.find_policy(pv_name) {
421                (p.to_sample_mode(), Some(p.policy_name().to_string()))
422            } else {
423                (sample_mode.clone(), None)
424            }
425        } else {
426            (sample_mode.clone(), None)
427        };
428
429        // Connect to discover the native type.
430        let channel = self.ca_client.create_channel(pv_name);
431        channel
432            .wait_connected(CA_CONNECT_TIMEOUT)
433            .await
434            .map_err(|e| anyhow::anyhow!("Failed to connect to {pv_name}: {e}"))?;
435
436        let info = self
437            .ca_client
438            .cainfo(pv_name)
439            .await
440            .map_err(|e| anyhow::anyhow!("Failed to get info for {pv_name}: {e}"))?;
441
442        let dbr_type = dbr_field_to_arch_type(info.native_type);
443        let element_count = info.element_count as i32;
444
445        // Register in SQLite.
446        self.registry
447            .register_pv(pv_name, dbr_type, &effective_mode, element_count)?;
448        // Java parity (b30f1a6): persist the matched policy's stable name so
449        // audit / metrics paths know which policy governed this archive.
450        if let Some(ref name) = matched_policy_name {
451            self.registry.update_policy_name(pv_name, Some(name))?;
452        }
453
454        let record = self
455            .registry
456            .get_pv(pv_name)?
457            .ok_or_else(|| anyhow::anyhow!("PV {pv_name} not found in registry"))?;
458        self.start_archiving_internal(&record).await?;
459
460        metrics::gauge!("archiver_pvs_active").increment(1.0);
461        info!(pv = pv_name, ?dbr_type, element_count, "Started archiving");
462        Ok(())
463    }
464
465    /// Internal: start CA subscription for a PV record.
466    async fn start_archiving_internal(&self, record: &PvRecord) -> anyhow::Result<()> {
467        let pv_name = record.pv_name.clone();
468        let dbr_type = record.dbr_type;
469        let element_count = record.element_count;
470        let channel = self.ca_client.create_channel(&pv_name);
471        let cancel_token = CancellationToken::new();
472        let conn_info = Arc::new(Mutex::new(ConnectionInfo::default()));
473        let extras: Arc<ExtraFieldsCache> = Arc::new(DashMap::new());
474        let field_tokens: Arc<DashMap<String, CancellationToken>> = Arc::new(DashMap::new());
475        let update_lock = Arc::new(tokio::sync::Mutex::new(()));
476        let counters = Arc::new(PvCounters::default());
477
478        // Hold update_lock around the whole insert+spawn block so a
479        // concurrent update_archive_fields can't observe the empty
480        // field_tokens map and spawn its own copies of the same fields
481        // before we get to the spawn loop. update_archive_fields acquires
482        // the same update_lock before mutating tokens.
483        let _guard = update_lock.lock().await;
484
485        self.channels.insert(
486            pv_name.clone(),
487            PvHandle {
488                channel: channel.clone(),
489                cancel_token: cancel_token.clone(),
490                dbr_type,
491                conn_info: conn_info.clone(),
492                extras: extras.clone(),
493                field_tokens: field_tokens.clone(),
494                update_lock: update_lock.clone(),
495                counters: counters.clone(),
496            },
497        );
498
499        // Start one monitor task per archive_field with a child cancel token,
500        // tracked so update_archive_fields can stop individual fields.
501        for field in &record.archive_fields {
502            let child = cancel_token.child_token();
503            field_tokens.insert(field.clone(), child.clone());
504            spawn_extra_field_monitor(
505                &self.ca_client,
506                &pv_name,
507                field,
508                extras.clone(),
509                child,
510                counters.clone(),
511            );
512        }
513        metrics::gauge!("archiver_extra_field_tasks").increment(record.archive_fields.len() as f64);
514        drop(_guard);
515
516        let tx = self.sample_tx.clone();
517        let token = cancel_token.clone();
518        let ci = conn_info.clone();
519        let extras_for_loop = extras.clone();
520        let counters_for_loop = counters.clone();
521
522        let drift = self.server_ioc_drift_secs;
523        match &record.sample_mode {
524            SampleMode::Monitor => {
525                tokio::spawn(async move {
526                    monitor_loop(
527                        pv_name,
528                        dbr_type,
529                        element_count,
530                        channel,
531                        tx,
532                        token,
533                        ci,
534                        extras_for_loop,
535                        counters_for_loop,
536                        drift,
537                    )
538                    .await;
539                });
540            }
541            SampleMode::Scan { period_secs } => {
542                let period = *period_secs;
543                tokio::spawn(async move {
544                    scan_loop(
545                        pv_name,
546                        dbr_type,
547                        element_count,
548                        channel,
549                        tx,
550                        token,
551                        period,
552                        ci,
553                        extras_for_loop,
554                        counters_for_loop,
555                    )
556                    .await;
557                });
558            }
559        }
560
561        Ok(())
562    }
563
564    /// Replace the archive_fields list for a running PV. Cancels per-field
565    /// monitor tasks for fields that left the set, spawns fresh ones for
566    /// fields that joined, and leaves unchanged fields running. Serialised
567    /// per-PV by an async mutex so concurrent callers can't double-spawn.
568    /// The main PV keeps running.
569    pub async fn update_archive_fields(
570        &self,
571        pv_name: &str,
572        fields: &[String],
573    ) -> anyhow::Result<()> {
574        // Persist first so a restart sees the new set even if the engine
575        // half of the update fails partway through.
576        self.registry.update_archive_fields(pv_name, fields)?;
577
578        // If the PV isn't currently active there's nothing more to do —
579        // start_archiving_internal will pick up the new fields on resume.
580        let (parent_token, extras, field_tokens, update_lock, counters) = {
581            let Some(handle) = self.channels.get(pv_name) else {
582                return Ok(());
583            };
584            (
585                handle.cancel_token.clone(),
586                handle.extras.clone(),
587                handle.field_tokens.clone(),
588                handle.update_lock.clone(),
589                handle.counters.clone(),
590            )
591        };
592
593        // Serialise so two concurrent updates can't both decide the same
594        // field is missing and spawn it twice.
595        let _guard = update_lock.lock().await;
596
597        let wanted: std::collections::HashSet<&str> = fields.iter().map(|s| s.as_str()).collect();
598
599        // Cancel + drop tasks for fields that left the set. Removing the
600        // entry from `field_tokens` also drops our handle on the child
601        // token; the spawned task observes `cancelled()` and exits.
602        let to_remove: Vec<String> = field_tokens
603            .iter()
604            .filter(|e| !wanted.contains(e.key().as_str()))
605            .map(|e| e.key().clone())
606            .collect();
607        let removed_count = to_remove.len();
608        for key in to_remove {
609            if let Some((_, token)) = field_tokens.remove(&key) {
610                token.cancel();
611            }
612            extras.remove(&key);
613        }
614
615        // Spawn tasks for fields newly added. Existing fields keep their
616        // task and their last cached value.
617        let mut added_count = 0usize;
618        for f in fields {
619            if !field_tokens.contains_key(f) {
620                let child = parent_token.child_token();
621                field_tokens.insert(f.clone(), child.clone());
622                spawn_extra_field_monitor(
623                    &self.ca_client,
624                    pv_name,
625                    f,
626                    extras.clone(),
627                    child,
628                    counters.clone(),
629                );
630                added_count += 1;
631            }
632        }
633        let net = added_count as i64 - removed_count as i64;
634        if net != 0 {
635            metrics::gauge!("archiver_extra_field_tasks").increment(net as f64);
636        }
637        Ok(())
638    }
639
640    /// Pause archiving for a PV.
641    pub async fn pause_pv(&self, pv_name: &str) -> anyhow::Result<()> {
642        let lock = self.op_lock(pv_name);
643        let _g = lock.lock().await;
644        if let Some((_key, handle)) = self.channels.remove(pv_name) {
645            let extra_count = handle.field_tokens.len() as f64;
646            handle.cancel_token.cancel();
647            metrics::gauge!("archiver_pvs_active").decrement(1.0);
648            if extra_count > 0.0 {
649                metrics::gauge!("archiver_extra_field_tasks").decrement(extra_count);
650            }
651        }
652        self.registry.set_status(pv_name, PvStatus::Paused)?;
653        info!(pv = pv_name, "Paused archiving");
654        Ok(())
655    }
656
657    /// Resume a paused PV. Only paused or error PVs can be resumed;
658    /// calling resume on an already-active PV is a no-op (returns Ok).
659    pub async fn resume_pv(&self, pv_name: &str) -> anyhow::Result<()> {
660        let lock = self.op_lock(pv_name);
661        let _g = lock.lock().await;
662
663        let record = self
664            .registry
665            .get_pv(pv_name)?
666            .ok_or_else(|| anyhow::anyhow!("PV {pv_name} not found in registry"))?;
667
668        // Guard: if already active with a live task, nothing to do.
669        if record.status == PvStatus::Active && self.channels.contains_key(pv_name) {
670            info!(
671                pv = pv_name,
672                "PV is already actively archived, skipping resume"
673            );
674            return Ok(());
675        }
676
677        // Cancel any orphaned task to prevent duplicate subscriptions.
678        if let Some((_key, handle)) = self.channels.remove(pv_name) {
679            let extra_count = handle.field_tokens.len() as f64;
680            handle.cancel_token.cancel();
681            if extra_count > 0.0 {
682                metrics::gauge!("archiver_extra_field_tasks").decrement(extra_count);
683            }
684        }
685
686        // Start the task first; only mark Active if it succeeds.
687        self.start_archiving_internal(&record).await?;
688        self.registry.set_status(pv_name, PvStatus::Active)?;
689        metrics::gauge!("archiver_pvs_active").increment(1.0);
690        info!(pv = pv_name, "Resumed archiving");
691        Ok(())
692    }
693
694    /// Stop archiving a PV without removing it from the registry.
695    /// Sets the PV status to Inactive (data retained, monitoring stopped).
696    pub async fn stop_pv(&self, pv_name: &str) -> anyhow::Result<()> {
697        let lock = self.op_lock(pv_name);
698        let _g = lock.lock().await;
699        if let Some((_key, handle)) = self.channels.remove(pv_name) {
700            let extra_count = handle.field_tokens.len() as f64;
701            handle.cancel_token.cancel();
702            metrics::gauge!("archiver_pvs_active").decrement(1.0);
703            if extra_count > 0.0 {
704                metrics::gauge!("archiver_extra_field_tasks").decrement(extra_count);
705            }
706        }
707        self.registry.set_status(pv_name, PvStatus::Inactive)?;
708        info!(pv = pv_name, "Stopped archiving (inactive)");
709        Ok(())
710    }
711
712    /// Remove a PV from archiving entirely.
713    pub async fn destroy_pv(&self, pv_name: &str) -> anyhow::Result<()> {
714        let lock = self.op_lock(pv_name);
715        let _g = lock.lock().await;
716        if let Some((_key, handle)) = self.channels.remove(pv_name) {
717            let extra_count = handle.field_tokens.len() as f64;
718            handle.cancel_token.cancel();
719            metrics::gauge!("archiver_pvs_active").decrement(1.0);
720            if extra_count > 0.0 {
721                metrics::gauge!("archiver_extra_field_tasks").decrement(extra_count);
722            }
723        }
724        self.registry.remove_pv(pv_name)?;
725        // Don't remove the op_locks entry here: a concurrent caller may have
726        // already taken a clone of this Arc and be queued on it; removing
727        // would let a fresh caller obtain a new mutex and the queued one
728        // would race them. The map is bounded by the lifetime universe of
729        // PV names, which is acceptable.
730        info!(pv = pv_name, "Destroyed archiving channel");
731        Ok(())
732    }
733
734    /// List all currently archived PV names (from registry).
735    pub fn list_pvs(&self) -> Vec<String> {
736        self.registry.all_pv_names().unwrap_or_else(|e| {
737            warn!("Failed to list PVs: {e}");
738            Vec::new()
739        })
740    }
741
742    /// Match PVs by glob pattern (from registry).
743    pub fn matching_pvs(&self, pattern: &str) -> Vec<String> {
744        self.registry.matching_pvs(pattern).unwrap_or_else(|e| {
745            warn!("Failed to match PVs: {e}");
746            Vec::new()
747        })
748    }
749
750    /// Get the registry reference.
751    pub fn registry(&self) -> &Arc<PvRegistry> {
752        &self.registry
753    }
754
755    /// Get connection info for a PV.
756    pub fn get_connection_info(&self, pv: &str) -> Option<ConnectionInfo> {
757        self.channels.get(pv).map(|h| {
758            h.conn_info
759                .lock()
760                .unwrap_or_else(|e| e.into_inner())
761                .clone()
762        })
763    }
764
765    /// Get PV names that have never received any event (connected_since == None).
766    pub fn get_never_connected_pvs(&self) -> Vec<String> {
767        self.channels
768            .iter()
769            .filter(|entry| {
770                let ci = entry
771                    .value()
772                    .conn_info
773                    .lock()
774                    .unwrap_or_else(|e| e.into_inner());
775                ci.connected_since.is_none()
776            })
777            .map(|entry| entry.key().clone())
778            .collect()
779    }
780
781    /// Snapshot the diagnostic counters for one PV. Returns None if the
782    /// PV isn't actively archived. The returned Arc is the live counter
783    /// — callers read with `Ordering::Relaxed`.
784    pub fn pv_counters(&self, pv_name: &str) -> Option<Arc<PvCounters>> {
785        self.channels.get(pv_name).map(|h| h.counters.clone())
786    }
787
788    /// Snapshot every active PV's counters. Returns `(pv_name,
789    /// PvCountersSnapshot)` so callers don't have to handle Arc.
790    pub fn all_pv_counters(&self) -> Vec<(String, PvCountersSnapshot)> {
791        self.channels
792            .iter()
793            .map(|e| {
794                (
795                    e.key().clone(),
796                    PvCountersSnapshot::from(&*e.value().counters),
797                )
798            })
799            .collect()
800    }
801
802    /// One-shot CA `get` against the running channel for `pv`. Returns
803    /// `None` if the PV isn't actively archived. The timeout caps how
804    /// long the caller will wait for a value when the IOC is slow.
805    /// Powers `getEngineDataAction` / `getDataAtTimeEngine`.
806    pub async fn live_value(
807        &self,
808        pv_name: &str,
809        timeout: Duration,
810    ) -> Option<anyhow::Result<ArchiverValue>> {
811        let channel = self.channels.get(pv_name)?.channel.clone();
812        // Wait briefly for connection — channel.get on a disconnected
813        // channel would otherwise return an error from deep in the CA
814        // stack. Capped by the same timeout the caller chose.
815        if channel.wait_connected(timeout).await.is_err() {
816            return Some(Err(anyhow::anyhow!(
817                "channel not connected within {timeout:?}"
818            )));
819        }
820        match tokio::time::timeout(timeout, channel.get()).await {
821            Ok(Ok((_dbr_type, val))) => Some(Ok(epics_value_to_archiver(&val))),
822            Ok(Err(e)) => Some(Err(anyhow::anyhow!("CA get failed: {e}"))),
823            Err(_) => Some(Err(anyhow::anyhow!("CA get timed out after {timeout:?}"))),
824        }
825    }
826
827    /// Snapshot the latest cached extra-field values for `pv` —
828    /// `(field_name, stringified_value)` pairs. Empty map when the PV
829    /// isn't archived or has no archive_fields configured.
830    pub fn extras_snapshot(&self, pv_name: &str) -> std::collections::HashMap<String, String> {
831        match self.channels.get(pv_name) {
832            Some(handle) => handle
833                .extras
834                .iter()
835                .map(|e| (e.key().clone(), e.value().clone()))
836                .collect(),
837            None => std::collections::HashMap::new(),
838        }
839    }
840
841    /// Get PV names that are currently disconnected (is_connected == false).
842    pub fn get_currently_disconnected_pvs(&self) -> Vec<String> {
843        self.channels
844            .iter()
845            .filter(|entry| {
846                let ci = entry
847                    .value()
848                    .conn_info
849                    .lock()
850                    .unwrap_or_else(|e| e.into_inner());
851                !ci.is_connected
852            })
853            .map(|entry| entry.key().clone())
854            .collect()
855    }
856}
857
858/// Monitor loop: subscribe to a channel and forward values.
859#[allow(clippy::too_many_arguments)]
860async fn monitor_loop(
861    pv_name: String,
862    dbr_type: ArchDbType,
863    element_count: i32,
864    channel: CaChannel,
865    tx: mpsc::Sender<PvSample>,
866    cancel_token: CancellationToken,
867    conn_info: Arc<Mutex<ConnectionInfo>>,
868    extras: Arc<ExtraFieldsCache>,
869    counters: Arc<PvCounters>,
870    server_ioc_drift_secs: u64,
871) {
872    loop {
873        // Wait for connection, respecting cancellation.
874        tokio::select! {
875            _ = cancel_token.cancelled() => return,
876            result = channel.wait_connected(CA_RECONNECT_TIMEOUT) => {
877                if result.is_err() {
878                    let was_connected = {
879                        let mut ci = conn_info.lock().unwrap_or_else(|e| e.into_inner());
880                        let prev_connected = ci.is_connected;
881                        ci.is_connected = false;
882                        ci.last_event_time = None;
883                        // Connecting if we've never seen a connect, otherwise
884                        // demote from Connected to Disconnected on a real loss.
885                        ci.state = match ci.state {
886                            PvConnectionState::Connected => PvConnectionState::Disconnected,
887                            PvConnectionState::Disconnected => PvConnectionState::Disconnected,
888                            _ => PvConnectionState::Connecting,
889                        };
890                        prev_connected
891                    };
892                    // A search-failure timeout that demotes us out of
893                    // Connected counts as a fresh disconnect — without
894                    // this, only the post-monitor `break` path bumps the
895                    // counters and a flapping PV silently under-reports
896                    // its drops while subsequent `attach_cnx_lost_headers`
897                    // calls carry a stale `cnxlostepsecs`.
898                    if was_connected {
899                        counters.disconnect_count.fetch_add(1, Ordering::Relaxed);
900                        counters
901                            .last_disconnect_unix_secs
902                            .store(unix_secs(SystemTime::now()), Ordering::Relaxed);
903                    }
904                    // Subscribe BEFORE re-checking the current state. If we
905                    // subscribed after a state check, a Connected event
906                    // firing in between would be lost forever, leaving this
907                    // loop hung until the next disconnect/reconnect cycle.
908                    let mut conn_rx = channel.connection_events();
909
910                    // Close the remaining race: the channel may have become
911                    // connected between the outer `wait_connected` timeout
912                    // and our `subscribe()` above, in which case no new event
913                    // will arrive. A short re-probe catches that case.
914                    if channel
915                        .wait_connected(Duration::from_millis(100))
916                        .await
917                        .is_err()
918                    {
919                        loop {
920                            tokio::select! {
921                                _ = cancel_token.cancelled() => return,
922                                event = conn_rx.recv() => {
923                                    use tokio::sync::broadcast::error::RecvError;
924                                    match event {
925                                        Ok(ConnectionEvent::Connected) => break,
926                                        Ok(_) => continue,
927                                        // Lagged: we missed some events but
928                                        // the channel is still live. Re-probe
929                                        // state and otherwise keep waiting.
930                                        Err(RecvError::Lagged(_)) => {
931                                            if channel
932                                                .wait_connected(Duration::from_millis(100))
933                                                .await
934                                                .is_ok()
935                                            {
936                                                break;
937                                            }
938                                            continue;
939                                        }
940                                        Err(RecvError::Closed) => return,
941                                    }
942                                }
943                            }
944                        }
945                    }
946                }
947            }
948        }
949
950        // Subscribe.
951        let mut monitor = match channel.subscribe().await {
952            Ok(m) => m,
953            Err(e) => {
954                counters
955                    .transient_error_count
956                    .fetch_add(1, Ordering::Relaxed);
957                warn!(pv = pv_name, "Subscribe failed: {e}, retrying...");
958                tokio::select! {
959                    _ = cancel_token.cancelled() => return,
960                    _ = tokio::time::sleep(CA_RETRY_DELAY) => continue,
961                }
962            }
963        };
964
965        debug!(pv = pv_name, "Monitor subscription active");
966
967        // Receive values with cancellation.
968        loop {
969            tokio::select! {
970                _ = cancel_token.cancelled() => return,
971                result = monitor.recv() => {
972                    match result {
973                        Some(Ok(snapshot)) => {
974                            let now = SystemTime::now();
975                            // Java parity (11e554d0): use the IOC-reported
976                            // timestamp, not receive-time, so latency
977                            // doesn't smear sample times. First sample
978                            // after connect is accepted unconditionally
979                            // — legitimate backfill on reconnect can
980                            // include older timestamps. Subsequent
981                            // samples whose IOC clock is more than
982                            // SERVER_IOC_DRIFT_SECS away from wall clock,
983                            // or earlier than the 1991 floor, are
984                            // dropped + counted.
985                            let first_after_connect = {
986                                let mut ci = conn_info.lock().unwrap_or_else(|e| e.into_inner());
987                                let first = ci.last_event_time.is_none();
988                                if ci.connected_since.is_none() {
989                                    ci.connected_since = Some(now);
990                                }
991                                ci.is_connected = true;
992                                ci.last_event_time = Some(now);
993                                ci.state = PvConnectionState::Connected;
994                                first
995                            };
996                            if !first_after_connect
997                                && !ioc_timestamp_in_window(
998                                    snapshot.timestamp,
999                                    now,
1000                                    server_ioc_drift_secs,
1001                                )
1002                            {
1003                                counters.timestamp_drops.fetch_add(1, Ordering::Relaxed);
1004                                debug!(
1005                                    pv = pv_name,
1006                                    ?snapshot.timestamp,
1007                                    "Dropping sample with out-of-window IOC timestamp"
1008                                );
1009                                continue;
1010                            }
1011                            counters.events_received.fetch_add(1, Ordering::Relaxed);
1012                            // CAS the first-event timestamp once. 0 sentinel
1013                            // means "no event yet"; replace with now.
1014                            let now_secs = unix_secs(now);
1015                            let _ = counters.first_event_unix_secs.compare_exchange(
1016                                0,
1017                                now_secs,
1018                                Ordering::Relaxed,
1019                                Ordering::Relaxed,
1020                            );
1021                            let archiver_val = epics_value_to_archiver(&snapshot.value);
1022                            let mut sample = ArchiverSample::new(snapshot.timestamp, archiver_val);
1023                            attach_extras(&extras, &mut sample);
1024                            if first_after_connect {
1025                                let lost_secs = counters
1026                                    .last_disconnect_unix_secs
1027                                    .load(Ordering::Relaxed);
1028                                attach_cnx_lost_headers(&mut sample, lost_secs, now_secs);
1029                            }
1030                            let pv_sample = PvSample {
1031                                pv_name: pv_name.clone(),
1032                                dbr_type,
1033                                sample,
1034                                element_count: Some(element_count),
1035                                counters: Some(counters.clone()),
1036                            };
1037                            if let Err(pv_sample) = try_send_with_overflow_count(
1038                                &tx,
1039                                pv_sample,
1040                                &counters,
1041                            )
1042                            .await
1043                            {
1044                                let _ = pv_sample;
1045                                return; // Write loop shut down
1046                            }
1047                        }
1048                        Some(Err(e)) => {
1049                            counters.transient_error_count.fetch_add(1, Ordering::Relaxed);
1050                            warn!(pv = pv_name, "Monitor error: {e}");
1051                        }
1052                        None => break, // Monitor ended, reconnect
1053                    }
1054                }
1055            }
1056        }
1057
1058        // Monitor ended (disconnect) — loop back to reconnect. Reset
1059        // `last_event_time` so the first sample after reconnect is
1060        // treated as `first_after_connect` and bypasses the drift
1061        // filter; without this, an IOC that comes back with a
1062        // legitimate backfill timestamp older than the 30 min window
1063        // would be silently dropped.
1064        {
1065            let mut ci = conn_info.lock().unwrap_or_else(|e| e.into_inner());
1066            ci.is_connected = false;
1067            ci.last_event_time = None;
1068            ci.state = PvConnectionState::Disconnected;
1069        }
1070        counters.disconnect_count.fetch_add(1, Ordering::Relaxed);
1071        counters
1072            .last_disconnect_unix_secs
1073            .store(unix_secs(SystemTime::now()), Ordering::Relaxed);
1074        debug!(pv = pv_name, "Monitor ended, waiting for reconnection");
1075    }
1076}
1077
1078fn unix_secs(t: SystemTime) -> i64 {
1079    t.duration_since(SystemTime::UNIX_EPOCH)
1080        .map(|d| d.as_secs() as i64)
1081        .unwrap_or(0)
1082}
1083
1084/// Send a sample and count buffer-overflow events. Tries non-blocking
1085/// first; if the bounded channel is full we increment the counter and
1086/// fall back to a blocking send so backpressure still works.
1087async fn try_send_with_overflow_count(
1088    tx: &mpsc::Sender<PvSample>,
1089    pv_sample: PvSample,
1090    counters: &PvCounters,
1091) -> Result<(), PvSample> {
1092    match tx.try_send(pv_sample) {
1093        Ok(()) => Ok(()),
1094        Err(tokio::sync::mpsc::error::TrySendError::Full(pv_sample)) => {
1095            counters
1096                .buffer_overflow_drops
1097                .fetch_add(1, Ordering::Relaxed);
1098            // Backpressure to the producer; this awaits until the writer
1099            // drains some space. We count the saturation event but don't
1100            // actually drop the sample.
1101            tx.send(pv_sample).await.map_err(|e| e.0)
1102        }
1103        Err(tokio::sync::mpsc::error::TrySendError::Closed(pv_sample)) => Err(pv_sample),
1104    }
1105}
1106
1107/// Scan loop: periodically read a channel value.
1108#[allow(clippy::too_many_arguments)]
1109async fn scan_loop(
1110    pv_name: String,
1111    dbr_type: ArchDbType,
1112    element_count: i32,
1113    channel: CaChannel,
1114    tx: mpsc::Sender<PvSample>,
1115    cancel_token: CancellationToken,
1116    period_secs: f64,
1117    conn_info: Arc<Mutex<ConnectionInfo>>,
1118    extras: Arc<ExtraFieldsCache>,
1119    counters: Arc<PvCounters>,
1120) {
1121    let period = Duration::from_secs_f64(period_secs);
1122    let mut interval = tokio::time::interval(period);
1123
1124    loop {
1125        tokio::select! {
1126            _ = cancel_token.cancelled() => return,
1127            _ = interval.tick() => {}
1128        }
1129
1130        if channel.wait_connected(CA_RETRY_DELAY).await.is_err() {
1131            let was_connected = {
1132                let mut ci = conn_info.lock().unwrap_or_else(|e| e.into_inner());
1133                let prev = ci.is_connected;
1134                ci.is_connected = false;
1135                ci.last_event_time = None;
1136                ci.state = match ci.state {
1137                    PvConnectionState::Connected => PvConnectionState::Disconnected,
1138                    PvConnectionState::Disconnected => PvConnectionState::Disconnected,
1139                    _ => PvConnectionState::Connecting,
1140                };
1141                prev
1142            };
1143            if was_connected {
1144                counters.disconnect_count.fetch_add(1, Ordering::Relaxed);
1145                counters
1146                    .last_disconnect_unix_secs
1147                    .store(unix_secs(SystemTime::now()), Ordering::Relaxed);
1148            }
1149            continue;
1150        }
1151
1152        match channel.get().await {
1153            Ok((_dbr_type, epics_val)) => {
1154                let now = SystemTime::now();
1155                let first_after_connect = {
1156                    let mut ci = conn_info.lock().unwrap_or_else(|e| e.into_inner());
1157                    let first = ci.last_event_time.is_none();
1158                    if ci.connected_since.is_none() {
1159                        ci.connected_since = Some(now);
1160                    }
1161                    ci.is_connected = true;
1162                    ci.last_event_time = Some(now);
1163                    ci.state = PvConnectionState::Connected;
1164                    first
1165                };
1166                counters.events_received.fetch_add(1, Ordering::Relaxed);
1167                let now_secs = unix_secs(now);
1168                let _ = counters.first_event_unix_secs.compare_exchange(
1169                    0,
1170                    now_secs,
1171                    Ordering::Relaxed,
1172                    Ordering::Relaxed,
1173                );
1174                let archiver_val = epics_value_to_archiver(&epics_val);
1175                let mut sample = ArchiverSample::new(now, archiver_val);
1176                attach_extras(&extras, &mut sample);
1177                if first_after_connect {
1178                    let lost_secs = counters.last_disconnect_unix_secs.load(Ordering::Relaxed);
1179                    attach_cnx_lost_headers(&mut sample, lost_secs, now_secs);
1180                }
1181                let pv_sample = PvSample {
1182                    pv_name: pv_name.clone(),
1183                    dbr_type,
1184                    sample,
1185                    element_count: Some(element_count),
1186                    counters: Some(counters.clone()),
1187                };
1188                if try_send_with_overflow_count(&tx, pv_sample, &counters)
1189                    .await
1190                    .is_err()
1191                {
1192                    return;
1193                }
1194            }
1195            Err(e) => {
1196                counters
1197                    .transient_error_count
1198                    .fetch_add(1, Ordering::Relaxed);
1199                debug!(pv = pv_name, "Scan read error: {e}");
1200            }
1201        }
1202    }
1203}
1204
1205/// Java parity (ed07feb): tag the first sample after (re)connect with
1206/// `cnxlostepsecs` / `cnxregainedepsecs` / `startup` so consumers can
1207/// detect archiver restarts and PV resumes. Unconditional emission —
1208/// on a clean startup `lost_secs` is 0, which is itself a valid value
1209/// for downstream gap detection.
1210fn attach_cnx_lost_headers(sample: &mut ArchiverSample, lost_secs: i64, now_secs: i64) {
1211    sample
1212        .field_values
1213        .push(("cnxlostepsecs".to_string(), lost_secs.to_string()));
1214    sample
1215        .field_values
1216        .push(("cnxregainedepsecs".to_string(), now_secs.to_string()));
1217    sample
1218        .field_values
1219        .push(("startup".to_string(), "true".to_string()));
1220}
1221
1222/// Snapshot the extras cache into `sample.field_values`. We sort for stable
1223/// PB output across runs (the protobuf field is repeated; consumers that
1224/// diff/compare files appreciate determinism).
1225fn attach_extras(extras: &ExtraFieldsCache, sample: &mut ArchiverSample) {
1226    if extras.is_empty() {
1227        return;
1228    }
1229    let mut entries: Vec<(String, String)> = extras
1230        .iter()
1231        .map(|e| (e.key().clone(), e.value().clone()))
1232        .collect();
1233    entries.sort_by(|a, b| a.0.cmp(&b.0));
1234    sample.field_values = entries;
1235}
1236
1237/// Render an EpicsValue as the string we'll persist in the metadata field
1238/// slot. Numeric scalars get `Display`-format; strings pass through; arrays
1239/// fall back to JSON-ish bracket notation. Stays in sync with what Java
1240/// archiver writes into PVTypeInfo's `archiveFields` blob (a string map).
1241fn epics_value_to_field_string(val: &EpicsValue) -> String {
1242    match val {
1243        EpicsValue::String(s) => s.clone(),
1244        EpicsValue::Short(v) => v.to_string(),
1245        EpicsValue::Float(v) => v.to_string(),
1246        EpicsValue::Enum(v) => v.to_string(),
1247        EpicsValue::Char(v) => v.to_string(),
1248        EpicsValue::Long(v) => v.to_string(),
1249        EpicsValue::Double(v) => v.to_string(),
1250        EpicsValue::ShortArray(v) => format!("{v:?}"),
1251        EpicsValue::FloatArray(v) => format!("{v:?}"),
1252        EpicsValue::EnumArray(v) => format!("{v:?}"),
1253        EpicsValue::DoubleArray(v) => format!("{v:?}"),
1254        EpicsValue::LongArray(v) => format!("{v:?}"),
1255        EpicsValue::CharArray(v) => String::from_utf8_lossy(v).into_owned(),
1256        EpicsValue::StringArray(v) => format!("{v:?}"),
1257    }
1258}
1259
1260/// Spawn a long-running task that subscribes to `<pv>.<field>` and updates
1261/// `extras` with each event. Owned by `parent_token` so pause/destroy cleans
1262/// it up alongside the main PV.
1263fn spawn_extra_field_monitor(
1264    ca_client: &CaClient,
1265    pv_name: &str,
1266    field: &str,
1267    extras: Arc<ExtraFieldsCache>,
1268    parent_token: CancellationToken,
1269    counters: Arc<PvCounters>,
1270) {
1271    let full_name = format!("{pv_name}.{field}");
1272    let channel = ca_client.create_channel(&full_name);
1273    let field_owned = field.to_string();
1274    let pv_owned = pv_name.to_string();
1275
1276    // Catch-unwind boundary: a panic from the CA client (e.g. malformed
1277    // wire frame, allocation failure inside epics_rs) would propagate to
1278    // the runtime and abort sibling tasks of this worker thread. Trap it
1279    // here, log with PV+field context, and return normally so the runtime
1280    // remains healthy.
1281    let panic_pv = pv_owned.clone();
1282    let panic_field = field_owned.clone();
1283    tokio::spawn(async move {
1284        let body = std::panic::AssertUnwindSafe(extra_field_monitor_body(
1285            channel,
1286            pv_owned,
1287            field_owned,
1288            extras,
1289            parent_token,
1290            counters,
1291        ));
1292        if let Err(payload) = futures::FutureExt::catch_unwind(body).await {
1293            let msg = panic_payload_msg(&payload);
1294            error!(
1295                pv = panic_pv,
1296                field = panic_field,
1297                "Extra-field monitor panicked: {msg}"
1298            );
1299        }
1300    });
1301}
1302
1303/// Body of the spawned extra-field monitor. Split out so the spawn site
1304/// can wrap it in `catch_unwind`.
1305async fn extra_field_monitor_body(
1306    channel: CaChannel,
1307    pv_owned: String,
1308    field_owned: String,
1309    extras: Arc<ExtraFieldsCache>,
1310    parent_token: CancellationToken,
1311    counters: Arc<PvCounters>,
1312) {
1313    // Initial connect attempt — failure here is non-fatal (the field may
1314    // not exist on every IOC; we just leave the cache empty).
1315    if channel.wait_connected(CA_CONNECT_TIMEOUT).await.is_err() {
1316        debug!(
1317            pv = pv_owned,
1318            field = field_owned,
1319            "Extra-field channel did not connect within timeout (will keep retrying via subscribe)"
1320        );
1321    }
1322
1323    // Exponential backoff for misconfigured fields (e.g. operator
1324    // listed `.HIHI` on a PV that doesn't expose it). Without this
1325    // we'd retry every 5s forever, churning CA search packets and
1326    // file descriptors. The cap is 60s; one warn at the cap so
1327    // ops know to fix archive_fields.
1328    let mut backoff = CA_RETRY_DELAY;
1329    let max_backoff = Duration::from_secs(60);
1330    let mut warned_at_cap = false;
1331
1332    loop {
1333        // Cancel-aware subscribe attempt.
1334        tokio::select! {
1335            _ = parent_token.cancelled() => return,
1336            sub = channel.subscribe() => {
1337                let mut monitor = match sub {
1338                    Ok(m) => m,
1339                    Err(e) => {
1340                        // Java parity (8fe73eb): bump the transient
1341                        // counter so a misconfigured `.HIHI` field
1342                        // shows up in the rate / drop reports.
1343                        counters.transient_error_count.fetch_add(1, Ordering::Relaxed);
1344                        debug!(
1345                            pv = pv_owned,
1346                            field = field_owned,
1347                            ?backoff,
1348                            "Extra-field subscribe failed: {e}; retrying"
1349                        );
1350                        if backoff >= max_backoff && !warned_at_cap {
1351                            warn!(
1352                                pv = pv_owned,
1353                                field = field_owned,
1354                                "Extra-field repeatedly fails to subscribe; \
1355                                 check archive_fields config (now retrying every 60s)"
1356                            );
1357                            warned_at_cap = true;
1358                        }
1359                        let sleep_for = backoff;
1360                        backoff = (backoff * 2).min(max_backoff);
1361                        tokio::select! {
1362                            _ = parent_token.cancelled() => return,
1363                            _ = tokio::time::sleep(sleep_for) => continue,
1364                        }
1365                    }
1366                };
1367                // Subscribe succeeded — reset backoff so the next
1368                // failure starts at the short delay again.
1369                backoff = CA_RETRY_DELAY;
1370                warned_at_cap = false;
1371                loop {
1372                    tokio::select! {
1373                        _ = parent_token.cancelled() => return,
1374                        ev = monitor.recv() => match ev {
1375                            Some(Ok(snapshot)) => {
1376                                extras.insert(
1377                                    field_owned.clone(),
1378                                    epics_value_to_field_string(&snapshot.value),
1379                                );
1380                            }
1381                            Some(Err(e)) => {
1382                                counters.transient_error_count.fetch_add(1, Ordering::Relaxed);
1383                                debug!(
1384                                    pv = pv_owned,
1385                                    field = field_owned,
1386                                    "Extra-field monitor error: {e}"
1387                                );
1388                            }
1389                            None => break, // resubscribe
1390                        }
1391                    }
1392                }
1393            }
1394        }
1395    }
1396}
1397
1398/// Format a panic payload (Box<dyn Any>) as a printable string. Mirrors
1399/// what `std::panicking::default_hook` extracts for the message.
1400fn panic_payload_msg(payload: &Box<dyn std::any::Any + Send>) -> String {
1401    if let Some(s) = payload.downcast_ref::<&'static str>() {
1402        (*s).to_string()
1403    } else if let Some(s) = payload.downcast_ref::<String>() {
1404        s.clone()
1405    } else {
1406        "<non-string panic payload>".to_string()
1407    }
1408}
1409
1410/// Convert epics-base-rs DbFieldType to archiver ArchDbType.
1411fn dbr_field_to_arch_type(field_type: DbFieldType) -> ArchDbType {
1412    match field_type {
1413        DbFieldType::String => ArchDbType::ScalarString,
1414        DbFieldType::Short => ArchDbType::ScalarShort,
1415        DbFieldType::Float => ArchDbType::ScalarFloat,
1416        DbFieldType::Enum => ArchDbType::ScalarEnum,
1417        DbFieldType::Char => ArchDbType::ScalarByte,
1418        DbFieldType::Long => ArchDbType::ScalarInt,
1419        DbFieldType::Double => ArchDbType::ScalarDouble,
1420    }
1421}
1422
1423/// Convert epics-base-rs EpicsValue to archiver ArchiverValue.
1424fn epics_value_to_archiver(val: &EpicsValue) -> ArchiverValue {
1425    match val {
1426        EpicsValue::String(s) => ArchiverValue::ScalarString(s.clone()),
1427        EpicsValue::Short(v) => ArchiverValue::ScalarShort(*v as i32),
1428        EpicsValue::Float(v) => ArchiverValue::ScalarFloat(*v),
1429        EpicsValue::Enum(v) => ArchiverValue::ScalarEnum(*v as i32),
1430        EpicsValue::Char(v) => ArchiverValue::ScalarByte(vec![*v]),
1431        EpicsValue::Long(v) => ArchiverValue::ScalarInt(*v),
1432        EpicsValue::Double(v) => ArchiverValue::ScalarDouble(*v),
1433        EpicsValue::ShortArray(v) => {
1434            ArchiverValue::VectorShort(v.iter().map(|x| *x as i32).collect())
1435        }
1436        EpicsValue::FloatArray(v) => ArchiverValue::VectorFloat(v.clone()),
1437        EpicsValue::EnumArray(v) => {
1438            ArchiverValue::VectorEnum(v.iter().map(|x| *x as i32).collect())
1439        }
1440        EpicsValue::DoubleArray(v) => ArchiverValue::VectorDouble(v.clone()),
1441        EpicsValue::LongArray(v) => ArchiverValue::VectorInt(v.clone()),
1442        EpicsValue::CharArray(v) => ArchiverValue::VectorChar(v.clone()),
1443        EpicsValue::StringArray(v) => ArchiverValue::VectorString(v.clone()),
1444    }
1445}
1446
1447/// Background writer task — drains samples and writes to storage.
1448pub async fn write_loop(
1449    storage: Arc<dyn StoragePlugin>,
1450    registry: Arc<PvRegistry>,
1451    mut rx: mpsc::Receiver<PvSample>,
1452    mut shutdown: tokio::sync::watch::Receiver<bool>,
1453    flush_period: Duration,
1454) {
1455    info!("Write loop started");
1456    // HashMap keyed by PV name → latest timestamp. Avoids duplicate entries
1457    // when the same PV receives many samples between flushes, and feeds
1458    // the per-PV out-of-order detector below.
1459    let mut ts_updates: std::collections::HashMap<String, SystemTime> =
1460        std::collections::HashMap::new();
1461    // Last-observed (registry-record) DBR type per PV, used to detect
1462    // mid-stream type changes that we must drop to avoid corrupting the
1463    // PB partition.
1464    let mut last_dbr_type: std::collections::HashMap<String, ArchDbType> =
1465        std::collections::HashMap::new();
1466    let mut last_flush = std::time::Instant::now();
1467
1468    loop {
1469        tokio::select! {
1470            Some(pv_sample) = rx.recv() => {
1471                let ts = pv_sample.sample.timestamp;
1472
1473                // Out-of-order timestamp drop. Storage requires monotonic
1474                // appends per PV; an older timestamp would produce a
1475                // corrupt partition.
1476                if let Some(prev_ts) = ts_updates.get(&pv_sample.pv_name)
1477                    && ts < *prev_ts
1478                {
1479                    if let Some(ref c) = pv_sample.counters {
1480                        c.timestamp_drops.fetch_add(1, Ordering::Relaxed);
1481                    }
1482                    debug!(
1483                        pv = pv_sample.pv_name,
1484                        ?ts,
1485                        ?prev_ts,
1486                        "Dropping out-of-order sample"
1487                    );
1488                    continue;
1489                }
1490
1491                // Type-change drop. The first sample defines the PV's
1492                // wire type; later samples with a different DBR get
1493                // dropped (operator must changeTypeForPV first).
1494                let prev_type = last_dbr_type
1495                    .insert(pv_sample.pv_name.clone(), pv_sample.dbr_type);
1496                if let Some(prev) = prev_type
1497                    && prev != pv_sample.dbr_type
1498                {
1499                    if let Some(ref c) = pv_sample.counters {
1500                        c.type_change_drops.fetch_add(1, Ordering::Relaxed);
1501                        // Java parity (9f2234f): record the latest observed
1502                        // DBR so the dropped-events report can show what
1503                        // the IOC is now sending vs the archived type.
1504                        c.latest_observed_dbr
1505                            .store(pv_sample.dbr_type as i32, Ordering::Relaxed);
1506                    }
1507                    debug!(
1508                        pv = pv_sample.pv_name,
1509                        ?prev,
1510                        new = ?pv_sample.dbr_type,
1511                        "Dropping type-changed sample"
1512                    );
1513                    // Restore prev_type in the map so a single
1514                    // mismatched sample doesn't permanently flip our
1515                    // recorded type.
1516                    last_dbr_type.insert(pv_sample.pv_name.clone(), prev);
1517                    continue;
1518                }
1519
1520                let meta = AppendMeta {
1521                    element_count: pv_sample.element_count,
1522                    ..Default::default()
1523                };
1524                if let Err(e) = storage
1525                    .append_event_with_meta(
1526                        &pv_sample.pv_name,
1527                        pv_sample.dbr_type,
1528                        &pv_sample.sample,
1529                        &meta,
1530                    )
1531                    .await
1532                {
1533                    error!(pv = pv_sample.pv_name, "Write error: {e}");
1534                } else {
1535                    metrics::counter!("archiver_events_stored_total").increment(1);
1536                    if let Some(ref c) = pv_sample.counters {
1537                        c.events_stored.fetch_add(1, Ordering::Relaxed);
1538                    }
1539                    ts_updates.insert(pv_sample.pv_name, ts);
1540                }
1541
1542                // Periodic flush: timestamps to SQLite + buffered writes to disk.
1543                if last_flush.elapsed() > flush_period && !ts_updates.is_empty() {
1544                    let refs: Vec<(&str, SystemTime)> = ts_updates
1545                        .iter()
1546                        .map(|(name, ts)| (name.as_str(), *ts))
1547                        .collect();
1548                    if let Err(e) = registry.batch_update_timestamps(&refs) {
1549                        error!("Failed to flush timestamps: {e}");
1550                    }
1551                    if let Err(e) = storage.flush_writes().await {
1552                        error!("Failed to flush storage writes: {e}");
1553                    }
1554                    ts_updates.clear();
1555                    last_flush = std::time::Instant::now();
1556                }
1557            }
1558            _ = shutdown.changed() => {
1559                // Drain remaining samples.
1560                while let Ok(pv_sample) = rx.try_recv() {
1561                    let meta = AppendMeta {
1562                        element_count: pv_sample.element_count,
1563                        ..Default::default()
1564                    };
1565                    if let Err(e) = storage
1566                        .append_event_with_meta(
1567                            &pv_sample.pv_name,
1568                            pv_sample.dbr_type,
1569                            &pv_sample.sample,
1570                            &meta,
1571                        )
1572                        .await
1573                    {
1574                        warn!(pv = pv_sample.pv_name, "Shutdown drain write error: {e}");
1575                    }
1576                }
1577                // Final flush: timestamps + buffered writes.
1578                if !ts_updates.is_empty() {
1579                    let refs: Vec<(&str, SystemTime)> = ts_updates
1580                        .iter()
1581                        .map(|(name, ts)| (name.as_str(), *ts))
1582                        .collect();
1583                    if let Err(e) = registry.batch_update_timestamps(&refs) {
1584                        warn!("Shutdown timestamp flush failed: {e}");
1585                    }
1586                }
1587                if let Err(e) = storage.flush_writes().await {
1588                    warn!("Shutdown storage flush failed: {e}");
1589                }
1590                info!("Write loop shutting down");
1591                break;
1592            }
1593        }
1594    }
1595}