archiver_engine/channel_manager.rs
1use std::sync::atomic::{AtomicI32, AtomicI64, AtomicU64, Ordering};
2use std::sync::{Arc, Mutex};
3use std::time::{Duration, SystemTime};
4
5use dashmap::DashMap;
6use epics_rs::base::server::snapshot::DbrClass;
7use epics_rs::base::types::{DbFieldType, EpicsValue};
8use epics_rs::ca::client::{CaChannel, CaClient, ConnectionEvent};
9use epics_rs::pva::client_native::PvaClient;
10use epics_rs::pva::pvdata::{PvField, ScalarValue};
11use tokio::sync::mpsc;
12use tokio_util::sync::CancellationToken;
13use tracing::{debug, error, info, warn};
14
15use archiver_core::registry::{Protocol, PvRecord, PvRegistry, PvStatus, SampleMode};
16use archiver_core::storage::traits::{AppendMeta, IngestFlushResult, StoragePlugin};
17use archiver_core::types::{ArchDbType, ArchiverSample, ArchiverValue};
18
19use crate::policy::PolicyConfig;
20
21/// Timeout for initial CA channel connection.
22const CA_CONNECT_TIMEOUT: Duration = Duration::from_secs(10);
23/// Timeout for CA reconnection attempts in the monitor loop.
24const CA_RECONNECT_TIMEOUT: Duration = Duration::from_secs(30);
25/// Delay before retrying a failed CA subscription.
26const CA_RETRY_DELAY: Duration = Duration::from_secs(5);
27
28/// Hard floor on accepted timestamps. Mirrors Java's `PAST_CUTOFF_TIMESTAMP`
29/// of 1991-01-01 — earlier than that, the timestamp is almost certainly a
30/// stale uninitialised IOC clock or a sentinel.
31const PAST_CUTOFF_UNIX_SECS: i64 = 662_688_000; // 1991-01-01 00:00:00 UTC
32
33/// Filter a freshly-received sample timestamp against the wall clock and
34/// the floor. Returns `Some(ts)` if accepted, `None` if it should be
35/// dropped (caller bumps `timestamp_drops`).
36///
37/// `drift_secs` is the configured `server_ioc_drift_secs` (Java parity
38/// 6538631), so per-site tuning doesn't require recompiling. `now` is
39/// passed in (rather than calling `SystemTime::now()` here) so the test
40/// suite can pin time deterministically.
41fn ioc_timestamp_in_window(ts: SystemTime, now: SystemTime, drift_secs: u64) -> bool {
42 let unix = ts
43 .duration_since(SystemTime::UNIX_EPOCH)
44 .map(|d| d.as_secs() as i64)
45 .unwrap_or(i64::MIN);
46 if unix < PAST_CUTOFF_UNIX_SECS {
47 return false;
48 }
49 // Within ±drift_secs of `now`?
50 let now_unix = now
51 .duration_since(SystemTime::UNIX_EPOCH)
52 .map(|d| d.as_secs() as i64)
53 .unwrap_or(0);
54 let delta = (unix - now_unix).unsigned_abs();
55 delta <= drift_secs
56}
57
58/// Discrete connection state for `getPVDetails` (Java parity dea7acb).
59/// Distinguishes never-connected from connecting from confirmed-down.
60#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
61pub enum PvConnectionState {
62 /// No connection attempt has been made (or none has progressed
63 /// past `wait_connected` yet).
64 #[default]
65 Idle,
66 /// Currently waiting on `wait_connected` for the first time on
67 /// this channel handle.
68 Connecting,
69 /// Channel has reported a successful connect; samples are flowing
70 /// or the channel is otherwise live.
71 Connected,
72 /// Channel reported connect at least once but the monitor loop has
73 /// since dropped — distinct from `Idle` so operators can spot a
74 /// regression vs a never-resolved name.
75 Disconnected,
76}
77
78impl PvConnectionState {
79 pub fn as_str(self) -> &'static str {
80 match self {
81 Self::Idle => "Idle",
82 Self::Connecting => "Connecting",
83 Self::Connected => "Connected",
84 Self::Disconnected => "Disconnected",
85 }
86 }
87}
88
89/// Connection state tracked per PV.
90#[derive(Debug, Clone, Default)]
91pub struct ConnectionInfo {
92 pub connected_since: Option<SystemTime>,
93 pub last_event_time: Option<SystemTime>,
94 pub is_connected: bool,
95 /// Java parity (dea7acb): discrete connection state for the
96 /// PVDetails report. Operators can distinguish never-connected,
97 /// currently-connecting, and previously-connected-now-down.
98 pub state: PvConnectionState,
99}
100
101/// Per-PV diagnostic counters for the BPL drop / rate / connection
102/// reports. All counts are monotonic across the PV's lifetime; the
103/// rate handlers compute deltas against `first_event_unix_secs`.
104///
105/// Tracked here (not in ConnectionInfo) so they survive transient
106/// disconnects and are read lock-free from the report endpoints.
107#[derive(Debug)]
108pub struct PvCounters {
109 /// Total events produced by monitor/scan, including those that
110 /// later got dropped before write.
111 pub events_received: AtomicU64,
112 /// Total events successfully written to storage.
113 pub events_stored: AtomicU64,
114 /// Unix-epoch seconds of the first event we ever saw for this PV.
115 /// 0 = "no event yet" (i64 because Atomic<Option<...>> doesn't exist).
116 pub first_event_unix_secs: AtomicI64,
117 /// Number of events the bounded sample channel rejected (write
118 /// loop falling behind producer). Surfaces as
119 /// `getDroppedEventsBufferOverflowReport`.
120 pub buffer_overflow_drops: AtomicU64,
121 /// Number of events whose timestamp went backwards relative to the
122 /// previously-stored event. Java archiver's
123 /// `DroppedEventsTimestampReport`.
124 pub timestamp_drops: AtomicU64,
125 /// Number of events whose runtime DBR type didn't match the
126 /// PvRecord's stored type — the engine drops these because mixing
127 /// types in one PB partition would corrupt downstream readers.
128 pub type_change_drops: AtomicU64,
129 /// Number of disconnect transitions seen on this PV's CA channel.
130 /// `LostConnectionsReport`.
131 pub disconnect_count: AtomicU64,
132 /// Last unix-epoch seconds of disconnect transition.
133 pub last_disconnect_unix_secs: AtomicI64,
134 /// Number of transient subscribe / monitor-recv / scan-read errors
135 /// (Java parity 8fe73eb). Distinct from `disconnect_count` —
136 /// these are recoverable per-attempt failures rather than confirmed
137 /// link drops.
138 pub transient_error_count: AtomicU64,
139 /// Latest DBR type observed from CA that did not match the
140 /// archive-time recorded type (Java parity 9f2234f). `-1` = no
141 /// mismatch ever seen.
142 pub latest_observed_dbr: AtomicI32,
143 /// Number of DBR_CTRL metadata refresh attempts that failed (timeout,
144 /// transport error, missing display info). Operators looking at PVs
145 /// with empty PREC/EGU should check this.
146 pub metadata_fetch_failures: AtomicU64,
147 /// Number of `storage.append_event_with_meta` calls that exceeded
148 /// the per-sample storage timeout. Distinct from
149 /// `buffer_overflow_drops` (mpsc channel saturation) so an
150 /// operator can tell "the storage tier is wedged" apart from
151 /// "the writer can't keep up with the producer".
152 pub storage_append_timeouts: AtomicU64,
153}
154
155impl Default for PvCounters {
156 fn default() -> Self {
157 Self {
158 events_received: AtomicU64::new(0),
159 events_stored: AtomicU64::new(0),
160 first_event_unix_secs: AtomicI64::new(0),
161 buffer_overflow_drops: AtomicU64::new(0),
162 timestamp_drops: AtomicU64::new(0),
163 type_change_drops: AtomicU64::new(0),
164 disconnect_count: AtomicU64::new(0),
165 last_disconnect_unix_secs: AtomicI64::new(0),
166 transient_error_count: AtomicU64::new(0),
167 // -1 sentinel = no type mismatch ever observed.
168 latest_observed_dbr: AtomicI32::new(-1),
169 metadata_fetch_failures: AtomicU64::new(0),
170 storage_append_timeouts: AtomicU64::new(0),
171 }
172 }
173}
174
175/// Read-only snapshot of `PvCounters` — owned values so callers can
176/// move them across threads / serialise without reaching into Atomics.
177#[derive(Debug, Clone)]
178pub struct PvCountersSnapshot {
179 pub events_received: u64,
180 pub events_stored: u64,
181 pub first_event_unix_secs: Option<i64>,
182 pub buffer_overflow_drops: u64,
183 pub timestamp_drops: u64,
184 pub type_change_drops: u64,
185 pub disconnect_count: u64,
186 pub last_disconnect_unix_secs: Option<i64>,
187 pub transient_error_count: u64,
188 /// `Some(dbr_type as i32)` if a type-change mismatch has been
189 /// observed; `None` otherwise (Java parity 9f2234f).
190 pub latest_observed_dbr: Option<i32>,
191 pub metadata_fetch_failures: u64,
192 pub storage_append_timeouts: u64,
193}
194
195impl From<&PvCounters> for PvCountersSnapshot {
196 fn from(c: &PvCounters) -> Self {
197 let first = c.first_event_unix_secs.load(Ordering::Relaxed);
198 let last_disc = c.last_disconnect_unix_secs.load(Ordering::Relaxed);
199 Self {
200 events_received: c.events_received.load(Ordering::Relaxed),
201 events_stored: c.events_stored.load(Ordering::Relaxed),
202 first_event_unix_secs: if first == 0 { None } else { Some(first) },
203 buffer_overflow_drops: c.buffer_overflow_drops.load(Ordering::Relaxed),
204 timestamp_drops: c.timestamp_drops.load(Ordering::Relaxed),
205 type_change_drops: c.type_change_drops.load(Ordering::Relaxed),
206 disconnect_count: c.disconnect_count.load(Ordering::Relaxed),
207 last_disconnect_unix_secs: if last_disc == 0 {
208 None
209 } else {
210 Some(last_disc)
211 },
212 transient_error_count: c.transient_error_count.load(Ordering::Relaxed),
213 latest_observed_dbr: match c.latest_observed_dbr.load(Ordering::Relaxed) {
214 -1 => None,
215 v => Some(v),
216 },
217 metadata_fetch_failures: c.metadata_fetch_failures.load(Ordering::Relaxed),
218 storage_append_timeouts: c.storage_append_timeouts.load(Ordering::Relaxed),
219 }
220 }
221}
222
223/// Handle for a running PV archiving task.
224struct PvHandle {
225 /// `Some` for CA-acquired PVs (used by `try_get_value` for the
226 /// live-value RPC); `None` for PVA-acquired PVs since PVA exposes
227 /// no Clone-able channel handle — live_value for those routes
228 /// through `pva_client` directly.
229 channel: Option<CaChannel>,
230 cancel_token: CancellationToken,
231 #[allow(dead_code)]
232 dbr_type: ArchDbType,
233 conn_info: Arc<Mutex<ConnectionInfo>>,
234 /// Latest values of metadata fields (.HIHI, .LOLO, .EGU, ...) attached to
235 /// every sample emitted for this PV. Populated by per-field monitor tasks
236 /// owned by `cancel_token` (and per-field child tokens in `field_tokens`),
237 /// so stopping the PV stops all of them.
238 extras: Arc<ExtraFieldsCache>,
239 /// Per-field cancellation tokens — child tokens of `cancel_token`. Keyed
240 /// by field name (e.g. "HIHI"). Lets `update_archive_fields` cancel one
241 /// field's task without disturbing the others or the main PV.
242 field_tokens: Arc<DashMap<String, CancellationToken>>,
243 /// Serialises concurrent `update_archive_fields` calls for this PV so
244 /// add/remove/respawn never race with itself.
245 update_lock: Arc<tokio::sync::Mutex<()>>,
246 /// Diagnostic counters surfaced through the BPL drop / rate /
247 /// connection reports. Lock-free reads; updates from the producer
248 /// (monitor/scan) and the writer happen on different threads.
249 counters: Arc<PvCounters>,
250}
251
252/// Thread-safe cache of latest extra-field values for one PV.
253/// Each map entry is `(field_name, stringified_value)`.
254type ExtraFieldsCache = DashMap<String, String>;
255
256/// Default capacity for the bounded sample channel.
257/// This limits memory usage when producers outpace the storage writer.
258/// At ~200 bytes per sample, 500K entries ≈ 100 MB worst-case.
259const SAMPLE_CHANNEL_CAPACITY: usize = 500_000;
260
261/// RAII guard that removes a key from `pending_archives` on drop,
262/// ensuring cleanup even if the owning future is cancelled.
263struct PendingGuard<'a> {
264 map: &'a DashMap<String, ()>,
265 key: String,
266}
267
268impl Drop for PendingGuard<'_> {
269 fn drop(&mut self) {
270 self.map.remove(&self.key);
271 }
272}
273
274/// Manages EPICS Channel Access + pvAccess connections and dispatches
275/// archived samples to storage.
276pub struct ChannelManager {
277 /// The CA client context.
278 ca_client: CaClient,
279 /// The PVA client context (lazily used only when a PV's registry
280 /// row carries `Protocol::Pva`).
281 pva_client: PvaClient,
282 /// Active channels: PV name → handle with cancellation.
283 channels: DashMap<String, PvHandle>,
284 /// PVs currently being archived (in-progress CA connect). Prevents TOCTOU races
285 /// where two concurrent `archive_pv` calls could double-subscribe the same PV.
286 pending_archives: DashMap<String, ()>,
287 /// Per-PV mutex serialising archive/pause/resume/stop/destroy on a single
288 /// PV. Without this, e.g. `pause_pv` racing with `resume_pv` can leave
289 /// the registry status and the channel map disagreeing.
290 op_locks: DashMap<String, Arc<tokio::sync::Mutex<()>>>,
291 /// Storage backend.
292 #[allow(dead_code)]
293 storage: Arc<dyn StoragePlugin>,
294 /// PV metadata registry.
295 registry: Arc<PvRegistry>,
296 /// Sample sender for the write thread.
297 sample_tx: mpsc::Sender<PvSample>,
298 /// Optional policy configuration.
299 policy: Option<PolicyConfig>,
300 /// Per-site IOC drift bound (Java parity 6538631).
301 server_ioc_drift_secs: u64,
302}
303
304/// A sample ready to be written to storage.
305pub struct PvSample {
306 pub pv_name: String,
307 pub dbr_type: ArchDbType,
308 pub sample: ArchiverSample,
309 pub element_count: Option<i32>,
310 /// Counter handle used by the write loop to record timestamp /
311 /// type-change drops. None on samples produced before counter
312 /// support was wired up — write_loop tolerates the absence.
313 pub counters: Option<Arc<PvCounters>>,
314}
315
316impl ChannelManager {
317 pub async fn new(
318 storage: Arc<dyn StoragePlugin>,
319 registry: Arc<PvRegistry>,
320 policy: Option<PolicyConfig>,
321 ) -> anyhow::Result<(Self, mpsc::Receiver<PvSample>)> {
322 Self::new_with_drift(storage, registry, policy, 30 * 60).await
323 }
324
325 /// Construct with an explicit IOC drift bound. Java parity (6538631):
326 /// keeps tests + sites that don't surface `EngineConfig` on the
327 /// existing default while letting the daemon plumb a configured value.
328 pub async fn new_with_drift(
329 storage: Arc<dyn StoragePlugin>,
330 registry: Arc<PvRegistry>,
331 policy: Option<PolicyConfig>,
332 server_ioc_drift_secs: u64,
333 ) -> anyhow::Result<(Self, mpsc::Receiver<PvSample>)> {
334 let ca_client = CaClient::new().await.map_err(|e| anyhow::anyhow!("{e}"))?;
335 let pva_client = PvaClient::new().map_err(|e| anyhow::anyhow!("{e}"))?;
336 let (tx, rx) = mpsc::channel(SAMPLE_CHANNEL_CAPACITY);
337
338 let mgr = Self {
339 ca_client,
340 pva_client,
341 channels: DashMap::new(),
342 pending_archives: DashMap::new(),
343 op_locks: DashMap::new(),
344 storage,
345 registry,
346 sample_tx: tx,
347 policy,
348 server_ioc_drift_secs,
349 };
350
351 Ok((mgr, rx))
352 }
353
354 /// Get-or-insert the per-PV operation mutex. The returned `Arc<Mutex>`
355 /// is what callers should `.lock().await` on; holding the entry guard
356 /// (via `entry().or_insert_with`) across the await would deadlock the
357 /// DashMap shard.
358 fn op_lock(&self, pv_name: &str) -> Arc<tokio::sync::Mutex<()>> {
359 if let Some(existing) = self.op_locks.get(pv_name) {
360 return existing.clone();
361 }
362 self.op_locks
363 .entry(pv_name.to_string())
364 .or_insert_with(|| Arc::new(tokio::sync::Mutex::new(())))
365 .clone()
366 }
367
368 /// Restore all active PVs from the registry (called on startup).
369 ///
370 /// `pvs_by_status(Active)` already filters out alias rows (they carry
371 /// `status='alias'`), but we re-check `alias_for.is_some()` here so a
372 /// future schema change can't silently re-introduce double-archiving
373 /// of the underlying IOC PV.
374 pub async fn restore_from_registry(&self) -> anyhow::Result<u64> {
375 let active_pvs = self.registry.pvs_by_status(PvStatus::Active)?;
376 let total = active_pvs.len() as u64;
377 info!(total, "Restoring PVs from registry");
378
379 let mut restored = 0u64;
380 for record in active_pvs {
381 if record.alias_for.is_some() {
382 warn!(
383 pv = record.pv_name,
384 target = record.alias_for.as_deref(),
385 "Skipping alias row in restore; aliases are routed, not archived"
386 );
387 continue;
388 }
389 if let Err(e) = self.start_archiving_internal(&record).await {
390 warn!(pv = record.pv_name, "Failed to restore PV: {e}");
391 self.registry.set_status(&record.pv_name, PvStatus::Error)?;
392 } else {
393 restored += 1;
394 }
395 }
396 metrics::gauge!("archiver_pvs_active").set(restored as f64);
397 if restored < total {
398 warn!(
399 restored,
400 failed = total - restored,
401 "Some PVs failed to restore"
402 );
403 }
404
405 Ok(restored)
406 }
407
408 /// Start archiving a new PV.
409 pub async fn archive_pv(
410 &self,
411 pv_name: &str,
412 sample_mode: &SampleMode,
413 protocol: Protocol,
414 ) -> anyhow::Result<()> {
415 // Serialise with pause/resume/stop/destroy on the same PV.
416 let lock = self.op_lock(pv_name);
417 let _g = lock.lock().await;
418
419 if self.channels.contains_key(pv_name) {
420 anyhow::bail!("PV {pv_name} is already being archived");
421 }
422
423 // Atomically claim the PV to prevent concurrent archive_pv races.
424 // The guard ensures cleanup even if this future is cancelled.
425 if self
426 .pending_archives
427 .insert(pv_name.to_string(), ())
428 .is_some()
429 {
430 anyhow::bail!("PV {pv_name} archive operation already in progress");
431 }
432 let _guard = PendingGuard {
433 map: &self.pending_archives,
434 key: pv_name.to_string(),
435 };
436
437 match protocol {
438 Protocol::Ca => self.archive_pv_inner(pv_name, sample_mode).await,
439 Protocol::Pva => self.archive_pv_inner_pva(pv_name, sample_mode).await,
440 }
441 }
442
443 /// Inner implementation of archive_pv, separated for cleanup safety.
444 async fn archive_pv_inner(
445 &self,
446 pv_name: &str,
447 sample_mode: &SampleMode,
448 ) -> anyhow::Result<()> {
449 // Re-check after acquiring the pending slot (another task may have completed).
450 if self.channels.contains_key(pv_name) {
451 anyhow::bail!("PV {pv_name} is already being archived");
452 }
453
454 // Check policy override.
455 let (effective_mode, matched_policy_name) = if let Some(ref policy) = self.policy {
456 if let Some(p) = policy.find_policy(pv_name) {
457 (p.to_sample_mode(), Some(p.policy_name().to_string()))
458 } else {
459 (sample_mode.clone(), None)
460 }
461 } else {
462 (sample_mode.clone(), None)
463 };
464
465 // Connect to discover the native type.
466 let channel = self.ca_client.create_channel(pv_name);
467 channel
468 .wait_connected(CA_CONNECT_TIMEOUT)
469 .await
470 .map_err(|e| anyhow::anyhow!("Failed to connect to {pv_name}: {e}"))?;
471
472 let info = self
473 .ca_client
474 .cainfo(pv_name)
475 .await
476 .map_err(|e| anyhow::anyhow!("Failed to get info for {pv_name}: {e}"))?;
477
478 let dbr_type = dbr_field_to_arch_type(info.native_type);
479 let element_count = info.element_count as i32;
480
481 // Register in SQLite. (CA path — PVA acquisition uses a separate
482 // entry point, so this branch is always Protocol::Ca.)
483 self.registry
484 .register_pv(pv_name, dbr_type, &effective_mode, element_count)?;
485 // Java parity (b30f1a6): persist the matched policy's stable name so
486 // audit / metrics paths know which policy governed this archive.
487 if let Some(ref name) = matched_policy_name {
488 self.registry.update_policy_name(pv_name, Some(name))?;
489 }
490
491 let record = self
492 .registry
493 .get_pv(pv_name)?
494 .ok_or_else(|| anyhow::anyhow!("PV {pv_name} not found in registry"))?;
495 self.start_archiving_internal(&record).await?;
496
497 metrics::gauge!("archiver_pvs_active").increment(1.0);
498 info!(pv = pv_name, ?dbr_type, element_count, "Started archiving");
499 Ok(())
500 }
501
502 /// PVA equivalent of [`Self::archive_pv_inner`]. Connects via the
503 /// pvAccess client, infers archive type from a one-shot pvget, then
504 /// hands off to a `monitor_loop_pva` that mirrors the CA monitor
505 /// loop's lifecycle (samples → write_loop, cancel_token shutdown).
506 async fn archive_pv_inner_pva(
507 &self,
508 pv_name: &str,
509 sample_mode: &SampleMode,
510 ) -> anyhow::Result<()> {
511 if self.channels.contains_key(pv_name) {
512 anyhow::bail!("PV {pv_name} is already being archived");
513 }
514
515 // Apply policy override (same surface as CA path).
516 let (effective_mode, matched_policy_name) = if let Some(ref policy) = self.policy {
517 if let Some(p) = policy.find_policy(pv_name) {
518 (p.to_sample_mode(), Some(p.policy_name().to_string()))
519 } else {
520 (sample_mode.clone(), None)
521 }
522 } else {
523 (sample_mode.clone(), None)
524 };
525
526 // Connect + introspect: a one-shot pvget tells us the value
527 // shape. We rely on it instead of a `cainfo` analogue because
528 // PVA channels carry their type only via fetched data.
529 let connect = tokio::time::timeout(CA_CONNECT_TIMEOUT, self.pva_client.pvconnect(pv_name))
530 .await
531 .map_err(|_| anyhow::anyhow!("PVA connect to {pv_name} timed out"))?
532 .map_err(|e| anyhow::anyhow!("Failed to connect to {pv_name} via PVA: {e}"))?;
533 debug!(pv = pv_name, server = %connect, "PVA channel connected");
534
535 let initial = tokio::time::timeout(CA_CONNECT_TIMEOUT, self.pva_client.pvget(pv_name))
536 .await
537 .map_err(|_| anyhow::anyhow!("PVA pvget for {pv_name} timed out"))?
538 .map_err(|e| anyhow::anyhow!("Failed to pvget {pv_name}: {e}"))?;
539 let (dbr_type, element_count) = pv_field_to_arch_db_type(&initial).ok_or_else(|| {
540 anyhow::anyhow!(
541 "PV {pv_name}: PVA value is not a scalar/scalar-array; structured types not yet supported"
542 )
543 })?;
544
545 // Register with Pva flag.
546 self.registry.register_pv_with_protocol(
547 pv_name,
548 dbr_type,
549 &effective_mode,
550 element_count,
551 Protocol::Pva,
552 )?;
553 if let Some(ref name) = matched_policy_name {
554 self.registry.update_policy_name(pv_name, Some(name))?;
555 }
556 // Persist PREC/EGU extracted from the same pvget — equivalent
557 // to the CA path's DBR_CTRL refresh. Non-fatal on error.
558 let (prec, egu) = pv_field_extract_display(&initial);
559 if (prec.is_some() || egu.is_some())
560 && let Err(e) = self
561 .registry
562 .update_metadata(pv_name, prec.as_deref(), egu.as_deref())
563 {
564 debug!(pv = pv_name, "Failed to persist PVA PREC/EGU: {e}");
565 }
566
567 let record = self
568 .registry
569 .get_pv(pv_name)?
570 .ok_or_else(|| anyhow::anyhow!("PV {pv_name} not found in registry"))?;
571 self.start_archiving_internal(&record).await?;
572
573 metrics::gauge!("archiver_pvs_active").increment(1.0);
574 info!(
575 pv = pv_name,
576 ?dbr_type,
577 element_count,
578 protocol = "pva",
579 "Started archiving"
580 );
581 Ok(())
582 }
583
584 /// Internal: start a subscription for a PV record. Routes by
585 /// `record.protocol`; the CA branch keeps the historical behaviour,
586 /// the PVA branch goes through `start_archiving_internal_pva`.
587 async fn start_archiving_internal(&self, record: &PvRecord) -> anyhow::Result<()> {
588 if record.protocol == Protocol::Pva {
589 return self.start_archiving_internal_pva(record).await;
590 }
591 let pv_name = record.pv_name.clone();
592 let dbr_type = record.dbr_type;
593 let element_count = record.element_count;
594 let channel = self.ca_client.create_channel(&pv_name);
595 let cancel_token = CancellationToken::new();
596 let conn_info = Arc::new(Mutex::new(ConnectionInfo::default()));
597 let extras: Arc<ExtraFieldsCache> = Arc::new(DashMap::new());
598 let field_tokens: Arc<DashMap<String, CancellationToken>> = Arc::new(DashMap::new());
599 let update_lock = Arc::new(tokio::sync::Mutex::new(()));
600 let counters = Arc::new(PvCounters::default());
601
602 // Hold update_lock around the whole insert+spawn block so a
603 // concurrent update_archive_fields can't observe the empty
604 // field_tokens map and spawn its own copies of the same fields
605 // before we get to the spawn loop. update_archive_fields acquires
606 // the same update_lock before mutating tokens.
607 let _guard = update_lock.lock().await;
608
609 self.channels.insert(
610 pv_name.clone(),
611 PvHandle {
612 channel: Some(channel.clone()),
613 cancel_token: cancel_token.clone(),
614 dbr_type,
615 conn_info: conn_info.clone(),
616 extras: extras.clone(),
617 field_tokens: field_tokens.clone(),
618 update_lock: update_lock.clone(),
619 counters: counters.clone(),
620 },
621 );
622
623 // Start one monitor task per archive_field with a child cancel token,
624 // tracked so update_archive_fields can stop individual fields.
625 for field in &record.archive_fields {
626 let child = cancel_token.child_token();
627 field_tokens.insert(field.clone(), child.clone());
628 spawn_extra_field_monitor(
629 &self.ca_client,
630 &pv_name,
631 field,
632 extras.clone(),
633 child,
634 counters.clone(),
635 );
636 }
637 metrics::gauge!("archiver_extra_field_tasks").increment(record.archive_fields.len() as f64);
638 drop(_guard);
639
640 let tx = self.sample_tx.clone();
641 let token = cancel_token.clone();
642 let ci = conn_info.clone();
643 let extras_for_loop = extras.clone();
644 let counters_for_loop = counters.clone();
645 let registry_for_loop = self.registry.clone();
646
647 let drift = self.server_ioc_drift_secs;
648 match &record.sample_mode {
649 SampleMode::Monitor => {
650 tokio::spawn(async move {
651 monitor_loop(
652 pv_name,
653 dbr_type,
654 element_count,
655 channel,
656 tx,
657 token,
658 ci,
659 registry_for_loop,
660 extras_for_loop,
661 counters_for_loop,
662 drift,
663 )
664 .await;
665 });
666 }
667 SampleMode::Scan { period_secs } => {
668 let period = *period_secs;
669 tokio::spawn(async move {
670 scan_loop(
671 pv_name,
672 dbr_type,
673 element_count,
674 channel,
675 tx,
676 token,
677 period,
678 ci,
679 registry_for_loop,
680 extras_for_loop,
681 counters_for_loop,
682 )
683 .await;
684 });
685 }
686 }
687
688 Ok(())
689 }
690
691 /// PVA equivalent of `start_archiving_internal`. Spawns a single
692 /// callback-driven monitor task; pvAccess auto-reconnect inside
693 /// `pvmonitor_handle` removes the explicit reconnect loop the CA
694 /// path needs. PVA records use `channel: None` on the [`PvHandle`]
695 /// and skip per-field "extras" subscriptions (`.HIHI`/`.LOLO`/…)
696 /// since pvAccess wraps those in the NTScalar value structure
697 /// rather than separate channels.
698 async fn start_archiving_internal_pva(&self, record: &PvRecord) -> anyhow::Result<()> {
699 let pv_name = record.pv_name.clone();
700 let dbr_type = record.dbr_type;
701 let element_count = record.element_count;
702 let cancel_token = CancellationToken::new();
703 let conn_info = Arc::new(Mutex::new(ConnectionInfo::default()));
704 let extras: Arc<ExtraFieldsCache> = Arc::new(DashMap::new());
705 let field_tokens: Arc<DashMap<String, CancellationToken>> = Arc::new(DashMap::new());
706 let update_lock = Arc::new(tokio::sync::Mutex::new(()));
707 let counters = Arc::new(PvCounters::default());
708
709 self.channels.insert(
710 pv_name.clone(),
711 PvHandle {
712 channel: None,
713 cancel_token: cancel_token.clone(),
714 dbr_type,
715 conn_info: conn_info.clone(),
716 extras: extras.clone(),
717 field_tokens: field_tokens.clone(),
718 update_lock,
719 counters: counters.clone(),
720 },
721 );
722
723 let tx = self.sample_tx.clone();
724 let pva_client = self.pva_client.clone();
725 let token = cancel_token.clone();
726 let ci = conn_info.clone();
727 let counters_for_loop = counters.clone();
728 let drift = self.server_ioc_drift_secs;
729
730 match &record.sample_mode {
731 SampleMode::Monitor => {
732 let pv_name_loop = pv_name.clone();
733 let archive_fields_loop = record.archive_fields.clone();
734 let extras_for_loop = extras.clone();
735 tokio::spawn(async move {
736 monitor_loop_pva(
737 pv_name_loop,
738 dbr_type,
739 element_count,
740 pva_client,
741 tx,
742 token,
743 ci,
744 counters_for_loop,
745 drift,
746 archive_fields_loop,
747 extras_for_loop,
748 )
749 .await;
750 });
751 }
752 SampleMode::Scan { period_secs } => {
753 let period = *period_secs;
754 let pv_name_loop = pv_name.clone();
755 let archive_fields_loop = record.archive_fields.clone();
756 let extras_for_loop = extras.clone();
757 tokio::spawn(async move {
758 scan_loop_pva(
759 pv_name_loop,
760 dbr_type,
761 pva_client,
762 tx,
763 token,
764 period,
765 ci,
766 counters_for_loop,
767 drift,
768 archive_fields_loop,
769 extras_for_loop,
770 )
771 .await;
772 });
773 }
774 }
775
776 // Auxiliary periodic refreshers: keep PREC/EGU current after
777 // IOC restarts, and approximate disconnect events from a long
778 // sample gap. Both bind to the same cancel_token so a stop /
779 // delete reaps every PVA-side task in one go.
780 {
781 let pv_name = pv_name.clone();
782 let pva_client = self.pva_client.clone();
783 let registry = self.registry.clone();
784 let cancel = cancel_token.clone();
785 let counters_for_refresh = counters.clone();
786 tokio::spawn(async move {
787 pva_metadata_refresh_loop(
788 pv_name,
789 pva_client,
790 registry,
791 cancel,
792 counters_for_refresh,
793 )
794 .await;
795 });
796 }
797 {
798 let pv_name = pv_name.clone();
799 let conn_info = conn_info.clone();
800 let counters_for_watch = counters.clone();
801 let cancel = cancel_token.clone();
802 let sample_mode = record.sample_mode.clone();
803 tokio::spawn(async move {
804 pva_state_watchdog(pv_name, conn_info, counters_for_watch, cancel, sample_mode)
805 .await;
806 });
807 }
808
809 Ok(())
810 }
811
812 /// Replace the archive_fields list for a running PV. Cancels per-field
813 /// monitor tasks for fields that left the set, spawns fresh ones for
814 /// fields that joined, and leaves unchanged fields running. Serialised
815 /// per-PV by an async mutex so concurrent callers can't double-spawn.
816 /// The main PV keeps running.
817 pub async fn update_archive_fields(
818 &self,
819 pv_name: &str,
820 fields: &[String],
821 ) -> anyhow::Result<()> {
822 // Persist first so a restart sees the new set even if the engine
823 // half of the update fails partway through.
824 self.registry.update_archive_fields(pv_name, fields)?;
825
826 // If the PV isn't currently active there's nothing more to do —
827 // start_archiving_internal will pick up the new fields on resume.
828 let (parent_token, extras, field_tokens, update_lock, counters) = {
829 let Some(handle) = self.channels.get(pv_name) else {
830 return Ok(());
831 };
832 (
833 handle.cancel_token.clone(),
834 handle.extras.clone(),
835 handle.field_tokens.clone(),
836 handle.update_lock.clone(),
837 handle.counters.clone(),
838 )
839 };
840
841 // Serialise so two concurrent updates can't both decide the same
842 // field is missing and spawn it twice.
843 let _guard = update_lock.lock().await;
844
845 let wanted: std::collections::HashSet<&str> = fields.iter().map(|s| s.as_str()).collect();
846
847 // Cancel + drop tasks for fields that left the set. Removing the
848 // entry from `field_tokens` also drops our handle on the child
849 // token; the spawned task observes `cancelled()` and exits.
850 let to_remove: Vec<String> = field_tokens
851 .iter()
852 .filter(|e| !wanted.contains(e.key().as_str()))
853 .map(|e| e.key().clone())
854 .collect();
855 let removed_count = to_remove.len();
856 for key in to_remove {
857 if let Some((_, token)) = field_tokens.remove(&key) {
858 token.cancel();
859 }
860 extras.remove(&key);
861 }
862
863 // Spawn tasks for fields newly added. Existing fields keep their
864 // task and their last cached value.
865 let mut added_count = 0usize;
866 for f in fields {
867 if !field_tokens.contains_key(f) {
868 let child = parent_token.child_token();
869 field_tokens.insert(f.clone(), child.clone());
870 spawn_extra_field_monitor(
871 &self.ca_client,
872 pv_name,
873 f,
874 extras.clone(),
875 child,
876 counters.clone(),
877 );
878 added_count += 1;
879 }
880 }
881 let net = added_count as i64 - removed_count as i64;
882 if net != 0 {
883 metrics::gauge!("archiver_extra_field_tasks").increment(net as f64);
884 }
885 Ok(())
886 }
887
888 /// Pause archiving for a PV.
889 pub async fn pause_pv(&self, pv_name: &str) -> anyhow::Result<()> {
890 let lock = self.op_lock(pv_name);
891 let _g = lock.lock().await;
892 if let Some((_key, handle)) = self.channels.remove(pv_name) {
893 let extra_count = handle.field_tokens.len() as f64;
894 handle.cancel_token.cancel();
895 metrics::gauge!("archiver_pvs_active").decrement(1.0);
896 if extra_count > 0.0 {
897 metrics::gauge!("archiver_extra_field_tasks").decrement(extra_count);
898 }
899 }
900 self.registry.set_status(pv_name, PvStatus::Paused)?;
901 info!(pv = pv_name, "Paused archiving");
902 Ok(())
903 }
904
905 /// Resume a paused PV. Only paused or error PVs can be resumed;
906 /// calling resume on an already-active PV is a no-op (returns Ok).
907 pub async fn resume_pv(&self, pv_name: &str) -> anyhow::Result<()> {
908 let lock = self.op_lock(pv_name);
909 let _g = lock.lock().await;
910
911 let record = self
912 .registry
913 .get_pv(pv_name)?
914 .ok_or_else(|| anyhow::anyhow!("PV {pv_name} not found in registry"))?;
915
916 // Guard: if already active with a live task, nothing to do.
917 if record.status == PvStatus::Active && self.channels.contains_key(pv_name) {
918 info!(
919 pv = pv_name,
920 "PV is already actively archived, skipping resume"
921 );
922 return Ok(());
923 }
924
925 // Cancel any orphaned task to prevent duplicate subscriptions.
926 if let Some((_key, handle)) = self.channels.remove(pv_name) {
927 let extra_count = handle.field_tokens.len() as f64;
928 handle.cancel_token.cancel();
929 if extra_count > 0.0 {
930 metrics::gauge!("archiver_extra_field_tasks").decrement(extra_count);
931 }
932 }
933
934 // Start the task first; only mark Active if it succeeds.
935 self.start_archiving_internal(&record).await?;
936 self.registry.set_status(pv_name, PvStatus::Active)?;
937 metrics::gauge!("archiver_pvs_active").increment(1.0);
938 info!(pv = pv_name, "Resumed archiving");
939 Ok(())
940 }
941
942 /// Stop archiving a PV without removing it from the registry.
943 /// Sets the PV status to Inactive (data retained, monitoring stopped).
944 pub async fn stop_pv(&self, pv_name: &str) -> anyhow::Result<()> {
945 let lock = self.op_lock(pv_name);
946 let _g = lock.lock().await;
947 if let Some((_key, handle)) = self.channels.remove(pv_name) {
948 let extra_count = handle.field_tokens.len() as f64;
949 handle.cancel_token.cancel();
950 metrics::gauge!("archiver_pvs_active").decrement(1.0);
951 if extra_count > 0.0 {
952 metrics::gauge!("archiver_extra_field_tasks").decrement(extra_count);
953 }
954 }
955 self.registry.set_status(pv_name, PvStatus::Inactive)?;
956 info!(pv = pv_name, "Stopped archiving (inactive)");
957 Ok(())
958 }
959
960 /// Remove a PV from archiving entirely.
961 pub async fn destroy_pv(&self, pv_name: &str) -> anyhow::Result<()> {
962 let lock = self.op_lock(pv_name);
963 let _g = lock.lock().await;
964 if let Some((_key, handle)) = self.channels.remove(pv_name) {
965 let extra_count = handle.field_tokens.len() as f64;
966 handle.cancel_token.cancel();
967 metrics::gauge!("archiver_pvs_active").decrement(1.0);
968 if extra_count > 0.0 {
969 metrics::gauge!("archiver_extra_field_tasks").decrement(extra_count);
970 }
971 }
972 self.registry.remove_pv(pv_name)?;
973 // Don't remove the op_locks entry here: a concurrent caller may have
974 // already taken a clone of this Arc and be queued on it; removing
975 // would let a fresh caller obtain a new mutex and the queued one
976 // would race them. The map is bounded by the lifetime universe of
977 // PV names, which is acceptable.
978 info!(pv = pv_name, "Destroyed archiving channel");
979 Ok(())
980 }
981
982 /// List all currently archived PV names (from registry).
983 pub fn list_pvs(&self) -> Vec<String> {
984 self.registry.all_pv_names().unwrap_or_else(|e| {
985 warn!("Failed to list PVs: {e}");
986 Vec::new()
987 })
988 }
989
990 /// Match PVs by glob pattern (from registry).
991 pub fn matching_pvs(&self, pattern: &str) -> Vec<String> {
992 self.registry.matching_pvs(pattern).unwrap_or_else(|e| {
993 warn!("Failed to match PVs: {e}");
994 Vec::new()
995 })
996 }
997
998 /// Get the registry reference.
999 pub fn registry(&self) -> &Arc<PvRegistry> {
1000 &self.registry
1001 }
1002
1003 /// Get connection info for a PV.
1004 pub fn get_connection_info(&self, pv: &str) -> Option<ConnectionInfo> {
1005 self.channels.get(pv).map(|h| {
1006 h.conn_info
1007 .lock()
1008 .unwrap_or_else(|e| e.into_inner())
1009 .clone()
1010 })
1011 }
1012
1013 /// Get PV names that have never received any event (connected_since == None).
1014 pub fn get_never_connected_pvs(&self) -> Vec<String> {
1015 self.channels
1016 .iter()
1017 .filter(|entry| {
1018 let ci = entry
1019 .value()
1020 .conn_info
1021 .lock()
1022 .unwrap_or_else(|e| e.into_inner());
1023 ci.connected_since.is_none()
1024 })
1025 .map(|entry| entry.key().clone())
1026 .collect()
1027 }
1028
1029 /// Snapshot the diagnostic counters for one PV. Returns None if the
1030 /// PV isn't actively archived. The returned Arc is the live counter
1031 /// — callers read with `Ordering::Relaxed`.
1032 pub fn pv_counters(&self, pv_name: &str) -> Option<Arc<PvCounters>> {
1033 self.channels.get(pv_name).map(|h| h.counters.clone())
1034 }
1035
1036 /// Snapshot every active PV's counters. Returns `(pv_name,
1037 /// PvCountersSnapshot)` so callers don't have to handle Arc.
1038 pub fn all_pv_counters(&self) -> Vec<(String, PvCountersSnapshot)> {
1039 self.channels
1040 .iter()
1041 .map(|e| {
1042 (
1043 e.key().clone(),
1044 PvCountersSnapshot::from(&*e.value().counters),
1045 )
1046 })
1047 .collect()
1048 }
1049
1050 /// One-shot CA `get` against the running channel for `pv`. Returns
1051 /// `None` if the PV isn't actively archived. The timeout caps how
1052 /// long the caller will wait for a value when the IOC is slow.
1053 /// Powers `getEngineDataAction` / `getDataAtTimeEngine`.
1054 pub async fn live_value(
1055 &self,
1056 pv_name: &str,
1057 timeout: Duration,
1058 ) -> Option<anyhow::Result<ArchiverValue>> {
1059 let channel_opt = self.channels.get(pv_name)?.channel.clone();
1060 let Some(channel) = channel_opt else {
1061 // PVA-acquired PV — live_value via pva_client.pvget.
1062 let pva = self.pva_client.clone();
1063 let name = pv_name.to_string();
1064 let res = tokio::time::timeout(timeout, pva.pvget(&name)).await;
1065 return Some(match res {
1066 Ok(Ok(field)) => pv_field_scalar_to_archiver(&field)
1067 .ok_or_else(|| anyhow::anyhow!("PVA value not a scalar")),
1068 Ok(Err(e)) => Err(anyhow::anyhow!("PVA get failed: {e}")),
1069 Err(_) => Err(anyhow::anyhow!("PVA get timed out after {timeout:?}")),
1070 });
1071 };
1072 // Wait briefly for connection — channel.get on a disconnected
1073 // channel would otherwise return an error from deep in the CA
1074 // stack. Capped by the same timeout the caller chose.
1075 if channel.wait_connected(timeout).await.is_err() {
1076 return Some(Err(anyhow::anyhow!(
1077 "channel not connected within {timeout:?}"
1078 )));
1079 }
1080 match tokio::time::timeout(timeout, channel.get()).await {
1081 Ok(Ok((_dbr_type, val))) => Some(Ok(epics_value_to_archiver(&val))),
1082 Ok(Err(e)) => Some(Err(anyhow::anyhow!("CA get failed: {e}"))),
1083 Err(_) => Some(Err(anyhow::anyhow!("CA get timed out after {timeout:?}"))),
1084 }
1085 }
1086
1087 /// Snapshot the latest cached extra-field values for `pv` —
1088 /// `(field_name, stringified_value)` pairs. Empty map when the PV
1089 /// isn't archived or has no archive_fields configured.
1090 pub fn extras_snapshot(&self, pv_name: &str) -> std::collections::HashMap<String, String> {
1091 match self.channels.get(pv_name) {
1092 Some(handle) => handle
1093 .extras
1094 .iter()
1095 .map(|e| (e.key().clone(), e.value().clone()))
1096 .collect(),
1097 None => std::collections::HashMap::new(),
1098 }
1099 }
1100
1101 /// Get PV names that are currently disconnected (is_connected == false).
1102 pub fn get_currently_disconnected_pvs(&self) -> Vec<String> {
1103 self.channels
1104 .iter()
1105 .filter(|entry| {
1106 let ci = entry
1107 .value()
1108 .conn_info
1109 .lock()
1110 .unwrap_or_else(|e| e.into_inner());
1111 !ci.is_connected
1112 })
1113 .map(|entry| entry.key().clone())
1114 .collect()
1115 }
1116}
1117
1118/// Read DBR_CTRL metadata from the channel and persist `precision`/`units`
1119/// to the registry when they differ from stored values. Best-effort: any
1120/// failure (timeout, transport error, missing display info, non-numeric
1121/// type) is logged at debug and silently ignored. Skips the SQL write
1122/// entirely when the in-memory record already matches, so reconnect
1123/// storms don't churn the database.
1124async fn refresh_ctrl_metadata(
1125 channel: &CaChannel,
1126 registry: &PvRegistry,
1127 pv_name: &str,
1128 counters: &PvCounters,
1129) {
1130 // 15s — long enough for slow-network IOCs (Java's default `epicsTimeout`
1131 // is 30s; 15s splits the difference). Failures count toward
1132 // `metadata_fetch_failures` so operators can spot PVs that never get
1133 // PREC/EGU populated.
1134 const FETCH_TIMEOUT: Duration = Duration::from_secs(15);
1135 let snapshot = match tokio::time::timeout(
1136 FETCH_TIMEOUT,
1137 channel.get_with_metadata(DbrClass::Ctrl),
1138 )
1139 .await
1140 {
1141 Ok(Ok(s)) => s,
1142 Ok(Err(e)) => {
1143 counters
1144 .metadata_fetch_failures
1145 .fetch_add(1, Ordering::Relaxed);
1146 debug!(pv = pv_name, "Ctrl metadata fetch failed: {e}");
1147 return;
1148 }
1149 Err(_) => {
1150 counters
1151 .metadata_fetch_failures
1152 .fetch_add(1, Ordering::Relaxed);
1153 debug!(pv = pv_name, "Ctrl metadata fetch timed out");
1154 return;
1155 }
1156 };
1157 let Some(display) = snapshot.display else {
1158 // Non-numeric type (string, enum, …) — DBR_CTRL has no
1159 // DisplayInfo. Not a failure; just nothing to persist.
1160 return;
1161 };
1162
1163 // Negative precision is the Java EAA "no precision info" sentinel —
1164 // persisting "-1" as the PREC string would surface as a literal -1
1165 // in the UI / API. Treat it the same as missing.
1166 let new_prec_opt: Option<String> = if display.precision < 0 {
1167 None
1168 } else {
1169 Some(display.precision.to_string())
1170 };
1171 let new_egu_trimmed = display.units.trim();
1172 let new_egu_opt: Option<&str> = if new_egu_trimmed.is_empty() {
1173 None
1174 } else {
1175 Some(new_egu_trimmed)
1176 };
1177
1178 let stored = match registry.get_pv(pv_name) {
1179 Ok(Some(r)) => r,
1180 Ok(None) => return,
1181 Err(e) => {
1182 debug!(
1183 pv = pv_name,
1184 "Registry read for metadata compare failed: {e}"
1185 );
1186 return;
1187 }
1188 };
1189
1190 let prec_changed = match (stored.prec.as_deref(), new_prec_opt.as_deref()) {
1191 (Some(s), Some(n)) => s != n,
1192 (None, Some(_)) => true,
1193 // Don't overwrite a populated PREC with the "no info" sentinel.
1194 _ => false,
1195 };
1196 let egu_changed = match (stored.egu.as_deref(), new_egu_opt) {
1197 (Some(s), Some(n)) => s != n,
1198 (None, Some(_)) => true,
1199 // Don't overwrite a populated EGU with empty, and don't
1200 // bother writing None over None.
1201 _ => false,
1202 };
1203 if !prec_changed && !egu_changed {
1204 return;
1205 }
1206
1207 let prec_arg = if prec_changed {
1208 new_prec_opt.as_deref()
1209 } else {
1210 None
1211 };
1212 let egu_arg = if egu_changed { new_egu_opt } else { None };
1213 if let Err(e) = registry.update_metadata(pv_name, prec_arg, egu_arg) {
1214 warn!(pv = pv_name, "Failed to persist PREC/EGU: {e}");
1215 } else {
1216 debug!(
1217 pv = pv_name,
1218 prec = ?prec_arg, egu = ?egu_arg,
1219 "Refreshed PREC/EGU from DBR_CTRL"
1220 );
1221 }
1222}
1223
1224/// Body shared by the two PVA monitor callback variants
1225/// (`pvmonitor_handle` takes `(FieldDesc, PvField)`,
1226/// `pvmonitor_with_request` takes `(PvField)`). Lifted out so we
1227/// can write the once-per-event logic in one place.
1228#[allow(clippy::too_many_arguments)]
1229fn pva_handle_event(
1230 field: &PvField,
1231 pv_name: &str,
1232 dbr_type: ArchDbType,
1233 tx: &mpsc::Sender<PvSample>,
1234 conn_info: &Mutex<ConnectionInfo>,
1235 counters: &Arc<PvCounters>,
1236 extras: &ExtraFieldsCache,
1237 extras_paths: &[(String, &'static str)],
1238 drift_secs: u64,
1239) {
1240 let now = SystemTime::now();
1241 let first_after_connect = {
1242 let mut ci = conn_info.lock().unwrap_or_else(|e| e.into_inner());
1243 let first = ci.last_event_time.is_none();
1244 if ci.connected_since.is_none() {
1245 ci.connected_since = Some(now);
1246 }
1247 ci.is_connected = true;
1248 ci.last_event_time = Some(now);
1249 ci.state = PvConnectionState::Connected;
1250 first
1251 };
1252 if first_after_connect {
1253 counters
1254 .first_event_unix_secs
1255 .compare_exchange(0, unix_secs(now), Ordering::Relaxed, Ordering::Relaxed)
1256 .ok();
1257 }
1258 counters.events_received.fetch_add(1, Ordering::Relaxed);
1259
1260 let Some(value) = pv_field_scalar_to_archiver(field) else {
1261 counters.type_change_drops.fetch_add(1, Ordering::Relaxed);
1262 debug!(pv = pv_name, "PVA event has non-scalar value; dropping");
1263 return;
1264 };
1265
1266 let ts = pv_field_extract_timestamp(field);
1267 if !first_after_connect && !ioc_timestamp_in_window(ts, now, drift_secs) {
1268 counters.timestamp_drops.fetch_add(1, Ordering::Relaxed);
1269 debug!(
1270 pv = pv_name,
1271 ?ts,
1272 "Dropping PVA sample with out-of-window timestamp"
1273 );
1274 return;
1275 }
1276 let elem_count = match pv_field_element_count(field) {
1277 0 => 1,
1278 n => n,
1279 };
1280 for (field_name, path) in extras_paths {
1281 if let Some(PvField::Scalar(s)) = pv_field_walk_path(field, path) {
1282 extras.insert(field_name.clone(), scalar_value_to_string(s));
1283 }
1284 }
1285 let mut sample = ArchiverSample::new(ts, value);
1286 attach_extras(extras, &mut sample);
1287 let pv_sample = PvSample {
1288 pv_name: pv_name.to_string(),
1289 dbr_type,
1290 sample,
1291 element_count: Some(elem_count),
1292 counters: Some(counters.clone()),
1293 };
1294 if let Err(tokio::sync::mpsc::error::TrySendError::Full(_)) = tx.try_send(pv_sample) {
1295 counters
1296 .buffer_overflow_drops
1297 .fetch_add(1, Ordering::Relaxed);
1298 }
1299}
1300
1301/// PVA monitor loop: subscribes once and parks until cancellation.
1302/// Each fan-in event is decoded inline, packaged as a [`PvSample`],
1303/// and non-blocking-pushed into the storage write_loop's channel —
1304/// blocking would stall the pvAccess reactor thread.
1305///
1306/// When `archive_fields` is non-empty, the subscription requests an
1307/// explicit pvRequest with the mapped sub-field paths so the IOC
1308/// includes them in every monitor event; the callback then mirrors
1309/// the CA path's "extras" cache by stringifying each requested field.
1310#[allow(clippy::too_many_arguments)]
1311async fn monitor_loop_pva(
1312 pv_name: String,
1313 dbr_type: ArchDbType,
1314 element_count: i32,
1315 pva_client: PvaClient,
1316 tx: mpsc::Sender<PvSample>,
1317 cancel_token: CancellationToken,
1318 conn_info: Arc<Mutex<ConnectionInfo>>,
1319 counters: Arc<PvCounters>,
1320 server_ioc_drift_secs: u64,
1321 archive_fields: Vec<String>,
1322 extras: Arc<ExtraFieldsCache>,
1323) {
1324 let drift_secs = server_ioc_drift_secs;
1325 // Static element_count is unused — each callback derives the
1326 // current array length from the live PvField, so an NTScalarArray
1327 // whose size changes between events still gets tagged correctly.
1328 let _ = element_count;
1329
1330 // Build the (CA name, PVA path) pairs once. Skip CA fields with
1331 // no PVA equivalent so a misconfigured archive_fields list
1332 // doesn't poison the pvRequest.
1333 let extras_paths: Vec<(String, &'static str)> = archive_fields
1334 .iter()
1335 .filter_map(|f| ca_archive_field_to_pva_path(f).map(|p| (f.clone(), p)))
1336 .collect();
1337 let request_expr = if extras_paths.is_empty() {
1338 None
1339 } else {
1340 let mut builder = epics_rs::pva::pv_request::PvRequestBuilder::new()
1341 .field("value")
1342 .field("alarm")
1343 .field("timeStamp");
1344 for (_, path) in &extras_paths {
1345 builder = builder.field(*path);
1346 }
1347 Some(builder.build())
1348 };
1349
1350 // Subscribe loop with retry. The custom-request path uses
1351 // `pvmonitor_with_request` (no SubscriptionHandle) inside a
1352 // tokio::select! so cancellation drops the future cleanly; the
1353 // empty-request path keeps the original SubscriptionHandle form
1354 // since that's the simpler path for the common case.
1355 loop {
1356 if let Some(ref req) = request_expr {
1357 // Custom-request path: 1-arg callback. pvmonitor_with_request
1358 // returns a future that runs to completion; race it against
1359 // the cancel signal.
1360 let pv_name_cb = pv_name.clone();
1361 let tx_cb = tx.clone();
1362 let conn_info_cb = conn_info.clone();
1363 let counters_cb = counters.clone();
1364 let extras_cb = extras.clone();
1365 let extras_paths_cb = extras_paths.clone();
1366 let cb = move |field: &PvField| {
1367 pva_handle_event(
1368 field,
1369 &pv_name_cb,
1370 dbr_type,
1371 &tx_cb,
1372 &conn_info_cb,
1373 &counters_cb,
1374 &extras_cb,
1375 &extras_paths_cb,
1376 drift_secs,
1377 );
1378 };
1379 tokio::select! {
1380 _ = cancel_token.cancelled() => {
1381 debug!(pv = pv_name, "PVA monitor (custom request) cancelled");
1382 return;
1383 }
1384 res = pva_client.pvmonitor_with_request(&pv_name, req, cb) => {
1385 match res {
1386 Ok(()) => {
1387 debug!(pv = pv_name, "PVA pvmonitor_with_request returned Ok; resubscribing");
1388 }
1389 Err(e) => {
1390 counters
1391 .transient_error_count
1392 .fetch_add(1, Ordering::Relaxed);
1393 warn!(pv = pv_name, "PVA pvmonitor_with_request failed: {e}; retrying");
1394 }
1395 }
1396 tokio::select! {
1397 _ = cancel_token.cancelled() => return,
1398 _ = tokio::time::sleep(CA_RETRY_DELAY) => continue,
1399 }
1400 }
1401 }
1402 } else {
1403 // Empty-request fast path: 2-arg callback. Keep the
1404 // SubscriptionHandle so the inner reactor task lives
1405 // until our explicit drop.
1406 let pv_name_cb = pv_name.clone();
1407 let tx_cb = tx.clone();
1408 let conn_info_cb = conn_info.clone();
1409 let counters_cb = counters.clone();
1410 let extras_cb = extras.clone();
1411 let extras_paths_cb = extras_paths.clone();
1412 let cb = move |_desc: &epics_rs::pva::pvdata::FieldDesc, field: &PvField| {
1413 pva_handle_event(
1414 field,
1415 &pv_name_cb,
1416 dbr_type,
1417 &tx_cb,
1418 &conn_info_cb,
1419 &counters_cb,
1420 &extras_cb,
1421 &extras_paths_cb,
1422 drift_secs,
1423 );
1424 };
1425 let handle = match pva_client.pvmonitor_handle(&pv_name, cb).await {
1426 Ok(h) => h,
1427 Err(e) => {
1428 counters
1429 .transient_error_count
1430 .fetch_add(1, Ordering::Relaxed);
1431 warn!(pv = pv_name, "PVA pvmonitor failed: {e}; retrying");
1432 tokio::select! {
1433 _ = cancel_token.cancelled() => return,
1434 _ = tokio::time::sleep(CA_RETRY_DELAY) => continue,
1435 }
1436 }
1437 };
1438 debug!(pv = pv_name, "PVA monitor active");
1439 cancel_token.cancelled().await;
1440 debug!(pv = pv_name, "PVA monitor cancelled; dropping subscription");
1441 drop(handle);
1442 return;
1443 }
1444 }
1445}
1446
1447/// PVA Scan loop: periodic `pvget` instead of streaming monitor.
1448/// Mirrors the CA scan_loop's per-tick connection check + sample emit.
1449#[allow(clippy::too_many_arguments)]
1450async fn scan_loop_pva(
1451 pv_name: String,
1452 dbr_type: ArchDbType,
1453 pva_client: PvaClient,
1454 tx: mpsc::Sender<PvSample>,
1455 cancel_token: CancellationToken,
1456 period_secs: f64,
1457 conn_info: Arc<Mutex<ConnectionInfo>>,
1458 counters: Arc<PvCounters>,
1459 server_ioc_drift_secs: u64,
1460 archive_fields: Vec<String>,
1461 extras: Arc<ExtraFieldsCache>,
1462) {
1463 let extras_paths: Vec<(String, &'static str)> = archive_fields
1464 .iter()
1465 .filter_map(|f| ca_archive_field_to_pva_path(f).map(|p| (f.clone(), p)))
1466 .collect();
1467 let period = Duration::from_secs_f64(period_secs);
1468 let mut interval = tokio::time::interval(period);
1469 let drift_secs = server_ioc_drift_secs;
1470 // Hard timeout per pvget — at most one tick of slack so a stuck
1471 // PVA server can't accumulate pending requests.
1472 let pvget_timeout = period.max(Duration::from_secs(5));
1473
1474 loop {
1475 tokio::select! {
1476 _ = cancel_token.cancelled() => return,
1477 _ = interval.tick() => {}
1478 }
1479
1480 let res = tokio::time::timeout(pvget_timeout, pva_client.pvget(&pv_name)).await;
1481 let field = match res {
1482 Ok(Ok(f)) => f,
1483 Ok(Err(e)) => {
1484 let was_connected = {
1485 let mut ci = conn_info.lock().unwrap_or_else(|e| e.into_inner());
1486 let prev = ci.is_connected;
1487 ci.is_connected = false;
1488 ci.last_event_time = None;
1489 ci.state = match ci.state {
1490 PvConnectionState::Connected => PvConnectionState::Disconnected,
1491 PvConnectionState::Disconnected => PvConnectionState::Disconnected,
1492 _ => PvConnectionState::Connecting,
1493 };
1494 prev
1495 };
1496 if was_connected {
1497 counters.disconnect_count.fetch_add(1, Ordering::Relaxed);
1498 counters
1499 .last_disconnect_unix_secs
1500 .store(unix_secs(SystemTime::now()), Ordering::Relaxed);
1501 }
1502 counters
1503 .transient_error_count
1504 .fetch_add(1, Ordering::Relaxed);
1505 debug!(pv = pv_name, "PVA scan pvget failed: {e}");
1506 continue;
1507 }
1508 Err(_) => {
1509 counters
1510 .transient_error_count
1511 .fetch_add(1, Ordering::Relaxed);
1512 debug!(pv = pv_name, "PVA scan pvget timed out");
1513 continue;
1514 }
1515 };
1516
1517 let now = SystemTime::now();
1518 let first_after_connect = {
1519 let mut ci = conn_info.lock().unwrap_or_else(|e| e.into_inner());
1520 let first = ci.last_event_time.is_none();
1521 if ci.connected_since.is_none() {
1522 ci.connected_since = Some(now);
1523 }
1524 ci.is_connected = true;
1525 ci.last_event_time = Some(now);
1526 ci.state = PvConnectionState::Connected;
1527 first
1528 };
1529 counters.events_received.fetch_add(1, Ordering::Relaxed);
1530 if first_after_connect {
1531 counters
1532 .first_event_unix_secs
1533 .compare_exchange(0, unix_secs(now), Ordering::Relaxed, Ordering::Relaxed)
1534 .ok();
1535 }
1536
1537 let Some(value) = pv_field_scalar_to_archiver(&field) else {
1538 counters.type_change_drops.fetch_add(1, Ordering::Relaxed);
1539 debug!(pv = pv_name, "PVA scan returned non-scalar value; dropping");
1540 continue;
1541 };
1542 // Scan mode: timestamp the sample at receive time (no IOC
1543 // timestamp available reliably for periodic pvget, mirroring
1544 // CA scan_loop's `now` policy).
1545 let _ = drift_secs; // referenced for symmetry; not used here
1546 let elem_count = match pv_field_element_count(&field) {
1547 0 => 1,
1548 n => n,
1549 };
1550 // Refresh extras from this scan's pvget result (the pvget
1551 // returns the full NTScalar so all sub-fields are available
1552 // for free — no extra channel spawn needed).
1553 for (field_name, path) in &extras_paths {
1554 if let Some(PvField::Scalar(s)) = pv_field_walk_path(&field, path) {
1555 extras.insert(field_name.clone(), scalar_value_to_string(s));
1556 }
1557 }
1558 let mut sample = ArchiverSample::new(now, value);
1559 attach_extras(&extras, &mut sample);
1560 let pv_sample = PvSample {
1561 pv_name: pv_name.clone(),
1562 dbr_type,
1563 sample,
1564 element_count: Some(elem_count),
1565 counters: Some(counters.clone()),
1566 };
1567 if let Err(rejected) = try_send_with_overflow_count(&tx, pv_sample, &counters).await {
1568 // Channel closed (write_loop down). Cooperative shutdown.
1569 let _ = rejected;
1570 return;
1571 }
1572 }
1573}
1574
1575/// Periodic refresh task for PVA-acquired PVs: re-fetches DBR_CTRL-
1576/// equivalent metadata (`display.units`, `display.precision`) every
1577/// few minutes and persists when changed. PVA's monitor callback API
1578/// doesn't surface explicit reconnect events the way CA's
1579/// `ConnectionEvent` does, so a dedicated periodic poll is the
1580/// simplest way to keep PREC/EGU fresh after IOC restarts.
1581async fn pva_metadata_refresh_loop(
1582 pv_name: String,
1583 pva_client: PvaClient,
1584 registry: Arc<PvRegistry>,
1585 cancel_token: CancellationToken,
1586 counters: Arc<PvCounters>,
1587) {
1588 // 5 minutes — operators almost never change PREC/EGU at runtime;
1589 // shorter cadence wastes registry/network and longer leaves UI
1590 // stale across IOC restarts. Same magic number Java EAA uses
1591 // for its `metaFieldsRefresh` cron.
1592 const REFRESH_INTERVAL: Duration = Duration::from_secs(5 * 60);
1593 const FETCH_TIMEOUT: Duration = Duration::from_secs(15);
1594 let mut tick = tokio::time::interval(REFRESH_INTERVAL);
1595 // Skip the immediate first tick — initial pvget already populated
1596 // PREC/EGU at archive_pv time.
1597 tick.tick().await;
1598 loop {
1599 tokio::select! {
1600 _ = cancel_token.cancelled() => return,
1601 _ = tick.tick() => {}
1602 }
1603
1604 let res = tokio::time::timeout(FETCH_TIMEOUT, pva_client.pvget(&pv_name)).await;
1605 let field = match res {
1606 Ok(Ok(f)) => f,
1607 _ => {
1608 counters
1609 .metadata_fetch_failures
1610 .fetch_add(1, Ordering::Relaxed);
1611 continue;
1612 }
1613 };
1614 let (new_prec, new_egu) = pv_field_extract_display(&field);
1615 let stored = match registry.get_pv(&pv_name) {
1616 Ok(Some(r)) => r,
1617 _ => continue,
1618 };
1619 let prec_changed = match (stored.prec.as_deref(), new_prec.as_deref()) {
1620 (Some(s), Some(n)) => s != n,
1621 (None, Some(_)) => true,
1622 _ => false,
1623 };
1624 let egu_changed = match (stored.egu.as_deref(), new_egu.as_deref()) {
1625 (Some(s), Some(n)) => s != n,
1626 (None, Some(_)) => true,
1627 _ => false,
1628 };
1629 if !prec_changed && !egu_changed {
1630 continue;
1631 }
1632 let prec_arg = if prec_changed {
1633 new_prec.as_deref()
1634 } else {
1635 None
1636 };
1637 let egu_arg = if egu_changed {
1638 new_egu.as_deref()
1639 } else {
1640 None
1641 };
1642 if let Err(e) = registry.update_metadata(&pv_name, prec_arg, egu_arg) {
1643 warn!(pv = pv_name, "Failed to persist PVA PREC/EGU: {e}");
1644 } else {
1645 debug!(pv = pv_name, prec = ?prec_arg, egu = ?egu_arg, "Refreshed PVA display");
1646 }
1647 }
1648}
1649
1650/// State watchdog for PVA-acquired PVs: a dedicated task that flips
1651/// `conn_info.state` to `Disconnected` when no event has arrived
1652/// within `STALE_THRESHOLD`. PVA's callback API doesn't surface
1653/// explicit channel-state changes — we approximate by treating a
1654/// long event gap as a disconnect. Trade-off: a genuinely silent PV
1655/// (alarm-only, low-rate scan) appears disconnected. Operators can
1656/// raise the threshold by env var if their PV cadence is slower.
1657async fn pva_state_watchdog(
1658 pv_name: String,
1659 conn_info: Arc<Mutex<ConnectionInfo>>,
1660 counters: Arc<PvCounters>,
1661 cancel_token: CancellationToken,
1662 sample_mode: SampleMode,
1663) {
1664 const POLL_INTERVAL: Duration = Duration::from_secs(5);
1665 // Threshold scales with the expected event cadence so a slow Scan
1666 // PV (period=300 s) doesn't flap on every tick. For Monitor mode
1667 // the floor is 60 s — most production PVs change at least that
1668 // often. Operators who archive truly silent PVs (alarm-only) will
1669 // see stale disconnected status; that's a known limitation.
1670 let stale_threshold = match sample_mode {
1671 SampleMode::Monitor => Duration::from_secs(60),
1672 SampleMode::Scan { period_secs } => Duration::from_secs_f64((period_secs * 3.0).max(60.0)),
1673 };
1674 let mut interval = tokio::time::interval(POLL_INTERVAL);
1675 interval.tick().await;
1676 loop {
1677 tokio::select! {
1678 _ = cancel_token.cancelled() => return,
1679 _ = interval.tick() => {}
1680 }
1681 let stale_now = {
1682 let ci = conn_info.lock().unwrap_or_else(|e| e.into_inner());
1683 if !ci.is_connected {
1684 continue;
1685 }
1686 match ci.last_event_time {
1687 Some(t) => SystemTime::now()
1688 .duration_since(t)
1689 .map(|d| d > stale_threshold)
1690 .unwrap_or(false),
1691 None => false,
1692 }
1693 };
1694 if stale_now {
1695 let was_connected = {
1696 let mut ci = conn_info.lock().unwrap_or_else(|e| e.into_inner());
1697 let prev = ci.is_connected;
1698 ci.is_connected = false;
1699 ci.state = PvConnectionState::Disconnected;
1700 prev
1701 };
1702 if was_connected {
1703 counters.disconnect_count.fetch_add(1, Ordering::Relaxed);
1704 counters
1705 .last_disconnect_unix_secs
1706 .store(unix_secs(SystemTime::now()), Ordering::Relaxed);
1707 debug!(
1708 pv = pv_name,
1709 "PVA watchdog: marking disconnected after stale heartbeat"
1710 );
1711 }
1712 }
1713 }
1714}
1715
1716/// Monitor loop: subscribe to a channel and forward values.
1717#[allow(clippy::too_many_arguments)]
1718async fn monitor_loop(
1719 pv_name: String,
1720 dbr_type: ArchDbType,
1721 element_count: i32,
1722 channel: CaChannel,
1723 tx: mpsc::Sender<PvSample>,
1724 cancel_token: CancellationToken,
1725 conn_info: Arc<Mutex<ConnectionInfo>>,
1726 registry: Arc<PvRegistry>,
1727 extras: Arc<ExtraFieldsCache>,
1728 counters: Arc<PvCounters>,
1729 server_ioc_drift_secs: u64,
1730) {
1731 loop {
1732 // Wait for connection, respecting cancellation.
1733 tokio::select! {
1734 _ = cancel_token.cancelled() => return,
1735 result = channel.wait_connected(CA_RECONNECT_TIMEOUT) => {
1736 if result.is_err() {
1737 let was_connected = {
1738 let mut ci = conn_info.lock().unwrap_or_else(|e| e.into_inner());
1739 let prev_connected = ci.is_connected;
1740 ci.is_connected = false;
1741 ci.last_event_time = None;
1742 // Connecting if we've never seen a connect, otherwise
1743 // demote from Connected to Disconnected on a real loss.
1744 ci.state = match ci.state {
1745 PvConnectionState::Connected => PvConnectionState::Disconnected,
1746 PvConnectionState::Disconnected => PvConnectionState::Disconnected,
1747 _ => PvConnectionState::Connecting,
1748 };
1749 prev_connected
1750 };
1751 // A search-failure timeout that demotes us out of
1752 // Connected counts as a fresh disconnect — without
1753 // this, only the post-monitor `break` path bumps the
1754 // counters and a flapping PV silently under-reports
1755 // its drops while subsequent `attach_cnx_lost_headers`
1756 // calls carry a stale `cnxlostepsecs`.
1757 if was_connected {
1758 counters.disconnect_count.fetch_add(1, Ordering::Relaxed);
1759 counters
1760 .last_disconnect_unix_secs
1761 .store(unix_secs(SystemTime::now()), Ordering::Relaxed);
1762 }
1763 // Subscribe BEFORE re-checking the current state. If we
1764 // subscribed after a state check, a Connected event
1765 // firing in between would be lost forever, leaving this
1766 // loop hung until the next disconnect/reconnect cycle.
1767 let mut conn_rx = channel.connection_events();
1768
1769 // Close the remaining race: the channel may have become
1770 // connected between the outer `wait_connected` timeout
1771 // and our `subscribe()` above, in which case no new event
1772 // will arrive. A short re-probe catches that case.
1773 if channel
1774 .wait_connected(Duration::from_millis(100))
1775 .await
1776 .is_err()
1777 {
1778 loop {
1779 tokio::select! {
1780 _ = cancel_token.cancelled() => return,
1781 event = conn_rx.recv() => {
1782 use tokio::sync::broadcast::error::RecvError;
1783 match event {
1784 Ok(ConnectionEvent::Connected) => break,
1785 Ok(_) => continue,
1786 // Lagged: we missed some events but
1787 // the channel is still live. Re-probe
1788 // state and otherwise keep waiting.
1789 Err(RecvError::Lagged(_)) => {
1790 if channel
1791 .wait_connected(Duration::from_millis(100))
1792 .await
1793 .is_ok()
1794 {
1795 break;
1796 }
1797 continue;
1798 }
1799 Err(RecvError::Closed) => return,
1800 }
1801 }
1802 }
1803 }
1804 }
1805 }
1806 }
1807 }
1808
1809 // Refresh DBR_CTRL metadata once per (re)connect so the registry's
1810 // PREC/EGU stay in sync with the IOC. Fire-and-forget so a slow
1811 // CA stack can't gate `subscribe()` (and therefore the first
1812 // sample) on a metadata round-trip. The spawned task is bound to
1813 // `cancel_token` so it terminates promptly when the PV is
1814 // stopped/deleted (otherwise a late metadata write could land on
1815 // a deleted-or-re-registered PV row).
1816 {
1817 let channel = channel.clone();
1818 let registry = registry.clone();
1819 let counters = counters.clone();
1820 let pv_name = pv_name.clone();
1821 let cancel = cancel_token.clone();
1822 tokio::spawn(async move {
1823 tokio::select! {
1824 _ = cancel.cancelled() => {}
1825 _ = refresh_ctrl_metadata(&channel, ®istry, &pv_name, &counters) => {}
1826 }
1827 });
1828 }
1829
1830 // Subscribe.
1831 let mut monitor = match channel.subscribe().await {
1832 Ok(m) => m,
1833 Err(e) => {
1834 counters
1835 .transient_error_count
1836 .fetch_add(1, Ordering::Relaxed);
1837 warn!(pv = pv_name, "Subscribe failed: {e}, retrying...");
1838 tokio::select! {
1839 _ = cancel_token.cancelled() => return,
1840 _ = tokio::time::sleep(CA_RETRY_DELAY) => continue,
1841 }
1842 }
1843 };
1844
1845 debug!(pv = pv_name, "Monitor subscription active");
1846
1847 // Receive values with cancellation.
1848 loop {
1849 tokio::select! {
1850 _ = cancel_token.cancelled() => return,
1851 result = monitor.recv() => {
1852 match result {
1853 Some(Ok(snapshot)) => {
1854 let now = SystemTime::now();
1855 // Java parity (11e554d0): use the IOC-reported
1856 // timestamp, not receive-time, so latency
1857 // doesn't smear sample times. First sample
1858 // after connect is accepted unconditionally
1859 // — legitimate backfill on reconnect can
1860 // include older timestamps. Subsequent
1861 // samples whose IOC clock is more than
1862 // SERVER_IOC_DRIFT_SECS away from wall clock,
1863 // or earlier than the 1991 floor, are
1864 // dropped + counted.
1865 let first_after_connect = {
1866 let mut ci = conn_info.lock().unwrap_or_else(|e| e.into_inner());
1867 let first = ci.last_event_time.is_none();
1868 if ci.connected_since.is_none() {
1869 ci.connected_since = Some(now);
1870 }
1871 ci.is_connected = true;
1872 ci.last_event_time = Some(now);
1873 ci.state = PvConnectionState::Connected;
1874 first
1875 };
1876 if !first_after_connect
1877 && !ioc_timestamp_in_window(
1878 snapshot.timestamp,
1879 now,
1880 server_ioc_drift_secs,
1881 )
1882 {
1883 counters.timestamp_drops.fetch_add(1, Ordering::Relaxed);
1884 debug!(
1885 pv = pv_name,
1886 ?snapshot.timestamp,
1887 "Dropping sample with out-of-window IOC timestamp"
1888 );
1889 continue;
1890 }
1891 counters.events_received.fetch_add(1, Ordering::Relaxed);
1892 // CAS the first-event timestamp once. 0 sentinel
1893 // means "no event yet"; replace with now.
1894 let now_secs = unix_secs(now);
1895 let _ = counters.first_event_unix_secs.compare_exchange(
1896 0,
1897 now_secs,
1898 Ordering::Relaxed,
1899 Ordering::Relaxed,
1900 );
1901 let archiver_val = epics_value_to_archiver(&snapshot.value);
1902 let mut sample = ArchiverSample::new(snapshot.timestamp, archiver_val);
1903 attach_extras(&extras, &mut sample);
1904 if first_after_connect {
1905 let lost_secs = counters
1906 .last_disconnect_unix_secs
1907 .load(Ordering::Relaxed);
1908 attach_cnx_lost_headers(&mut sample, lost_secs, now_secs);
1909 }
1910 let pv_sample = PvSample {
1911 pv_name: pv_name.clone(),
1912 dbr_type,
1913 sample,
1914 element_count: Some(element_count),
1915 counters: Some(counters.clone()),
1916 };
1917 if let Err(pv_sample) = try_send_with_overflow_count(
1918 &tx,
1919 pv_sample,
1920 &counters,
1921 )
1922 .await
1923 {
1924 let _ = pv_sample;
1925 return; // Write loop shut down
1926 }
1927 }
1928 Some(Err(e)) => {
1929 counters.transient_error_count.fetch_add(1, Ordering::Relaxed);
1930 warn!(pv = pv_name, "Monitor error: {e}");
1931 }
1932 None => break, // Monitor ended, reconnect
1933 }
1934 }
1935 }
1936 }
1937
1938 // Monitor ended (disconnect) — loop back to reconnect. Reset
1939 // `last_event_time` so the first sample after reconnect is
1940 // treated as `first_after_connect` and bypasses the drift
1941 // filter; without this, an IOC that comes back with a
1942 // legitimate backfill timestamp older than the 30 min window
1943 // would be silently dropped.
1944 {
1945 let mut ci = conn_info.lock().unwrap_or_else(|e| e.into_inner());
1946 ci.is_connected = false;
1947 ci.last_event_time = None;
1948 ci.state = PvConnectionState::Disconnected;
1949 }
1950 counters.disconnect_count.fetch_add(1, Ordering::Relaxed);
1951 counters
1952 .last_disconnect_unix_secs
1953 .store(unix_secs(SystemTime::now()), Ordering::Relaxed);
1954 debug!(pv = pv_name, "Monitor ended, waiting for reconnection");
1955 }
1956}
1957
1958fn unix_secs(t: SystemTime) -> i64 {
1959 t.duration_since(SystemTime::UNIX_EPOCH)
1960 .map(|d| d.as_secs() as i64)
1961 .unwrap_or(0)
1962}
1963
1964/// Send a sample and count buffer-overflow events. Tries non-blocking
1965/// first; if the bounded channel is full we increment the counter and
1966/// fall back to a blocking send so backpressure still works.
1967async fn try_send_with_overflow_count(
1968 tx: &mpsc::Sender<PvSample>,
1969 pv_sample: PvSample,
1970 counters: &PvCounters,
1971) -> Result<(), PvSample> {
1972 match tx.try_send(pv_sample) {
1973 Ok(()) => Ok(()),
1974 Err(tokio::sync::mpsc::error::TrySendError::Full(pv_sample)) => {
1975 counters
1976 .buffer_overflow_drops
1977 .fetch_add(1, Ordering::Relaxed);
1978 // Backpressure to the producer; this awaits until the writer
1979 // drains some space. We count the saturation event but don't
1980 // actually drop the sample.
1981 tx.send(pv_sample).await.map_err(|e| e.0)
1982 }
1983 Err(tokio::sync::mpsc::error::TrySendError::Closed(pv_sample)) => Err(pv_sample),
1984 }
1985}
1986
1987/// Scan loop: periodically read a channel value.
1988#[allow(clippy::too_many_arguments)]
1989async fn scan_loop(
1990 pv_name: String,
1991 dbr_type: ArchDbType,
1992 element_count: i32,
1993 channel: CaChannel,
1994 tx: mpsc::Sender<PvSample>,
1995 cancel_token: CancellationToken,
1996 period_secs: f64,
1997 conn_info: Arc<Mutex<ConnectionInfo>>,
1998 registry: Arc<PvRegistry>,
1999 extras: Arc<ExtraFieldsCache>,
2000 counters: Arc<PvCounters>,
2001) {
2002 let period = Duration::from_secs_f64(period_secs);
2003 let mut interval = tokio::time::interval(period);
2004 // Reset on every detected disconnect so we re-fetch metadata after
2005 // each reconnect (mirrors monitor_loop's once-per-(re)connect cadence).
2006 let mut metadata_done = false;
2007
2008 loop {
2009 tokio::select! {
2010 _ = cancel_token.cancelled() => return,
2011 _ = interval.tick() => {}
2012 }
2013
2014 if channel.wait_connected(CA_RETRY_DELAY).await.is_err() {
2015 let was_connected = {
2016 let mut ci = conn_info.lock().unwrap_or_else(|e| e.into_inner());
2017 let prev = ci.is_connected;
2018 ci.is_connected = false;
2019 ci.last_event_time = None;
2020 ci.state = match ci.state {
2021 PvConnectionState::Connected => PvConnectionState::Disconnected,
2022 PvConnectionState::Disconnected => PvConnectionState::Disconnected,
2023 _ => PvConnectionState::Connecting,
2024 };
2025 prev
2026 };
2027 if was_connected {
2028 counters.disconnect_count.fetch_add(1, Ordering::Relaxed);
2029 counters
2030 .last_disconnect_unix_secs
2031 .store(unix_secs(SystemTime::now()), Ordering::Relaxed);
2032 }
2033 metadata_done = false;
2034 continue;
2035 }
2036
2037 if !metadata_done {
2038 // Fire-and-forget; matches monitor_loop's once-per-(re)connect
2039 // semantics. The local `metadata_done` flag exists because
2040 // scan_loop's outer iteration is per-tick (vs monitor_loop's
2041 // per-reconnect), so we'd otherwise spawn a fetch every tick.
2042 // Bound to `cancel_token` so a stopped/deleted PV doesn't get
2043 // a stale metadata write from an in-flight task.
2044 let channel = channel.clone();
2045 let registry = registry.clone();
2046 let counters = counters.clone();
2047 let pv_name = pv_name.clone();
2048 let cancel = cancel_token.clone();
2049 tokio::spawn(async move {
2050 tokio::select! {
2051 _ = cancel.cancelled() => {}
2052 _ = refresh_ctrl_metadata(&channel, ®istry, &pv_name, &counters) => {}
2053 }
2054 });
2055 metadata_done = true;
2056 }
2057
2058 match channel.get().await {
2059 Ok((_dbr_type, epics_val)) => {
2060 let now = SystemTime::now();
2061 let first_after_connect = {
2062 let mut ci = conn_info.lock().unwrap_or_else(|e| e.into_inner());
2063 let first = ci.last_event_time.is_none();
2064 if ci.connected_since.is_none() {
2065 ci.connected_since = Some(now);
2066 }
2067 ci.is_connected = true;
2068 ci.last_event_time = Some(now);
2069 ci.state = PvConnectionState::Connected;
2070 first
2071 };
2072 counters.events_received.fetch_add(1, Ordering::Relaxed);
2073 let now_secs = unix_secs(now);
2074 let _ = counters.first_event_unix_secs.compare_exchange(
2075 0,
2076 now_secs,
2077 Ordering::Relaxed,
2078 Ordering::Relaxed,
2079 );
2080 let archiver_val = epics_value_to_archiver(&epics_val);
2081 let mut sample = ArchiverSample::new(now, archiver_val);
2082 attach_extras(&extras, &mut sample);
2083 if first_after_connect {
2084 let lost_secs = counters.last_disconnect_unix_secs.load(Ordering::Relaxed);
2085 attach_cnx_lost_headers(&mut sample, lost_secs, now_secs);
2086 }
2087 let pv_sample = PvSample {
2088 pv_name: pv_name.clone(),
2089 dbr_type,
2090 sample,
2091 element_count: Some(element_count),
2092 counters: Some(counters.clone()),
2093 };
2094 if try_send_with_overflow_count(&tx, pv_sample, &counters)
2095 .await
2096 .is_err()
2097 {
2098 return;
2099 }
2100 }
2101 Err(e) => {
2102 counters
2103 .transient_error_count
2104 .fetch_add(1, Ordering::Relaxed);
2105 debug!(pv = pv_name, "Scan read error: {e}");
2106 }
2107 }
2108 }
2109}
2110
2111/// Java parity (ed07feb): tag the first sample after (re)connect with
2112/// `cnxlostepsecs` / `cnxregainedepsecs` / `startup` so consumers can
2113/// detect archiver restarts and PV resumes. Unconditional emission —
2114/// on a clean startup `lost_secs` is 0, which is itself a valid value
2115/// for downstream gap detection.
2116fn attach_cnx_lost_headers(sample: &mut ArchiverSample, lost_secs: i64, now_secs: i64) {
2117 sample
2118 .field_values
2119 .push(("cnxlostepsecs".to_string(), lost_secs.to_string()));
2120 sample
2121 .field_values
2122 .push(("cnxregainedepsecs".to_string(), now_secs.to_string()));
2123 sample
2124 .field_values
2125 .push(("startup".to_string(), "true".to_string()));
2126}
2127
2128/// Snapshot the extras cache into `sample.field_values`. We sort for stable
2129/// PB output across runs (the protobuf field is repeated; consumers that
2130/// diff/compare files appreciate determinism).
2131fn attach_extras(extras: &ExtraFieldsCache, sample: &mut ArchiverSample) {
2132 if extras.is_empty() {
2133 return;
2134 }
2135 let mut entries: Vec<(String, String)> = extras
2136 .iter()
2137 .map(|e| (e.key().clone(), e.value().clone()))
2138 .collect();
2139 entries.sort_by(|a, b| a.0.cmp(&b.0));
2140 sample.field_values = entries;
2141}
2142
2143/// Render an EpicsValue as the string we'll persist in the metadata field
2144/// slot. Numeric scalars get `Display`-format; strings pass through; arrays
2145/// fall back to JSON-ish bracket notation. Stays in sync with what Java
2146/// archiver writes into PVTypeInfo's `archiveFields` blob (a string map).
2147fn epics_value_to_field_string(val: &EpicsValue) -> String {
2148 match val {
2149 EpicsValue::String(s) => s.clone(),
2150 EpicsValue::Short(v) => v.to_string(),
2151 EpicsValue::Float(v) => v.to_string(),
2152 EpicsValue::Enum(v) => v.to_string(),
2153 EpicsValue::Char(v) => v.to_string(),
2154 EpicsValue::Long(v) => v.to_string(),
2155 EpicsValue::Int64(v) => v.to_string(),
2156 EpicsValue::Double(v) => v.to_string(),
2157 EpicsValue::ShortArray(v) => format!("{v:?}"),
2158 EpicsValue::FloatArray(v) => format!("{v:?}"),
2159 EpicsValue::EnumArray(v) => format!("{v:?}"),
2160 EpicsValue::DoubleArray(v) => format!("{v:?}"),
2161 EpicsValue::LongArray(v) => format!("{v:?}"),
2162 EpicsValue::Int64Array(v) => format!("{v:?}"),
2163 EpicsValue::CharArray(v) => String::from_utf8_lossy(v).into_owned(),
2164 EpicsValue::StringArray(v) => format!("{v:?}"),
2165 }
2166}
2167
2168/// Spawn a long-running task that subscribes to `<pv>.<field>` and updates
2169/// `extras` with each event. Owned by `parent_token` so pause/destroy cleans
2170/// it up alongside the main PV.
2171fn spawn_extra_field_monitor(
2172 ca_client: &CaClient,
2173 pv_name: &str,
2174 field: &str,
2175 extras: Arc<ExtraFieldsCache>,
2176 parent_token: CancellationToken,
2177 counters: Arc<PvCounters>,
2178) {
2179 let full_name = format!("{pv_name}.{field}");
2180 let channel = ca_client.create_channel(&full_name);
2181 let field_owned = field.to_string();
2182 let pv_owned = pv_name.to_string();
2183
2184 // Catch-unwind boundary: a panic from the CA client (e.g. malformed
2185 // wire frame, allocation failure inside epics_rs) would propagate to
2186 // the runtime and abort sibling tasks of this worker thread. Trap it
2187 // here, log with PV+field context, and return normally so the runtime
2188 // remains healthy.
2189 let panic_pv = pv_owned.clone();
2190 let panic_field = field_owned.clone();
2191 tokio::spawn(async move {
2192 let body = std::panic::AssertUnwindSafe(extra_field_monitor_body(
2193 channel,
2194 pv_owned,
2195 field_owned,
2196 extras,
2197 parent_token,
2198 counters,
2199 ));
2200 if let Err(payload) = futures::FutureExt::catch_unwind(body).await {
2201 let msg = panic_payload_msg(&payload);
2202 error!(
2203 pv = panic_pv,
2204 field = panic_field,
2205 "Extra-field monitor panicked: {msg}"
2206 );
2207 }
2208 });
2209}
2210
2211/// Body of the spawned extra-field monitor. Split out so the spawn site
2212/// can wrap it in `catch_unwind`.
2213async fn extra_field_monitor_body(
2214 channel: CaChannel,
2215 pv_owned: String,
2216 field_owned: String,
2217 extras: Arc<ExtraFieldsCache>,
2218 parent_token: CancellationToken,
2219 counters: Arc<PvCounters>,
2220) {
2221 // Initial connect attempt — failure here is non-fatal (the field may
2222 // not exist on every IOC; we just leave the cache empty).
2223 if channel.wait_connected(CA_CONNECT_TIMEOUT).await.is_err() {
2224 debug!(
2225 pv = pv_owned,
2226 field = field_owned,
2227 "Extra-field channel did not connect within timeout (will keep retrying via subscribe)"
2228 );
2229 }
2230
2231 // Exponential backoff for misconfigured fields (e.g. operator
2232 // listed `.HIHI` on a PV that doesn't expose it). Without this
2233 // we'd retry every 5s forever, churning CA search packets and
2234 // file descriptors. The cap is 60s; one warn at the cap so
2235 // ops know to fix archive_fields.
2236 let mut backoff = CA_RETRY_DELAY;
2237 let max_backoff = Duration::from_secs(60);
2238 let mut warned_at_cap = false;
2239
2240 loop {
2241 // Cancel-aware subscribe attempt.
2242 tokio::select! {
2243 _ = parent_token.cancelled() => return,
2244 sub = channel.subscribe() => {
2245 let mut monitor = match sub {
2246 Ok(m) => m,
2247 Err(e) => {
2248 // Java parity (8fe73eb): bump the transient
2249 // counter so a misconfigured `.HIHI` field
2250 // shows up in the rate / drop reports.
2251 counters.transient_error_count.fetch_add(1, Ordering::Relaxed);
2252 debug!(
2253 pv = pv_owned,
2254 field = field_owned,
2255 ?backoff,
2256 "Extra-field subscribe failed: {e}; retrying"
2257 );
2258 if backoff >= max_backoff && !warned_at_cap {
2259 warn!(
2260 pv = pv_owned,
2261 field = field_owned,
2262 "Extra-field repeatedly fails to subscribe; \
2263 check archive_fields config (now retrying every 60s)"
2264 );
2265 warned_at_cap = true;
2266 }
2267 let sleep_for = backoff;
2268 backoff = (backoff * 2).min(max_backoff);
2269 tokio::select! {
2270 _ = parent_token.cancelled() => return,
2271 _ = tokio::time::sleep(sleep_for) => continue,
2272 }
2273 }
2274 };
2275 // Subscribe succeeded — reset backoff so the next
2276 // failure starts at the short delay again.
2277 backoff = CA_RETRY_DELAY;
2278 warned_at_cap = false;
2279 loop {
2280 tokio::select! {
2281 _ = parent_token.cancelled() => return,
2282 ev = monitor.recv() => match ev {
2283 Some(Ok(snapshot)) => {
2284 extras.insert(
2285 field_owned.clone(),
2286 epics_value_to_field_string(&snapshot.value),
2287 );
2288 }
2289 Some(Err(e)) => {
2290 counters.transient_error_count.fetch_add(1, Ordering::Relaxed);
2291 debug!(
2292 pv = pv_owned,
2293 field = field_owned,
2294 "Extra-field monitor error: {e}"
2295 );
2296 }
2297 None => break, // resubscribe
2298 }
2299 }
2300 }
2301 }
2302 }
2303 }
2304}
2305
2306/// Format a panic payload (Box<dyn Any>) as a printable string. Mirrors
2307/// what `std::panicking::default_hook` extracts for the message.
2308fn panic_payload_msg(payload: &Box<dyn std::any::Any + Send>) -> String {
2309 if let Some(s) = payload.downcast_ref::<&'static str>() {
2310 (*s).to_string()
2311 } else if let Some(s) = payload.downcast_ref::<String>() {
2312 s.clone()
2313 } else {
2314 "<non-string panic payload>".to_string()
2315 }
2316}
2317
2318/// Resolve the data-bearing sub-field of a PVA value. NTScalar /
2319/// NTScalarArray / NTEnum wrap the raw value in a `value` field; bare
2320/// scalars are passed through.
2321fn pv_value_field(field: &PvField) -> &PvField {
2322 match field {
2323 PvField::Structure(s) => s.get_field("value").unwrap_or(field),
2324 _ => field,
2325 }
2326}
2327
2328/// Map a PVA `ScalarValue` to an archiver `ArchiverValue`.
2329fn scalar_value_to_archiver(s: &ScalarValue) -> ArchiverValue {
2330 match s {
2331 ScalarValue::Boolean(v) => ArchiverValue::ScalarEnum(*v as i32),
2332 ScalarValue::Byte(v) => ArchiverValue::ScalarByte(vec![*v as u8]),
2333 ScalarValue::UByte(v) => ArchiverValue::ScalarByte(vec![*v]),
2334 ScalarValue::Short(v) => ArchiverValue::ScalarShort(*v as i32),
2335 ScalarValue::UShort(v) => ArchiverValue::ScalarShort(*v as i32),
2336 ScalarValue::Int(v) => ArchiverValue::ScalarInt(*v),
2337 ScalarValue::UInt(v) => ArchiverValue::ScalarInt(*v as i32),
2338 ScalarValue::Long(v) => ArchiverValue::ScalarInt(*v as i32),
2339 ScalarValue::ULong(v) => ArchiverValue::ScalarInt(*v as i32),
2340 ScalarValue::Float(v) => ArchiverValue::ScalarFloat(*v),
2341 ScalarValue::Double(v) => ArchiverValue::ScalarDouble(*v),
2342 ScalarValue::String(s) => ArchiverValue::ScalarString(s.clone()),
2343 }
2344}
2345
2346/// Extract a scalar `ArchiverValue` from a top-level PVA value (which
2347/// may be an NTScalar wrapper). Returns `None` for array/struct types
2348/// that don't reduce to a scalar.
2349fn pv_field_scalar_to_archiver(field: &PvField) -> Option<ArchiverValue> {
2350 match pv_value_field(field) {
2351 PvField::Scalar(s) => Some(scalar_value_to_archiver(s)),
2352 _ => None,
2353 }
2354}
2355
2356/// Pick the `(ArchDbType, element_count)` to record at archive_pv time
2357/// from an NTScalar / NTScalarArray's introspection-pvget result.
2358/// Returns `None` for unsupported (struct / union / variant) shapes.
2359fn pv_field_to_arch_db_type(field: &PvField) -> Option<(ArchDbType, i32)> {
2360 let value = pv_value_field(field);
2361 Some(match value {
2362 PvField::Scalar(s) => (
2363 match s {
2364 ScalarValue::Boolean(_) => ArchDbType::ScalarEnum,
2365 ScalarValue::Byte(_) | ScalarValue::UByte(_) => ArchDbType::ScalarByte,
2366 ScalarValue::Short(_) | ScalarValue::UShort(_) => ArchDbType::ScalarShort,
2367 ScalarValue::Int(_)
2368 | ScalarValue::UInt(_)
2369 | ScalarValue::Long(_)
2370 | ScalarValue::ULong(_) => ArchDbType::ScalarInt,
2371 ScalarValue::Float(_) => ArchDbType::ScalarFloat,
2372 ScalarValue::Double(_) => ArchDbType::ScalarDouble,
2373 ScalarValue::String(_) => ArchDbType::ScalarString,
2374 },
2375 1,
2376 ),
2377 // Array variants — keep the corresponding scalar element type but
2378 // bump element_count. write_loop / PB encoder doesn't currently
2379 // serialise PVA arrays, so callers should reject these for now.
2380 PvField::ScalarArray(arr) => {
2381 let elem = arr.first()?;
2382 let inner = PvField::Scalar(elem.clone());
2383 let (t, _) = pv_field_to_arch_db_type(&inner)?;
2384 (t, arr.len() as i32)
2385 }
2386 _ => return None,
2387 })
2388}
2389
2390/// Map a CA field name (`HIHI`, `LOLO`, `EGU`, …) to the dotted
2391/// pvAccess path under an NTScalar / NTScalarArray. Returns `None`
2392/// for fields that have no PVA equivalent — caller should drop them
2393/// silently rather than building an unsatisfiable pvRequest.
2394fn ca_archive_field_to_pva_path(field: &str) -> Option<&'static str> {
2395 Some(match field {
2396 // display sub-structure
2397 "EGU" => "display.units",
2398 "PREC" => "display.precision",
2399 "DESC" => "display.description",
2400 "HOPR" => "display.limitHigh",
2401 "LOPR" => "display.limitLow",
2402 // valueAlarm sub-structure (alarm thresholds)
2403 "HIHI" => "valueAlarm.highAlarmLimit",
2404 "LOLO" => "valueAlarm.lowAlarmLimit",
2405 "HIGH" => "valueAlarm.highWarningLimit",
2406 "LOW" => "valueAlarm.lowWarningLimit",
2407 "HHSV" => "valueAlarm.highAlarmSeverity",
2408 "LLSV" => "valueAlarm.lowAlarmSeverity",
2409 "HSV" => "valueAlarm.highWarningSeverity",
2410 "LSV" => "valueAlarm.lowWarningSeverity",
2411 "HYST" => "valueAlarm.hysteresis",
2412 // control sub-structure (drive limits)
2413 "DRVH" => "control.limitHigh",
2414 "DRVL" => "control.limitLow",
2415 _ => return None,
2416 })
2417}
2418
2419/// Walk a dotted PVA field path (`valueAlarm.highAlarmLimit`) from
2420/// the root [`PvField`]. Returns `None` if any segment is missing or
2421/// the path lands on a non-structure intermediate.
2422fn pv_field_walk_path<'a>(root: &'a PvField, path: &str) -> Option<&'a PvField> {
2423 let mut current = root;
2424 for segment in path.split('.') {
2425 let PvField::Structure(s) = current else {
2426 return None;
2427 };
2428 current = s.get_field(segment)?;
2429 }
2430 Some(current)
2431}
2432
2433/// Stringify a [`ScalarValue`] for storage in the per-sample extras
2434/// map. Java archiver's `archiveFields` blob is a string-string map,
2435/// so we mirror that wire shape.
2436fn scalar_value_to_string(s: &ScalarValue) -> String {
2437 match s {
2438 ScalarValue::Boolean(v) => v.to_string(),
2439 ScalarValue::Byte(v) => v.to_string(),
2440 ScalarValue::Short(v) => v.to_string(),
2441 ScalarValue::Int(v) => v.to_string(),
2442 ScalarValue::Long(v) => v.to_string(),
2443 ScalarValue::UByte(v) => v.to_string(),
2444 ScalarValue::UShort(v) => v.to_string(),
2445 ScalarValue::UInt(v) => v.to_string(),
2446 ScalarValue::ULong(v) => v.to_string(),
2447 ScalarValue::Float(v) => v.to_string(),
2448 ScalarValue::Double(v) => v.to_string(),
2449 ScalarValue::String(s) => s.clone(),
2450 }
2451}
2452
2453/// Element count for a PVA value: 1 for scalar, length for arrays,
2454/// 0 for unsupported shapes (treated as "unknown" by callers).
2455fn pv_field_element_count(field: &PvField) -> i32 {
2456 match pv_value_field(field) {
2457 PvField::Scalar(_) => 1,
2458 PvField::ScalarArray(arr) => arr.len() as i32,
2459 PvField::ScalarArrayTyped(t) => t.len() as i32,
2460 _ => 0,
2461 }
2462}
2463
2464/// Pull `(precision, units)` out of an NTScalar / NTScalarArray's
2465/// `display` sub-structure. Returns `(None, None)` for bare scalars
2466/// or structures missing `display` (some servers omit it).
2467/// Negative precision is the Java EAA "no precision info" sentinel —
2468/// treat it the same as missing so we don't surface "-1" to the UI.
2469fn pv_field_extract_display(field: &PvField) -> (Option<String>, Option<String>) {
2470 let PvField::Structure(s) = field else {
2471 return (None, None);
2472 };
2473 let Some(PvField::Structure(disp)) = s.get_field("display") else {
2474 return (None, None);
2475 };
2476 let prec = match disp.get_field("precision") {
2477 Some(PvField::Scalar(ScalarValue::Int(p))) if *p >= 0 => Some(p.to_string()),
2478 Some(PvField::Scalar(ScalarValue::Short(p))) if *p >= 0 => Some(p.to_string()),
2479 Some(PvField::Scalar(ScalarValue::Long(p))) if *p >= 0 => Some(p.to_string()),
2480 _ => None,
2481 };
2482 let egu = match disp.get_field("units") {
2483 Some(PvField::Scalar(ScalarValue::String(u))) => {
2484 let t = u.trim();
2485 if t.is_empty() {
2486 None
2487 } else {
2488 Some(t.to_string())
2489 }
2490 }
2491 _ => None,
2492 };
2493 (prec, egu)
2494}
2495
2496/// Pull the IOC-reported timestamp out of an NTScalar / NTScalarArray.
2497/// Falls back to `SystemTime::now()` when the structure lacks a usable
2498/// `timeStamp` sub-field, so a malformed server doesn't drop samples.
2499fn pv_field_extract_timestamp(field: &PvField) -> SystemTime {
2500 let PvField::Structure(s) = field else {
2501 return SystemTime::now();
2502 };
2503 let Some(PvField::Structure(ts)) = s.get_field("timeStamp") else {
2504 return SystemTime::now();
2505 };
2506 let secs = match ts.get_field("secondsPastEpoch") {
2507 Some(PvField::Scalar(ScalarValue::Long(v))) => *v as u64,
2508 Some(PvField::Scalar(ScalarValue::ULong(v))) => *v,
2509 Some(PvField::Scalar(ScalarValue::Int(v))) => *v as u64,
2510 Some(PvField::Scalar(ScalarValue::UInt(v))) => *v as u64,
2511 _ => return SystemTime::now(),
2512 };
2513 let nanos = match ts.get_field("nanoseconds") {
2514 Some(PvField::Scalar(ScalarValue::Int(v))) => *v as u32,
2515 Some(PvField::Scalar(ScalarValue::UInt(v))) => *v,
2516 _ => 0,
2517 };
2518 SystemTime::UNIX_EPOCH + Duration::new(secs, nanos)
2519}
2520
2521/// Convert epics-base-rs DbFieldType to archiver ArchDbType.
2522fn dbr_field_to_arch_type(field_type: DbFieldType) -> ArchDbType {
2523 match field_type {
2524 DbFieldType::String => ArchDbType::ScalarString,
2525 DbFieldType::Short => ArchDbType::ScalarShort,
2526 DbFieldType::Float => ArchDbType::ScalarFloat,
2527 DbFieldType::Enum => ArchDbType::ScalarEnum,
2528 DbFieldType::Char => ArchDbType::ScalarByte,
2529 DbFieldType::Long => ArchDbType::ScalarInt,
2530 // PB PayloadType has no SCALAR_LONG (i64) — values outside i32 range
2531 // are truncated by the i32 cast in epics_value_to_archiver.
2532 DbFieldType::Int64 => ArchDbType::ScalarInt,
2533 DbFieldType::Double => ArchDbType::ScalarDouble,
2534 }
2535}
2536
2537/// Convert epics-base-rs EpicsValue to archiver ArchiverValue.
2538fn epics_value_to_archiver(val: &EpicsValue) -> ArchiverValue {
2539 match val {
2540 EpicsValue::String(s) => ArchiverValue::ScalarString(s.clone()),
2541 EpicsValue::Short(v) => ArchiverValue::ScalarShort(*v as i32),
2542 EpicsValue::Float(v) => ArchiverValue::ScalarFloat(*v),
2543 EpicsValue::Enum(v) => ArchiverValue::ScalarEnum(*v as i32),
2544 EpicsValue::Char(v) => ArchiverValue::ScalarByte(vec![*v]),
2545 EpicsValue::Long(v) => ArchiverValue::ScalarInt(*v),
2546 EpicsValue::Int64(v) => ArchiverValue::ScalarInt(*v as i32),
2547 EpicsValue::Double(v) => ArchiverValue::ScalarDouble(*v),
2548 EpicsValue::ShortArray(v) => {
2549 ArchiverValue::VectorShort(v.iter().map(|x| *x as i32).collect())
2550 }
2551 EpicsValue::FloatArray(v) => ArchiverValue::VectorFloat(v.clone()),
2552 EpicsValue::EnumArray(v) => {
2553 ArchiverValue::VectorEnum(v.iter().map(|x| *x as i32).collect())
2554 }
2555 EpicsValue::DoubleArray(v) => ArchiverValue::VectorDouble(v.clone()),
2556 EpicsValue::LongArray(v) => ArchiverValue::VectorInt(v.clone()),
2557 EpicsValue::Int64Array(v) => {
2558 ArchiverValue::VectorInt(v.iter().map(|x| *x as i32).collect())
2559 }
2560 EpicsValue::CharArray(v) => ArchiverValue::VectorChar(v.clone()),
2561 EpicsValue::StringArray(v) => ArchiverValue::VectorString(v.clone()),
2562 }
2563}
2564
2565/// Tunables for [`write_loop_with_config`]. Production uses the
2566/// defaults via [`write_loop`]; tests dial the timeouts down to
2567/// sub-second values so failure-injection cases finish in a few
2568/// hundred milliseconds instead of minutes.
2569#[derive(Debug, Clone)]
2570pub struct WriteLoopConfig {
2571 /// How often to call `flush_ingest_writes` and commit the
2572 /// pending `last_event` timestamps to the registry.
2573 pub flush_period: Duration,
2574 /// Bound on how long write_loop waits for a single
2575 /// `append_event_with_meta` JoinHandle. On timeout the sample
2576 /// is logged as abandoned-but-may-succeed-late and the loop
2577 /// moves on (the spawn_blocking task remains parked on the
2578 /// blocking pool — per-PV serialization in PlainPB keeps any
2579 /// late completion ordered behind subsequent same-PV samples).
2580 pub append_timeout: Duration,
2581 /// Bound on how long write_loop waits for one
2582 /// `flush_ingest_writes` JoinHandle during the periodic flush.
2583 /// On timeout, every pending `ts_updates` entry is deferred to
2584 /// the next cycle (we don't know which PVs reached disk).
2585 pub flush_timeout: Duration,
2586 /// Bound on each individual append issued during the shutdown
2587 /// drain.
2588 pub drain_per_sample_timeout: Duration,
2589 /// Total wall-clock budget for the shutdown drain. Once
2590 /// exceeded, remaining buffered samples are abandoned without
2591 /// even attempting an append, so a wedged STS can't stretch an
2592 /// orderly stop into "kill -9".
2593 pub drain_total_budget: Duration,
2594 /// Bound on the final `flush_writes` issued at shutdown.
2595 pub shutdown_flush_timeout: Duration,
2596}
2597
2598impl Default for WriteLoopConfig {
2599 fn default() -> Self {
2600 Self {
2601 flush_period: Duration::from_secs(10),
2602 append_timeout: Duration::from_secs(30),
2603 flush_timeout: Duration::from_secs(30),
2604 drain_per_sample_timeout: Duration::from_secs(5),
2605 drain_total_budget: Duration::from_secs(30),
2606 shutdown_flush_timeout: Duration::from_secs(15),
2607 }
2608 }
2609}
2610
2611/// Tunables for [`run_sharded_write_pool`].
2612///
2613/// `shards = 1` is the legacy single-worker layout and incurs no
2614/// dispatcher overhead. `shards > 1` spawns a dispatcher that hashes
2615/// each sample's `pv_name` to a fixed shard (so any one PV's samples
2616/// always land in the same per-shard write_loop, preserving
2617/// timestamp ordering) and N parallel shard workers — different PVs
2618/// can append concurrently instead of queueing behind a single task.
2619/// Sites with many active PVs and a fast STS should set this to
2620/// `min(num_cpus, expected concurrent slow appends)`.
2621#[derive(Debug, Clone)]
2622pub struct ShardedWritePoolConfig {
2623 /// Number of parallel shard workers. Must be ≥ 1; `1` is the
2624 /// degenerate case (no dispatcher, behaves identically to a
2625 /// bare `write_loop_with_config`).
2626 pub shards: usize,
2627 /// mpsc capacity of each shard's input channel. The dispatcher
2628 /// `try_send`s into this; when full, the OFFENDING SHARD's
2629 /// drop is recorded under
2630 /// `archiver_dispatcher_shard_overflow_drops_total{shard=N}`
2631 /// and the sample's per-PV `buffer_overflow_drops` counter is
2632 /// bumped. Other shards keep flowing — that's the per-shard
2633 /// isolation guarantee.
2634 pub per_shard_buffer: usize,
2635 /// Per-worker config (timeouts, flush period). Cloned into
2636 /// each shard.
2637 pub write_loop: WriteLoopConfig,
2638}
2639
2640impl Default for ShardedWritePoolConfig {
2641 fn default() -> Self {
2642 Self {
2643 shards: 1,
2644 per_shard_buffer: 4096,
2645 write_loop: WriteLoopConfig::default(),
2646 }
2647 }
2648}
2649
2650// PV-to-shard hash lives in `archiver_core::storage::plainpb` so
2651// the engine's dispatcher and PlainPB's
2652// `flush_ingest_writes_for_shard` agree on which shard owns which
2653// PV. Two definitions would mean shard 0's flush could iterate
2654// PVs the dispatcher routed to shard 1, re-introducing the
2655// misattribution bug.
2656use archiver_core::storage::plainpb::shard_for_pv;
2657
2658/// Run an N-shard write pool: 1 dispatcher (hashes pv_name → shard)
2659/// plus N parallel `write_loop_with_config` workers. Each shard has
2660/// independent `ts_updates`, an independent flush ticker, and its
2661/// own per-PV writer slots inside the shared storage plugin (so
2662/// per-PV ordering is naturally preserved by the consistent-hash
2663/// routing — samples for one PV always go to the same shard).
2664///
2665/// `shards == 1` short-circuits the dispatcher and runs a single
2666/// `write_loop_with_config` directly so single-worker deployments
2667/// pay zero overhead.
2668pub async fn run_sharded_write_pool(
2669 storage: Arc<dyn StoragePlugin>,
2670 registry: Arc<PvRegistry>,
2671 rx: mpsc::Receiver<PvSample>,
2672 shutdown: tokio::sync::watch::Receiver<bool>,
2673 cfg: ShardedWritePoolConfig,
2674) {
2675 let n = cfg.shards.max(1);
2676
2677 // Coalescing per-PV pending-timestamp map shared between the
2678 // flush owner, every shard worker, and every shard worker's
2679 // spawn_blocking late-success closure. Replaces the previous
2680 // mpsc report channel — see [`PendingReports`] for why.
2681 let pending = Arc::new(PendingReports::new());
2682
2683 // Spawn the global flush owner first so it's ready to flush
2684 // as soon as shards start appending.
2685 let flush_owner_handle = tokio::spawn(flush_owner_loop(
2686 storage.clone(),
2687 registry.clone(),
2688 pending.clone(),
2689 shutdown.clone(),
2690 cfg.write_loop.clone(),
2691 ));
2692
2693 if n == 1 {
2694 // Fast path: single shard, no dispatcher hop. We become
2695 // the shard worker ourselves and read from `rx` directly.
2696 shard_append_loop(
2697 0,
2698 storage.clone(),
2699 rx,
2700 pending.clone(),
2701 shutdown.clone(),
2702 cfg.write_loop.clone(),
2703 )
2704 .await;
2705 } else {
2706 // Multi-shard path: dispatcher + N shard workers.
2707 let mut shard_txs = Vec::with_capacity(n);
2708 let mut shard_handles = Vec::with_capacity(n);
2709 for shard_idx in 0..n {
2710 let (s_tx, s_rx) = mpsc::channel::<PvSample>(cfg.per_shard_buffer);
2711 shard_txs.push(s_tx);
2712 let storage = storage.clone();
2713 let pending = pending.clone();
2714 let shard_shutdown = shutdown.clone();
2715 let shard_cfg = cfg.write_loop.clone();
2716 shard_handles.push(tokio::spawn(shard_append_loop(
2717 shard_idx,
2718 storage,
2719 s_rx,
2720 pending,
2721 shard_shutdown,
2722 shard_cfg,
2723 )));
2724 }
2725
2726 dispatch_loop(rx, shard_txs, shutdown.clone()).await;
2727
2728 // Dispatcher exited → shard sample receivers see EOS or
2729 // the shutdown signal directly via their watch::Receiver.
2730 // Wait for shards to finish their drain branches.
2731 for h in shard_handles {
2732 let _ = h.await;
2733 }
2734 }
2735
2736 // Flush owner uses its own shutdown-watch + drain_total_budget
2737 // grace timer, not an EOS signal — see flush_owner_loop. Wait
2738 // for it to finish.
2739 let _ = flush_owner_handle.await;
2740}
2741
2742/// Reads the upstream main mpsc, hashes each sample's `pv_name`,
2743/// and forwards to the appropriate shard channel via
2744/// `Sender::try_send` for **per-shard isolation**.
2745///
2746/// `try_send` (not `send().await`) is the load-balancing
2747/// invariant: if one slow shard's channel is full, the dispatcher
2748/// drops THAT shard's overflow into `buffer_overflow_drops` and
2749/// keeps routing for the other shards. Using `send().await` would
2750/// block the dispatcher on the slow shard, back-pressuring the
2751/// upstream main channel and starving every other shard's PVs —
2752/// the exact failure mode the sharded layout is meant to prevent.
2753async fn dispatch_loop(
2754 mut rx: mpsc::Receiver<PvSample>,
2755 shard_txs: Vec<mpsc::Sender<PvSample>>,
2756 mut shutdown: tokio::sync::watch::Receiver<bool>,
2757) {
2758 let n = shard_txs.len();
2759 info!(shards = n, "Sharded write dispatcher started");
2760 loop {
2761 tokio::select! {
2762 biased;
2763 // `biased` so we always check shutdown before draining
2764 // another sample — keeps the shutdown latency bounded
2765 // even under a sustained sample storm.
2766 _ = shutdown.changed() => {
2767 if *shutdown.borrow() {
2768 // Drain remaining samples (try_send for the
2769 // same reason as the steady-state branch).
2770 // Mirror the steady-state overflow accounting
2771 // so a saturated shard's drain-time drops are
2772 // visible to operators — silent loss here
2773 // would shadow real samples lost during
2774 // shutdown.
2775 while let Ok(sample) = rx.try_recv() {
2776 let idx = shard_for_pv(&sample.pv_name, n);
2777 match shard_txs[idx].try_send(sample) {
2778 Ok(()) => {}
2779 Err(mpsc::error::TrySendError::Full(s)) => {
2780 if let Some(c) = s.counters.as_ref() {
2781 c.buffer_overflow_drops
2782 .fetch_add(1, Ordering::Relaxed);
2783 }
2784 metrics::counter!(
2785 "archiver_dispatcher_shard_overflow_drops_total",
2786 "shard" => idx.to_string(),
2787 "phase" => "shutdown_drain",
2788 )
2789 .increment(1);
2790 debug!(
2791 shard = idx,
2792 pv = s.pv_name,
2793 "Shutdown-drain shard channel full; sample dropped"
2794 );
2795 }
2796 Err(mpsc::error::TrySendError::Closed(s)) => {
2797 warn!(
2798 shard = idx,
2799 pv = s.pv_name,
2800 "Shutdown-drain shard channel closed; sample dropped"
2801 );
2802 }
2803 }
2804 }
2805 break;
2806 }
2807 }
2808 maybe = rx.recv() => {
2809 match maybe {
2810 Some(sample) => {
2811 let idx = shard_for_pv(&sample.pv_name, n);
2812 match shard_txs[idx].try_send(sample) {
2813 Ok(()) => {}
2814 Err(mpsc::error::TrySendError::Full(s)) => {
2815 // Shard saturated. Surface the
2816 // drop on the SAMPLE's per-PV
2817 // counter so operators can pin
2818 // the loss to a specific PV+shard.
2819 if let Some(c) = s.counters.as_ref() {
2820 c.buffer_overflow_drops
2821 .fetch_add(1, Ordering::Relaxed);
2822 }
2823 metrics::counter!(
2824 "archiver_dispatcher_shard_overflow_drops_total",
2825 "shard" => idx.to_string(),
2826 )
2827 .increment(1);
2828 debug!(
2829 shard = idx,
2830 pv = s.pv_name,
2831 "Shard channel full; sample dropped \
2832 (per-shard isolation)"
2833 );
2834 }
2835 Err(mpsc::error::TrySendError::Closed(s)) => {
2836 // Shard worker died (panic or
2837 // shutdown race). We'll lose this
2838 // sample but keep the dispatcher
2839 // alive for OTHER shards.
2840 warn!(
2841 shard = idx,
2842 pv = s.pv_name,
2843 "Shard channel closed; sample dropped"
2844 );
2845 }
2846 }
2847 }
2848 None => break, // Upstream closed.
2849 }
2850 }
2851 }
2852 }
2853 info!("Sharded write dispatcher exiting");
2854}
2855
2856/// Background writer task — drains samples and writes to storage.
2857///
2858/// Thin wrapper that fills [`WriteLoopConfig`] from [`Default`] and
2859/// the caller-supplied `flush_period`. Production callers use this
2860/// form; the test suite uses [`write_loop_with_config`] directly to
2861/// dial timeouts down.
2862pub async fn write_loop(
2863 storage: Arc<dyn StoragePlugin>,
2864 registry: Arc<PvRegistry>,
2865 rx: mpsc::Receiver<PvSample>,
2866 shutdown: tokio::sync::watch::Receiver<bool>,
2867 flush_period: Duration,
2868) {
2869 let cfg = WriteLoopConfig {
2870 flush_period,
2871 ..Default::default()
2872 };
2873 write_loop_with_config(storage, registry, rx, shutdown, cfg).await
2874}
2875
2876/// RAII guard that clears the `flush_in_flight` flag when dropped,
2877/// even if the surrounding spawn_blocking closure panics. Without
2878/// this, an unwind inside `block_on(flush_ingest_writes)` would
2879/// skip the manual `store(false)` and leave the flag stuck `true`
2880/// forever — every subsequent ticker would see "still in flight"
2881/// and silently skip flushing, freezing the registry's `last_event`.
2882struct FlushInFlightGuard {
2883 flag: Arc<std::sync::atomic::AtomicBool>,
2884}
2885
2886impl Drop for FlushInFlightGuard {
2887 fn drop(&mut self) {
2888 self.flag.store(false, Ordering::Release);
2889 }
2890}
2891
2892/// Run one flush + commit cycle.
2893///
2894/// Spawns `flush_ingest_writes` on the blocking pool with a bounded
2895/// timeout, then commits the pending registry timestamps for every
2896/// PV the flush returned cleanly. Maintains an `in_flight` flag set
2897/// by the spawn_blocking task so a caller (the ticker branch) can
2898/// avoid stacking concurrent flushes when the previous one is still
2899/// running on a wedged FS.
2900///
2901/// Returns `true` if a flush was actually launched, `false` if the
2902/// helper was a no-op (in_flight already set, ts_updates empty).
2903///
2904/// Mutates `ts_updates`:
2905/// * Clean flush: drops `failed` PVs (bytes lost), commits all
2906/// other entries, drops committed entries on success.
2907/// * Deferred PVs: STAY in `ts_updates` for the next cycle.
2908/// Their bytes are still buffered, not lost.
2909/// * Flush error / panic: leaves every entry intact for retry.
2910/// * Flush timeout: CLEARS every entry. We don't know which PVs
2911/// reached disk; the in-flight task may still be running and
2912/// may even fail late, removing a writer from the cache. The
2913/// next flush would then see a "clean" state for that PV and
2914/// wrongly commit. Conservative drop is the only safe move
2915/// (Java archiver has the same trade-off — under-commit on
2916/// timeout, never over-commit). Future samples will repopulate
2917/// `ts_updates` and the next clean flush catches the registry up.
2918async fn run_flush_and_commit(
2919 storage: &Arc<dyn StoragePlugin>,
2920 registry: &Arc<PvRegistry>,
2921 pending: &Arc<PendingReports>,
2922 flush_timeout: Duration,
2923 in_flight: &Arc<std::sync::atomic::AtomicBool>,
2924) -> bool {
2925 if in_flight.load(Ordering::Acquire) {
2926 // Previous flush hasn't finished. Don't queue another —
2927 // we'd just stack spawn_blocking tasks on the wedged FS
2928 // and saturate the blocking pool. Wait for the existing
2929 // one to drain.
2930 return false;
2931 }
2932 // Snapshot the current pending map BEFORE setting in_flight
2933 // so concurrent shards reporting after this point survive
2934 // into the next cycle (their entries will be in
2935 // `pending` but not in `snapshot`, so we won't remove them
2936 // when we clean up). The snapshot is the authoritative view
2937 // of "what we're trying to commit this cycle".
2938 let snapshot = pending.snapshot();
2939 if snapshot.is_empty() {
2940 return false;
2941 }
2942 in_flight.store(true, Ordering::Release);
2943
2944 let storage_for_flush = storage.clone();
2945 let in_flight_for_task = in_flight.clone();
2946 let flush_join = tokio::task::spawn_blocking(move || {
2947 // RAII guard: clears `in_flight` on every exit path
2948 // (return, panic, future-cancellation). The guard's
2949 // existence is the contract — manually storing false
2950 // BEFORE returning would skip cleanup on a panic inside
2951 // block_on / inside flush_ingest_writes, leaving the flag
2952 // stuck `true` and silently freezing all future flushes.
2953 let _guard = FlushInFlightGuard {
2954 flag: in_flight_for_task,
2955 };
2956 let rt = tokio::runtime::Handle::current();
2957 rt.block_on(storage_for_flush.flush_ingest_writes())
2958 });
2959 match tokio::time::timeout(flush_timeout, flush_join).await {
2960 Ok(Ok(Ok(IngestFlushResult { failed, deferred }))) => {
2961 // Failed PVs: bytes lost. Drop every failed PV's
2962 // pending entry UNCONDITIONALLY — regardless of
2963 // whether the snapshot's value still matches.
2964 //
2965 // The previous "remove only if matches snapshot"
2966 // policy assumed a concurrent shard's post-snapshot
2967 // report meant "newer bytes ARE on disk". That
2968 // assumption fails for the eviction/loss-queue path:
2969 // the loss queue records only PV names, so a loss
2970 // event recorded between snapshot and flush-return
2971 // has no way to communicate which writer generation
2972 // (or timestamp horizon) it applies to. A
2973 // post-snapshot pending entry could refer to bytes
2974 // that ARE on disk (fresh writer after eviction) OR
2975 // bytes that were also lost — we can't tell.
2976 //
2977 // Conservative drop is the safe choice: never
2978 // commit a `last_event` for a PV the storage just
2979 // told us had lost bytes, even at the cost of an
2980 // occasional under-commit that the next sample
2981 // resolves.
2982 if !failed.is_empty() {
2983 pending.remove_failed(&failed);
2984 error!(
2985 "STS flush dropped {} PV(s) from timestamp commit \
2986 (bytes never reached disk): {:?}",
2987 failed.len(),
2988 failed
2989 );
2990 }
2991 // Deferred PVs: writer slot busy, bytes still buffered.
2992 // We do NOT remove them from `pending` (next cycle
2993 // will catch them) and we EXCLUDE them from this
2994 // cycle's commit batch.
2995 if !deferred.is_empty() {
2996 debug!(
2997 "STS flush deferred {} PV(s); keeping in pending \
2998 for next cycle: {:?}",
2999 deferred.len(),
3000 deferred
3001 );
3002 }
3003 // Build commit batch from the snapshot, excluding
3004 // failed (bytes lost) and deferred (bytes not on disk
3005 // yet).
3006 let failed_set: std::collections::HashSet<&str> =
3007 failed.iter().map(|s| s.as_str()).collect();
3008 let deferred_set: std::collections::HashSet<&str> =
3009 deferred.iter().map(|s| s.as_str()).collect();
3010 let to_commit: Vec<(&str, SystemTime)> = snapshot
3011 .iter()
3012 .filter(|(pv, _)| {
3013 !failed_set.contains(pv.as_str()) && !deferred_set.contains(pv.as_str())
3014 })
3015 .map(|(pv, ts)| (pv.as_str(), *ts))
3016 .collect();
3017 if to_commit.is_empty() {
3018 return true;
3019 }
3020 match registry.batch_update_timestamps(&to_commit) {
3021 Ok(()) => {
3022 // Remove committed entries from the shared
3023 // map — but only if their current value still
3024 // matches the snapshot's. This preserves
3025 // concurrent updates that arrived between
3026 // snapshot and remove.
3027 let committed_map: std::collections::HashMap<String, SystemTime> = to_commit
3028 .iter()
3029 .map(|(pv, ts)| ((*pv).to_string(), *ts))
3030 .collect();
3031 pending.remove_committed(&committed_map);
3032 }
3033 Err(e) => {
3034 // Don't remove — retry on next cycle.
3035 // Otherwise a transient SQLite error orphans
3036 // timestamps the disk has but the registry
3037 // doesn't know about, with no retry path.
3038 error!(
3039 "Registry timestamp commit failed; \
3040 keeping {} pending for retry: {e}",
3041 snapshot.len()
3042 );
3043 }
3044 }
3045 }
3046 Ok(Ok(Err(e))) => {
3047 error!(
3048 "STS ingest flush errored; deferring all {} \
3049 pending timestamp commits: {e}",
3050 snapshot.len()
3051 );
3052 }
3053 Ok(Err(join_err)) => {
3054 error!(
3055 "STS ingest flush task panicked; deferring all {} \
3056 pending timestamp commits: {join_err}",
3057 snapshot.len()
3058 );
3059 }
3060 Err(_) => {
3061 // Flush wedged. Conservative drop: the spawn_blocking
3062 // task may still be running and may even fail late
3063 // (which would remove a PV's writer from the cache,
3064 // making the NEXT flush return "clean" and tricking
3065 // the owner into committing a stale value for a PV
3066 // whose old bytes are gone). Drop the snapshot's
3067 // entries from `pending` (only if unchanged), so the
3068 // owner doesn't trust them; future samples rebuild
3069 // pending and the next clean flush catches the
3070 // registry up.
3071 //
3072 // The `in_flight` flag stays TRUE until the
3073 // spawn_blocking task naturally finishes — this
3074 // throttles us from queueing yet another flush onto
3075 // the wedged FS until it clears.
3076 metrics::counter!("archiver_storage_flush_timeouts_total").increment(1);
3077 let dropped = snapshot.len();
3078 pending.remove_committed(&snapshot);
3079 error!(
3080 "STS ingest flush timed out after {flush_timeout:?}; \
3081 conservatively dropped {dropped} pending timestamp \
3082 commit(s) (task remains on blocking pool; \
3083 timestamps will be rebuilt from subsequent samples)"
3084 );
3085 }
3086 }
3087 true
3088}
3089
3090/// Coalescing per-PV pending-timestamp map shared between every
3091/// shard worker (incl. their spawn_blocking late-success closures)
3092/// and the single global flush owner.
3093///
3094/// **Why a shared map instead of an mpsc channel?** With a channel,
3095/// a slow flush owner causes the buffer to fill and `try_send`
3096/// drops happen — for a PV that goes silent right after a dropped
3097/// report, the registry's `last_event` is permanently
3098/// under-committed. With a coalescing map keyed by PV, every
3099/// shard call is an `entry().and_modify().or_insert()` that
3100/// always succeeds; concurrent updates from different shards
3101/// never lose data because the map is bounded by **PV count**
3102/// (typical: thousands), not by sample rate (typical: 100k/s).
3103///
3104/// **Coalescing semantics.** A successful append for `(pv, ts)`
3105/// upserts the entry to `max(current, ts)`. A stale late-success
3106/// report (e.g. from a spawn_blocking task that finished after
3107/// shard already wrote a newer sample) does NOT clobber a newer
3108/// committed value.
3109#[derive(Default)]
3110pub struct PendingReports {
3111 inner: dashmap::DashMap<String, SystemTime>,
3112}
3113
3114impl PendingReports {
3115 pub fn new() -> Self {
3116 Self::default()
3117 }
3118
3119 /// Coalescing report. If `pv` already has a newer timestamp,
3120 /// no-op. Always succeeds — this is the channel-replacement
3121 /// invariant that makes silent-PV under-commit impossible.
3122 pub fn report(&self, pv: &str, ts: SystemTime) {
3123 // DashMap's entry() locks one shard internally. Fast-path
3124 // updates use `and_modify`; brand-new PVs go through
3125 // `or_insert`. No external lock contention across shards.
3126 self.inner
3127 .entry(pv.to_string())
3128 .and_modify(|cur| {
3129 if *cur < ts {
3130 *cur = ts;
3131 }
3132 })
3133 .or_insert(ts);
3134 }
3135
3136 /// Snapshot the entire map for a flush cycle. Returns a
3137 /// owned HashMap; the shared map stays intact so concurrent
3138 /// shards can keep reporting during the flush.
3139 pub fn snapshot(&self) -> std::collections::HashMap<String, SystemTime> {
3140 self.inner
3141 .iter()
3142 .map(|kv| (kv.key().clone(), *kv.value()))
3143 .collect()
3144 }
3145
3146 /// Remove entries whose CURRENT value still equals what we
3147 /// committed. If a concurrent shard advanced an entry to a
3148 /// newer ts after the snapshot, leave it alone — the next
3149 /// flush will pick it up.
3150 pub fn remove_committed(&self, committed: &std::collections::HashMap<String, SystemTime>) {
3151 for (pv, &committed_ts) in committed {
3152 // remove_if applies the predicate atomically inside
3153 // the DashMap shard lock.
3154 let _ = self.inner.remove_if(pv, |_, v| *v == committed_ts);
3155 }
3156 }
3157
3158 /// Unconditionally remove every failed PV's entry from the
3159 /// pending map, regardless of its current timestamp.
3160 ///
3161 /// **Why unconditional.** The storage's loss queue carries
3162 /// only PV names — it cannot tell us whether a concurrent
3163 /// shard's post-snapshot report was for bytes that ARE on
3164 /// disk (e.g., a fresh writer opened after eviction) versus
3165 /// bytes that are also gone. The matching `remove_committed`
3166 /// path can leave a post-snapshot entry behind, and the next
3167 /// clean flush would commit it — turning a "PV's bytes were
3168 /// lost" report into a `last_event` lie.
3169 ///
3170 /// The trade-off: a brand-new sample whose bytes truly DID
3171 /// reach disk after the loss event gets dropped from this
3172 /// commit cycle. Future samples re-populate `pending` and the
3173 /// registry catches up. Under-commit is the safe direction;
3174 /// over-commit is never acceptable.
3175 pub fn remove_failed(&self, failed: &[String]) {
3176 for pv in failed {
3177 let _ = self.inner.remove(pv);
3178 }
3179 }
3180
3181 pub fn is_empty(&self) -> bool {
3182 self.inner.is_empty()
3183 }
3184
3185 pub fn len(&self) -> usize {
3186 self.inner.len()
3187 }
3188}
3189
3190/// Same as [`write_loop`] but accepts the full [`WriteLoopConfig`].
3191///
3192/// Thin wrapper that runs a 1-shard sharded pool — kept on the
3193/// public surface for tests and any direct callers. The actual
3194/// work happens in [`run_sharded_write_pool`].
3195pub async fn write_loop_with_config(
3196 storage: Arc<dyn StoragePlugin>,
3197 registry: Arc<PvRegistry>,
3198 rx: mpsc::Receiver<PvSample>,
3199 shutdown: tokio::sync::watch::Receiver<bool>,
3200 cfg: WriteLoopConfig,
3201) {
3202 let pool_cfg = ShardedWritePoolConfig {
3203 shards: 1,
3204 // unused on the 1-shard fast path (rx is forwarded directly)
3205 per_shard_buffer: 4096,
3206 write_loop: cfg,
3207 };
3208 run_sharded_write_pool(storage, registry, rx, shutdown, pool_cfg).await;
3209}
3210
3211/// Per-shard append worker. Drains samples from `sample_rx`,
3212/// drops out-of-order or type-changed samples, then runs each
3213/// `append_event_with_meta` on the blocking pool with a bounded
3214/// timeout. On every success — including the late-success path
3215/// where write_loop's outer timeout already abandoned the
3216/// JoinHandle — coalesces `(pv, ts)` into the shared
3217/// [`PendingReports`] map so the global flush owner sees it on
3218/// the next flush cycle.
3219///
3220/// The shard worker holds NO ts_updates / flush state of its own.
3221/// It tracks `last_ts` and `last_dbr_type` only for the
3222/// per-PV ordering / type-change drop checks; those are local
3223/// to the shard because the dispatcher's consistent hash pins each
3224/// PV to one shard.
3225async fn shard_append_loop(
3226 shard_idx: usize,
3227 storage: Arc<dyn StoragePlugin>,
3228 mut sample_rx: mpsc::Receiver<PvSample>,
3229 pending: Arc<PendingReports>,
3230 mut shutdown: tokio::sync::watch::Receiver<bool>,
3231 cfg: WriteLoopConfig,
3232) {
3233 let append_timeout = cfg.append_timeout;
3234 info!(shard = shard_idx, "Shard append loop started");
3235 // Per-PV state for the in-shard sanity drops. The dispatcher's
3236 // consistent hash keeps each PV in one shard, so these maps
3237 // never see cross-shard PVs.
3238 let mut last_ts: std::collections::HashMap<String, SystemTime> =
3239 std::collections::HashMap::new();
3240 let mut last_dbr_type: std::collections::HashMap<String, ArchDbType> =
3241 std::collections::HashMap::new();
3242
3243 loop {
3244 tokio::select! {
3245 Some(pv_sample) = sample_rx.recv() => {
3246 shard_handle_sample(
3247 shard_idx,
3248 &storage,
3249 &pending,
3250 pv_sample,
3251 &mut last_ts,
3252 &mut last_dbr_type,
3253 append_timeout,
3254 )
3255 .await;
3256 }
3257 _ = shutdown.changed() => {
3258 shard_drain_on_shutdown(
3259 shard_idx,
3260 &storage,
3261 &pending,
3262 &mut sample_rx,
3263 &mut last_ts,
3264 &mut last_dbr_type,
3265 &cfg,
3266 )
3267 .await;
3268 break;
3269 }
3270 }
3271 }
3272 info!(shard = shard_idx, "Shard append loop exited");
3273}
3274
3275/// Process one sample on the shard's hot path. Extracted so the
3276/// steady-state and shutdown-drain branches can share the same
3277/// "drop checks → spawn_blocking append → report on success"
3278/// recipe.
3279async fn shard_handle_sample(
3280 shard_idx: usize,
3281 storage: &Arc<dyn StoragePlugin>,
3282 pending: &Arc<PendingReports>,
3283 pv_sample: PvSample,
3284 last_ts: &mut std::collections::HashMap<String, SystemTime>,
3285 last_dbr_type: &mut std::collections::HashMap<String, ArchDbType>,
3286 append_timeout: Duration,
3287) {
3288 let ts = pv_sample.sample.timestamp;
3289
3290 // Out-of-order timestamp drop. Storage requires monotonic
3291 // appends per PV; an older timestamp would produce a corrupt
3292 // partition. Tracked via shard-local `last_ts` so the check
3293 // survives flush cycles (the legacy ts_updates-based check
3294 // only caught within-cycle reorderings).
3295 if let Some(prev_ts) = last_ts.get(&pv_sample.pv_name)
3296 && ts < *prev_ts
3297 {
3298 if let Some(ref c) = pv_sample.counters {
3299 c.timestamp_drops.fetch_add(1, Ordering::Relaxed);
3300 }
3301 debug!(
3302 shard = shard_idx,
3303 pv = pv_sample.pv_name,
3304 ?ts,
3305 ?prev_ts,
3306 "Dropping out-of-order sample"
3307 );
3308 return;
3309 }
3310
3311 // Type-change drop. The first sample defines the PV's wire
3312 // type; later samples with a different DBR get dropped
3313 // (operator must changeTypeForPV first).
3314 let prev_type = last_dbr_type.insert(pv_sample.pv_name.clone(), pv_sample.dbr_type);
3315 if let Some(prev) = prev_type
3316 && prev != pv_sample.dbr_type
3317 {
3318 if let Some(ref c) = pv_sample.counters {
3319 c.type_change_drops.fetch_add(1, Ordering::Relaxed);
3320 // Java parity (9f2234f): record the latest observed
3321 // DBR so the dropped-events report can show what the
3322 // IOC is now sending vs the archived type.
3323 c.latest_observed_dbr
3324 .store(pv_sample.dbr_type as i32, Ordering::Relaxed);
3325 }
3326 debug!(
3327 shard = shard_idx,
3328 pv = pv_sample.pv_name,
3329 ?prev,
3330 new = ?pv_sample.dbr_type,
3331 "Dropping type-changed sample"
3332 );
3333 // Restore prev_type so a single mismatched sample doesn't
3334 // permanently flip our recorded type.
3335 last_dbr_type.insert(pv_sample.pv_name.clone(), prev);
3336 return;
3337 }
3338
3339 let meta = AppendMeta {
3340 element_count: pv_sample.element_count,
3341 ..Default::default()
3342 };
3343 let pv_name_for_post = pv_sample.pv_name.clone();
3344 let counters_for_post = pv_sample.counters.clone();
3345 let counters_in_task = pv_sample.counters.clone();
3346 let storage_for_task = storage.clone();
3347 let pending_for_task = pending.clone();
3348
3349 // Bound each append + isolate sync syscall hangs.
3350 // `spawn_blocking` moves the actual I/O to the blocking
3351 // thread pool, so a stuck NFS write parks a blocking-pool
3352 // thread instead of a runtime worker. The
3353 // `tokio::time::timeout` then bounds how long the shard waits
3354 // for that join — a stuck task is abandoned and the shard
3355 // continues.
3356 //
3357 // Late-success path: a timed-out task may still complete its
3358 // write later. The closure ALWAYS reports into the shared
3359 // `PendingReports` map on Ok (whether or not the shard
3360 // already moved on), so the global flush owner picks up late
3361 // successes too. Per-PV serialization inside PlainPB keeps
3362 // any late completion ordered behind subsequent same-PV
3363 // samples. The map's coalescing semantics ensure a stale late
3364 // success does NOT clobber a newer hot-path commit.
3365 let join = tokio::task::spawn_blocking(move || {
3366 let rt = tokio::runtime::Handle::current();
3367 let res = rt.block_on(storage_for_task.append_event_with_meta(
3368 &pv_sample.pv_name,
3369 pv_sample.dbr_type,
3370 &pv_sample.sample,
3371 &meta,
3372 ));
3373 if res.is_ok() {
3374 metrics::counter!("archiver_events_stored_total").increment(1);
3375 if let Some(c) = counters_in_task.as_ref() {
3376 c.events_stored.fetch_add(1, Ordering::Relaxed);
3377 }
3378 // Coalesce into the shared map. Always succeeds —
3379 // unlike the previous mpsc try_send, a saturated
3380 // flush owner cannot cause this report to drop, so
3381 // a PV that goes silent right after a successful
3382 // append still gets its `last_event` committed once
3383 // the owner gets to its next flush cycle.
3384 pending_for_task.report(&pv_sample.pv_name, ts);
3385 }
3386 res
3387 });
3388 let res = tokio::time::timeout(append_timeout, join).await;
3389 // Conservative ordering high-water (principle 5). Bump
3390 // `last_ts` on EVERY storage-layer return path — success,
3391 // error, panic, timeout — not just on Ok and timeout.
3392 //
3393 // Why all four:
3394 // * Ok — sample on disk; obvious bump.
3395 // * Timeout — sample MAY land on disk via late success;
3396 // bump to keep on-disk order monotonic.
3397 // * Error — current storage contract is binary, but a
3398 // future partial-success-with-error variant
3399 // could leave bytes on disk. Bumping now
3400 // hedges against that.
3401 // * Panic — closure aborted mid-write; storage state
3402 // is undefined. Bumping is the safe choice.
3403 //
3404 // The pre-check earlier in this function already drops
3405 // out-of-order and type-changed samples BEFORE we get here,
3406 // so this bump only ever sets the high-water to a ts the
3407 // shard explicitly accepted into the storage layer.
3408 last_ts.insert(pv_name_for_post.clone(), ts);
3409 match res {
3410 Ok(Ok(Ok(()))) => {
3411 // Report already went out from inside the closure.
3412 }
3413 Ok(Ok(Err(e))) => {
3414 error!(shard = shard_idx, pv = pv_name_for_post, "Write error: {e}");
3415 }
3416 Ok(Err(join_err)) => {
3417 error!(
3418 shard = shard_idx,
3419 pv = pv_name_for_post,
3420 "Storage write task panicked: {join_err}"
3421 );
3422 }
3423 Err(_) => {
3424 // Sample MAY still land on disk later (per-PV
3425 // serialization keeps order). The closure will report
3426 // into the shared pending map whenever it eventually
3427 // completes, so registry's `last_event` catches up
3428 // via the late path even though we move on now.
3429 error!(
3430 shard = shard_idx,
3431 pv = pv_name_for_post,
3432 "Storage append timed out after {append_timeout:?}; \
3433 shard abandoning (task remains on blocking pool, \
3434 may still succeed late)"
3435 );
3436 if let Some(ref c) = counters_for_post {
3437 c.storage_append_timeouts.fetch_add(1, Ordering::Relaxed);
3438 }
3439 }
3440 }
3441}
3442
3443/// Drain the shard's remaining buffered samples on shutdown,
3444/// applying the same bounded-timeout discipline as the hot path.
3445async fn shard_drain_on_shutdown(
3446 shard_idx: usize,
3447 storage: &Arc<dyn StoragePlugin>,
3448 pending: &Arc<PendingReports>,
3449 sample_rx: &mut mpsc::Receiver<PvSample>,
3450 last_ts: &mut std::collections::HashMap<String, SystemTime>,
3451 last_dbr_type: &mut std::collections::HashMap<String, ArchDbType>,
3452 cfg: &WriteLoopConfig,
3453) {
3454 let drain_per_sample_timeout = cfg.drain_per_sample_timeout;
3455 let drain_total_budget = cfg.drain_total_budget;
3456 let drain_start = std::time::Instant::now();
3457 let mut drained_skipped = 0usize;
3458 while let Ok(pv_sample) = sample_rx.try_recv() {
3459 if drain_start.elapsed() > drain_total_budget {
3460 drained_skipped += 1;
3461 continue;
3462 }
3463 // Reuse the hot-path handler with a tighter timeout.
3464 // Late-success reports still flow into `pending` because
3465 // the closure clones the Arc independently of this
3466 // shard's lifecycle.
3467 shard_handle_sample(
3468 shard_idx,
3469 storage,
3470 pending,
3471 pv_sample,
3472 last_ts,
3473 last_dbr_type,
3474 drain_per_sample_timeout,
3475 )
3476 .await;
3477 }
3478 if drained_skipped > 0 {
3479 warn!(
3480 shard = shard_idx,
3481 "Shutdown drain budget ({drain_total_budget:?}) exhausted; \
3482 {drained_skipped} buffered sample(s) abandoned without \
3483 attempting append"
3484 );
3485 }
3486}
3487
3488/// Single global flush owner. Reads from the shared
3489/// [`PendingReports`] map (which all shards coalesce into),
3490/// drives the periodic `flush_ingest_writes`, and commits to the
3491/// registry. Because ALL shards report into this one map and the
3492/// owner is the only consumer, the flush result and the
3493/// pending-commit data live in the same task — no shard 0 vs
3494/// shard 1 attribution skew.
3495async fn flush_owner_loop(
3496 storage: Arc<dyn StoragePlugin>,
3497 registry: Arc<PvRegistry>,
3498 pending: Arc<PendingReports>,
3499 mut shutdown: tokio::sync::watch::Receiver<bool>,
3500 cfg: WriteLoopConfig,
3501) {
3502 let flush_period = cfg.flush_period;
3503 let flush_timeout = cfg.flush_timeout;
3504 let shutdown_flush_timeout = cfg.shutdown_flush_timeout;
3505 let drain_total_budget = cfg.drain_total_budget;
3506 // tokio::time::interval(Duration::ZERO) panics. Config-side
3507 // validation rejects 0 at load time, but defending in depth
3508 // here keeps tests and direct callers (which build configs by
3509 // hand) from crashing the engine on a misconfigured value.
3510 let flush_period = if flush_period.is_zero() {
3511 warn!(
3512 "flush_owner: flush_period was Duration::ZERO; \
3513 clamping to 1s to avoid tokio::time::interval panic"
3514 );
3515 Duration::from_secs(1)
3516 } else {
3517 flush_period
3518 };
3519 info!("Flush owner started");
3520
3521 let mut flush_ticker = tokio::time::interval(flush_period);
3522 flush_ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
3523 let _ = flush_ticker.tick().await;
3524
3525 let flush_in_flight = Arc::new(std::sync::atomic::AtomicBool::new(false));
3526
3527 // Phase 1: steady state. Ticker-driven flush only. Reports
3528 // accumulate in the shared `PendingReports` map continuously
3529 // (no recv loop — shards write directly), so we don't need
3530 // to multiplex report consumption against the flush.
3531 loop {
3532 tokio::select! {
3533 biased;
3534 _ = shutdown.changed() => {
3535 if *shutdown.borrow() {
3536 break;
3537 }
3538 }
3539 _ = flush_ticker.tick() => {
3540 if !pending.is_empty() {
3541 run_flush_and_commit(
3542 &storage,
3543 ®istry,
3544 &pending,
3545 flush_timeout,
3546 &flush_in_flight,
3547 )
3548 .await;
3549 }
3550 }
3551 }
3552 }
3553
3554 // Phase 2: shutdown grace. Two windows in one budget:
3555 //
3556 // 1. A brief minimum sleep so any in-flight spawn_blocking
3557 // late-success closures can coalesce their reports into
3558 // `pending` before the final flush snapshots it.
3559 //
3560 // 2. Wait for any still-running ticker flush to drain
3561 // (`flush_in_flight` cleared by the spawn_blocking task's
3562 // RAII guard). Without this, the final
3563 // `run_flush_and_commit` would see `in_flight == true`,
3564 // short-circuit, and skip every entry in `pending` —
3565 // data on disk but no registry commit.
3566 //
3567 // Both windows fit inside the operator-configured
3568 // `drain_total_budget`. If a flush is genuinely wedged past
3569 // the budget, we proceed to final flush; that call will see
3570 // `in_flight` still set and short-circuit, but at least we
3571 // didn't pin the process forever. Future samples on restart
3572 // will re-populate `pending` and the registry catches up.
3573 let phase2_deadline = std::time::Instant::now() + drain_total_budget;
3574 let min_grace = std::cmp::min(drain_total_budget, Duration::from_millis(200));
3575 if !min_grace.is_zero() {
3576 tokio::time::sleep(min_grace).await;
3577 }
3578 while flush_in_flight.load(Ordering::Acquire) && std::time::Instant::now() < phase2_deadline {
3579 tokio::time::sleep(Duration::from_millis(50)).await;
3580 }
3581 if flush_in_flight.load(Ordering::Acquire) {
3582 warn!(
3583 "Shutdown grace exhausted while a flush is still in flight; \
3584 final flush will be skipped (pending entries left for next \
3585 process restart to rebuild from re-archived samples)"
3586 );
3587 }
3588
3589 // Final flush + commit. Use shutdown_flush_timeout (typically
3590 // shorter than the steady-state flush_timeout) so a wedged FS
3591 // doesn't block shutdown indefinitely.
3592 info!(
3593 pending_at_shutdown = pending.len(),
3594 "Flush owner running final flush + commit"
3595 );
3596 if !pending.is_empty() {
3597 run_flush_and_commit(
3598 &storage,
3599 ®istry,
3600 &pending,
3601 shutdown_flush_timeout,
3602 &flush_in_flight,
3603 )
3604 .await;
3605 }
3606 info!("Flush owner exiting");
3607}