archiver_engine/channel_manager.rs
1use std::sync::atomic::{AtomicI32, AtomicI64, AtomicU64, Ordering};
2use std::sync::{Arc, Mutex};
3use std::time::{Duration, SystemTime};
4
5use dashmap::DashMap;
6use epics_rs::base::server::snapshot::DbrClass;
7use epics_rs::base::types::{DbFieldType, EpicsValue};
8use epics_rs::ca::client::{CaChannel, CaClient, ConnectionEvent};
9use epics_rs::pva::client_native::PvaClient;
10use epics_rs::pva::pvdata::{PvField, ScalarValue};
11use tokio::sync::mpsc;
12use tokio_util::sync::CancellationToken;
13use tracing::{debug, error, info, warn};
14
15use archiver_core::registry::{Protocol, PvRecord, PvRegistry, PvStatus, SampleMode};
16use archiver_core::storage::traits::{AppendMeta, IngestFlushResult, StoragePlugin};
17use archiver_core::types::{ArchDbType, ArchiverSample, ArchiverValue};
18
19use crate::policy::PolicyConfig;
20
21/// Timeout for initial CA channel connection.
22const CA_CONNECT_TIMEOUT: Duration = Duration::from_secs(10);
23/// Timeout for CA reconnection attempts in the monitor loop.
24const CA_RECONNECT_TIMEOUT: Duration = Duration::from_secs(30);
25/// Delay before retrying a failed CA subscription.
26const CA_RETRY_DELAY: Duration = Duration::from_secs(5);
27
28/// Hard floor on accepted timestamps. Mirrors Java's `PAST_CUTOFF_TIMESTAMP`
29/// of 1991-01-01 — earlier than that, the timestamp is almost certainly a
30/// stale uninitialised IOC clock or a sentinel.
31const PAST_CUTOFF_UNIX_SECS: i64 = 662_688_000; // 1991-01-01 00:00:00 UTC
32
33/// Filter a freshly-received sample timestamp against the wall clock and
34/// the floor. Returns `Some(ts)` if accepted, `None` if it should be
35/// dropped (caller bumps `timestamp_drops`).
36///
37/// `drift_secs` is the configured `server_ioc_drift_secs` (Java parity
38/// 6538631), so per-site tuning doesn't require recompiling. `now` is
39/// passed in (rather than calling `SystemTime::now()` here) so the test
40/// suite can pin time deterministically.
41fn ioc_timestamp_in_window(ts: SystemTime, now: SystemTime, drift_secs: u64) -> bool {
42 let unix = ts
43 .duration_since(SystemTime::UNIX_EPOCH)
44 .map(|d| d.as_secs() as i64)
45 .unwrap_or(i64::MIN);
46 if unix < PAST_CUTOFF_UNIX_SECS {
47 return false;
48 }
49 // Within ±drift_secs of `now`?
50 let now_unix = now
51 .duration_since(SystemTime::UNIX_EPOCH)
52 .map(|d| d.as_secs() as i64)
53 .unwrap_or(0);
54 let delta = (unix - now_unix).unsigned_abs();
55 delta <= drift_secs
56}
57
58/// Discrete connection state for `getPVDetails` (Java parity dea7acb).
59/// Distinguishes never-connected from connecting from confirmed-down.
60#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
61pub enum PvConnectionState {
62 /// No connection attempt has been made (or none has progressed
63 /// past `wait_connected` yet).
64 #[default]
65 Idle,
66 /// Currently waiting on `wait_connected` for the first time on
67 /// this channel handle.
68 Connecting,
69 /// Channel has reported a successful connect; samples are flowing
70 /// or the channel is otherwise live.
71 Connected,
72 /// Channel reported connect at least once but the monitor loop has
73 /// since dropped — distinct from `Idle` so operators can spot a
74 /// regression vs a never-resolved name.
75 Disconnected,
76}
77
78impl PvConnectionState {
79 pub fn as_str(self) -> &'static str {
80 match self {
81 Self::Idle => "Idle",
82 Self::Connecting => "Connecting",
83 Self::Connected => "Connected",
84 Self::Disconnected => "Disconnected",
85 }
86 }
87}
88
89/// Connection state tracked per PV.
90#[derive(Debug, Clone, Default)]
91pub struct ConnectionInfo {
92 pub connected_since: Option<SystemTime>,
93 pub last_event_time: Option<SystemTime>,
94 pub is_connected: bool,
95 /// Java parity (dea7acb): discrete connection state for the
96 /// PVDetails report. Operators can distinguish never-connected,
97 /// currently-connecting, and previously-connected-now-down.
98 pub state: PvConnectionState,
99}
100
101/// Per-PV diagnostic counters for the BPL drop / rate / connection
102/// reports. All counts are monotonic across the PV's lifetime; the
103/// rate handlers compute deltas against `first_event_unix_secs`.
104///
105/// Tracked here (not in ConnectionInfo) so they survive transient
106/// disconnects and are read lock-free from the report endpoints.
107#[derive(Debug)]
108pub struct PvCounters {
109 /// Total events produced by monitor/scan, including those that
110 /// later got dropped before write.
111 pub events_received: AtomicU64,
112 /// Total events successfully written to storage.
113 pub events_stored: AtomicU64,
114 /// Unix-epoch seconds of the first event we ever saw for this PV.
115 /// 0 = "no event yet" (i64 because Atomic<Option<...>> doesn't exist).
116 pub first_event_unix_secs: AtomicI64,
117 /// Number of events the bounded sample channel rejected (write
118 /// loop falling behind producer). Surfaces as
119 /// `getDroppedEventsBufferOverflowReport`.
120 pub buffer_overflow_drops: AtomicU64,
121 /// Number of events whose timestamp went backwards relative to the
122 /// previously-stored event. Java archiver's
123 /// `DroppedEventsTimestampReport`.
124 pub timestamp_drops: AtomicU64,
125 /// Number of events whose runtime DBR type didn't match the
126 /// PvRecord's stored type — the engine drops these because mixing
127 /// types in one PB partition would corrupt downstream readers.
128 pub type_change_drops: AtomicU64,
129 /// Number of disconnect transitions seen on this PV's CA channel.
130 /// `LostConnectionsReport`.
131 pub disconnect_count: AtomicU64,
132 /// Last unix-epoch seconds of disconnect transition.
133 pub last_disconnect_unix_secs: AtomicI64,
134 /// Number of transient subscribe / monitor-recv / scan-read errors
135 /// (Java parity 8fe73eb). Distinct from `disconnect_count` —
136 /// these are recoverable per-attempt failures rather than confirmed
137 /// link drops.
138 pub transient_error_count: AtomicU64,
139 /// Latest DBR type observed from CA that did not match the
140 /// archive-time recorded type (Java parity 9f2234f). `-1` = no
141 /// mismatch ever seen.
142 pub latest_observed_dbr: AtomicI32,
143 /// Number of DBR_CTRL metadata refresh attempts that failed (timeout,
144 /// transport error, missing display info). Operators looking at PVs
145 /// with empty PREC/EGU should check this.
146 pub metadata_fetch_failures: AtomicU64,
147 /// Number of `storage.append_event_with_meta` calls that exceeded
148 /// the per-sample storage timeout. Distinct from
149 /// `buffer_overflow_drops` (mpsc channel saturation) so an
150 /// operator can tell "the storage tier is wedged" apart from
151 /// "the writer can't keep up with the producer".
152 pub storage_append_timeouts: AtomicU64,
153}
154
155impl Default for PvCounters {
156 fn default() -> Self {
157 Self {
158 events_received: AtomicU64::new(0),
159 events_stored: AtomicU64::new(0),
160 first_event_unix_secs: AtomicI64::new(0),
161 buffer_overflow_drops: AtomicU64::new(0),
162 timestamp_drops: AtomicU64::new(0),
163 type_change_drops: AtomicU64::new(0),
164 disconnect_count: AtomicU64::new(0),
165 last_disconnect_unix_secs: AtomicI64::new(0),
166 transient_error_count: AtomicU64::new(0),
167 // -1 sentinel = no type mismatch ever observed.
168 latest_observed_dbr: AtomicI32::new(-1),
169 metadata_fetch_failures: AtomicU64::new(0),
170 storage_append_timeouts: AtomicU64::new(0),
171 }
172 }
173}
174
175/// Read-only snapshot of `PvCounters` — owned values so callers can
176/// move them across threads / serialise without reaching into Atomics.
177#[derive(Debug, Clone)]
178pub struct PvCountersSnapshot {
179 pub events_received: u64,
180 pub events_stored: u64,
181 pub first_event_unix_secs: Option<i64>,
182 pub buffer_overflow_drops: u64,
183 pub timestamp_drops: u64,
184 pub type_change_drops: u64,
185 pub disconnect_count: u64,
186 pub last_disconnect_unix_secs: Option<i64>,
187 pub transient_error_count: u64,
188 /// `Some(dbr_type as i32)` if a type-change mismatch has been
189 /// observed; `None` otherwise (Java parity 9f2234f).
190 pub latest_observed_dbr: Option<i32>,
191 pub metadata_fetch_failures: u64,
192 pub storage_append_timeouts: u64,
193}
194
195impl From<&PvCounters> for PvCountersSnapshot {
196 fn from(c: &PvCounters) -> Self {
197 let first = c.first_event_unix_secs.load(Ordering::Relaxed);
198 let last_disc = c.last_disconnect_unix_secs.load(Ordering::Relaxed);
199 Self {
200 events_received: c.events_received.load(Ordering::Relaxed),
201 events_stored: c.events_stored.load(Ordering::Relaxed),
202 first_event_unix_secs: if first == 0 { None } else { Some(first) },
203 buffer_overflow_drops: c.buffer_overflow_drops.load(Ordering::Relaxed),
204 timestamp_drops: c.timestamp_drops.load(Ordering::Relaxed),
205 type_change_drops: c.type_change_drops.load(Ordering::Relaxed),
206 disconnect_count: c.disconnect_count.load(Ordering::Relaxed),
207 last_disconnect_unix_secs: if last_disc == 0 {
208 None
209 } else {
210 Some(last_disc)
211 },
212 transient_error_count: c.transient_error_count.load(Ordering::Relaxed),
213 latest_observed_dbr: match c.latest_observed_dbr.load(Ordering::Relaxed) {
214 -1 => None,
215 v => Some(v),
216 },
217 metadata_fetch_failures: c.metadata_fetch_failures.load(Ordering::Relaxed),
218 storage_append_timeouts: c.storage_append_timeouts.load(Ordering::Relaxed),
219 }
220 }
221}
222
223/// Handle for a running PV archiving task.
224struct PvHandle {
225 /// `Some` for CA-acquired PVs (used by `try_get_value` for the
226 /// live-value RPC); `None` for PVA-acquired PVs since PVA exposes
227 /// no Clone-able channel handle — live_value for those routes
228 /// through `pva_client` directly.
229 channel: Option<CaChannel>,
230 cancel_token: CancellationToken,
231 #[allow(dead_code)]
232 dbr_type: ArchDbType,
233 conn_info: Arc<Mutex<ConnectionInfo>>,
234 /// Latest values of metadata fields (.HIHI, .LOLO, .EGU, ...) attached to
235 /// every sample emitted for this PV. Populated by per-field monitor tasks
236 /// owned by `cancel_token` (and per-field child tokens in `field_tokens`),
237 /// so stopping the PV stops all of them.
238 extras: Arc<ExtraFieldsCache>,
239 /// Per-field cancellation tokens — child tokens of `cancel_token`. Keyed
240 /// by field name (e.g. "HIHI"). Lets `update_archive_fields` cancel one
241 /// field's task without disturbing the others or the main PV.
242 field_tokens: Arc<DashMap<String, CancellationToken>>,
243 /// Serialises concurrent `update_archive_fields` calls for this PV so
244 /// add/remove/respawn never race with itself.
245 update_lock: Arc<tokio::sync::Mutex<()>>,
246 /// Diagnostic counters surfaced through the BPL drop / rate /
247 /// connection reports. Lock-free reads; updates from the producer
248 /// (monitor/scan) and the writer happen on different threads.
249 counters: Arc<PvCounters>,
250}
251
252/// Thread-safe cache of latest extra-field values for one PV.
253/// Each map entry is `(field_name, stringified_value)`.
254type ExtraFieldsCache = DashMap<String, String>;
255
256/// Default capacity for the bounded sample channel.
257/// This limits memory usage when producers outpace the storage writer.
258/// At ~200 bytes per sample, 500K entries ≈ 100 MB worst-case.
259const SAMPLE_CHANNEL_CAPACITY: usize = 500_000;
260
261/// RAII guard that removes a key from `pending_archives` on drop,
262/// ensuring cleanup even if the owning future is cancelled.
263struct PendingGuard<'a> {
264 map: &'a DashMap<String, ()>,
265 key: String,
266}
267
268impl Drop for PendingGuard<'_> {
269 fn drop(&mut self) {
270 self.map.remove(&self.key);
271 }
272}
273
274/// Manages EPICS Channel Access + pvAccess connections and dispatches
275/// archived samples to storage.
276pub struct ChannelManager {
277 /// The CA client context.
278 ca_client: CaClient,
279 /// The PVA client context (lazily used only when a PV's registry
280 /// row carries `Protocol::Pva`).
281 pva_client: PvaClient,
282 /// Active channels: PV name → handle with cancellation.
283 channels: DashMap<String, PvHandle>,
284 /// PVs currently being archived (in-progress CA connect). Prevents TOCTOU races
285 /// where two concurrent `archive_pv` calls could double-subscribe the same PV.
286 pending_archives: DashMap<String, ()>,
287 /// Per-PV mutex serialising archive/pause/resume/stop/destroy on a single
288 /// PV. Without this, e.g. `pause_pv` racing with `resume_pv` can leave
289 /// the registry status and the channel map disagreeing.
290 op_locks: DashMap<String, Arc<tokio::sync::Mutex<()>>>,
291 /// Storage backend.
292 #[allow(dead_code)]
293 storage: Arc<dyn StoragePlugin>,
294 /// PV metadata registry.
295 registry: Arc<PvRegistry>,
296 /// Sample sender for the write thread.
297 sample_tx: mpsc::Sender<PvSample>,
298 /// Optional policy configuration.
299 policy: Option<PolicyConfig>,
300 /// Per-site IOC drift bound (Java parity 6538631).
301 server_ioc_drift_secs: u64,
302}
303
304/// A sample ready to be written to storage.
305pub struct PvSample {
306 pub pv_name: String,
307 pub dbr_type: ArchDbType,
308 pub sample: ArchiverSample,
309 pub element_count: Option<i32>,
310 /// Counter handle used by the write loop to record timestamp /
311 /// type-change drops. None on samples produced before counter
312 /// support was wired up — write_loop tolerates the absence.
313 pub counters: Option<Arc<PvCounters>>,
314}
315
316impl ChannelManager {
317 pub async fn new(
318 storage: Arc<dyn StoragePlugin>,
319 registry: Arc<PvRegistry>,
320 policy: Option<PolicyConfig>,
321 ) -> anyhow::Result<(Self, mpsc::Receiver<PvSample>)> {
322 Self::new_with_drift(storage, registry, policy, 30 * 60).await
323 }
324
325 /// Construct with an explicit IOC drift bound. Java parity (6538631):
326 /// keeps tests + sites that don't surface `EngineConfig` on the
327 /// existing default while letting the daemon plumb a configured value.
328 pub async fn new_with_drift(
329 storage: Arc<dyn StoragePlugin>,
330 registry: Arc<PvRegistry>,
331 policy: Option<PolicyConfig>,
332 server_ioc_drift_secs: u64,
333 ) -> anyhow::Result<(Self, mpsc::Receiver<PvSample>)> {
334 let ca_client = CaClient::new().await.map_err(|e| anyhow::anyhow!("{e}"))?;
335 let pva_client = PvaClient::new().map_err(|e| anyhow::anyhow!("{e}"))?;
336 let (tx, rx) = mpsc::channel(SAMPLE_CHANNEL_CAPACITY);
337
338 let mgr = Self {
339 ca_client,
340 pva_client,
341 channels: DashMap::new(),
342 pending_archives: DashMap::new(),
343 op_locks: DashMap::new(),
344 storage,
345 registry,
346 sample_tx: tx,
347 policy,
348 server_ioc_drift_secs,
349 };
350
351 Ok((mgr, rx))
352 }
353
354 /// Get-or-insert the per-PV operation mutex. The returned `Arc<Mutex>`
355 /// is what callers should `.lock().await` on; holding the entry guard
356 /// (via `entry().or_insert_with`) across the await would deadlock the
357 /// DashMap shard.
358 fn op_lock(&self, pv_name: &str) -> Arc<tokio::sync::Mutex<()>> {
359 if let Some(existing) = self.op_locks.get(pv_name) {
360 return existing.clone();
361 }
362 self.op_locks
363 .entry(pv_name.to_string())
364 .or_insert_with(|| Arc::new(tokio::sync::Mutex::new(())))
365 .clone()
366 }
367
368 /// Restore all active PVs from the registry (called on startup).
369 ///
370 /// `pvs_by_status(Active)` already filters out alias rows (they carry
371 /// `status='alias'`), but we re-check `alias_for.is_some()` here so a
372 /// future schema change can't silently re-introduce double-archiving
373 /// of the underlying IOC PV.
374 pub async fn restore_from_registry(&self) -> anyhow::Result<u64> {
375 let active_pvs = self.registry.pvs_by_status(PvStatus::Active)?;
376 let total = active_pvs.len() as u64;
377 info!(total, "Restoring PVs from registry");
378
379 let mut restored = 0u64;
380 for record in active_pvs {
381 if record.alias_for.is_some() {
382 warn!(
383 pv = record.pv_name,
384 target = record.alias_for.as_deref(),
385 "Skipping alias row in restore; aliases are routed, not archived"
386 );
387 continue;
388 }
389 if let Err(e) = self.start_archiving_internal(&record).await {
390 warn!(pv = record.pv_name, "Failed to restore PV: {e}");
391 self.registry.set_status(&record.pv_name, PvStatus::Error)?;
392 } else {
393 restored += 1;
394 }
395 }
396 metrics::gauge!("archiver_pvs_active").set(restored as f64);
397 if restored < total {
398 warn!(
399 restored,
400 failed = total - restored,
401 "Some PVs failed to restore"
402 );
403 }
404
405 Ok(restored)
406 }
407
408 /// Start archiving a new PV.
409 pub async fn archive_pv(
410 &self,
411 pv_name: &str,
412 sample_mode: &SampleMode,
413 protocol: Protocol,
414 ) -> anyhow::Result<()> {
415 // Serialise with pause/resume/stop/destroy on the same PV.
416 let lock = self.op_lock(pv_name);
417 let _g = lock.lock().await;
418
419 if self.channels.contains_key(pv_name) {
420 anyhow::bail!("PV {pv_name} is already being archived");
421 }
422
423 // Atomically claim the PV to prevent concurrent archive_pv races.
424 // The guard ensures cleanup even if this future is cancelled.
425 if self
426 .pending_archives
427 .insert(pv_name.to_string(), ())
428 .is_some()
429 {
430 anyhow::bail!("PV {pv_name} archive operation already in progress");
431 }
432 let _guard = PendingGuard {
433 map: &self.pending_archives,
434 key: pv_name.to_string(),
435 };
436
437 match protocol {
438 Protocol::Ca => self.archive_pv_inner(pv_name, sample_mode).await,
439 Protocol::Pva => self.archive_pv_inner_pva(pv_name, sample_mode).await,
440 }
441 }
442
443 /// Inner implementation of archive_pv, separated for cleanup safety.
444 async fn archive_pv_inner(
445 &self,
446 pv_name: &str,
447 sample_mode: &SampleMode,
448 ) -> anyhow::Result<()> {
449 // Re-check after acquiring the pending slot (another task may have completed).
450 if self.channels.contains_key(pv_name) {
451 anyhow::bail!("PV {pv_name} is already being archived");
452 }
453
454 // Check policy override.
455 let (effective_mode, matched_policy_name) = if let Some(ref policy) = self.policy {
456 if let Some(p) = policy.find_policy(pv_name) {
457 (p.to_sample_mode(), Some(p.policy_name().to_string()))
458 } else {
459 (sample_mode.clone(), None)
460 }
461 } else {
462 (sample_mode.clone(), None)
463 };
464
465 // Connect to discover the native type.
466 let channel = self.ca_client.create_channel(pv_name);
467 channel
468 .wait_connected(CA_CONNECT_TIMEOUT)
469 .await
470 .map_err(|e| anyhow::anyhow!("Failed to connect to {pv_name}: {e}"))?;
471
472 let info = self
473 .ca_client
474 .cainfo(pv_name)
475 .await
476 .map_err(|e| anyhow::anyhow!("Failed to get info for {pv_name}: {e}"))?;
477
478 let dbr_type = dbr_field_to_arch_type(info.native_type);
479 let element_count = info.element_count as i32;
480
481 // Register in SQLite. (CA path — PVA acquisition uses a separate
482 // entry point, so this branch is always Protocol::Ca.)
483 self.registry
484 .register_pv(pv_name, dbr_type, &effective_mode, element_count)?;
485 // Java parity (b30f1a6): persist the matched policy's stable name so
486 // audit / metrics paths know which policy governed this archive.
487 if let Some(ref name) = matched_policy_name {
488 self.registry.update_policy_name(pv_name, Some(name))?;
489 }
490
491 let record = self
492 .registry
493 .get_pv(pv_name)?
494 .ok_or_else(|| anyhow::anyhow!("PV {pv_name} not found in registry"))?;
495 self.start_archiving_internal(&record).await?;
496
497 metrics::gauge!("archiver_pvs_active").increment(1.0);
498 info!(pv = pv_name, ?dbr_type, element_count, "Started archiving");
499 Ok(())
500 }
501
502 /// PVA equivalent of [`Self::archive_pv_inner`]. Connects via the
503 /// pvAccess client, infers archive type from a one-shot pvget, then
504 /// hands off to a `monitor_loop_pva` that mirrors the CA monitor
505 /// loop's lifecycle (samples → write_loop, cancel_token shutdown).
506 async fn archive_pv_inner_pva(
507 &self,
508 pv_name: &str,
509 sample_mode: &SampleMode,
510 ) -> anyhow::Result<()> {
511 if self.channels.contains_key(pv_name) {
512 anyhow::bail!("PV {pv_name} is already being archived");
513 }
514
515 // Apply policy override (same surface as CA path).
516 let (effective_mode, matched_policy_name) = if let Some(ref policy) = self.policy {
517 if let Some(p) = policy.find_policy(pv_name) {
518 (p.to_sample_mode(), Some(p.policy_name().to_string()))
519 } else {
520 (sample_mode.clone(), None)
521 }
522 } else {
523 (sample_mode.clone(), None)
524 };
525
526 // Connect + introspect: a one-shot pvget tells us the value
527 // shape. We rely on it instead of a `cainfo` analogue because
528 // PVA channels carry their type only via fetched data.
529 let connect = tokio::time::timeout(
530 CA_CONNECT_TIMEOUT,
531 self.pva_client.pvconnect(pv_name),
532 )
533 .await
534 .map_err(|_| anyhow::anyhow!("PVA connect to {pv_name} timed out"))?
535 .map_err(|e| anyhow::anyhow!("Failed to connect to {pv_name} via PVA: {e}"))?;
536 debug!(pv = pv_name, server = %connect, "PVA channel connected");
537
538 let initial = tokio::time::timeout(CA_CONNECT_TIMEOUT, self.pva_client.pvget(pv_name))
539 .await
540 .map_err(|_| anyhow::anyhow!("PVA pvget for {pv_name} timed out"))?
541 .map_err(|e| anyhow::anyhow!("Failed to pvget {pv_name}: {e}"))?;
542 let (dbr_type, element_count) = pv_field_to_arch_db_type(&initial).ok_or_else(|| {
543 anyhow::anyhow!(
544 "PV {pv_name}: PVA value is not a scalar/scalar-array; structured types not yet supported"
545 )
546 })?;
547
548 // Register with Pva flag.
549 self.registry.register_pv_with_protocol(
550 pv_name,
551 dbr_type,
552 &effective_mode,
553 element_count,
554 Protocol::Pva,
555 )?;
556 if let Some(ref name) = matched_policy_name {
557 self.registry.update_policy_name(pv_name, Some(name))?;
558 }
559 // Persist PREC/EGU extracted from the same pvget — equivalent
560 // to the CA path's DBR_CTRL refresh. Non-fatal on error.
561 let (prec, egu) = pv_field_extract_display(&initial);
562 if (prec.is_some() || egu.is_some())
563 && let Err(e) =
564 self.registry
565 .update_metadata(pv_name, prec.as_deref(), egu.as_deref())
566 {
567 debug!(pv = pv_name, "Failed to persist PVA PREC/EGU: {e}");
568 }
569
570 let record = self
571 .registry
572 .get_pv(pv_name)?
573 .ok_or_else(|| anyhow::anyhow!("PV {pv_name} not found in registry"))?;
574 self.start_archiving_internal(&record).await?;
575
576 metrics::gauge!("archiver_pvs_active").increment(1.0);
577 info!(
578 pv = pv_name,
579 ?dbr_type,
580 element_count,
581 protocol = "pva",
582 "Started archiving"
583 );
584 Ok(())
585 }
586
587 /// Internal: start a subscription for a PV record. Routes by
588 /// `record.protocol`; the CA branch keeps the historical behaviour,
589 /// the PVA branch goes through `start_archiving_internal_pva`.
590 async fn start_archiving_internal(&self, record: &PvRecord) -> anyhow::Result<()> {
591 if record.protocol == Protocol::Pva {
592 return self.start_archiving_internal_pva(record).await;
593 }
594 let pv_name = record.pv_name.clone();
595 let dbr_type = record.dbr_type;
596 let element_count = record.element_count;
597 let channel = self.ca_client.create_channel(&pv_name);
598 let cancel_token = CancellationToken::new();
599 let conn_info = Arc::new(Mutex::new(ConnectionInfo::default()));
600 let extras: Arc<ExtraFieldsCache> = Arc::new(DashMap::new());
601 let field_tokens: Arc<DashMap<String, CancellationToken>> = Arc::new(DashMap::new());
602 let update_lock = Arc::new(tokio::sync::Mutex::new(()));
603 let counters = Arc::new(PvCounters::default());
604
605 // Hold update_lock around the whole insert+spawn block so a
606 // concurrent update_archive_fields can't observe the empty
607 // field_tokens map and spawn its own copies of the same fields
608 // before we get to the spawn loop. update_archive_fields acquires
609 // the same update_lock before mutating tokens.
610 let _guard = update_lock.lock().await;
611
612 self.channels.insert(
613 pv_name.clone(),
614 PvHandle {
615 channel: Some(channel.clone()),
616 cancel_token: cancel_token.clone(),
617 dbr_type,
618 conn_info: conn_info.clone(),
619 extras: extras.clone(),
620 field_tokens: field_tokens.clone(),
621 update_lock: update_lock.clone(),
622 counters: counters.clone(),
623 },
624 );
625
626 // Start one monitor task per archive_field with a child cancel token,
627 // tracked so update_archive_fields can stop individual fields.
628 for field in &record.archive_fields {
629 let child = cancel_token.child_token();
630 field_tokens.insert(field.clone(), child.clone());
631 spawn_extra_field_monitor(
632 &self.ca_client,
633 &pv_name,
634 field,
635 extras.clone(),
636 child,
637 counters.clone(),
638 );
639 }
640 metrics::gauge!("archiver_extra_field_tasks").increment(record.archive_fields.len() as f64);
641 drop(_guard);
642
643 let tx = self.sample_tx.clone();
644 let token = cancel_token.clone();
645 let ci = conn_info.clone();
646 let extras_for_loop = extras.clone();
647 let counters_for_loop = counters.clone();
648 let registry_for_loop = self.registry.clone();
649
650 let drift = self.server_ioc_drift_secs;
651 match &record.sample_mode {
652 SampleMode::Monitor => {
653 tokio::spawn(async move {
654 monitor_loop(
655 pv_name,
656 dbr_type,
657 element_count,
658 channel,
659 tx,
660 token,
661 ci,
662 registry_for_loop,
663 extras_for_loop,
664 counters_for_loop,
665 drift,
666 )
667 .await;
668 });
669 }
670 SampleMode::Scan { period_secs } => {
671 let period = *period_secs;
672 tokio::spawn(async move {
673 scan_loop(
674 pv_name,
675 dbr_type,
676 element_count,
677 channel,
678 tx,
679 token,
680 period,
681 ci,
682 registry_for_loop,
683 extras_for_loop,
684 counters_for_loop,
685 )
686 .await;
687 });
688 }
689 }
690
691 Ok(())
692 }
693
694 /// PVA equivalent of `start_archiving_internal`. Spawns a single
695 /// callback-driven monitor task; pvAccess auto-reconnect inside
696 /// `pvmonitor_handle` removes the explicit reconnect loop the CA
697 /// path needs. PVA records use `channel: None` on the [`PvHandle`]
698 /// and skip per-field "extras" subscriptions (`.HIHI`/`.LOLO`/…)
699 /// since pvAccess wraps those in the NTScalar value structure
700 /// rather than separate channels.
701 async fn start_archiving_internal_pva(&self, record: &PvRecord) -> anyhow::Result<()> {
702 let pv_name = record.pv_name.clone();
703 let dbr_type = record.dbr_type;
704 let element_count = record.element_count;
705 let cancel_token = CancellationToken::new();
706 let conn_info = Arc::new(Mutex::new(ConnectionInfo::default()));
707 let extras: Arc<ExtraFieldsCache> = Arc::new(DashMap::new());
708 let field_tokens: Arc<DashMap<String, CancellationToken>> = Arc::new(DashMap::new());
709 let update_lock = Arc::new(tokio::sync::Mutex::new(()));
710 let counters = Arc::new(PvCounters::default());
711
712 self.channels.insert(
713 pv_name.clone(),
714 PvHandle {
715 channel: None,
716 cancel_token: cancel_token.clone(),
717 dbr_type,
718 conn_info: conn_info.clone(),
719 extras: extras.clone(),
720 field_tokens: field_tokens.clone(),
721 update_lock,
722 counters: counters.clone(),
723 },
724 );
725
726 let tx = self.sample_tx.clone();
727 let pva_client = self.pva_client.clone();
728 let token = cancel_token.clone();
729 let ci = conn_info.clone();
730 let counters_for_loop = counters.clone();
731 let drift = self.server_ioc_drift_secs;
732
733 match &record.sample_mode {
734 SampleMode::Monitor => {
735 let pv_name_loop = pv_name.clone();
736 let archive_fields_loop = record.archive_fields.clone();
737 let extras_for_loop = extras.clone();
738 tokio::spawn(async move {
739 monitor_loop_pva(
740 pv_name_loop,
741 dbr_type,
742 element_count,
743 pva_client,
744 tx,
745 token,
746 ci,
747 counters_for_loop,
748 drift,
749 archive_fields_loop,
750 extras_for_loop,
751 )
752 .await;
753 });
754 }
755 SampleMode::Scan { period_secs } => {
756 let period = *period_secs;
757 let pv_name_loop = pv_name.clone();
758 let archive_fields_loop = record.archive_fields.clone();
759 let extras_for_loop = extras.clone();
760 tokio::spawn(async move {
761 scan_loop_pva(
762 pv_name_loop,
763 dbr_type,
764 pva_client,
765 tx,
766 token,
767 period,
768 ci,
769 counters_for_loop,
770 drift,
771 archive_fields_loop,
772 extras_for_loop,
773 )
774 .await;
775 });
776 }
777 }
778
779 // Auxiliary periodic refreshers: keep PREC/EGU current after
780 // IOC restarts, and approximate disconnect events from a long
781 // sample gap. Both bind to the same cancel_token so a stop /
782 // delete reaps every PVA-side task in one go.
783 {
784 let pv_name = pv_name.clone();
785 let pva_client = self.pva_client.clone();
786 let registry = self.registry.clone();
787 let cancel = cancel_token.clone();
788 let counters_for_refresh = counters.clone();
789 tokio::spawn(async move {
790 pva_metadata_refresh_loop(
791 pv_name,
792 pva_client,
793 registry,
794 cancel,
795 counters_for_refresh,
796 )
797 .await;
798 });
799 }
800 {
801 let pv_name = pv_name.clone();
802 let conn_info = conn_info.clone();
803 let counters_for_watch = counters.clone();
804 let cancel = cancel_token.clone();
805 let sample_mode = record.sample_mode.clone();
806 tokio::spawn(async move {
807 pva_state_watchdog(
808 pv_name,
809 conn_info,
810 counters_for_watch,
811 cancel,
812 sample_mode,
813 )
814 .await;
815 });
816 }
817
818 Ok(())
819 }
820
821 /// Replace the archive_fields list for a running PV. Cancels per-field
822 /// monitor tasks for fields that left the set, spawns fresh ones for
823 /// fields that joined, and leaves unchanged fields running. Serialised
824 /// per-PV by an async mutex so concurrent callers can't double-spawn.
825 /// The main PV keeps running.
826 pub async fn update_archive_fields(
827 &self,
828 pv_name: &str,
829 fields: &[String],
830 ) -> anyhow::Result<()> {
831 // Persist first so a restart sees the new set even if the engine
832 // half of the update fails partway through.
833 self.registry.update_archive_fields(pv_name, fields)?;
834
835 // If the PV isn't currently active there's nothing more to do —
836 // start_archiving_internal will pick up the new fields on resume.
837 let (parent_token, extras, field_tokens, update_lock, counters) = {
838 let Some(handle) = self.channels.get(pv_name) else {
839 return Ok(());
840 };
841 (
842 handle.cancel_token.clone(),
843 handle.extras.clone(),
844 handle.field_tokens.clone(),
845 handle.update_lock.clone(),
846 handle.counters.clone(),
847 )
848 };
849
850 // Serialise so two concurrent updates can't both decide the same
851 // field is missing and spawn it twice.
852 let _guard = update_lock.lock().await;
853
854 let wanted: std::collections::HashSet<&str> = fields.iter().map(|s| s.as_str()).collect();
855
856 // Cancel + drop tasks for fields that left the set. Removing the
857 // entry from `field_tokens` also drops our handle on the child
858 // token; the spawned task observes `cancelled()` and exits.
859 let to_remove: Vec<String> = field_tokens
860 .iter()
861 .filter(|e| !wanted.contains(e.key().as_str()))
862 .map(|e| e.key().clone())
863 .collect();
864 let removed_count = to_remove.len();
865 for key in to_remove {
866 if let Some((_, token)) = field_tokens.remove(&key) {
867 token.cancel();
868 }
869 extras.remove(&key);
870 }
871
872 // Spawn tasks for fields newly added. Existing fields keep their
873 // task and their last cached value.
874 let mut added_count = 0usize;
875 for f in fields {
876 if !field_tokens.contains_key(f) {
877 let child = parent_token.child_token();
878 field_tokens.insert(f.clone(), child.clone());
879 spawn_extra_field_monitor(
880 &self.ca_client,
881 pv_name,
882 f,
883 extras.clone(),
884 child,
885 counters.clone(),
886 );
887 added_count += 1;
888 }
889 }
890 let net = added_count as i64 - removed_count as i64;
891 if net != 0 {
892 metrics::gauge!("archiver_extra_field_tasks").increment(net as f64);
893 }
894 Ok(())
895 }
896
897 /// Pause archiving for a PV.
898 pub async fn pause_pv(&self, pv_name: &str) -> anyhow::Result<()> {
899 let lock = self.op_lock(pv_name);
900 let _g = lock.lock().await;
901 if let Some((_key, handle)) = self.channels.remove(pv_name) {
902 let extra_count = handle.field_tokens.len() as f64;
903 handle.cancel_token.cancel();
904 metrics::gauge!("archiver_pvs_active").decrement(1.0);
905 if extra_count > 0.0 {
906 metrics::gauge!("archiver_extra_field_tasks").decrement(extra_count);
907 }
908 }
909 self.registry.set_status(pv_name, PvStatus::Paused)?;
910 info!(pv = pv_name, "Paused archiving");
911 Ok(())
912 }
913
914 /// Resume a paused PV. Only paused or error PVs can be resumed;
915 /// calling resume on an already-active PV is a no-op (returns Ok).
916 pub async fn resume_pv(&self, pv_name: &str) -> anyhow::Result<()> {
917 let lock = self.op_lock(pv_name);
918 let _g = lock.lock().await;
919
920 let record = self
921 .registry
922 .get_pv(pv_name)?
923 .ok_or_else(|| anyhow::anyhow!("PV {pv_name} not found in registry"))?;
924
925 // Guard: if already active with a live task, nothing to do.
926 if record.status == PvStatus::Active && self.channels.contains_key(pv_name) {
927 info!(
928 pv = pv_name,
929 "PV is already actively archived, skipping resume"
930 );
931 return Ok(());
932 }
933
934 // Cancel any orphaned task to prevent duplicate subscriptions.
935 if let Some((_key, handle)) = self.channels.remove(pv_name) {
936 let extra_count = handle.field_tokens.len() as f64;
937 handle.cancel_token.cancel();
938 if extra_count > 0.0 {
939 metrics::gauge!("archiver_extra_field_tasks").decrement(extra_count);
940 }
941 }
942
943 // Start the task first; only mark Active if it succeeds.
944 self.start_archiving_internal(&record).await?;
945 self.registry.set_status(pv_name, PvStatus::Active)?;
946 metrics::gauge!("archiver_pvs_active").increment(1.0);
947 info!(pv = pv_name, "Resumed archiving");
948 Ok(())
949 }
950
951 /// Stop archiving a PV without removing it from the registry.
952 /// Sets the PV status to Inactive (data retained, monitoring stopped).
953 pub async fn stop_pv(&self, pv_name: &str) -> anyhow::Result<()> {
954 let lock = self.op_lock(pv_name);
955 let _g = lock.lock().await;
956 if let Some((_key, handle)) = self.channels.remove(pv_name) {
957 let extra_count = handle.field_tokens.len() as f64;
958 handle.cancel_token.cancel();
959 metrics::gauge!("archiver_pvs_active").decrement(1.0);
960 if extra_count > 0.0 {
961 metrics::gauge!("archiver_extra_field_tasks").decrement(extra_count);
962 }
963 }
964 self.registry.set_status(pv_name, PvStatus::Inactive)?;
965 info!(pv = pv_name, "Stopped archiving (inactive)");
966 Ok(())
967 }
968
969 /// Remove a PV from archiving entirely.
970 pub async fn destroy_pv(&self, pv_name: &str) -> anyhow::Result<()> {
971 let lock = self.op_lock(pv_name);
972 let _g = lock.lock().await;
973 if let Some((_key, handle)) = self.channels.remove(pv_name) {
974 let extra_count = handle.field_tokens.len() as f64;
975 handle.cancel_token.cancel();
976 metrics::gauge!("archiver_pvs_active").decrement(1.0);
977 if extra_count > 0.0 {
978 metrics::gauge!("archiver_extra_field_tasks").decrement(extra_count);
979 }
980 }
981 self.registry.remove_pv(pv_name)?;
982 // Don't remove the op_locks entry here: a concurrent caller may have
983 // already taken a clone of this Arc and be queued on it; removing
984 // would let a fresh caller obtain a new mutex and the queued one
985 // would race them. The map is bounded by the lifetime universe of
986 // PV names, which is acceptable.
987 info!(pv = pv_name, "Destroyed archiving channel");
988 Ok(())
989 }
990
991 /// List all currently archived PV names (from registry).
992 pub fn list_pvs(&self) -> Vec<String> {
993 self.registry.all_pv_names().unwrap_or_else(|e| {
994 warn!("Failed to list PVs: {e}");
995 Vec::new()
996 })
997 }
998
999 /// Match PVs by glob pattern (from registry).
1000 pub fn matching_pvs(&self, pattern: &str) -> Vec<String> {
1001 self.registry.matching_pvs(pattern).unwrap_or_else(|e| {
1002 warn!("Failed to match PVs: {e}");
1003 Vec::new()
1004 })
1005 }
1006
1007 /// Get the registry reference.
1008 pub fn registry(&self) -> &Arc<PvRegistry> {
1009 &self.registry
1010 }
1011
1012 /// Get connection info for a PV.
1013 pub fn get_connection_info(&self, pv: &str) -> Option<ConnectionInfo> {
1014 self.channels.get(pv).map(|h| {
1015 h.conn_info
1016 .lock()
1017 .unwrap_or_else(|e| e.into_inner())
1018 .clone()
1019 })
1020 }
1021
1022 /// Get PV names that have never received any event (connected_since == None).
1023 pub fn get_never_connected_pvs(&self) -> Vec<String> {
1024 self.channels
1025 .iter()
1026 .filter(|entry| {
1027 let ci = entry
1028 .value()
1029 .conn_info
1030 .lock()
1031 .unwrap_or_else(|e| e.into_inner());
1032 ci.connected_since.is_none()
1033 })
1034 .map(|entry| entry.key().clone())
1035 .collect()
1036 }
1037
1038 /// Snapshot the diagnostic counters for one PV. Returns None if the
1039 /// PV isn't actively archived. The returned Arc is the live counter
1040 /// — callers read with `Ordering::Relaxed`.
1041 pub fn pv_counters(&self, pv_name: &str) -> Option<Arc<PvCounters>> {
1042 self.channels.get(pv_name).map(|h| h.counters.clone())
1043 }
1044
1045 /// Snapshot every active PV's counters. Returns `(pv_name,
1046 /// PvCountersSnapshot)` so callers don't have to handle Arc.
1047 pub fn all_pv_counters(&self) -> Vec<(String, PvCountersSnapshot)> {
1048 self.channels
1049 .iter()
1050 .map(|e| {
1051 (
1052 e.key().clone(),
1053 PvCountersSnapshot::from(&*e.value().counters),
1054 )
1055 })
1056 .collect()
1057 }
1058
1059 /// One-shot CA `get` against the running channel for `pv`. Returns
1060 /// `None` if the PV isn't actively archived. The timeout caps how
1061 /// long the caller will wait for a value when the IOC is slow.
1062 /// Powers `getEngineDataAction` / `getDataAtTimeEngine`.
1063 pub async fn live_value(
1064 &self,
1065 pv_name: &str,
1066 timeout: Duration,
1067 ) -> Option<anyhow::Result<ArchiverValue>> {
1068 let channel_opt = self.channels.get(pv_name)?.channel.clone();
1069 let Some(channel) = channel_opt else {
1070 // PVA-acquired PV — live_value via pva_client.pvget.
1071 let pva = self.pva_client.clone();
1072 let name = pv_name.to_string();
1073 let res = tokio::time::timeout(timeout, pva.pvget(&name)).await;
1074 return Some(match res {
1075 Ok(Ok(field)) => pv_field_scalar_to_archiver(&field)
1076 .ok_or_else(|| anyhow::anyhow!("PVA value not a scalar")),
1077 Ok(Err(e)) => Err(anyhow::anyhow!("PVA get failed: {e}")),
1078 Err(_) => Err(anyhow::anyhow!("PVA get timed out after {timeout:?}")),
1079 });
1080 };
1081 // Wait briefly for connection — channel.get on a disconnected
1082 // channel would otherwise return an error from deep in the CA
1083 // stack. Capped by the same timeout the caller chose.
1084 if channel.wait_connected(timeout).await.is_err() {
1085 return Some(Err(anyhow::anyhow!(
1086 "channel not connected within {timeout:?}"
1087 )));
1088 }
1089 match tokio::time::timeout(timeout, channel.get()).await {
1090 Ok(Ok((_dbr_type, val))) => Some(Ok(epics_value_to_archiver(&val))),
1091 Ok(Err(e)) => Some(Err(anyhow::anyhow!("CA get failed: {e}"))),
1092 Err(_) => Some(Err(anyhow::anyhow!("CA get timed out after {timeout:?}"))),
1093 }
1094 }
1095
1096 /// Snapshot the latest cached extra-field values for `pv` —
1097 /// `(field_name, stringified_value)` pairs. Empty map when the PV
1098 /// isn't archived or has no archive_fields configured.
1099 pub fn extras_snapshot(&self, pv_name: &str) -> std::collections::HashMap<String, String> {
1100 match self.channels.get(pv_name) {
1101 Some(handle) => handle
1102 .extras
1103 .iter()
1104 .map(|e| (e.key().clone(), e.value().clone()))
1105 .collect(),
1106 None => std::collections::HashMap::new(),
1107 }
1108 }
1109
1110 /// Get PV names that are currently disconnected (is_connected == false).
1111 pub fn get_currently_disconnected_pvs(&self) -> Vec<String> {
1112 self.channels
1113 .iter()
1114 .filter(|entry| {
1115 let ci = entry
1116 .value()
1117 .conn_info
1118 .lock()
1119 .unwrap_or_else(|e| e.into_inner());
1120 !ci.is_connected
1121 })
1122 .map(|entry| entry.key().clone())
1123 .collect()
1124 }
1125}
1126
1127/// Read DBR_CTRL metadata from the channel and persist `precision`/`units`
1128/// to the registry when they differ from stored values. Best-effort: any
1129/// failure (timeout, transport error, missing display info, non-numeric
1130/// type) is logged at debug and silently ignored. Skips the SQL write
1131/// entirely when the in-memory record already matches, so reconnect
1132/// storms don't churn the database.
1133async fn refresh_ctrl_metadata(
1134 channel: &CaChannel,
1135 registry: &PvRegistry,
1136 pv_name: &str,
1137 counters: &PvCounters,
1138) {
1139 // 15s — long enough for slow-network IOCs (Java's default `epicsTimeout`
1140 // is 30s; 15s splits the difference). Failures count toward
1141 // `metadata_fetch_failures` so operators can spot PVs that never get
1142 // PREC/EGU populated.
1143 const FETCH_TIMEOUT: Duration = Duration::from_secs(15);
1144 let snapshot = match tokio::time::timeout(
1145 FETCH_TIMEOUT,
1146 channel.get_with_metadata(DbrClass::Ctrl),
1147 )
1148 .await
1149 {
1150 Ok(Ok(s)) => s,
1151 Ok(Err(e)) => {
1152 counters
1153 .metadata_fetch_failures
1154 .fetch_add(1, Ordering::Relaxed);
1155 debug!(pv = pv_name, "Ctrl metadata fetch failed: {e}");
1156 return;
1157 }
1158 Err(_) => {
1159 counters
1160 .metadata_fetch_failures
1161 .fetch_add(1, Ordering::Relaxed);
1162 debug!(pv = pv_name, "Ctrl metadata fetch timed out");
1163 return;
1164 }
1165 };
1166 let Some(display) = snapshot.display else {
1167 // Non-numeric type (string, enum, …) — DBR_CTRL has no
1168 // DisplayInfo. Not a failure; just nothing to persist.
1169 return;
1170 };
1171
1172 // Negative precision is the Java EAA "no precision info" sentinel —
1173 // persisting "-1" as the PREC string would surface as a literal -1
1174 // in the UI / API. Treat it the same as missing.
1175 let new_prec_opt: Option<String> = if display.precision < 0 {
1176 None
1177 } else {
1178 Some(display.precision.to_string())
1179 };
1180 let new_egu_trimmed = display.units.trim();
1181 let new_egu_opt: Option<&str> = if new_egu_trimmed.is_empty() {
1182 None
1183 } else {
1184 Some(new_egu_trimmed)
1185 };
1186
1187 let stored = match registry.get_pv(pv_name) {
1188 Ok(Some(r)) => r,
1189 Ok(None) => return,
1190 Err(e) => {
1191 debug!(pv = pv_name, "Registry read for metadata compare failed: {e}");
1192 return;
1193 }
1194 };
1195
1196 let prec_changed = match (stored.prec.as_deref(), new_prec_opt.as_deref()) {
1197 (Some(s), Some(n)) => s != n,
1198 (None, Some(_)) => true,
1199 // Don't overwrite a populated PREC with the "no info" sentinel.
1200 _ => false,
1201 };
1202 let egu_changed = match (stored.egu.as_deref(), new_egu_opt) {
1203 (Some(s), Some(n)) => s != n,
1204 (None, Some(_)) => true,
1205 // Don't overwrite a populated EGU with empty, and don't
1206 // bother writing None over None.
1207 _ => false,
1208 };
1209 if !prec_changed && !egu_changed {
1210 return;
1211 }
1212
1213 let prec_arg = if prec_changed {
1214 new_prec_opt.as_deref()
1215 } else {
1216 None
1217 };
1218 let egu_arg = if egu_changed { new_egu_opt } else { None };
1219 if let Err(e) = registry.update_metadata(pv_name, prec_arg, egu_arg) {
1220 warn!(pv = pv_name, "Failed to persist PREC/EGU: {e}");
1221 } else {
1222 debug!(
1223 pv = pv_name,
1224 prec = ?prec_arg, egu = ?egu_arg,
1225 "Refreshed PREC/EGU from DBR_CTRL"
1226 );
1227 }
1228}
1229
1230/// Body shared by the two PVA monitor callback variants
1231/// (`pvmonitor_handle` takes `(FieldDesc, PvField)`,
1232/// `pvmonitor_with_request` takes `(PvField)`). Lifted out so we
1233/// can write the once-per-event logic in one place.
1234#[allow(clippy::too_many_arguments)]
1235fn pva_handle_event(
1236 field: &PvField,
1237 pv_name: &str,
1238 dbr_type: ArchDbType,
1239 tx: &mpsc::Sender<PvSample>,
1240 conn_info: &Mutex<ConnectionInfo>,
1241 counters: &Arc<PvCounters>,
1242 extras: &ExtraFieldsCache,
1243 extras_paths: &[(String, &'static str)],
1244 drift_secs: u64,
1245) {
1246 let now = SystemTime::now();
1247 let first_after_connect = {
1248 let mut ci = conn_info.lock().unwrap_or_else(|e| e.into_inner());
1249 let first = ci.last_event_time.is_none();
1250 if ci.connected_since.is_none() {
1251 ci.connected_since = Some(now);
1252 }
1253 ci.is_connected = true;
1254 ci.last_event_time = Some(now);
1255 ci.state = PvConnectionState::Connected;
1256 first
1257 };
1258 if first_after_connect {
1259 counters
1260 .first_event_unix_secs
1261 .compare_exchange(0, unix_secs(now), Ordering::Relaxed, Ordering::Relaxed)
1262 .ok();
1263 }
1264 counters.events_received.fetch_add(1, Ordering::Relaxed);
1265
1266 let Some(value) = pv_field_scalar_to_archiver(field) else {
1267 counters.type_change_drops.fetch_add(1, Ordering::Relaxed);
1268 debug!(pv = pv_name, "PVA event has non-scalar value; dropping");
1269 return;
1270 };
1271
1272 let ts = pv_field_extract_timestamp(field);
1273 if !first_after_connect && !ioc_timestamp_in_window(ts, now, drift_secs) {
1274 counters.timestamp_drops.fetch_add(1, Ordering::Relaxed);
1275 debug!(pv = pv_name, ?ts, "Dropping PVA sample with out-of-window timestamp");
1276 return;
1277 }
1278 let elem_count = match pv_field_element_count(field) {
1279 0 => 1,
1280 n => n,
1281 };
1282 for (field_name, path) in extras_paths {
1283 if let Some(PvField::Scalar(s)) = pv_field_walk_path(field, path) {
1284 extras.insert(field_name.clone(), scalar_value_to_string(s));
1285 }
1286 }
1287 let mut sample = ArchiverSample::new(ts, value);
1288 attach_extras(extras, &mut sample);
1289 let pv_sample = PvSample {
1290 pv_name: pv_name.to_string(),
1291 dbr_type,
1292 sample,
1293 element_count: Some(elem_count),
1294 counters: Some(counters.clone()),
1295 };
1296 if let Err(tokio::sync::mpsc::error::TrySendError::Full(_)) = tx.try_send(pv_sample) {
1297 counters.buffer_overflow_drops.fetch_add(1, Ordering::Relaxed);
1298 }
1299}
1300
1301/// PVA monitor loop: subscribes once and parks until cancellation.
1302/// Each fan-in event is decoded inline, packaged as a [`PvSample`],
1303/// and non-blocking-pushed into the storage write_loop's channel —
1304/// blocking would stall the pvAccess reactor thread.
1305///
1306/// When `archive_fields` is non-empty, the subscription requests an
1307/// explicit pvRequest with the mapped sub-field paths so the IOC
1308/// includes them in every monitor event; the callback then mirrors
1309/// the CA path's "extras" cache by stringifying each requested field.
1310#[allow(clippy::too_many_arguments)]
1311async fn monitor_loop_pva(
1312 pv_name: String,
1313 dbr_type: ArchDbType,
1314 element_count: i32,
1315 pva_client: PvaClient,
1316 tx: mpsc::Sender<PvSample>,
1317 cancel_token: CancellationToken,
1318 conn_info: Arc<Mutex<ConnectionInfo>>,
1319 counters: Arc<PvCounters>,
1320 server_ioc_drift_secs: u64,
1321 archive_fields: Vec<String>,
1322 extras: Arc<ExtraFieldsCache>,
1323) {
1324 let drift_secs = server_ioc_drift_secs;
1325 // Static element_count is unused — each callback derives the
1326 // current array length from the live PvField, so an NTScalarArray
1327 // whose size changes between events still gets tagged correctly.
1328 let _ = element_count;
1329
1330 // Build the (CA name, PVA path) pairs once. Skip CA fields with
1331 // no PVA equivalent so a misconfigured archive_fields list
1332 // doesn't poison the pvRequest.
1333 let extras_paths: Vec<(String, &'static str)> = archive_fields
1334 .iter()
1335 .filter_map(|f| ca_archive_field_to_pva_path(f).map(|p| (f.clone(), p)))
1336 .collect();
1337 let request_expr = if extras_paths.is_empty() {
1338 None
1339 } else {
1340 let mut builder = epics_rs::pva::pv_request::PvRequestBuilder::new()
1341 .field("value")
1342 .field("alarm")
1343 .field("timeStamp");
1344 for (_, path) in &extras_paths {
1345 builder = builder.field(*path);
1346 }
1347 Some(builder.build())
1348 };
1349
1350 // Subscribe loop with retry. The custom-request path uses
1351 // `pvmonitor_with_request` (no SubscriptionHandle) inside a
1352 // tokio::select! so cancellation drops the future cleanly; the
1353 // empty-request path keeps the original SubscriptionHandle form
1354 // since that's the simpler path for the common case.
1355 loop {
1356 if let Some(ref req) = request_expr {
1357 // Custom-request path: 1-arg callback. pvmonitor_with_request
1358 // returns a future that runs to completion; race it against
1359 // the cancel signal.
1360 let pv_name_cb = pv_name.clone();
1361 let tx_cb = tx.clone();
1362 let conn_info_cb = conn_info.clone();
1363 let counters_cb = counters.clone();
1364 let extras_cb = extras.clone();
1365 let extras_paths_cb = extras_paths.clone();
1366 let cb = move |field: &PvField| {
1367 pva_handle_event(
1368 field,
1369 &pv_name_cb,
1370 dbr_type,
1371 &tx_cb,
1372 &conn_info_cb,
1373 &counters_cb,
1374 &extras_cb,
1375 &extras_paths_cb,
1376 drift_secs,
1377 );
1378 };
1379 tokio::select! {
1380 _ = cancel_token.cancelled() => {
1381 debug!(pv = pv_name, "PVA monitor (custom request) cancelled");
1382 return;
1383 }
1384 res = pva_client.pvmonitor_with_request(&pv_name, req, cb) => {
1385 match res {
1386 Ok(()) => {
1387 debug!(pv = pv_name, "PVA pvmonitor_with_request returned Ok; resubscribing");
1388 }
1389 Err(e) => {
1390 counters
1391 .transient_error_count
1392 .fetch_add(1, Ordering::Relaxed);
1393 warn!(pv = pv_name, "PVA pvmonitor_with_request failed: {e}; retrying");
1394 }
1395 }
1396 tokio::select! {
1397 _ = cancel_token.cancelled() => return,
1398 _ = tokio::time::sleep(CA_RETRY_DELAY) => continue,
1399 }
1400 }
1401 }
1402 } else {
1403 // Empty-request fast path: 2-arg callback. Keep the
1404 // SubscriptionHandle so the inner reactor task lives
1405 // until our explicit drop.
1406 let pv_name_cb = pv_name.clone();
1407 let tx_cb = tx.clone();
1408 let conn_info_cb = conn_info.clone();
1409 let counters_cb = counters.clone();
1410 let extras_cb = extras.clone();
1411 let extras_paths_cb = extras_paths.clone();
1412 let cb = move |_desc: &epics_rs::pva::pvdata::FieldDesc, field: &PvField| {
1413 pva_handle_event(
1414 field,
1415 &pv_name_cb,
1416 dbr_type,
1417 &tx_cb,
1418 &conn_info_cb,
1419 &counters_cb,
1420 &extras_cb,
1421 &extras_paths_cb,
1422 drift_secs,
1423 );
1424 };
1425 let handle = match pva_client.pvmonitor_handle(&pv_name, cb).await {
1426 Ok(h) => h,
1427 Err(e) => {
1428 counters
1429 .transient_error_count
1430 .fetch_add(1, Ordering::Relaxed);
1431 warn!(pv = pv_name, "PVA pvmonitor failed: {e}; retrying");
1432 tokio::select! {
1433 _ = cancel_token.cancelled() => return,
1434 _ = tokio::time::sleep(CA_RETRY_DELAY) => continue,
1435 }
1436 }
1437 };
1438 debug!(pv = pv_name, "PVA monitor active");
1439 cancel_token.cancelled().await;
1440 debug!(pv = pv_name, "PVA monitor cancelled; dropping subscription");
1441 drop(handle);
1442 return;
1443 }
1444 }
1445}
1446
1447/// PVA Scan loop: periodic `pvget` instead of streaming monitor.
1448/// Mirrors the CA scan_loop's per-tick connection check + sample emit.
1449#[allow(clippy::too_many_arguments)]
1450async fn scan_loop_pva(
1451 pv_name: String,
1452 dbr_type: ArchDbType,
1453 pva_client: PvaClient,
1454 tx: mpsc::Sender<PvSample>,
1455 cancel_token: CancellationToken,
1456 period_secs: f64,
1457 conn_info: Arc<Mutex<ConnectionInfo>>,
1458 counters: Arc<PvCounters>,
1459 server_ioc_drift_secs: u64,
1460 archive_fields: Vec<String>,
1461 extras: Arc<ExtraFieldsCache>,
1462) {
1463 let extras_paths: Vec<(String, &'static str)> = archive_fields
1464 .iter()
1465 .filter_map(|f| ca_archive_field_to_pva_path(f).map(|p| (f.clone(), p)))
1466 .collect();
1467 let period = Duration::from_secs_f64(period_secs);
1468 let mut interval = tokio::time::interval(period);
1469 let drift_secs = server_ioc_drift_secs;
1470 // Hard timeout per pvget — at most one tick of slack so a stuck
1471 // PVA server can't accumulate pending requests.
1472 let pvget_timeout = period.max(Duration::from_secs(5));
1473
1474 loop {
1475 tokio::select! {
1476 _ = cancel_token.cancelled() => return,
1477 _ = interval.tick() => {}
1478 }
1479
1480 let res = tokio::time::timeout(pvget_timeout, pva_client.pvget(&pv_name)).await;
1481 let field = match res {
1482 Ok(Ok(f)) => f,
1483 Ok(Err(e)) => {
1484 let was_connected = {
1485 let mut ci = conn_info.lock().unwrap_or_else(|e| e.into_inner());
1486 let prev = ci.is_connected;
1487 ci.is_connected = false;
1488 ci.last_event_time = None;
1489 ci.state = match ci.state {
1490 PvConnectionState::Connected => PvConnectionState::Disconnected,
1491 PvConnectionState::Disconnected => PvConnectionState::Disconnected,
1492 _ => PvConnectionState::Connecting,
1493 };
1494 prev
1495 };
1496 if was_connected {
1497 counters.disconnect_count.fetch_add(1, Ordering::Relaxed);
1498 counters
1499 .last_disconnect_unix_secs
1500 .store(unix_secs(SystemTime::now()), Ordering::Relaxed);
1501 }
1502 counters
1503 .transient_error_count
1504 .fetch_add(1, Ordering::Relaxed);
1505 debug!(pv = pv_name, "PVA scan pvget failed: {e}");
1506 continue;
1507 }
1508 Err(_) => {
1509 counters
1510 .transient_error_count
1511 .fetch_add(1, Ordering::Relaxed);
1512 debug!(pv = pv_name, "PVA scan pvget timed out");
1513 continue;
1514 }
1515 };
1516
1517 let now = SystemTime::now();
1518 let first_after_connect = {
1519 let mut ci = conn_info.lock().unwrap_or_else(|e| e.into_inner());
1520 let first = ci.last_event_time.is_none();
1521 if ci.connected_since.is_none() {
1522 ci.connected_since = Some(now);
1523 }
1524 ci.is_connected = true;
1525 ci.last_event_time = Some(now);
1526 ci.state = PvConnectionState::Connected;
1527 first
1528 };
1529 counters.events_received.fetch_add(1, Ordering::Relaxed);
1530 if first_after_connect {
1531 counters
1532 .first_event_unix_secs
1533 .compare_exchange(0, unix_secs(now), Ordering::Relaxed, Ordering::Relaxed)
1534 .ok();
1535 }
1536
1537 let Some(value) = pv_field_scalar_to_archiver(&field) else {
1538 counters.type_change_drops.fetch_add(1, Ordering::Relaxed);
1539 debug!(pv = pv_name, "PVA scan returned non-scalar value; dropping");
1540 continue;
1541 };
1542 // Scan mode: timestamp the sample at receive time (no IOC
1543 // timestamp available reliably for periodic pvget, mirroring
1544 // CA scan_loop's `now` policy).
1545 let _ = drift_secs; // referenced for symmetry; not used here
1546 let elem_count = match pv_field_element_count(&field) {
1547 0 => 1,
1548 n => n,
1549 };
1550 // Refresh extras from this scan's pvget result (the pvget
1551 // returns the full NTScalar so all sub-fields are available
1552 // for free — no extra channel spawn needed).
1553 for (field_name, path) in &extras_paths {
1554 if let Some(PvField::Scalar(s)) = pv_field_walk_path(&field, path) {
1555 extras.insert(field_name.clone(), scalar_value_to_string(s));
1556 }
1557 }
1558 let mut sample = ArchiverSample::new(now, value);
1559 attach_extras(&extras, &mut sample);
1560 let pv_sample = PvSample {
1561 pv_name: pv_name.clone(),
1562 dbr_type,
1563 sample,
1564 element_count: Some(elem_count),
1565 counters: Some(counters.clone()),
1566 };
1567 if let Err(rejected) =
1568 try_send_with_overflow_count(&tx, pv_sample, &counters).await
1569 {
1570 // Channel closed (write_loop down). Cooperative shutdown.
1571 let _ = rejected;
1572 return;
1573 }
1574 }
1575}
1576
1577/// Periodic refresh task for PVA-acquired PVs: re-fetches DBR_CTRL-
1578/// equivalent metadata (`display.units`, `display.precision`) every
1579/// few minutes and persists when changed. PVA's monitor callback API
1580/// doesn't surface explicit reconnect events the way CA's
1581/// `ConnectionEvent` does, so a dedicated periodic poll is the
1582/// simplest way to keep PREC/EGU fresh after IOC restarts.
1583async fn pva_metadata_refresh_loop(
1584 pv_name: String,
1585 pva_client: PvaClient,
1586 registry: Arc<PvRegistry>,
1587 cancel_token: CancellationToken,
1588 counters: Arc<PvCounters>,
1589) {
1590 // 5 minutes — operators almost never change PREC/EGU at runtime;
1591 // shorter cadence wastes registry/network and longer leaves UI
1592 // stale across IOC restarts. Same magic number Java EAA uses
1593 // for its `metaFieldsRefresh` cron.
1594 const REFRESH_INTERVAL: Duration = Duration::from_secs(5 * 60);
1595 const FETCH_TIMEOUT: Duration = Duration::from_secs(15);
1596 let mut tick = tokio::time::interval(REFRESH_INTERVAL);
1597 // Skip the immediate first tick — initial pvget already populated
1598 // PREC/EGU at archive_pv time.
1599 tick.tick().await;
1600 loop {
1601 tokio::select! {
1602 _ = cancel_token.cancelled() => return,
1603 _ = tick.tick() => {}
1604 }
1605
1606 let res = tokio::time::timeout(FETCH_TIMEOUT, pva_client.pvget(&pv_name)).await;
1607 let field = match res {
1608 Ok(Ok(f)) => f,
1609 _ => {
1610 counters
1611 .metadata_fetch_failures
1612 .fetch_add(1, Ordering::Relaxed);
1613 continue;
1614 }
1615 };
1616 let (new_prec, new_egu) = pv_field_extract_display(&field);
1617 let stored = match registry.get_pv(&pv_name) {
1618 Ok(Some(r)) => r,
1619 _ => continue,
1620 };
1621 let prec_changed = match (stored.prec.as_deref(), new_prec.as_deref()) {
1622 (Some(s), Some(n)) => s != n,
1623 (None, Some(_)) => true,
1624 _ => false,
1625 };
1626 let egu_changed = match (stored.egu.as_deref(), new_egu.as_deref()) {
1627 (Some(s), Some(n)) => s != n,
1628 (None, Some(_)) => true,
1629 _ => false,
1630 };
1631 if !prec_changed && !egu_changed {
1632 continue;
1633 }
1634 let prec_arg = if prec_changed { new_prec.as_deref() } else { None };
1635 let egu_arg = if egu_changed { new_egu.as_deref() } else { None };
1636 if let Err(e) = registry.update_metadata(&pv_name, prec_arg, egu_arg) {
1637 warn!(pv = pv_name, "Failed to persist PVA PREC/EGU: {e}");
1638 } else {
1639 debug!(pv = pv_name, prec = ?prec_arg, egu = ?egu_arg, "Refreshed PVA display");
1640 }
1641 }
1642}
1643
1644/// State watchdog for PVA-acquired PVs: a dedicated task that flips
1645/// `conn_info.state` to `Disconnected` when no event has arrived
1646/// within `STALE_THRESHOLD`. PVA's callback API doesn't surface
1647/// explicit channel-state changes — we approximate by treating a
1648/// long event gap as a disconnect. Trade-off: a genuinely silent PV
1649/// (alarm-only, low-rate scan) appears disconnected. Operators can
1650/// raise the threshold by env var if their PV cadence is slower.
1651async fn pva_state_watchdog(
1652 pv_name: String,
1653 conn_info: Arc<Mutex<ConnectionInfo>>,
1654 counters: Arc<PvCounters>,
1655 cancel_token: CancellationToken,
1656 sample_mode: SampleMode,
1657) {
1658 const POLL_INTERVAL: Duration = Duration::from_secs(5);
1659 // Threshold scales with the expected event cadence so a slow Scan
1660 // PV (period=300 s) doesn't flap on every tick. For Monitor mode
1661 // the floor is 60 s — most production PVs change at least that
1662 // often. Operators who archive truly silent PVs (alarm-only) will
1663 // see stale disconnected status; that's a known limitation.
1664 let stale_threshold = match sample_mode {
1665 SampleMode::Monitor => Duration::from_secs(60),
1666 SampleMode::Scan { period_secs } => {
1667 Duration::from_secs_f64((period_secs * 3.0).max(60.0))
1668 }
1669 };
1670 let mut interval = tokio::time::interval(POLL_INTERVAL);
1671 interval.tick().await;
1672 loop {
1673 tokio::select! {
1674 _ = cancel_token.cancelled() => return,
1675 _ = interval.tick() => {}
1676 }
1677 let stale_now = {
1678 let ci = conn_info.lock().unwrap_or_else(|e| e.into_inner());
1679 if !ci.is_connected {
1680 continue;
1681 }
1682 match ci.last_event_time {
1683 Some(t) => SystemTime::now()
1684 .duration_since(t)
1685 .map(|d| d > stale_threshold)
1686 .unwrap_or(false),
1687 None => false,
1688 }
1689 };
1690 if stale_now {
1691 let was_connected = {
1692 let mut ci = conn_info.lock().unwrap_or_else(|e| e.into_inner());
1693 let prev = ci.is_connected;
1694 ci.is_connected = false;
1695 ci.state = PvConnectionState::Disconnected;
1696 prev
1697 };
1698 if was_connected {
1699 counters.disconnect_count.fetch_add(1, Ordering::Relaxed);
1700 counters
1701 .last_disconnect_unix_secs
1702 .store(unix_secs(SystemTime::now()), Ordering::Relaxed);
1703 debug!(
1704 pv = pv_name,
1705 "PVA watchdog: marking disconnected after stale heartbeat"
1706 );
1707 }
1708 }
1709 }
1710}
1711
1712/// Monitor loop: subscribe to a channel and forward values.
1713#[allow(clippy::too_many_arguments)]
1714async fn monitor_loop(
1715 pv_name: String,
1716 dbr_type: ArchDbType,
1717 element_count: i32,
1718 channel: CaChannel,
1719 tx: mpsc::Sender<PvSample>,
1720 cancel_token: CancellationToken,
1721 conn_info: Arc<Mutex<ConnectionInfo>>,
1722 registry: Arc<PvRegistry>,
1723 extras: Arc<ExtraFieldsCache>,
1724 counters: Arc<PvCounters>,
1725 server_ioc_drift_secs: u64,
1726) {
1727 loop {
1728 // Wait for connection, respecting cancellation.
1729 tokio::select! {
1730 _ = cancel_token.cancelled() => return,
1731 result = channel.wait_connected(CA_RECONNECT_TIMEOUT) => {
1732 if result.is_err() {
1733 let was_connected = {
1734 let mut ci = conn_info.lock().unwrap_or_else(|e| e.into_inner());
1735 let prev_connected = ci.is_connected;
1736 ci.is_connected = false;
1737 ci.last_event_time = None;
1738 // Connecting if we've never seen a connect, otherwise
1739 // demote from Connected to Disconnected on a real loss.
1740 ci.state = match ci.state {
1741 PvConnectionState::Connected => PvConnectionState::Disconnected,
1742 PvConnectionState::Disconnected => PvConnectionState::Disconnected,
1743 _ => PvConnectionState::Connecting,
1744 };
1745 prev_connected
1746 };
1747 // A search-failure timeout that demotes us out of
1748 // Connected counts as a fresh disconnect — without
1749 // this, only the post-monitor `break` path bumps the
1750 // counters and a flapping PV silently under-reports
1751 // its drops while subsequent `attach_cnx_lost_headers`
1752 // calls carry a stale `cnxlostepsecs`.
1753 if was_connected {
1754 counters.disconnect_count.fetch_add(1, Ordering::Relaxed);
1755 counters
1756 .last_disconnect_unix_secs
1757 .store(unix_secs(SystemTime::now()), Ordering::Relaxed);
1758 }
1759 // Subscribe BEFORE re-checking the current state. If we
1760 // subscribed after a state check, a Connected event
1761 // firing in between would be lost forever, leaving this
1762 // loop hung until the next disconnect/reconnect cycle.
1763 let mut conn_rx = channel.connection_events();
1764
1765 // Close the remaining race: the channel may have become
1766 // connected between the outer `wait_connected` timeout
1767 // and our `subscribe()` above, in which case no new event
1768 // will arrive. A short re-probe catches that case.
1769 if channel
1770 .wait_connected(Duration::from_millis(100))
1771 .await
1772 .is_err()
1773 {
1774 loop {
1775 tokio::select! {
1776 _ = cancel_token.cancelled() => return,
1777 event = conn_rx.recv() => {
1778 use tokio::sync::broadcast::error::RecvError;
1779 match event {
1780 Ok(ConnectionEvent::Connected) => break,
1781 Ok(_) => continue,
1782 // Lagged: we missed some events but
1783 // the channel is still live. Re-probe
1784 // state and otherwise keep waiting.
1785 Err(RecvError::Lagged(_)) => {
1786 if channel
1787 .wait_connected(Duration::from_millis(100))
1788 .await
1789 .is_ok()
1790 {
1791 break;
1792 }
1793 continue;
1794 }
1795 Err(RecvError::Closed) => return,
1796 }
1797 }
1798 }
1799 }
1800 }
1801 }
1802 }
1803 }
1804
1805 // Refresh DBR_CTRL metadata once per (re)connect so the registry's
1806 // PREC/EGU stay in sync with the IOC. Fire-and-forget so a slow
1807 // CA stack can't gate `subscribe()` (and therefore the first
1808 // sample) on a metadata round-trip. The spawned task is bound to
1809 // `cancel_token` so it terminates promptly when the PV is
1810 // stopped/deleted (otherwise a late metadata write could land on
1811 // a deleted-or-re-registered PV row).
1812 {
1813 let channel = channel.clone();
1814 let registry = registry.clone();
1815 let counters = counters.clone();
1816 let pv_name = pv_name.clone();
1817 let cancel = cancel_token.clone();
1818 tokio::spawn(async move {
1819 tokio::select! {
1820 _ = cancel.cancelled() => {}
1821 _ = refresh_ctrl_metadata(&channel, ®istry, &pv_name, &counters) => {}
1822 }
1823 });
1824 }
1825
1826 // Subscribe.
1827 let mut monitor = match channel.subscribe().await {
1828 Ok(m) => m,
1829 Err(e) => {
1830 counters
1831 .transient_error_count
1832 .fetch_add(1, Ordering::Relaxed);
1833 warn!(pv = pv_name, "Subscribe failed: {e}, retrying...");
1834 tokio::select! {
1835 _ = cancel_token.cancelled() => return,
1836 _ = tokio::time::sleep(CA_RETRY_DELAY) => continue,
1837 }
1838 }
1839 };
1840
1841 debug!(pv = pv_name, "Monitor subscription active");
1842
1843 // Receive values with cancellation.
1844 loop {
1845 tokio::select! {
1846 _ = cancel_token.cancelled() => return,
1847 result = monitor.recv() => {
1848 match result {
1849 Some(Ok(snapshot)) => {
1850 let now = SystemTime::now();
1851 // Java parity (11e554d0): use the IOC-reported
1852 // timestamp, not receive-time, so latency
1853 // doesn't smear sample times. First sample
1854 // after connect is accepted unconditionally
1855 // — legitimate backfill on reconnect can
1856 // include older timestamps. Subsequent
1857 // samples whose IOC clock is more than
1858 // SERVER_IOC_DRIFT_SECS away from wall clock,
1859 // or earlier than the 1991 floor, are
1860 // dropped + counted.
1861 let first_after_connect = {
1862 let mut ci = conn_info.lock().unwrap_or_else(|e| e.into_inner());
1863 let first = ci.last_event_time.is_none();
1864 if ci.connected_since.is_none() {
1865 ci.connected_since = Some(now);
1866 }
1867 ci.is_connected = true;
1868 ci.last_event_time = Some(now);
1869 ci.state = PvConnectionState::Connected;
1870 first
1871 };
1872 if !first_after_connect
1873 && !ioc_timestamp_in_window(
1874 snapshot.timestamp,
1875 now,
1876 server_ioc_drift_secs,
1877 )
1878 {
1879 counters.timestamp_drops.fetch_add(1, Ordering::Relaxed);
1880 debug!(
1881 pv = pv_name,
1882 ?snapshot.timestamp,
1883 "Dropping sample with out-of-window IOC timestamp"
1884 );
1885 continue;
1886 }
1887 counters.events_received.fetch_add(1, Ordering::Relaxed);
1888 // CAS the first-event timestamp once. 0 sentinel
1889 // means "no event yet"; replace with now.
1890 let now_secs = unix_secs(now);
1891 let _ = counters.first_event_unix_secs.compare_exchange(
1892 0,
1893 now_secs,
1894 Ordering::Relaxed,
1895 Ordering::Relaxed,
1896 );
1897 let archiver_val = epics_value_to_archiver(&snapshot.value);
1898 let mut sample = ArchiverSample::new(snapshot.timestamp, archiver_val);
1899 attach_extras(&extras, &mut sample);
1900 if first_after_connect {
1901 let lost_secs = counters
1902 .last_disconnect_unix_secs
1903 .load(Ordering::Relaxed);
1904 attach_cnx_lost_headers(&mut sample, lost_secs, now_secs);
1905 }
1906 let pv_sample = PvSample {
1907 pv_name: pv_name.clone(),
1908 dbr_type,
1909 sample,
1910 element_count: Some(element_count),
1911 counters: Some(counters.clone()),
1912 };
1913 if let Err(pv_sample) = try_send_with_overflow_count(
1914 &tx,
1915 pv_sample,
1916 &counters,
1917 )
1918 .await
1919 {
1920 let _ = pv_sample;
1921 return; // Write loop shut down
1922 }
1923 }
1924 Some(Err(e)) => {
1925 counters.transient_error_count.fetch_add(1, Ordering::Relaxed);
1926 warn!(pv = pv_name, "Monitor error: {e}");
1927 }
1928 None => break, // Monitor ended, reconnect
1929 }
1930 }
1931 }
1932 }
1933
1934 // Monitor ended (disconnect) — loop back to reconnect. Reset
1935 // `last_event_time` so the first sample after reconnect is
1936 // treated as `first_after_connect` and bypasses the drift
1937 // filter; without this, an IOC that comes back with a
1938 // legitimate backfill timestamp older than the 30 min window
1939 // would be silently dropped.
1940 {
1941 let mut ci = conn_info.lock().unwrap_or_else(|e| e.into_inner());
1942 ci.is_connected = false;
1943 ci.last_event_time = None;
1944 ci.state = PvConnectionState::Disconnected;
1945 }
1946 counters.disconnect_count.fetch_add(1, Ordering::Relaxed);
1947 counters
1948 .last_disconnect_unix_secs
1949 .store(unix_secs(SystemTime::now()), Ordering::Relaxed);
1950 debug!(pv = pv_name, "Monitor ended, waiting for reconnection");
1951 }
1952}
1953
1954fn unix_secs(t: SystemTime) -> i64 {
1955 t.duration_since(SystemTime::UNIX_EPOCH)
1956 .map(|d| d.as_secs() as i64)
1957 .unwrap_or(0)
1958}
1959
1960/// Send a sample and count buffer-overflow events. Tries non-blocking
1961/// first; if the bounded channel is full we increment the counter and
1962/// fall back to a blocking send so backpressure still works.
1963async fn try_send_with_overflow_count(
1964 tx: &mpsc::Sender<PvSample>,
1965 pv_sample: PvSample,
1966 counters: &PvCounters,
1967) -> Result<(), PvSample> {
1968 match tx.try_send(pv_sample) {
1969 Ok(()) => Ok(()),
1970 Err(tokio::sync::mpsc::error::TrySendError::Full(pv_sample)) => {
1971 counters
1972 .buffer_overflow_drops
1973 .fetch_add(1, Ordering::Relaxed);
1974 // Backpressure to the producer; this awaits until the writer
1975 // drains some space. We count the saturation event but don't
1976 // actually drop the sample.
1977 tx.send(pv_sample).await.map_err(|e| e.0)
1978 }
1979 Err(tokio::sync::mpsc::error::TrySendError::Closed(pv_sample)) => Err(pv_sample),
1980 }
1981}
1982
1983/// Scan loop: periodically read a channel value.
1984#[allow(clippy::too_many_arguments)]
1985async fn scan_loop(
1986 pv_name: String,
1987 dbr_type: ArchDbType,
1988 element_count: i32,
1989 channel: CaChannel,
1990 tx: mpsc::Sender<PvSample>,
1991 cancel_token: CancellationToken,
1992 period_secs: f64,
1993 conn_info: Arc<Mutex<ConnectionInfo>>,
1994 registry: Arc<PvRegistry>,
1995 extras: Arc<ExtraFieldsCache>,
1996 counters: Arc<PvCounters>,
1997) {
1998 let period = Duration::from_secs_f64(period_secs);
1999 let mut interval = tokio::time::interval(period);
2000 // Reset on every detected disconnect so we re-fetch metadata after
2001 // each reconnect (mirrors monitor_loop's once-per-(re)connect cadence).
2002 let mut metadata_done = false;
2003
2004 loop {
2005 tokio::select! {
2006 _ = cancel_token.cancelled() => return,
2007 _ = interval.tick() => {}
2008 }
2009
2010 if channel.wait_connected(CA_RETRY_DELAY).await.is_err() {
2011 let was_connected = {
2012 let mut ci = conn_info.lock().unwrap_or_else(|e| e.into_inner());
2013 let prev = ci.is_connected;
2014 ci.is_connected = false;
2015 ci.last_event_time = None;
2016 ci.state = match ci.state {
2017 PvConnectionState::Connected => PvConnectionState::Disconnected,
2018 PvConnectionState::Disconnected => PvConnectionState::Disconnected,
2019 _ => PvConnectionState::Connecting,
2020 };
2021 prev
2022 };
2023 if was_connected {
2024 counters.disconnect_count.fetch_add(1, Ordering::Relaxed);
2025 counters
2026 .last_disconnect_unix_secs
2027 .store(unix_secs(SystemTime::now()), Ordering::Relaxed);
2028 }
2029 metadata_done = false;
2030 continue;
2031 }
2032
2033 if !metadata_done {
2034 // Fire-and-forget; matches monitor_loop's once-per-(re)connect
2035 // semantics. The local `metadata_done` flag exists because
2036 // scan_loop's outer iteration is per-tick (vs monitor_loop's
2037 // per-reconnect), so we'd otherwise spawn a fetch every tick.
2038 // Bound to `cancel_token` so a stopped/deleted PV doesn't get
2039 // a stale metadata write from an in-flight task.
2040 let channel = channel.clone();
2041 let registry = registry.clone();
2042 let counters = counters.clone();
2043 let pv_name = pv_name.clone();
2044 let cancel = cancel_token.clone();
2045 tokio::spawn(async move {
2046 tokio::select! {
2047 _ = cancel.cancelled() => {}
2048 _ = refresh_ctrl_metadata(&channel, ®istry, &pv_name, &counters) => {}
2049 }
2050 });
2051 metadata_done = true;
2052 }
2053
2054 match channel.get().await {
2055 Ok((_dbr_type, epics_val)) => {
2056 let now = SystemTime::now();
2057 let first_after_connect = {
2058 let mut ci = conn_info.lock().unwrap_or_else(|e| e.into_inner());
2059 let first = ci.last_event_time.is_none();
2060 if ci.connected_since.is_none() {
2061 ci.connected_since = Some(now);
2062 }
2063 ci.is_connected = true;
2064 ci.last_event_time = Some(now);
2065 ci.state = PvConnectionState::Connected;
2066 first
2067 };
2068 counters.events_received.fetch_add(1, Ordering::Relaxed);
2069 let now_secs = unix_secs(now);
2070 let _ = counters.first_event_unix_secs.compare_exchange(
2071 0,
2072 now_secs,
2073 Ordering::Relaxed,
2074 Ordering::Relaxed,
2075 );
2076 let archiver_val = epics_value_to_archiver(&epics_val);
2077 let mut sample = ArchiverSample::new(now, archiver_val);
2078 attach_extras(&extras, &mut sample);
2079 if first_after_connect {
2080 let lost_secs = counters.last_disconnect_unix_secs.load(Ordering::Relaxed);
2081 attach_cnx_lost_headers(&mut sample, lost_secs, now_secs);
2082 }
2083 let pv_sample = PvSample {
2084 pv_name: pv_name.clone(),
2085 dbr_type,
2086 sample,
2087 element_count: Some(element_count),
2088 counters: Some(counters.clone()),
2089 };
2090 if try_send_with_overflow_count(&tx, pv_sample, &counters)
2091 .await
2092 .is_err()
2093 {
2094 return;
2095 }
2096 }
2097 Err(e) => {
2098 counters
2099 .transient_error_count
2100 .fetch_add(1, Ordering::Relaxed);
2101 debug!(pv = pv_name, "Scan read error: {e}");
2102 }
2103 }
2104 }
2105}
2106
2107/// Java parity (ed07feb): tag the first sample after (re)connect with
2108/// `cnxlostepsecs` / `cnxregainedepsecs` / `startup` so consumers can
2109/// detect archiver restarts and PV resumes. Unconditional emission —
2110/// on a clean startup `lost_secs` is 0, which is itself a valid value
2111/// for downstream gap detection.
2112fn attach_cnx_lost_headers(sample: &mut ArchiverSample, lost_secs: i64, now_secs: i64) {
2113 sample
2114 .field_values
2115 .push(("cnxlostepsecs".to_string(), lost_secs.to_string()));
2116 sample
2117 .field_values
2118 .push(("cnxregainedepsecs".to_string(), now_secs.to_string()));
2119 sample
2120 .field_values
2121 .push(("startup".to_string(), "true".to_string()));
2122}
2123
2124/// Snapshot the extras cache into `sample.field_values`. We sort for stable
2125/// PB output across runs (the protobuf field is repeated; consumers that
2126/// diff/compare files appreciate determinism).
2127fn attach_extras(extras: &ExtraFieldsCache, sample: &mut ArchiverSample) {
2128 if extras.is_empty() {
2129 return;
2130 }
2131 let mut entries: Vec<(String, String)> = extras
2132 .iter()
2133 .map(|e| (e.key().clone(), e.value().clone()))
2134 .collect();
2135 entries.sort_by(|a, b| a.0.cmp(&b.0));
2136 sample.field_values = entries;
2137}
2138
2139/// Render an EpicsValue as the string we'll persist in the metadata field
2140/// slot. Numeric scalars get `Display`-format; strings pass through; arrays
2141/// fall back to JSON-ish bracket notation. Stays in sync with what Java
2142/// archiver writes into PVTypeInfo's `archiveFields` blob (a string map).
2143fn epics_value_to_field_string(val: &EpicsValue) -> String {
2144 match val {
2145 EpicsValue::String(s) => s.clone(),
2146 EpicsValue::Short(v) => v.to_string(),
2147 EpicsValue::Float(v) => v.to_string(),
2148 EpicsValue::Enum(v) => v.to_string(),
2149 EpicsValue::Char(v) => v.to_string(),
2150 EpicsValue::Long(v) => v.to_string(),
2151 EpicsValue::Int64(v) => v.to_string(),
2152 EpicsValue::Double(v) => v.to_string(),
2153 EpicsValue::ShortArray(v) => format!("{v:?}"),
2154 EpicsValue::FloatArray(v) => format!("{v:?}"),
2155 EpicsValue::EnumArray(v) => format!("{v:?}"),
2156 EpicsValue::DoubleArray(v) => format!("{v:?}"),
2157 EpicsValue::LongArray(v) => format!("{v:?}"),
2158 EpicsValue::Int64Array(v) => format!("{v:?}"),
2159 EpicsValue::CharArray(v) => String::from_utf8_lossy(v).into_owned(),
2160 EpicsValue::StringArray(v) => format!("{v:?}"),
2161 }
2162}
2163
2164/// Spawn a long-running task that subscribes to `<pv>.<field>` and updates
2165/// `extras` with each event. Owned by `parent_token` so pause/destroy cleans
2166/// it up alongside the main PV.
2167fn spawn_extra_field_monitor(
2168 ca_client: &CaClient,
2169 pv_name: &str,
2170 field: &str,
2171 extras: Arc<ExtraFieldsCache>,
2172 parent_token: CancellationToken,
2173 counters: Arc<PvCounters>,
2174) {
2175 let full_name = format!("{pv_name}.{field}");
2176 let channel = ca_client.create_channel(&full_name);
2177 let field_owned = field.to_string();
2178 let pv_owned = pv_name.to_string();
2179
2180 // Catch-unwind boundary: a panic from the CA client (e.g. malformed
2181 // wire frame, allocation failure inside epics_rs) would propagate to
2182 // the runtime and abort sibling tasks of this worker thread. Trap it
2183 // here, log with PV+field context, and return normally so the runtime
2184 // remains healthy.
2185 let panic_pv = pv_owned.clone();
2186 let panic_field = field_owned.clone();
2187 tokio::spawn(async move {
2188 let body = std::panic::AssertUnwindSafe(extra_field_monitor_body(
2189 channel,
2190 pv_owned,
2191 field_owned,
2192 extras,
2193 parent_token,
2194 counters,
2195 ));
2196 if let Err(payload) = futures::FutureExt::catch_unwind(body).await {
2197 let msg = panic_payload_msg(&payload);
2198 error!(
2199 pv = panic_pv,
2200 field = panic_field,
2201 "Extra-field monitor panicked: {msg}"
2202 );
2203 }
2204 });
2205}
2206
2207/// Body of the spawned extra-field monitor. Split out so the spawn site
2208/// can wrap it in `catch_unwind`.
2209async fn extra_field_monitor_body(
2210 channel: CaChannel,
2211 pv_owned: String,
2212 field_owned: String,
2213 extras: Arc<ExtraFieldsCache>,
2214 parent_token: CancellationToken,
2215 counters: Arc<PvCounters>,
2216) {
2217 // Initial connect attempt — failure here is non-fatal (the field may
2218 // not exist on every IOC; we just leave the cache empty).
2219 if channel.wait_connected(CA_CONNECT_TIMEOUT).await.is_err() {
2220 debug!(
2221 pv = pv_owned,
2222 field = field_owned,
2223 "Extra-field channel did not connect within timeout (will keep retrying via subscribe)"
2224 );
2225 }
2226
2227 // Exponential backoff for misconfigured fields (e.g. operator
2228 // listed `.HIHI` on a PV that doesn't expose it). Without this
2229 // we'd retry every 5s forever, churning CA search packets and
2230 // file descriptors. The cap is 60s; one warn at the cap so
2231 // ops know to fix archive_fields.
2232 let mut backoff = CA_RETRY_DELAY;
2233 let max_backoff = Duration::from_secs(60);
2234 let mut warned_at_cap = false;
2235
2236 loop {
2237 // Cancel-aware subscribe attempt.
2238 tokio::select! {
2239 _ = parent_token.cancelled() => return,
2240 sub = channel.subscribe() => {
2241 let mut monitor = match sub {
2242 Ok(m) => m,
2243 Err(e) => {
2244 // Java parity (8fe73eb): bump the transient
2245 // counter so a misconfigured `.HIHI` field
2246 // shows up in the rate / drop reports.
2247 counters.transient_error_count.fetch_add(1, Ordering::Relaxed);
2248 debug!(
2249 pv = pv_owned,
2250 field = field_owned,
2251 ?backoff,
2252 "Extra-field subscribe failed: {e}; retrying"
2253 );
2254 if backoff >= max_backoff && !warned_at_cap {
2255 warn!(
2256 pv = pv_owned,
2257 field = field_owned,
2258 "Extra-field repeatedly fails to subscribe; \
2259 check archive_fields config (now retrying every 60s)"
2260 );
2261 warned_at_cap = true;
2262 }
2263 let sleep_for = backoff;
2264 backoff = (backoff * 2).min(max_backoff);
2265 tokio::select! {
2266 _ = parent_token.cancelled() => return,
2267 _ = tokio::time::sleep(sleep_for) => continue,
2268 }
2269 }
2270 };
2271 // Subscribe succeeded — reset backoff so the next
2272 // failure starts at the short delay again.
2273 backoff = CA_RETRY_DELAY;
2274 warned_at_cap = false;
2275 loop {
2276 tokio::select! {
2277 _ = parent_token.cancelled() => return,
2278 ev = monitor.recv() => match ev {
2279 Some(Ok(snapshot)) => {
2280 extras.insert(
2281 field_owned.clone(),
2282 epics_value_to_field_string(&snapshot.value),
2283 );
2284 }
2285 Some(Err(e)) => {
2286 counters.transient_error_count.fetch_add(1, Ordering::Relaxed);
2287 debug!(
2288 pv = pv_owned,
2289 field = field_owned,
2290 "Extra-field monitor error: {e}"
2291 );
2292 }
2293 None => break, // resubscribe
2294 }
2295 }
2296 }
2297 }
2298 }
2299 }
2300}
2301
2302/// Format a panic payload (Box<dyn Any>) as a printable string. Mirrors
2303/// what `std::panicking::default_hook` extracts for the message.
2304fn panic_payload_msg(payload: &Box<dyn std::any::Any + Send>) -> String {
2305 if let Some(s) = payload.downcast_ref::<&'static str>() {
2306 (*s).to_string()
2307 } else if let Some(s) = payload.downcast_ref::<String>() {
2308 s.clone()
2309 } else {
2310 "<non-string panic payload>".to_string()
2311 }
2312}
2313
2314/// Resolve the data-bearing sub-field of a PVA value. NTScalar /
2315/// NTScalarArray / NTEnum wrap the raw value in a `value` field; bare
2316/// scalars are passed through.
2317fn pv_value_field(field: &PvField) -> &PvField {
2318 match field {
2319 PvField::Structure(s) => s.get_field("value").unwrap_or(field),
2320 _ => field,
2321 }
2322}
2323
2324/// Map a PVA `ScalarValue` to an archiver `ArchiverValue`.
2325fn scalar_value_to_archiver(s: &ScalarValue) -> ArchiverValue {
2326 match s {
2327 ScalarValue::Boolean(v) => ArchiverValue::ScalarEnum(*v as i32),
2328 ScalarValue::Byte(v) => ArchiverValue::ScalarByte(vec![*v as u8]),
2329 ScalarValue::UByte(v) => ArchiverValue::ScalarByte(vec![*v]),
2330 ScalarValue::Short(v) => ArchiverValue::ScalarShort(*v as i32),
2331 ScalarValue::UShort(v) => ArchiverValue::ScalarShort(*v as i32),
2332 ScalarValue::Int(v) => ArchiverValue::ScalarInt(*v),
2333 ScalarValue::UInt(v) => ArchiverValue::ScalarInt(*v as i32),
2334 ScalarValue::Long(v) => ArchiverValue::ScalarInt(*v as i32),
2335 ScalarValue::ULong(v) => ArchiverValue::ScalarInt(*v as i32),
2336 ScalarValue::Float(v) => ArchiverValue::ScalarFloat(*v),
2337 ScalarValue::Double(v) => ArchiverValue::ScalarDouble(*v),
2338 ScalarValue::String(s) => ArchiverValue::ScalarString(s.clone()),
2339 }
2340}
2341
2342/// Extract a scalar `ArchiverValue` from a top-level PVA value (which
2343/// may be an NTScalar wrapper). Returns `None` for array/struct types
2344/// that don't reduce to a scalar.
2345fn pv_field_scalar_to_archiver(field: &PvField) -> Option<ArchiverValue> {
2346 match pv_value_field(field) {
2347 PvField::Scalar(s) => Some(scalar_value_to_archiver(s)),
2348 _ => None,
2349 }
2350}
2351
2352/// Pick the `(ArchDbType, element_count)` to record at archive_pv time
2353/// from an NTScalar / NTScalarArray's introspection-pvget result.
2354/// Returns `None` for unsupported (struct / union / variant) shapes.
2355fn pv_field_to_arch_db_type(field: &PvField) -> Option<(ArchDbType, i32)> {
2356 let value = pv_value_field(field);
2357 Some(match value {
2358 PvField::Scalar(s) => (
2359 match s {
2360 ScalarValue::Boolean(_) => ArchDbType::ScalarEnum,
2361 ScalarValue::Byte(_) | ScalarValue::UByte(_) => ArchDbType::ScalarByte,
2362 ScalarValue::Short(_) | ScalarValue::UShort(_) => ArchDbType::ScalarShort,
2363 ScalarValue::Int(_)
2364 | ScalarValue::UInt(_)
2365 | ScalarValue::Long(_)
2366 | ScalarValue::ULong(_) => ArchDbType::ScalarInt,
2367 ScalarValue::Float(_) => ArchDbType::ScalarFloat,
2368 ScalarValue::Double(_) => ArchDbType::ScalarDouble,
2369 ScalarValue::String(_) => ArchDbType::ScalarString,
2370 },
2371 1,
2372 ),
2373 // Array variants — keep the corresponding scalar element type but
2374 // bump element_count. write_loop / PB encoder doesn't currently
2375 // serialise PVA arrays, so callers should reject these for now.
2376 PvField::ScalarArray(arr) => {
2377 let elem = arr.first()?;
2378 let inner = PvField::Scalar(elem.clone());
2379 let (t, _) = pv_field_to_arch_db_type(&inner)?;
2380 (t, arr.len() as i32)
2381 }
2382 _ => return None,
2383 })
2384}
2385
2386/// Map a CA field name (`HIHI`, `LOLO`, `EGU`, …) to the dotted
2387/// pvAccess path under an NTScalar / NTScalarArray. Returns `None`
2388/// for fields that have no PVA equivalent — caller should drop them
2389/// silently rather than building an unsatisfiable pvRequest.
2390fn ca_archive_field_to_pva_path(field: &str) -> Option<&'static str> {
2391 Some(match field {
2392 // display sub-structure
2393 "EGU" => "display.units",
2394 "PREC" => "display.precision",
2395 "DESC" => "display.description",
2396 "HOPR" => "display.limitHigh",
2397 "LOPR" => "display.limitLow",
2398 // valueAlarm sub-structure (alarm thresholds)
2399 "HIHI" => "valueAlarm.highAlarmLimit",
2400 "LOLO" => "valueAlarm.lowAlarmLimit",
2401 "HIGH" => "valueAlarm.highWarningLimit",
2402 "LOW" => "valueAlarm.lowWarningLimit",
2403 "HHSV" => "valueAlarm.highAlarmSeverity",
2404 "LLSV" => "valueAlarm.lowAlarmSeverity",
2405 "HSV" => "valueAlarm.highWarningSeverity",
2406 "LSV" => "valueAlarm.lowWarningSeverity",
2407 "HYST" => "valueAlarm.hysteresis",
2408 // control sub-structure (drive limits)
2409 "DRVH" => "control.limitHigh",
2410 "DRVL" => "control.limitLow",
2411 _ => return None,
2412 })
2413}
2414
2415/// Walk a dotted PVA field path (`valueAlarm.highAlarmLimit`) from
2416/// the root [`PvField`]. Returns `None` if any segment is missing or
2417/// the path lands on a non-structure intermediate.
2418fn pv_field_walk_path<'a>(root: &'a PvField, path: &str) -> Option<&'a PvField> {
2419 let mut current = root;
2420 for segment in path.split('.') {
2421 let PvField::Structure(s) = current else {
2422 return None;
2423 };
2424 current = s.get_field(segment)?;
2425 }
2426 Some(current)
2427}
2428
2429/// Stringify a [`ScalarValue`] for storage in the per-sample extras
2430/// map. Java archiver's `archiveFields` blob is a string-string map,
2431/// so we mirror that wire shape.
2432fn scalar_value_to_string(s: &ScalarValue) -> String {
2433 match s {
2434 ScalarValue::Boolean(v) => v.to_string(),
2435 ScalarValue::Byte(v) => v.to_string(),
2436 ScalarValue::Short(v) => v.to_string(),
2437 ScalarValue::Int(v) => v.to_string(),
2438 ScalarValue::Long(v) => v.to_string(),
2439 ScalarValue::UByte(v) => v.to_string(),
2440 ScalarValue::UShort(v) => v.to_string(),
2441 ScalarValue::UInt(v) => v.to_string(),
2442 ScalarValue::ULong(v) => v.to_string(),
2443 ScalarValue::Float(v) => v.to_string(),
2444 ScalarValue::Double(v) => v.to_string(),
2445 ScalarValue::String(s) => s.clone(),
2446 }
2447}
2448
2449/// Element count for a PVA value: 1 for scalar, length for arrays,
2450/// 0 for unsupported shapes (treated as "unknown" by callers).
2451fn pv_field_element_count(field: &PvField) -> i32 {
2452 match pv_value_field(field) {
2453 PvField::Scalar(_) => 1,
2454 PvField::ScalarArray(arr) => arr.len() as i32,
2455 PvField::ScalarArrayTyped(t) => t.len() as i32,
2456 _ => 0,
2457 }
2458}
2459
2460/// Pull `(precision, units)` out of an NTScalar / NTScalarArray's
2461/// `display` sub-structure. Returns `(None, None)` for bare scalars
2462/// or structures missing `display` (some servers omit it).
2463/// Negative precision is the Java EAA "no precision info" sentinel —
2464/// treat it the same as missing so we don't surface "-1" to the UI.
2465fn pv_field_extract_display(field: &PvField) -> (Option<String>, Option<String>) {
2466 let PvField::Structure(s) = field else {
2467 return (None, None);
2468 };
2469 let Some(PvField::Structure(disp)) = s.get_field("display") else {
2470 return (None, None);
2471 };
2472 let prec = match disp.get_field("precision") {
2473 Some(PvField::Scalar(ScalarValue::Int(p))) if *p >= 0 => Some(p.to_string()),
2474 Some(PvField::Scalar(ScalarValue::Short(p))) if *p >= 0 => Some(p.to_string()),
2475 Some(PvField::Scalar(ScalarValue::Long(p))) if *p >= 0 => Some(p.to_string()),
2476 _ => None,
2477 };
2478 let egu = match disp.get_field("units") {
2479 Some(PvField::Scalar(ScalarValue::String(u))) => {
2480 let t = u.trim();
2481 if t.is_empty() { None } else { Some(t.to_string()) }
2482 }
2483 _ => None,
2484 };
2485 (prec, egu)
2486}
2487
2488/// Pull the IOC-reported timestamp out of an NTScalar / NTScalarArray.
2489/// Falls back to `SystemTime::now()` when the structure lacks a usable
2490/// `timeStamp` sub-field, so a malformed server doesn't drop samples.
2491fn pv_field_extract_timestamp(field: &PvField) -> SystemTime {
2492 let PvField::Structure(s) = field else {
2493 return SystemTime::now();
2494 };
2495 let Some(PvField::Structure(ts)) = s.get_field("timeStamp") else {
2496 return SystemTime::now();
2497 };
2498 let secs = match ts.get_field("secondsPastEpoch") {
2499 Some(PvField::Scalar(ScalarValue::Long(v))) => *v as u64,
2500 Some(PvField::Scalar(ScalarValue::ULong(v))) => *v,
2501 Some(PvField::Scalar(ScalarValue::Int(v))) => *v as u64,
2502 Some(PvField::Scalar(ScalarValue::UInt(v))) => *v as u64,
2503 _ => return SystemTime::now(),
2504 };
2505 let nanos = match ts.get_field("nanoseconds") {
2506 Some(PvField::Scalar(ScalarValue::Int(v))) => *v as u32,
2507 Some(PvField::Scalar(ScalarValue::UInt(v))) => *v,
2508 _ => 0,
2509 };
2510 SystemTime::UNIX_EPOCH + Duration::new(secs, nanos)
2511}
2512
2513/// Convert epics-base-rs DbFieldType to archiver ArchDbType.
2514fn dbr_field_to_arch_type(field_type: DbFieldType) -> ArchDbType {
2515 match field_type {
2516 DbFieldType::String => ArchDbType::ScalarString,
2517 DbFieldType::Short => ArchDbType::ScalarShort,
2518 DbFieldType::Float => ArchDbType::ScalarFloat,
2519 DbFieldType::Enum => ArchDbType::ScalarEnum,
2520 DbFieldType::Char => ArchDbType::ScalarByte,
2521 DbFieldType::Long => ArchDbType::ScalarInt,
2522 // PB PayloadType has no SCALAR_LONG (i64) — values outside i32 range
2523 // are truncated by the i32 cast in epics_value_to_archiver.
2524 DbFieldType::Int64 => ArchDbType::ScalarInt,
2525 DbFieldType::Double => ArchDbType::ScalarDouble,
2526 }
2527}
2528
2529/// Convert epics-base-rs EpicsValue to archiver ArchiverValue.
2530fn epics_value_to_archiver(val: &EpicsValue) -> ArchiverValue {
2531 match val {
2532 EpicsValue::String(s) => ArchiverValue::ScalarString(s.clone()),
2533 EpicsValue::Short(v) => ArchiverValue::ScalarShort(*v as i32),
2534 EpicsValue::Float(v) => ArchiverValue::ScalarFloat(*v),
2535 EpicsValue::Enum(v) => ArchiverValue::ScalarEnum(*v as i32),
2536 EpicsValue::Char(v) => ArchiverValue::ScalarByte(vec![*v]),
2537 EpicsValue::Long(v) => ArchiverValue::ScalarInt(*v),
2538 EpicsValue::Int64(v) => ArchiverValue::ScalarInt(*v as i32),
2539 EpicsValue::Double(v) => ArchiverValue::ScalarDouble(*v),
2540 EpicsValue::ShortArray(v) => {
2541 ArchiverValue::VectorShort(v.iter().map(|x| *x as i32).collect())
2542 }
2543 EpicsValue::FloatArray(v) => ArchiverValue::VectorFloat(v.clone()),
2544 EpicsValue::EnumArray(v) => {
2545 ArchiverValue::VectorEnum(v.iter().map(|x| *x as i32).collect())
2546 }
2547 EpicsValue::DoubleArray(v) => ArchiverValue::VectorDouble(v.clone()),
2548 EpicsValue::LongArray(v) => ArchiverValue::VectorInt(v.clone()),
2549 EpicsValue::Int64Array(v) => {
2550 ArchiverValue::VectorInt(v.iter().map(|x| *x as i32).collect())
2551 }
2552 EpicsValue::CharArray(v) => ArchiverValue::VectorChar(v.clone()),
2553 EpicsValue::StringArray(v) => ArchiverValue::VectorString(v.clone()),
2554 }
2555}
2556
2557/// Tunables for [`write_loop_with_config`]. Production uses the
2558/// defaults via [`write_loop`]; tests dial the timeouts down to
2559/// sub-second values so failure-injection cases finish in a few
2560/// hundred milliseconds instead of minutes.
2561#[derive(Debug, Clone)]
2562pub struct WriteLoopConfig {
2563 /// How often to call `flush_ingest_writes` and commit the
2564 /// pending `last_event` timestamps to the registry.
2565 pub flush_period: Duration,
2566 /// Bound on how long write_loop waits for a single
2567 /// `append_event_with_meta` JoinHandle. On timeout the sample
2568 /// is logged as abandoned-but-may-succeed-late and the loop
2569 /// moves on (the spawn_blocking task remains parked on the
2570 /// blocking pool — per-PV serialization in PlainPB keeps any
2571 /// late completion ordered behind subsequent same-PV samples).
2572 pub append_timeout: Duration,
2573 /// Bound on how long write_loop waits for one
2574 /// `flush_ingest_writes` JoinHandle during the periodic flush.
2575 /// On timeout, every pending `ts_updates` entry is deferred to
2576 /// the next cycle (we don't know which PVs reached disk).
2577 pub flush_timeout: Duration,
2578 /// Bound on each individual append issued during the shutdown
2579 /// drain.
2580 pub drain_per_sample_timeout: Duration,
2581 /// Total wall-clock budget for the shutdown drain. Once
2582 /// exceeded, remaining buffered samples are abandoned without
2583 /// even attempting an append, so a wedged STS can't stretch an
2584 /// orderly stop into "kill -9".
2585 pub drain_total_budget: Duration,
2586 /// Bound on the final `flush_writes` issued at shutdown.
2587 pub shutdown_flush_timeout: Duration,
2588}
2589
2590impl Default for WriteLoopConfig {
2591 fn default() -> Self {
2592 Self {
2593 flush_period: Duration::from_secs(10),
2594 append_timeout: Duration::from_secs(30),
2595 flush_timeout: Duration::from_secs(30),
2596 drain_per_sample_timeout: Duration::from_secs(5),
2597 drain_total_budget: Duration::from_secs(30),
2598 shutdown_flush_timeout: Duration::from_secs(15),
2599 }
2600 }
2601}
2602
2603/// Tunables for [`run_sharded_write_pool`].
2604///
2605/// `shards = 1` is the legacy single-worker layout and incurs no
2606/// dispatcher overhead. `shards > 1` spawns a dispatcher that hashes
2607/// each sample's `pv_name` to a fixed shard (so any one PV's samples
2608/// always land in the same per-shard write_loop, preserving
2609/// timestamp ordering) and N parallel shard workers — different PVs
2610/// can append concurrently instead of queueing behind a single task.
2611/// Sites with many active PVs and a fast STS should set this to
2612/// `min(num_cpus, expected concurrent slow appends)`.
2613#[derive(Debug, Clone)]
2614pub struct ShardedWritePoolConfig {
2615 /// Number of parallel shard workers. Must be ≥ 1; `1` is the
2616 /// degenerate case (no dispatcher, behaves identically to a
2617 /// bare `write_loop_with_config`).
2618 pub shards: usize,
2619 /// mpsc capacity of each shard's input channel. The dispatcher
2620 /// `try_send`s into this; when full, the OFFENDING SHARD's
2621 /// drop is recorded under
2622 /// `archiver_dispatcher_shard_overflow_drops_total{shard=N}`
2623 /// and the sample's per-PV `buffer_overflow_drops` counter is
2624 /// bumped. Other shards keep flowing — that's the per-shard
2625 /// isolation guarantee.
2626 pub per_shard_buffer: usize,
2627 /// Per-worker config (timeouts, flush period). Cloned into
2628 /// each shard.
2629 pub write_loop: WriteLoopConfig,
2630}
2631
2632impl Default for ShardedWritePoolConfig {
2633 fn default() -> Self {
2634 Self {
2635 shards: 1,
2636 per_shard_buffer: 4096,
2637 write_loop: WriteLoopConfig::default(),
2638 }
2639 }
2640}
2641
2642// PV-to-shard hash lives in `archiver_core::storage::plainpb` so
2643// the engine's dispatcher and PlainPB's
2644// `flush_ingest_writes_for_shard` agree on which shard owns which
2645// PV. Two definitions would mean shard 0's flush could iterate
2646// PVs the dispatcher routed to shard 1, re-introducing the
2647// misattribution bug.
2648use archiver_core::storage::plainpb::shard_for_pv;
2649
2650/// Run an N-shard write pool: 1 dispatcher (hashes pv_name → shard)
2651/// + N parallel `write_loop_with_config` workers. Each shard has
2652/// independent `ts_updates`, an independent flush ticker, and its
2653/// own per-PV writer slots inside the shared storage plugin (so
2654/// per-PV ordering is naturally preserved by the consistent-hash
2655/// routing — samples for one PV always go to the same shard).
2656///
2657/// `shards == 1` short-circuits the dispatcher and runs a single
2658/// `write_loop_with_config` directly so single-worker deployments
2659/// pay zero overhead.
2660pub async fn run_sharded_write_pool(
2661 storage: Arc<dyn StoragePlugin>,
2662 registry: Arc<PvRegistry>,
2663 rx: mpsc::Receiver<PvSample>,
2664 shutdown: tokio::sync::watch::Receiver<bool>,
2665 cfg: ShardedWritePoolConfig,
2666) {
2667 let n = cfg.shards.max(1);
2668 let WriteLoopConfig {
2669 flush_period,
2670 append_timeout,
2671 flush_timeout,
2672 drain_per_sample_timeout,
2673 drain_total_budget,
2674 shutdown_flush_timeout,
2675 } = cfg.write_loop.clone();
2676
2677 // Coalescing per-PV pending-timestamp map shared between the
2678 // flush owner, every shard worker, and every shard worker's
2679 // spawn_blocking late-success closure. Replaces the previous
2680 // mpsc report channel — see [`PendingReports`] for why.
2681 let pending = Arc::new(PendingReports::new());
2682
2683 // Spawn the global flush owner first so it's ready to flush
2684 // as soon as shards start appending.
2685 let flush_owner_handle = tokio::spawn(flush_owner_loop(
2686 storage.clone(),
2687 registry.clone(),
2688 pending.clone(),
2689 shutdown.clone(),
2690 flush_period,
2691 flush_timeout,
2692 shutdown_flush_timeout,
2693 drain_total_budget,
2694 ));
2695
2696 if n == 1 {
2697 // Fast path: single shard, no dispatcher hop. We become
2698 // the shard worker ourselves and read from `rx` directly.
2699 shard_append_loop(
2700 0,
2701 storage.clone(),
2702 rx,
2703 pending.clone(),
2704 shutdown.clone(),
2705 append_timeout,
2706 drain_per_sample_timeout,
2707 drain_total_budget,
2708 )
2709 .await;
2710 } else {
2711 // Multi-shard path: dispatcher + N shard workers.
2712 let mut shard_txs = Vec::with_capacity(n);
2713 let mut shard_handles = Vec::with_capacity(n);
2714 for shard_idx in 0..n {
2715 let (s_tx, s_rx) = mpsc::channel::<PvSample>(cfg.per_shard_buffer);
2716 shard_txs.push(s_tx);
2717 let storage = storage.clone();
2718 let pending = pending.clone();
2719 let shard_shutdown = shutdown.clone();
2720 shard_handles.push(tokio::spawn(shard_append_loop(
2721 shard_idx,
2722 storage,
2723 s_rx,
2724 pending,
2725 shard_shutdown,
2726 append_timeout,
2727 drain_per_sample_timeout,
2728 drain_total_budget,
2729 )));
2730 }
2731
2732 dispatch_loop(rx, shard_txs, shutdown.clone()).await;
2733
2734 // Dispatcher exited → shard sample receivers see EOS or
2735 // the shutdown signal directly via their watch::Receiver.
2736 // Wait for shards to finish their drain branches.
2737 for h in shard_handles {
2738 let _ = h.await;
2739 }
2740 }
2741
2742 // Flush owner uses its own shutdown-watch + drain_total_budget
2743 // grace timer, not an EOS signal — see flush_owner_loop. Wait
2744 // for it to finish.
2745 let _ = flush_owner_handle.await;
2746}
2747
2748/// Reads the upstream main mpsc, hashes each sample's `pv_name`,
2749/// and forwards to the appropriate shard channel via
2750/// `Sender::try_send` for **per-shard isolation**.
2751///
2752/// `try_send` (not `send().await`) is the load-balancing
2753/// invariant: if one slow shard's channel is full, the dispatcher
2754/// drops THAT shard's overflow into `buffer_overflow_drops` and
2755/// keeps routing for the other shards. Using `send().await` would
2756/// block the dispatcher on the slow shard, back-pressuring the
2757/// upstream main channel and starving every other shard's PVs —
2758/// the exact failure mode the sharded layout is meant to prevent.
2759async fn dispatch_loop(
2760 mut rx: mpsc::Receiver<PvSample>,
2761 shard_txs: Vec<mpsc::Sender<PvSample>>,
2762 mut shutdown: tokio::sync::watch::Receiver<bool>,
2763) {
2764 let n = shard_txs.len();
2765 info!(shards = n, "Sharded write dispatcher started");
2766 loop {
2767 tokio::select! {
2768 biased;
2769 // `biased` so we always check shutdown before draining
2770 // another sample — keeps the shutdown latency bounded
2771 // even under a sustained sample storm.
2772 _ = shutdown.changed() => {
2773 if *shutdown.borrow() {
2774 // Drain remaining samples (try_send for the
2775 // same reason as the steady-state branch).
2776 // Mirror the steady-state overflow accounting
2777 // so a saturated shard's drain-time drops are
2778 // visible to operators — silent loss here
2779 // would shadow real samples lost during
2780 // shutdown.
2781 while let Ok(sample) = rx.try_recv() {
2782 let idx = shard_for_pv(&sample.pv_name, n);
2783 match shard_txs[idx].try_send(sample) {
2784 Ok(()) => {}
2785 Err(mpsc::error::TrySendError::Full(s)) => {
2786 if let Some(c) = s.counters.as_ref() {
2787 c.buffer_overflow_drops
2788 .fetch_add(1, Ordering::Relaxed);
2789 }
2790 metrics::counter!(
2791 "archiver_dispatcher_shard_overflow_drops_total",
2792 "shard" => idx.to_string(),
2793 "phase" => "shutdown_drain",
2794 )
2795 .increment(1);
2796 debug!(
2797 shard = idx,
2798 pv = s.pv_name,
2799 "Shutdown-drain shard channel full; sample dropped"
2800 );
2801 }
2802 Err(mpsc::error::TrySendError::Closed(s)) => {
2803 warn!(
2804 shard = idx,
2805 pv = s.pv_name,
2806 "Shutdown-drain shard channel closed; sample dropped"
2807 );
2808 }
2809 }
2810 }
2811 break;
2812 }
2813 }
2814 maybe = rx.recv() => {
2815 match maybe {
2816 Some(sample) => {
2817 let idx = shard_for_pv(&sample.pv_name, n);
2818 match shard_txs[idx].try_send(sample) {
2819 Ok(()) => {}
2820 Err(mpsc::error::TrySendError::Full(s)) => {
2821 // Shard saturated. Surface the
2822 // drop on the SAMPLE's per-PV
2823 // counter so operators can pin
2824 // the loss to a specific PV+shard.
2825 if let Some(c) = s.counters.as_ref() {
2826 c.buffer_overflow_drops
2827 .fetch_add(1, Ordering::Relaxed);
2828 }
2829 metrics::counter!(
2830 "archiver_dispatcher_shard_overflow_drops_total",
2831 "shard" => idx.to_string(),
2832 )
2833 .increment(1);
2834 debug!(
2835 shard = idx,
2836 pv = s.pv_name,
2837 "Shard channel full; sample dropped \
2838 (per-shard isolation)"
2839 );
2840 }
2841 Err(mpsc::error::TrySendError::Closed(s)) => {
2842 // Shard worker died (panic or
2843 // shutdown race). We'll lose this
2844 // sample but keep the dispatcher
2845 // alive for OTHER shards.
2846 warn!(
2847 shard = idx,
2848 pv = s.pv_name,
2849 "Shard channel closed; sample dropped"
2850 );
2851 }
2852 }
2853 }
2854 None => break, // Upstream closed.
2855 }
2856 }
2857 }
2858 }
2859 info!("Sharded write dispatcher exiting");
2860}
2861
2862/// Background writer task — drains samples and writes to storage.
2863///
2864/// Thin wrapper that fills [`WriteLoopConfig`] from [`Default`] and
2865/// the caller-supplied `flush_period`. Production callers use this
2866/// form; the test suite uses [`write_loop_with_config`] directly to
2867/// dial timeouts down.
2868pub async fn write_loop(
2869 storage: Arc<dyn StoragePlugin>,
2870 registry: Arc<PvRegistry>,
2871 rx: mpsc::Receiver<PvSample>,
2872 shutdown: tokio::sync::watch::Receiver<bool>,
2873 flush_period: Duration,
2874) {
2875 let cfg = WriteLoopConfig {
2876 flush_period,
2877 ..Default::default()
2878 };
2879 write_loop_with_config(storage, registry, rx, shutdown, cfg).await
2880}
2881
2882/// RAII guard that clears the `flush_in_flight` flag when dropped,
2883/// even if the surrounding spawn_blocking closure panics. Without
2884/// this, an unwind inside `block_on(flush_ingest_writes)` would
2885/// skip the manual `store(false)` and leave the flag stuck `true`
2886/// forever — every subsequent ticker would see "still in flight"
2887/// and silently skip flushing, freezing the registry's `last_event`.
2888struct FlushInFlightGuard {
2889 flag: Arc<std::sync::atomic::AtomicBool>,
2890}
2891
2892impl Drop for FlushInFlightGuard {
2893 fn drop(&mut self) {
2894 self.flag.store(false, Ordering::Release);
2895 }
2896}
2897
2898/// Run one flush + commit cycle.
2899///
2900/// Spawns `flush_ingest_writes` on the blocking pool with a bounded
2901/// timeout, then commits the pending registry timestamps for every
2902/// PV the flush returned cleanly. Maintains an `in_flight` flag set
2903/// by the spawn_blocking task so a caller (the ticker branch) can
2904/// avoid stacking concurrent flushes when the previous one is still
2905/// running on a wedged FS.
2906///
2907/// Returns `true` if a flush was actually launched, `false` if the
2908/// helper was a no-op (in_flight already set, ts_updates empty).
2909///
2910/// Mutates `ts_updates`:
2911/// * Clean flush: drops `failed` PVs (bytes lost), commits all
2912/// other entries, drops committed entries on success.
2913/// * Deferred PVs: STAY in `ts_updates` for the next cycle.
2914/// Their bytes are still buffered, not lost.
2915/// * Flush error / panic: leaves every entry intact for retry.
2916/// * Flush timeout: CLEARS every entry. We don't know which PVs
2917/// reached disk; the in-flight task may still be running and
2918/// may even fail late, removing a writer from the cache. The
2919/// next flush would then see a "clean" state for that PV and
2920/// wrongly commit. Conservative drop is the only safe move
2921/// (Java archiver has the same trade-off — under-commit on
2922/// timeout, never over-commit). Future samples will repopulate
2923/// `ts_updates` and the next clean flush catches the registry up.
2924async fn run_flush_and_commit(
2925 storage: &Arc<dyn StoragePlugin>,
2926 registry: &Arc<PvRegistry>,
2927 pending: &Arc<PendingReports>,
2928 flush_timeout: Duration,
2929 in_flight: &Arc<std::sync::atomic::AtomicBool>,
2930) -> bool {
2931 if in_flight.load(Ordering::Acquire) {
2932 // Previous flush hasn't finished. Don't queue another —
2933 // we'd just stack spawn_blocking tasks on the wedged FS
2934 // and saturate the blocking pool. Wait for the existing
2935 // one to drain.
2936 return false;
2937 }
2938 // Snapshot the current pending map BEFORE setting in_flight
2939 // so concurrent shards reporting after this point survive
2940 // into the next cycle (their entries will be in
2941 // `pending` but not in `snapshot`, so we won't remove them
2942 // when we clean up). The snapshot is the authoritative view
2943 // of "what we're trying to commit this cycle".
2944 let snapshot = pending.snapshot();
2945 if snapshot.is_empty() {
2946 return false;
2947 }
2948 in_flight.store(true, Ordering::Release);
2949
2950 let storage_for_flush = storage.clone();
2951 let in_flight_for_task = in_flight.clone();
2952 let flush_join = tokio::task::spawn_blocking(move || {
2953 // RAII guard: clears `in_flight` on every exit path
2954 // (return, panic, future-cancellation). The guard's
2955 // existence is the contract — manually storing false
2956 // BEFORE returning would skip cleanup on a panic inside
2957 // block_on / inside flush_ingest_writes, leaving the flag
2958 // stuck `true` and silently freezing all future flushes.
2959 let _guard = FlushInFlightGuard {
2960 flag: in_flight_for_task,
2961 };
2962 let rt = tokio::runtime::Handle::current();
2963 rt.block_on(storage_for_flush.flush_ingest_writes())
2964 });
2965 match tokio::time::timeout(flush_timeout, flush_join).await {
2966 Ok(Ok(Ok(IngestFlushResult { failed, deferred }))) => {
2967 // Failed PVs: bytes lost. Drop every failed PV's
2968 // pending entry UNCONDITIONALLY — regardless of
2969 // whether the snapshot's value still matches.
2970 //
2971 // The previous "remove only if matches snapshot"
2972 // policy assumed a concurrent shard's post-snapshot
2973 // report meant "newer bytes ARE on disk". That
2974 // assumption fails for the eviction/loss-queue path:
2975 // the loss queue records only PV names, so a loss
2976 // event recorded between snapshot and flush-return
2977 // has no way to communicate which writer generation
2978 // (or timestamp horizon) it applies to. A
2979 // post-snapshot pending entry could refer to bytes
2980 // that ARE on disk (fresh writer after eviction) OR
2981 // bytes that were also lost — we can't tell.
2982 //
2983 // Conservative drop is the safe choice: never
2984 // commit a `last_event` for a PV the storage just
2985 // told us had lost bytes, even at the cost of an
2986 // occasional under-commit that the next sample
2987 // resolves.
2988 if !failed.is_empty() {
2989 pending.remove_failed(&failed);
2990 error!(
2991 "STS flush dropped {} PV(s) from timestamp commit \
2992 (bytes never reached disk): {:?}",
2993 failed.len(),
2994 failed
2995 );
2996 }
2997 // Deferred PVs: writer slot busy, bytes still buffered.
2998 // We do NOT remove them from `pending` (next cycle
2999 // will catch them) and we EXCLUDE them from this
3000 // cycle's commit batch.
3001 if !deferred.is_empty() {
3002 debug!(
3003 "STS flush deferred {} PV(s); keeping in pending \
3004 for next cycle: {:?}",
3005 deferred.len(),
3006 deferred
3007 );
3008 }
3009 // Build commit batch from the snapshot, excluding
3010 // failed (bytes lost) and deferred (bytes not on disk
3011 // yet).
3012 let failed_set: std::collections::HashSet<&str> =
3013 failed.iter().map(|s| s.as_str()).collect();
3014 let deferred_set: std::collections::HashSet<&str> =
3015 deferred.iter().map(|s| s.as_str()).collect();
3016 let to_commit: Vec<(&str, SystemTime)> = snapshot
3017 .iter()
3018 .filter(|(pv, _)| {
3019 !failed_set.contains(pv.as_str())
3020 && !deferred_set.contains(pv.as_str())
3021 })
3022 .map(|(pv, ts)| (pv.as_str(), *ts))
3023 .collect();
3024 if to_commit.is_empty() {
3025 return true;
3026 }
3027 match registry.batch_update_timestamps(&to_commit) {
3028 Ok(()) => {
3029 // Remove committed entries from the shared
3030 // map — but only if their current value still
3031 // matches the snapshot's. This preserves
3032 // concurrent updates that arrived between
3033 // snapshot and remove.
3034 let committed_map: std::collections::HashMap<String, SystemTime> =
3035 to_commit
3036 .iter()
3037 .map(|(pv, ts)| ((*pv).to_string(), *ts))
3038 .collect();
3039 pending.remove_committed(&committed_map);
3040 }
3041 Err(e) => {
3042 // Don't remove — retry on next cycle.
3043 // Otherwise a transient SQLite error orphans
3044 // timestamps the disk has but the registry
3045 // doesn't know about, with no retry path.
3046 error!(
3047 "Registry timestamp commit failed; \
3048 keeping {} pending for retry: {e}",
3049 snapshot.len()
3050 );
3051 }
3052 }
3053 }
3054 Ok(Ok(Err(e))) => {
3055 error!(
3056 "STS ingest flush errored; deferring all {} \
3057 pending timestamp commits: {e}",
3058 snapshot.len()
3059 );
3060 }
3061 Ok(Err(join_err)) => {
3062 error!(
3063 "STS ingest flush task panicked; deferring all {} \
3064 pending timestamp commits: {join_err}",
3065 snapshot.len()
3066 );
3067 }
3068 Err(_) => {
3069 // Flush wedged. Conservative drop: the spawn_blocking
3070 // task may still be running and may even fail late
3071 // (which would remove a PV's writer from the cache,
3072 // making the NEXT flush return "clean" and tricking
3073 // the owner into committing a stale value for a PV
3074 // whose old bytes are gone). Drop the snapshot's
3075 // entries from `pending` (only if unchanged), so the
3076 // owner doesn't trust them; future samples rebuild
3077 // pending and the next clean flush catches the
3078 // registry up.
3079 //
3080 // The `in_flight` flag stays TRUE until the
3081 // spawn_blocking task naturally finishes — this
3082 // throttles us from queueing yet another flush onto
3083 // the wedged FS until it clears.
3084 metrics::counter!("archiver_storage_flush_timeouts_total").increment(1);
3085 let dropped = snapshot.len();
3086 pending.remove_committed(&snapshot);
3087 error!(
3088 "STS ingest flush timed out after {flush_timeout:?}; \
3089 conservatively dropped {dropped} pending timestamp \
3090 commit(s) (task remains on blocking pool; \
3091 timestamps will be rebuilt from subsequent samples)"
3092 );
3093 }
3094 }
3095 true
3096}
3097
3098/// Coalescing per-PV pending-timestamp map shared between every
3099/// shard worker (incl. their spawn_blocking late-success closures)
3100/// and the single global flush owner.
3101///
3102/// **Why a shared map instead of an mpsc channel?** With a channel,
3103/// a slow flush owner causes the buffer to fill and `try_send`
3104/// drops happen — for a PV that goes silent right after a dropped
3105/// report, the registry's `last_event` is permanently
3106/// under-committed. With a coalescing map keyed by PV, every
3107/// shard call is an `entry().and_modify().or_insert()` that
3108/// always succeeds; concurrent updates from different shards
3109/// never lose data because the map is bounded by **PV count**
3110/// (typical: thousands), not by sample rate (typical: 100k/s).
3111///
3112/// **Coalescing semantics.** A successful append for `(pv, ts)`
3113/// upserts the entry to `max(current, ts)`. A stale late-success
3114/// report (e.g. from a spawn_blocking task that finished after
3115/// shard already wrote a newer sample) does NOT clobber a newer
3116/// committed value.
3117#[derive(Default)]
3118pub struct PendingReports {
3119 inner: dashmap::DashMap<String, SystemTime>,
3120}
3121
3122impl PendingReports {
3123 pub fn new() -> Self {
3124 Self::default()
3125 }
3126
3127 /// Coalescing report. If `pv` already has a newer timestamp,
3128 /// no-op. Always succeeds — this is the channel-replacement
3129 /// invariant that makes silent-PV under-commit impossible.
3130 pub fn report(&self, pv: &str, ts: SystemTime) {
3131 // DashMap's entry() locks one shard internally. Fast-path
3132 // updates use `and_modify`; brand-new PVs go through
3133 // `or_insert`. No external lock contention across shards.
3134 self.inner
3135 .entry(pv.to_string())
3136 .and_modify(|cur| {
3137 if *cur < ts {
3138 *cur = ts;
3139 }
3140 })
3141 .or_insert(ts);
3142 }
3143
3144 /// Snapshot the entire map for a flush cycle. Returns a
3145 /// owned HashMap; the shared map stays intact so concurrent
3146 /// shards can keep reporting during the flush.
3147 pub fn snapshot(&self) -> std::collections::HashMap<String, SystemTime> {
3148 self.inner
3149 .iter()
3150 .map(|kv| (kv.key().clone(), *kv.value()))
3151 .collect()
3152 }
3153
3154 /// Remove entries whose CURRENT value still equals what we
3155 /// committed. If a concurrent shard advanced an entry to a
3156 /// newer ts after the snapshot, leave it alone — the next
3157 /// flush will pick it up.
3158 pub fn remove_committed(
3159 &self,
3160 committed: &std::collections::HashMap<String, SystemTime>,
3161 ) {
3162 for (pv, &committed_ts) in committed {
3163 // remove_if applies the predicate atomically inside
3164 // the DashMap shard lock.
3165 let _ = self.inner.remove_if(pv, |_, v| *v == committed_ts);
3166 }
3167 }
3168
3169 /// Unconditionally remove every failed PV's entry from the
3170 /// pending map, regardless of its current timestamp.
3171 ///
3172 /// **Why unconditional.** The storage's loss queue carries
3173 /// only PV names — it cannot tell us whether a concurrent
3174 /// shard's post-snapshot report was for bytes that ARE on
3175 /// disk (e.g., a fresh writer opened after eviction) versus
3176 /// bytes that are also gone. The matching `remove_committed`
3177 /// path can leave a post-snapshot entry behind, and the next
3178 /// clean flush would commit it — turning a "PV's bytes were
3179 /// lost" report into a `last_event` lie.
3180 ///
3181 /// The trade-off: a brand-new sample whose bytes truly DID
3182 /// reach disk after the loss event gets dropped from this
3183 /// commit cycle. Future samples re-populate `pending` and the
3184 /// registry catches up. Under-commit is the safe direction;
3185 /// over-commit is never acceptable.
3186 pub fn remove_failed(&self, failed: &[String]) {
3187 for pv in failed {
3188 let _ = self.inner.remove(pv);
3189 }
3190 }
3191
3192 pub fn is_empty(&self) -> bool {
3193 self.inner.is_empty()
3194 }
3195
3196 pub fn len(&self) -> usize {
3197 self.inner.len()
3198 }
3199}
3200
3201/// Same as [`write_loop`] but accepts the full [`WriteLoopConfig`].
3202///
3203/// Thin wrapper that runs a 1-shard sharded pool — kept on the
3204/// public surface for tests and any direct callers. The actual
3205/// work happens in [`run_sharded_write_pool`].
3206pub async fn write_loop_with_config(
3207 storage: Arc<dyn StoragePlugin>,
3208 registry: Arc<PvRegistry>,
3209 rx: mpsc::Receiver<PvSample>,
3210 shutdown: tokio::sync::watch::Receiver<bool>,
3211 cfg: WriteLoopConfig,
3212) {
3213 let pool_cfg = ShardedWritePoolConfig {
3214 shards: 1,
3215 // unused on the 1-shard fast path (rx is forwarded directly)
3216 per_shard_buffer: 4096,
3217 write_loop: cfg,
3218 };
3219 run_sharded_write_pool(storage, registry, rx, shutdown, pool_cfg).await;
3220}
3221
3222/// Per-shard append worker. Drains samples from `sample_rx`,
3223/// drops out-of-order or type-changed samples, then runs each
3224/// `append_event_with_meta` on the blocking pool with a bounded
3225/// timeout. On every success — including the late-success path
3226/// where write_loop's outer timeout already abandoned the
3227/// JoinHandle — coalesces `(pv, ts)` into the shared
3228/// [`PendingReports`] map so the global flush owner sees it on
3229/// the next flush cycle.
3230///
3231/// The shard worker holds NO ts_updates / flush state of its own.
3232/// It tracks `last_ts` and `last_dbr_type` only for the
3233/// per-PV ordering / type-change drop checks; those are local
3234/// to the shard because the dispatcher's consistent hash pins each
3235/// PV to one shard.
3236async fn shard_append_loop(
3237 shard_idx: usize,
3238 storage: Arc<dyn StoragePlugin>,
3239 mut sample_rx: mpsc::Receiver<PvSample>,
3240 pending: Arc<PendingReports>,
3241 mut shutdown: tokio::sync::watch::Receiver<bool>,
3242 append_timeout: Duration,
3243 drain_per_sample_timeout: Duration,
3244 drain_total_budget: Duration,
3245) {
3246 info!(shard = shard_idx, "Shard append loop started");
3247 // Per-PV state for the in-shard sanity drops. The dispatcher's
3248 // consistent hash keeps each PV in one shard, so these maps
3249 // never see cross-shard PVs.
3250 let mut last_ts: std::collections::HashMap<String, SystemTime> =
3251 std::collections::HashMap::new();
3252 let mut last_dbr_type: std::collections::HashMap<String, ArchDbType> =
3253 std::collections::HashMap::new();
3254
3255 loop {
3256 tokio::select! {
3257 Some(pv_sample) = sample_rx.recv() => {
3258 shard_handle_sample(
3259 shard_idx,
3260 &storage,
3261 &pending,
3262 pv_sample,
3263 &mut last_ts,
3264 &mut last_dbr_type,
3265 append_timeout,
3266 )
3267 .await;
3268 }
3269 _ = shutdown.changed() => {
3270 shard_drain_on_shutdown(
3271 shard_idx,
3272 &storage,
3273 &pending,
3274 &mut sample_rx,
3275 &mut last_ts,
3276 &mut last_dbr_type,
3277 drain_per_sample_timeout,
3278 drain_total_budget,
3279 )
3280 .await;
3281 break;
3282 }
3283 }
3284 }
3285 info!(shard = shard_idx, "Shard append loop exited");
3286}
3287
3288/// Process one sample on the shard's hot path. Extracted so the
3289/// steady-state and shutdown-drain branches can share the same
3290/// "drop checks → spawn_blocking append → report on success"
3291/// recipe.
3292async fn shard_handle_sample(
3293 shard_idx: usize,
3294 storage: &Arc<dyn StoragePlugin>,
3295 pending: &Arc<PendingReports>,
3296 pv_sample: PvSample,
3297 last_ts: &mut std::collections::HashMap<String, SystemTime>,
3298 last_dbr_type: &mut std::collections::HashMap<String, ArchDbType>,
3299 append_timeout: Duration,
3300) {
3301 let ts = pv_sample.sample.timestamp;
3302
3303 // Out-of-order timestamp drop. Storage requires monotonic
3304 // appends per PV; an older timestamp would produce a corrupt
3305 // partition. Tracked via shard-local `last_ts` so the check
3306 // survives flush cycles (the legacy ts_updates-based check
3307 // only caught within-cycle reorderings).
3308 if let Some(prev_ts) = last_ts.get(&pv_sample.pv_name)
3309 && ts < *prev_ts
3310 {
3311 if let Some(ref c) = pv_sample.counters {
3312 c.timestamp_drops.fetch_add(1, Ordering::Relaxed);
3313 }
3314 debug!(
3315 shard = shard_idx,
3316 pv = pv_sample.pv_name,
3317 ?ts,
3318 ?prev_ts,
3319 "Dropping out-of-order sample"
3320 );
3321 return;
3322 }
3323
3324 // Type-change drop. The first sample defines the PV's wire
3325 // type; later samples with a different DBR get dropped
3326 // (operator must changeTypeForPV first).
3327 let prev_type = last_dbr_type.insert(pv_sample.pv_name.clone(), pv_sample.dbr_type);
3328 if let Some(prev) = prev_type
3329 && prev != pv_sample.dbr_type
3330 {
3331 if let Some(ref c) = pv_sample.counters {
3332 c.type_change_drops.fetch_add(1, Ordering::Relaxed);
3333 // Java parity (9f2234f): record the latest observed
3334 // DBR so the dropped-events report can show what the
3335 // IOC is now sending vs the archived type.
3336 c.latest_observed_dbr
3337 .store(pv_sample.dbr_type as i32, Ordering::Relaxed);
3338 }
3339 debug!(
3340 shard = shard_idx,
3341 pv = pv_sample.pv_name,
3342 ?prev,
3343 new = ?pv_sample.dbr_type,
3344 "Dropping type-changed sample"
3345 );
3346 // Restore prev_type so a single mismatched sample doesn't
3347 // permanently flip our recorded type.
3348 last_dbr_type.insert(pv_sample.pv_name.clone(), prev);
3349 return;
3350 }
3351
3352 let meta = AppendMeta {
3353 element_count: pv_sample.element_count,
3354 ..Default::default()
3355 };
3356 let pv_name_for_post = pv_sample.pv_name.clone();
3357 let counters_for_post = pv_sample.counters.clone();
3358 let counters_in_task = pv_sample.counters.clone();
3359 let storage_for_task = storage.clone();
3360 let pending_for_task = pending.clone();
3361
3362 // Bound each append + isolate sync syscall hangs.
3363 // `spawn_blocking` moves the actual I/O to the blocking
3364 // thread pool, so a stuck NFS write parks a blocking-pool
3365 // thread instead of a runtime worker. The
3366 // `tokio::time::timeout` then bounds how long the shard waits
3367 // for that join — a stuck task is abandoned and the shard
3368 // continues.
3369 //
3370 // Late-success path: a timed-out task may still complete its
3371 // write later. The closure ALWAYS reports into the shared
3372 // `PendingReports` map on Ok (whether or not the shard
3373 // already moved on), so the global flush owner picks up late
3374 // successes too. Per-PV serialization inside PlainPB keeps
3375 // any late completion ordered behind subsequent same-PV
3376 // samples. The map's coalescing semantics ensure a stale late
3377 // success does NOT clobber a newer hot-path commit.
3378 let join = tokio::task::spawn_blocking(move || {
3379 let rt = tokio::runtime::Handle::current();
3380 let res = rt.block_on(storage_for_task.append_event_with_meta(
3381 &pv_sample.pv_name,
3382 pv_sample.dbr_type,
3383 &pv_sample.sample,
3384 &meta,
3385 ));
3386 if res.is_ok() {
3387 metrics::counter!("archiver_events_stored_total").increment(1);
3388 if let Some(c) = counters_in_task.as_ref() {
3389 c.events_stored.fetch_add(1, Ordering::Relaxed);
3390 }
3391 // Coalesce into the shared map. Always succeeds —
3392 // unlike the previous mpsc try_send, a saturated
3393 // flush owner cannot cause this report to drop, so
3394 // a PV that goes silent right after a successful
3395 // append still gets its `last_event` committed once
3396 // the owner gets to its next flush cycle.
3397 pending_for_task.report(&pv_sample.pv_name, ts);
3398 }
3399 res
3400 });
3401 let res = tokio::time::timeout(append_timeout, join).await;
3402 // Conservative ordering high-water (principle 5). Bump
3403 // `last_ts` on EVERY storage-layer return path — success,
3404 // error, panic, timeout — not just on Ok and timeout.
3405 //
3406 // Why all four:
3407 // * Ok — sample on disk; obvious bump.
3408 // * Timeout — sample MAY land on disk via late success;
3409 // bump to keep on-disk order monotonic.
3410 // * Error — current storage contract is binary, but a
3411 // future partial-success-with-error variant
3412 // could leave bytes on disk. Bumping now
3413 // hedges against that.
3414 // * Panic — closure aborted mid-write; storage state
3415 // is undefined. Bumping is the safe choice.
3416 //
3417 // The pre-check earlier in this function already drops
3418 // out-of-order and type-changed samples BEFORE we get here,
3419 // so this bump only ever sets the high-water to a ts the
3420 // shard explicitly accepted into the storage layer.
3421 last_ts.insert(pv_name_for_post.clone(), ts);
3422 match res {
3423 Ok(Ok(Ok(()))) => {
3424 // Report already went out from inside the closure.
3425 }
3426 Ok(Ok(Err(e))) => {
3427 error!(shard = shard_idx, pv = pv_name_for_post, "Write error: {e}");
3428 }
3429 Ok(Err(join_err)) => {
3430 error!(
3431 shard = shard_idx,
3432 pv = pv_name_for_post,
3433 "Storage write task panicked: {join_err}"
3434 );
3435 }
3436 Err(_) => {
3437 // Sample MAY still land on disk later (per-PV
3438 // serialization keeps order). The closure will report
3439 // into the shared pending map whenever it eventually
3440 // completes, so registry's `last_event` catches up
3441 // via the late path even though we move on now.
3442 error!(
3443 shard = shard_idx,
3444 pv = pv_name_for_post,
3445 "Storage append timed out after {append_timeout:?}; \
3446 shard abandoning (task remains on blocking pool, \
3447 may still succeed late)"
3448 );
3449 if let Some(ref c) = counters_for_post {
3450 c.storage_append_timeouts.fetch_add(1, Ordering::Relaxed);
3451 }
3452 }
3453 }
3454}
3455
3456/// Drain the shard's remaining buffered samples on shutdown,
3457/// applying the same bounded-timeout discipline as the hot path.
3458async fn shard_drain_on_shutdown(
3459 shard_idx: usize,
3460 storage: &Arc<dyn StoragePlugin>,
3461 pending: &Arc<PendingReports>,
3462 sample_rx: &mut mpsc::Receiver<PvSample>,
3463 last_ts: &mut std::collections::HashMap<String, SystemTime>,
3464 last_dbr_type: &mut std::collections::HashMap<String, ArchDbType>,
3465 drain_per_sample_timeout: Duration,
3466 drain_total_budget: Duration,
3467) {
3468 let drain_start = std::time::Instant::now();
3469 let mut drained_skipped = 0usize;
3470 while let Ok(pv_sample) = sample_rx.try_recv() {
3471 if drain_start.elapsed() > drain_total_budget {
3472 drained_skipped += 1;
3473 continue;
3474 }
3475 // Reuse the hot-path handler with a tighter timeout.
3476 // Late-success reports still flow into `pending` because
3477 // the closure clones the Arc independently of this
3478 // shard's lifecycle.
3479 shard_handle_sample(
3480 shard_idx,
3481 storage,
3482 pending,
3483 pv_sample,
3484 last_ts,
3485 last_dbr_type,
3486 drain_per_sample_timeout,
3487 )
3488 .await;
3489 }
3490 if drained_skipped > 0 {
3491 warn!(
3492 shard = shard_idx,
3493 "Shutdown drain budget ({drain_total_budget:?}) exhausted; \
3494 {drained_skipped} buffered sample(s) abandoned without \
3495 attempting append"
3496 );
3497 }
3498}
3499
3500/// Single global flush owner. Reads from the shared
3501/// [`PendingReports`] map (which all shards coalesce into),
3502/// drives the periodic `flush_ingest_writes`, and commits to the
3503/// registry. Because ALL shards report into this one map and the
3504/// owner is the only consumer, the flush result and the
3505/// pending-commit data live in the same task — no shard 0 vs
3506/// shard 1 attribution skew.
3507async fn flush_owner_loop(
3508 storage: Arc<dyn StoragePlugin>,
3509 registry: Arc<PvRegistry>,
3510 pending: Arc<PendingReports>,
3511 mut shutdown: tokio::sync::watch::Receiver<bool>,
3512 flush_period: Duration,
3513 flush_timeout: Duration,
3514 shutdown_flush_timeout: Duration,
3515 drain_total_budget: Duration,
3516) {
3517 // tokio::time::interval(Duration::ZERO) panics. Config-side
3518 // validation rejects 0 at load time, but defending in depth
3519 // here keeps tests and direct callers (which build configs by
3520 // hand) from crashing the engine on a misconfigured value.
3521 let flush_period = if flush_period.is_zero() {
3522 warn!(
3523 "flush_owner: flush_period was Duration::ZERO; \
3524 clamping to 1s to avoid tokio::time::interval panic"
3525 );
3526 Duration::from_secs(1)
3527 } else {
3528 flush_period
3529 };
3530 info!("Flush owner started");
3531
3532 let mut flush_ticker = tokio::time::interval(flush_period);
3533 flush_ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
3534 let _ = flush_ticker.tick().await;
3535
3536 let flush_in_flight = Arc::new(std::sync::atomic::AtomicBool::new(false));
3537
3538 // Phase 1: steady state. Ticker-driven flush only. Reports
3539 // accumulate in the shared `PendingReports` map continuously
3540 // (no recv loop — shards write directly), so we don't need
3541 // to multiplex report consumption against the flush.
3542 loop {
3543 tokio::select! {
3544 biased;
3545 _ = shutdown.changed() => {
3546 if *shutdown.borrow() {
3547 break;
3548 }
3549 }
3550 _ = flush_ticker.tick() => {
3551 if !pending.is_empty() {
3552 run_flush_and_commit(
3553 &storage,
3554 ®istry,
3555 &pending,
3556 flush_timeout,
3557 &flush_in_flight,
3558 )
3559 .await;
3560 }
3561 }
3562 }
3563 }
3564
3565 // Phase 2: shutdown grace. Two windows in one budget:
3566 //
3567 // 1. A brief minimum sleep so any in-flight spawn_blocking
3568 // late-success closures can coalesce their reports into
3569 // `pending` before the final flush snapshots it.
3570 //
3571 // 2. Wait for any still-running ticker flush to drain
3572 // (`flush_in_flight` cleared by the spawn_blocking task's
3573 // RAII guard). Without this, the final
3574 // `run_flush_and_commit` would see `in_flight == true`,
3575 // short-circuit, and skip every entry in `pending` —
3576 // data on disk but no registry commit.
3577 //
3578 // Both windows fit inside the operator-configured
3579 // `drain_total_budget`. If a flush is genuinely wedged past
3580 // the budget, we proceed to final flush; that call will see
3581 // `in_flight` still set and short-circuit, but at least we
3582 // didn't pin the process forever. Future samples on restart
3583 // will re-populate `pending` and the registry catches up.
3584 let phase2_deadline = std::time::Instant::now() + drain_total_budget;
3585 let min_grace = std::cmp::min(drain_total_budget, Duration::from_millis(200));
3586 if !min_grace.is_zero() {
3587 tokio::time::sleep(min_grace).await;
3588 }
3589 while flush_in_flight.load(Ordering::Acquire)
3590 && std::time::Instant::now() < phase2_deadline
3591 {
3592 tokio::time::sleep(Duration::from_millis(50)).await;
3593 }
3594 if flush_in_flight.load(Ordering::Acquire) {
3595 warn!(
3596 "Shutdown grace exhausted while a flush is still in flight; \
3597 final flush will be skipped (pending entries left for next \
3598 process restart to rebuild from re-archived samples)"
3599 );
3600 }
3601
3602 // Final flush + commit. Use shutdown_flush_timeout (typically
3603 // shorter than the steady-state flush_timeout) so a wedged FS
3604 // doesn't block shutdown indefinitely.
3605 info!(
3606 pending_at_shutdown = pending.len(),
3607 "Flush owner running final flush + commit"
3608 );
3609 if !pending.is_empty() {
3610 run_flush_and_commit(
3611 &storage,
3612 ®istry,
3613 &pending,
3614 shutdown_flush_timeout,
3615 &flush_in_flight,
3616 )
3617 .await;
3618 }
3619 info!("Flush owner exiting");
3620}