1use std::sync::atomic::{AtomicI32, AtomicI64, AtomicU64, Ordering};
2use std::sync::{Arc, Mutex};
3use std::time::{Duration, SystemTime};
4
5use dashmap::DashMap;
6use epics_rs::base::server::snapshot::DbrClass;
7use epics_rs::base::types::{DbFieldType, EpicsValue};
8use epics_rs::ca::client::{CaChannel, CaClient, ConnectionEvent};
9use epics_rs::pva::client_native::PvaClient;
10use epics_rs::pva::proto::ByteOrder;
11use epics_rs::pva::pvdata::encode::{encode_pv_field, encode_type_desc};
12use epics_rs::pva::pvdata::{PvField, ScalarType, ScalarValue, TypedScalarArray};
13use tokio::sync::mpsc;
14use tokio_util::sync::CancellationToken;
15use tracing::{debug, error, info, warn};
16
17use archiver_core::registry::{Protocol, PvRecord, PvRegistry, PvStatus, SampleMode};
18use archiver_core::storage::traits::{AppendMeta, IngestFlushResult, StoragePlugin};
19use archiver_core::types::{ArchDbType, ArchiverSample, ArchiverValue};
20
21use crate::policy::PolicyConfig;
22
23const CA_CONNECT_TIMEOUT: Duration = Duration::from_secs(10);
25const CA_RECONNECT_TIMEOUT: Duration = Duration::from_secs(30);
27const CA_RETRY_DELAY: Duration = Duration::from_secs(5);
29
30const PAST_CUTOFF_UNIX_SECS: i64 = 662_688_000; fn ioc_timestamp_in_window(ts: SystemTime, now: SystemTime, drift_secs: u64) -> bool {
44 let unix = ts
45 .duration_since(SystemTime::UNIX_EPOCH)
46 .map(|d| d.as_secs() as i64)
47 .unwrap_or(i64::MIN);
48 if unix < PAST_CUTOFF_UNIX_SECS {
49 return false;
50 }
51 let now_unix = now
53 .duration_since(SystemTime::UNIX_EPOCH)
54 .map(|d| d.as_secs() as i64)
55 .unwrap_or(0);
56 let delta = (unix - now_unix).unsigned_abs();
57 delta <= drift_secs
58}
59
60#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
63pub enum PvConnectionState {
64 #[default]
67 Idle,
68 Connecting,
71 Connected,
74 Disconnected,
78}
79
80impl PvConnectionState {
81 pub fn as_str(self) -> &'static str {
82 match self {
83 Self::Idle => "Idle",
84 Self::Connecting => "Connecting",
85 Self::Connected => "Connected",
86 Self::Disconnected => "Disconnected",
87 }
88 }
89}
90
91#[derive(Debug, Clone, Default)]
93pub struct ConnectionInfo {
94 pub connected_since: Option<SystemTime>,
95 pub last_event_time: Option<SystemTime>,
96 pub is_connected: bool,
97 pub state: PvConnectionState,
101}
102
103#[derive(Debug)]
110pub struct PvCounters {
111 pub events_received: AtomicU64,
114 pub events_stored: AtomicU64,
116 pub first_event_unix_secs: AtomicI64,
119 pub buffer_overflow_drops: AtomicU64,
123 pub timestamp_drops: AtomicU64,
127 pub type_change_drops: AtomicU64,
131 pub disconnect_count: AtomicU64,
134 pub last_disconnect_unix_secs: AtomicI64,
136 pub transient_error_count: AtomicU64,
141 pub latest_observed_dbr: AtomicI32,
145 pub metadata_fetch_failures: AtomicU64,
149 pub storage_append_timeouts: AtomicU64,
155}
156
157impl Default for PvCounters {
158 fn default() -> Self {
159 Self {
160 events_received: AtomicU64::new(0),
161 events_stored: AtomicU64::new(0),
162 first_event_unix_secs: AtomicI64::new(0),
163 buffer_overflow_drops: AtomicU64::new(0),
164 timestamp_drops: AtomicU64::new(0),
165 type_change_drops: AtomicU64::new(0),
166 disconnect_count: AtomicU64::new(0),
167 last_disconnect_unix_secs: AtomicI64::new(0),
168 transient_error_count: AtomicU64::new(0),
169 latest_observed_dbr: AtomicI32::new(-1),
171 metadata_fetch_failures: AtomicU64::new(0),
172 storage_append_timeouts: AtomicU64::new(0),
173 }
174 }
175}
176
177#[derive(Debug, Clone)]
180pub struct PvCountersSnapshot {
181 pub events_received: u64,
182 pub events_stored: u64,
183 pub first_event_unix_secs: Option<i64>,
184 pub buffer_overflow_drops: u64,
185 pub timestamp_drops: u64,
186 pub type_change_drops: u64,
187 pub disconnect_count: u64,
188 pub last_disconnect_unix_secs: Option<i64>,
189 pub transient_error_count: u64,
190 pub latest_observed_dbr: Option<i32>,
193 pub metadata_fetch_failures: u64,
194 pub storage_append_timeouts: u64,
195}
196
197impl From<&PvCounters> for PvCountersSnapshot {
198 fn from(c: &PvCounters) -> Self {
199 let first = c.first_event_unix_secs.load(Ordering::Relaxed);
200 let last_disc = c.last_disconnect_unix_secs.load(Ordering::Relaxed);
201 Self {
202 events_received: c.events_received.load(Ordering::Relaxed),
203 events_stored: c.events_stored.load(Ordering::Relaxed),
204 first_event_unix_secs: if first == 0 { None } else { Some(first) },
205 buffer_overflow_drops: c.buffer_overflow_drops.load(Ordering::Relaxed),
206 timestamp_drops: c.timestamp_drops.load(Ordering::Relaxed),
207 type_change_drops: c.type_change_drops.load(Ordering::Relaxed),
208 disconnect_count: c.disconnect_count.load(Ordering::Relaxed),
209 last_disconnect_unix_secs: if last_disc == 0 {
210 None
211 } else {
212 Some(last_disc)
213 },
214 transient_error_count: c.transient_error_count.load(Ordering::Relaxed),
215 latest_observed_dbr: match c.latest_observed_dbr.load(Ordering::Relaxed) {
216 -1 => None,
217 v => Some(v),
218 },
219 metadata_fetch_failures: c.metadata_fetch_failures.load(Ordering::Relaxed),
220 storage_append_timeouts: c.storage_append_timeouts.load(Ordering::Relaxed),
221 }
222 }
223}
224
225struct PvHandle {
227 channel: Option<CaChannel>,
232 cancel_token: CancellationToken,
233 #[allow(dead_code)]
234 dbr_type: ArchDbType,
235 conn_info: Arc<Mutex<ConnectionInfo>>,
236 extras: Arc<ExtraFieldsCache>,
241 field_tokens: Arc<DashMap<String, CancellationToken>>,
245 update_lock: Arc<tokio::sync::Mutex<()>>,
248 counters: Arc<PvCounters>,
252}
253
254type ExtraFieldsCache = DashMap<String, String>;
257
258const SAMPLE_CHANNEL_CAPACITY: usize = 500_000;
262
263struct PendingGuard<'a> {
266 map: &'a DashMap<String, ()>,
267 key: String,
268}
269
270impl Drop for PendingGuard<'_> {
271 fn drop(&mut self) {
272 self.map.remove(&self.key);
273 }
274}
275
276pub struct ChannelManager {
279 ca_client: CaClient,
281 pva_client: PvaClient,
284 channels: DashMap<String, PvHandle>,
286 pending_archives: DashMap<String, ()>,
289 op_locks: DashMap<String, Arc<tokio::sync::Mutex<()>>>,
293 #[allow(dead_code)]
295 storage: Arc<dyn StoragePlugin>,
296 registry: Arc<PvRegistry>,
298 sample_tx: mpsc::Sender<PvSample>,
300 policy: Option<PolicyConfig>,
302 server_ioc_drift_secs: u64,
304}
305
306pub struct PvSample {
308 pub pv_name: String,
309 pub dbr_type: ArchDbType,
310 pub sample: ArchiverSample,
311 pub element_count: Option<i32>,
312 pub counters: Option<Arc<PvCounters>>,
316}
317
318impl ChannelManager {
319 pub async fn new(
320 storage: Arc<dyn StoragePlugin>,
321 registry: Arc<PvRegistry>,
322 policy: Option<PolicyConfig>,
323 ) -> anyhow::Result<(Self, mpsc::Receiver<PvSample>)> {
324 Self::new_with_drift(storage, registry, policy, 30 * 60).await
325 }
326
327 pub async fn new_with_drift(
331 storage: Arc<dyn StoragePlugin>,
332 registry: Arc<PvRegistry>,
333 policy: Option<PolicyConfig>,
334 server_ioc_drift_secs: u64,
335 ) -> anyhow::Result<(Self, mpsc::Receiver<PvSample>)> {
336 let ca_client = CaClient::new().await.map_err(|e| anyhow::anyhow!("{e}"))?;
337 let pva_client = PvaClient::new().map_err(|e| anyhow::anyhow!("{e}"))?;
338 let (tx, rx) = mpsc::channel(SAMPLE_CHANNEL_CAPACITY);
339
340 let mgr = Self {
341 ca_client,
342 pva_client,
343 channels: DashMap::new(),
344 pending_archives: DashMap::new(),
345 op_locks: DashMap::new(),
346 storage,
347 registry,
348 sample_tx: tx,
349 policy,
350 server_ioc_drift_secs,
351 };
352
353 Ok((mgr, rx))
354 }
355
356 fn op_lock(&self, pv_name: &str) -> Arc<tokio::sync::Mutex<()>> {
361 if let Some(existing) = self.op_locks.get(pv_name) {
362 return existing.clone();
363 }
364 self.op_locks
365 .entry(pv_name.to_string())
366 .or_insert_with(|| Arc::new(tokio::sync::Mutex::new(())))
367 .clone()
368 }
369
370 pub async fn restore_from_registry(&self) -> anyhow::Result<u64> {
377 let active_pvs = self.registry.pvs_by_status(PvStatus::Active)?;
378 let total = active_pvs.len() as u64;
379 info!(total, "Restoring PVs from registry");
380
381 let mut restored = 0u64;
382 for record in active_pvs {
383 if record.alias_for.is_some() {
384 warn!(
385 pv = record.pv_name,
386 target = record.alias_for.as_deref(),
387 "Skipping alias row in restore; aliases are routed, not archived"
388 );
389 continue;
390 }
391 if let Err(e) = self.start_archiving_internal(&record).await {
392 warn!(pv = record.pv_name, "Failed to restore PV: {e}");
393 self.registry.set_status(&record.pv_name, PvStatus::Error)?;
394 } else {
395 restored += 1;
396 }
397 }
398 metrics::gauge!("archiver_pvs_active").set(restored as f64);
399 if restored < total {
400 warn!(
401 restored,
402 failed = total - restored,
403 "Some PVs failed to restore"
404 );
405 }
406
407 Ok(restored)
408 }
409
410 pub async fn archive_pv(
412 &self,
413 pv_name: &str,
414 sample_mode: &SampleMode,
415 protocol: Protocol,
416 ) -> anyhow::Result<()> {
417 let lock = self.op_lock(pv_name);
419 let _g = lock.lock().await;
420
421 if self.channels.contains_key(pv_name) {
422 anyhow::bail!("PV {pv_name} is already being archived");
423 }
424
425 if self
428 .pending_archives
429 .insert(pv_name.to_string(), ())
430 .is_some()
431 {
432 anyhow::bail!("PV {pv_name} archive operation already in progress");
433 }
434 let _guard = PendingGuard {
435 map: &self.pending_archives,
436 key: pv_name.to_string(),
437 };
438
439 match protocol {
440 Protocol::Ca => self.archive_pv_inner(pv_name, sample_mode).await,
441 Protocol::Pva => self.archive_pv_inner_pva(pv_name, sample_mode).await,
442 }
443 }
444
445 async fn archive_pv_inner(
447 &self,
448 pv_name: &str,
449 sample_mode: &SampleMode,
450 ) -> anyhow::Result<()> {
451 if self.channels.contains_key(pv_name) {
453 anyhow::bail!("PV {pv_name} is already being archived");
454 }
455
456 let (effective_mode, matched_policy_name) = if let Some(ref policy) = self.policy {
458 if let Some(p) = policy.find_policy(pv_name) {
459 (p.to_sample_mode(), Some(p.policy_name().to_string()))
460 } else {
461 (sample_mode.clone(), None)
462 }
463 } else {
464 (sample_mode.clone(), None)
465 };
466
467 let channel = self.ca_client.create_channel(pv_name);
469 channel
470 .wait_connected(CA_CONNECT_TIMEOUT)
471 .await
472 .map_err(|e| anyhow::anyhow!("Failed to connect to {pv_name}: {e}"))?;
473
474 let info = self
475 .ca_client
476 .cainfo(pv_name)
477 .await
478 .map_err(|e| anyhow::anyhow!("Failed to get info for {pv_name}: {e}"))?;
479
480 let dbr_type = dbr_field_to_arch_type(info.native_type);
481 let element_count = info.element_count as i32;
482
483 self.registry
486 .register_pv(pv_name, dbr_type, &effective_mode, element_count)?;
487 if let Some(ref name) = matched_policy_name {
490 self.registry.update_policy_name(pv_name, Some(name))?;
491 }
492
493 let record = self
494 .registry
495 .get_pv(pv_name)?
496 .ok_or_else(|| anyhow::anyhow!("PV {pv_name} not found in registry"))?;
497 self.start_archiving_internal(&record).await?;
498
499 metrics::gauge!("archiver_pvs_active").increment(1.0);
500 info!(pv = pv_name, ?dbr_type, element_count, "Started archiving");
501 Ok(())
502 }
503
504 async fn archive_pv_inner_pva(
509 &self,
510 pv_name: &str,
511 sample_mode: &SampleMode,
512 ) -> anyhow::Result<()> {
513 if self.channels.contains_key(pv_name) {
514 anyhow::bail!("PV {pv_name} is already being archived");
515 }
516
517 let (effective_mode, matched_policy_name) = if let Some(ref policy) = self.policy {
519 if let Some(p) = policy.find_policy(pv_name) {
520 (p.to_sample_mode(), Some(p.policy_name().to_string()))
521 } else {
522 (sample_mode.clone(), None)
523 }
524 } else {
525 (sample_mode.clone(), None)
526 };
527
528 let connect = tokio::time::timeout(CA_CONNECT_TIMEOUT, self.pva_client.pvconnect(pv_name))
532 .await
533 .map_err(|_| anyhow::anyhow!("PVA connect to {pv_name} timed out"))?
534 .map_err(|e| anyhow::anyhow!("Failed to connect to {pv_name} via PVA: {e}"))?;
535 debug!(pv = pv_name, server = %connect, "PVA channel connected");
536
537 let initial = tokio::time::timeout(CA_CONNECT_TIMEOUT, self.pva_client.pvget_full(pv_name))
543 .await
544 .map_err(|_| anyhow::anyhow!("PVA pvget for {pv_name} timed out"))?
545 .map_err(|e| anyhow::anyhow!("Failed to pvget {pv_name}: {e}"))?;
546 let (dbr_type, element_count) =
547 pv_field_to_arch_db_type(&initial.value).ok_or_else(|| {
548 anyhow::anyhow!("PV {pv_name}: empty PVA scalar-array; cannot infer element type")
549 })?;
550
551 self.registry.register_pv_with_protocol(
553 pv_name,
554 dbr_type,
555 &effective_mode,
556 element_count,
557 Protocol::Pva,
558 )?;
559 if let Some(ref name) = matched_policy_name {
560 self.registry.update_policy_name(pv_name, Some(name))?;
561 }
562 let (prec, egu) = pv_field_extract_display(&initial.value);
565 if (prec.is_some() || egu.is_some())
566 && let Err(e) = self
567 .registry
568 .update_metadata(pv_name, prec.as_deref(), egu.as_deref())
569 {
570 debug!(pv = pv_name, "Failed to persist PVA PREC/EGU: {e}");
571 }
572
573 let record = self
574 .registry
575 .get_pv(pv_name)?
576 .ok_or_else(|| anyhow::anyhow!("PV {pv_name} not found in registry"))?;
577 self.start_archiving_internal(&record).await?;
578
579 metrics::gauge!("archiver_pvs_active").increment(1.0);
580 info!(
581 pv = pv_name,
582 ?dbr_type,
583 element_count,
584 protocol = "pva",
585 "Started archiving"
586 );
587 Ok(())
588 }
589
590 async fn start_archiving_internal(&self, record: &PvRecord) -> anyhow::Result<()> {
594 if record.protocol == Protocol::Pva {
595 return self.start_archiving_internal_pva(record).await;
596 }
597 let pv_name = record.pv_name.clone();
598 let dbr_type = record.dbr_type;
599 let element_count = record.element_count;
600 let channel = self.ca_client.create_channel(&pv_name);
601 let cancel_token = CancellationToken::new();
602 let conn_info = Arc::new(Mutex::new(ConnectionInfo::default()));
603 let extras: Arc<ExtraFieldsCache> = Arc::new(DashMap::new());
604 let field_tokens: Arc<DashMap<String, CancellationToken>> = Arc::new(DashMap::new());
605 let update_lock = Arc::new(tokio::sync::Mutex::new(()));
606 let counters = Arc::new(PvCounters::default());
607
608 let _guard = update_lock.lock().await;
614
615 self.channels.insert(
616 pv_name.clone(),
617 PvHandle {
618 channel: Some(channel.clone()),
619 cancel_token: cancel_token.clone(),
620 dbr_type,
621 conn_info: conn_info.clone(),
622 extras: extras.clone(),
623 field_tokens: field_tokens.clone(),
624 update_lock: update_lock.clone(),
625 counters: counters.clone(),
626 },
627 );
628
629 for field in &record.archive_fields {
632 let child = cancel_token.child_token();
633 field_tokens.insert(field.clone(), child.clone());
634 spawn_extra_field_monitor(
635 &self.ca_client,
636 &pv_name,
637 field,
638 extras.clone(),
639 child,
640 counters.clone(),
641 );
642 }
643 metrics::gauge!("archiver_extra_field_tasks").increment(record.archive_fields.len() as f64);
644 drop(_guard);
645
646 let tx = self.sample_tx.clone();
647 let token = cancel_token.clone();
648 let ci = conn_info.clone();
649 let extras_for_loop = extras.clone();
650 let counters_for_loop = counters.clone();
651 let registry_for_loop = self.registry.clone();
652
653 let drift = self.server_ioc_drift_secs;
654 match &record.sample_mode {
655 SampleMode::Monitor => {
656 tokio::spawn(async move {
657 monitor_loop(
658 pv_name,
659 dbr_type,
660 element_count,
661 channel,
662 tx,
663 token,
664 ci,
665 registry_for_loop,
666 extras_for_loop,
667 counters_for_loop,
668 drift,
669 )
670 .await;
671 });
672 }
673 SampleMode::Scan { period_secs } => {
674 let period = *period_secs;
675 tokio::spawn(async move {
676 scan_loop(
677 pv_name,
678 dbr_type,
679 element_count,
680 channel,
681 tx,
682 token,
683 period,
684 ci,
685 registry_for_loop,
686 extras_for_loop,
687 counters_for_loop,
688 )
689 .await;
690 });
691 }
692 }
693
694 Ok(())
695 }
696
697 async fn start_archiving_internal_pva(&self, record: &PvRecord) -> anyhow::Result<()> {
705 let pv_name = record.pv_name.clone();
706 let dbr_type = record.dbr_type;
707 let element_count = record.element_count;
708 let cancel_token = CancellationToken::new();
709 let conn_info = Arc::new(Mutex::new(ConnectionInfo::default()));
710 let extras: Arc<ExtraFieldsCache> = Arc::new(DashMap::new());
711 let field_tokens: Arc<DashMap<String, CancellationToken>> = Arc::new(DashMap::new());
712 let update_lock = Arc::new(tokio::sync::Mutex::new(()));
713 let counters = Arc::new(PvCounters::default());
714
715 self.channels.insert(
716 pv_name.clone(),
717 PvHandle {
718 channel: None,
719 cancel_token: cancel_token.clone(),
720 dbr_type,
721 conn_info: conn_info.clone(),
722 extras: extras.clone(),
723 field_tokens: field_tokens.clone(),
724 update_lock,
725 counters: counters.clone(),
726 },
727 );
728
729 let tx = self.sample_tx.clone();
730 let pva_client = self.pva_client.clone();
731 let token = cancel_token.clone();
732 let ci = conn_info.clone();
733 let counters_for_loop = counters.clone();
734 let drift = self.server_ioc_drift_secs;
735
736 match &record.sample_mode {
737 SampleMode::Monitor => {
738 let pv_name_loop = pv_name.clone();
739 let archive_fields_loop = record.archive_fields.clone();
740 let extras_for_loop = extras.clone();
741 tokio::spawn(async move {
742 monitor_loop_pva(
743 pv_name_loop,
744 dbr_type,
745 element_count,
746 pva_client,
747 tx,
748 token,
749 ci,
750 counters_for_loop,
751 drift,
752 archive_fields_loop,
753 extras_for_loop,
754 )
755 .await;
756 });
757 }
758 SampleMode::Scan { period_secs } => {
759 let period = *period_secs;
760 let pv_name_loop = pv_name.clone();
761 let archive_fields_loop = record.archive_fields.clone();
762 let extras_for_loop = extras.clone();
763 tokio::spawn(async move {
764 scan_loop_pva(
765 pv_name_loop,
766 dbr_type,
767 pva_client,
768 tx,
769 token,
770 period,
771 ci,
772 counters_for_loop,
773 drift,
774 archive_fields_loop,
775 extras_for_loop,
776 )
777 .await;
778 });
779 }
780 }
781
782 {
787 let pv_name = pv_name.clone();
788 let pva_client = self.pva_client.clone();
789 let registry = self.registry.clone();
790 let cancel = cancel_token.clone();
791 let counters_for_refresh = counters.clone();
792 tokio::spawn(async move {
793 pva_metadata_refresh_loop(
794 pv_name,
795 pva_client,
796 registry,
797 cancel,
798 counters_for_refresh,
799 )
800 .await;
801 });
802 }
803 {
804 let pv_name = pv_name.clone();
805 let conn_info = conn_info.clone();
806 let counters_for_watch = counters.clone();
807 let cancel = cancel_token.clone();
808 let sample_mode = record.sample_mode.clone();
809 tokio::spawn(async move {
810 pva_state_watchdog(pv_name, conn_info, counters_for_watch, cancel, sample_mode)
811 .await;
812 });
813 }
814
815 Ok(())
816 }
817
818 pub async fn update_archive_fields(
824 &self,
825 pv_name: &str,
826 fields: &[String],
827 ) -> anyhow::Result<()> {
828 self.registry.update_archive_fields(pv_name, fields)?;
831
832 let (parent_token, extras, field_tokens, update_lock, counters) = {
835 let Some(handle) = self.channels.get(pv_name) else {
836 return Ok(());
837 };
838 (
839 handle.cancel_token.clone(),
840 handle.extras.clone(),
841 handle.field_tokens.clone(),
842 handle.update_lock.clone(),
843 handle.counters.clone(),
844 )
845 };
846
847 let _guard = update_lock.lock().await;
850
851 let wanted: std::collections::HashSet<&str> = fields.iter().map(|s| s.as_str()).collect();
852
853 let to_remove: Vec<String> = field_tokens
857 .iter()
858 .filter(|e| !wanted.contains(e.key().as_str()))
859 .map(|e| e.key().clone())
860 .collect();
861 let removed_count = to_remove.len();
862 for key in to_remove {
863 if let Some((_, token)) = field_tokens.remove(&key) {
864 token.cancel();
865 }
866 extras.remove(&key);
867 }
868
869 let mut added_count = 0usize;
872 for f in fields {
873 if !field_tokens.contains_key(f) {
874 let child = parent_token.child_token();
875 field_tokens.insert(f.clone(), child.clone());
876 spawn_extra_field_monitor(
877 &self.ca_client,
878 pv_name,
879 f,
880 extras.clone(),
881 child,
882 counters.clone(),
883 );
884 added_count += 1;
885 }
886 }
887 let net = added_count as i64 - removed_count as i64;
888 if net != 0 {
889 metrics::gauge!("archiver_extra_field_tasks").increment(net as f64);
890 }
891 Ok(())
892 }
893
894 pub async fn pause_pv(&self, pv_name: &str) -> anyhow::Result<()> {
896 let lock = self.op_lock(pv_name);
897 let _g = lock.lock().await;
898 if let Some((_key, handle)) = self.channels.remove(pv_name) {
899 let extra_count = handle.field_tokens.len() as f64;
900 handle.cancel_token.cancel();
901 metrics::gauge!("archiver_pvs_active").decrement(1.0);
902 if extra_count > 0.0 {
903 metrics::gauge!("archiver_extra_field_tasks").decrement(extra_count);
904 }
905 }
906 self.registry.set_status(pv_name, PvStatus::Paused)?;
907 info!(pv = pv_name, "Paused archiving");
908 Ok(())
909 }
910
911 pub async fn resume_pv(&self, pv_name: &str) -> anyhow::Result<()> {
914 let lock = self.op_lock(pv_name);
915 let _g = lock.lock().await;
916
917 let record = self
918 .registry
919 .get_pv(pv_name)?
920 .ok_or_else(|| anyhow::anyhow!("PV {pv_name} not found in registry"))?;
921
922 if record.status == PvStatus::Active && self.channels.contains_key(pv_name) {
924 info!(
925 pv = pv_name,
926 "PV is already actively archived, skipping resume"
927 );
928 return Ok(());
929 }
930
931 if let Some((_key, handle)) = self.channels.remove(pv_name) {
933 let extra_count = handle.field_tokens.len() as f64;
934 handle.cancel_token.cancel();
935 if extra_count > 0.0 {
936 metrics::gauge!("archiver_extra_field_tasks").decrement(extra_count);
937 }
938 }
939
940 self.start_archiving_internal(&record).await?;
942 self.registry.set_status(pv_name, PvStatus::Active)?;
943 metrics::gauge!("archiver_pvs_active").increment(1.0);
944 info!(pv = pv_name, "Resumed archiving");
945 Ok(())
946 }
947
948 pub async fn stop_pv(&self, pv_name: &str) -> anyhow::Result<()> {
951 let lock = self.op_lock(pv_name);
952 let _g = lock.lock().await;
953 if let Some((_key, handle)) = self.channels.remove(pv_name) {
954 let extra_count = handle.field_tokens.len() as f64;
955 handle.cancel_token.cancel();
956 metrics::gauge!("archiver_pvs_active").decrement(1.0);
957 if extra_count > 0.0 {
958 metrics::gauge!("archiver_extra_field_tasks").decrement(extra_count);
959 }
960 }
961 self.registry.set_status(pv_name, PvStatus::Inactive)?;
962 info!(pv = pv_name, "Stopped archiving (inactive)");
963 Ok(())
964 }
965
966 pub async fn destroy_pv(&self, pv_name: &str) -> anyhow::Result<()> {
968 let lock = self.op_lock(pv_name);
969 let _g = lock.lock().await;
970 if let Some((_key, handle)) = self.channels.remove(pv_name) {
971 let extra_count = handle.field_tokens.len() as f64;
972 handle.cancel_token.cancel();
973 metrics::gauge!("archiver_pvs_active").decrement(1.0);
974 if extra_count > 0.0 {
975 metrics::gauge!("archiver_extra_field_tasks").decrement(extra_count);
976 }
977 }
978 self.registry.remove_pv(pv_name)?;
979 info!(pv = pv_name, "Destroyed archiving channel");
985 Ok(())
986 }
987
988 pub fn list_pvs(&self) -> Vec<String> {
990 self.registry.all_pv_names().unwrap_or_else(|e| {
991 warn!("Failed to list PVs: {e}");
992 Vec::new()
993 })
994 }
995
996 pub fn matching_pvs(&self, pattern: &str) -> Vec<String> {
998 self.registry.matching_pvs(pattern).unwrap_or_else(|e| {
999 warn!("Failed to match PVs: {e}");
1000 Vec::new()
1001 })
1002 }
1003
1004 pub fn registry(&self) -> &Arc<PvRegistry> {
1006 &self.registry
1007 }
1008
1009 pub fn get_connection_info(&self, pv: &str) -> Option<ConnectionInfo> {
1011 self.channels.get(pv).map(|h| {
1012 h.conn_info
1013 .lock()
1014 .unwrap_or_else(|e| e.into_inner())
1015 .clone()
1016 })
1017 }
1018
1019 pub fn get_never_connected_pvs(&self) -> Vec<String> {
1021 self.channels
1022 .iter()
1023 .filter(|entry| {
1024 let ci = entry
1025 .value()
1026 .conn_info
1027 .lock()
1028 .unwrap_or_else(|e| e.into_inner());
1029 ci.connected_since.is_none()
1030 })
1031 .map(|entry| entry.key().clone())
1032 .collect()
1033 }
1034
1035 pub fn pv_counters(&self, pv_name: &str) -> Option<Arc<PvCounters>> {
1039 self.channels.get(pv_name).map(|h| h.counters.clone())
1040 }
1041
1042 pub fn all_pv_counters(&self) -> Vec<(String, PvCountersSnapshot)> {
1045 self.channels
1046 .iter()
1047 .map(|e| {
1048 (
1049 e.key().clone(),
1050 PvCountersSnapshot::from(&*e.value().counters),
1051 )
1052 })
1053 .collect()
1054 }
1055
1056 pub async fn live_value(
1061 &self,
1062 pv_name: &str,
1063 timeout: Duration,
1064 ) -> Option<anyhow::Result<ArchiverValue>> {
1065 let channel_opt = self.channels.get(pv_name)?.channel.clone();
1066 let Some(channel) = channel_opt else {
1067 let pva = self.pva_client.clone();
1072 let name = pv_name.to_string();
1073 let res = tokio::time::timeout(timeout, pva.pvget_full(&name)).await;
1074 return Some(match res {
1075 Ok(Ok(result)) => pv_field_scalar_to_archiver(&result.value, &result.introspection)
1076 .ok_or_else(|| anyhow::anyhow!("PVA value has no archiver mapping")),
1077 Ok(Err(e)) => Err(anyhow::anyhow!("PVA get failed: {e}")),
1078 Err(_) => Err(anyhow::anyhow!("PVA get timed out after {timeout:?}")),
1079 });
1080 };
1081 if channel.wait_connected(timeout).await.is_err() {
1085 return Some(Err(anyhow::anyhow!(
1086 "channel not connected within {timeout:?}"
1087 )));
1088 }
1089 match tokio::time::timeout(timeout, channel.get()).await {
1090 Ok(Ok((_dbr_type, val))) => Some(Ok(epics_value_to_archiver(&val))),
1091 Ok(Err(e)) => Some(Err(anyhow::anyhow!("CA get failed: {e}"))),
1092 Err(_) => Some(Err(anyhow::anyhow!("CA get timed out after {timeout:?}"))),
1093 }
1094 }
1095
1096 pub fn extras_snapshot(&self, pv_name: &str) -> std::collections::HashMap<String, String> {
1100 match self.channels.get(pv_name) {
1101 Some(handle) => handle
1102 .extras
1103 .iter()
1104 .map(|e| (e.key().clone(), e.value().clone()))
1105 .collect(),
1106 None => std::collections::HashMap::new(),
1107 }
1108 }
1109
1110 pub fn get_currently_disconnected_pvs(&self) -> Vec<String> {
1112 self.channels
1113 .iter()
1114 .filter(|entry| {
1115 let ci = entry
1116 .value()
1117 .conn_info
1118 .lock()
1119 .unwrap_or_else(|e| e.into_inner());
1120 !ci.is_connected
1121 })
1122 .map(|entry| entry.key().clone())
1123 .collect()
1124 }
1125}
1126
1127async fn refresh_ctrl_metadata(
1134 channel: &CaChannel,
1135 registry: &PvRegistry,
1136 pv_name: &str,
1137 counters: &PvCounters,
1138) {
1139 const FETCH_TIMEOUT: Duration = Duration::from_secs(15);
1144 let snapshot = match tokio::time::timeout(
1145 FETCH_TIMEOUT,
1146 channel.get_with_metadata(DbrClass::Ctrl),
1147 )
1148 .await
1149 {
1150 Ok(Ok(s)) => s,
1151 Ok(Err(e)) => {
1152 counters
1153 .metadata_fetch_failures
1154 .fetch_add(1, Ordering::Relaxed);
1155 debug!(pv = pv_name, "Ctrl metadata fetch failed: {e}");
1156 return;
1157 }
1158 Err(_) => {
1159 counters
1160 .metadata_fetch_failures
1161 .fetch_add(1, Ordering::Relaxed);
1162 debug!(pv = pv_name, "Ctrl metadata fetch timed out");
1163 return;
1164 }
1165 };
1166 let Some(display) = snapshot.display else {
1167 return;
1170 };
1171
1172 let new_prec_opt: Option<String> = if display.precision < 0 {
1176 None
1177 } else {
1178 Some(display.precision.to_string())
1179 };
1180 let new_egu_trimmed = display.units.trim();
1181 let new_egu_opt: Option<&str> = if new_egu_trimmed.is_empty() {
1182 None
1183 } else {
1184 Some(new_egu_trimmed)
1185 };
1186
1187 let stored = match registry.get_pv(pv_name) {
1188 Ok(Some(r)) => r,
1189 Ok(None) => return,
1190 Err(e) => {
1191 debug!(
1192 pv = pv_name,
1193 "Registry read for metadata compare failed: {e}"
1194 );
1195 return;
1196 }
1197 };
1198
1199 let prec_changed = match (stored.prec.as_deref(), new_prec_opt.as_deref()) {
1200 (Some(s), Some(n)) => s != n,
1201 (None, Some(_)) => true,
1202 _ => false,
1204 };
1205 let egu_changed = match (stored.egu.as_deref(), new_egu_opt) {
1206 (Some(s), Some(n)) => s != n,
1207 (None, Some(_)) => true,
1208 _ => false,
1211 };
1212 if !prec_changed && !egu_changed {
1213 return;
1214 }
1215
1216 let prec_arg = if prec_changed {
1217 new_prec_opt.as_deref()
1218 } else {
1219 None
1220 };
1221 let egu_arg = if egu_changed { new_egu_opt } else { None };
1222 if let Err(e) = registry.update_metadata(pv_name, prec_arg, egu_arg) {
1223 warn!(pv = pv_name, "Failed to persist PREC/EGU: {e}");
1224 } else {
1225 debug!(
1226 pv = pv_name,
1227 prec = ?prec_arg, egu = ?egu_arg,
1228 "Refreshed PREC/EGU from DBR_CTRL"
1229 );
1230 }
1231}
1232
1233#[allow(clippy::too_many_arguments)]
1246fn pva_handle_event(
1247 field: &PvField,
1248 canonical_desc: &epics_rs::pva::pvdata::FieldDesc,
1249 pv_name: &str,
1250 dbr_type: ArchDbType,
1251 tx: &mpsc::Sender<PvSample>,
1252 conn_info: &Mutex<ConnectionInfo>,
1253 counters: &Arc<PvCounters>,
1254 extras: &ExtraFieldsCache,
1255 extras_paths: &[(String, &'static str)],
1256 drift_secs: u64,
1257) {
1258 let now = SystemTime::now();
1259 let first_after_connect = {
1260 let mut ci = conn_info.lock().unwrap_or_else(|e| e.into_inner());
1261 let first = ci.last_event_time.is_none();
1262 if ci.connected_since.is_none() {
1263 ci.connected_since = Some(now);
1264 }
1265 ci.is_connected = true;
1266 ci.last_event_time = Some(now);
1267 ci.state = PvConnectionState::Connected;
1268 first
1269 };
1270 if first_after_connect {
1271 counters
1272 .first_event_unix_secs
1273 .compare_exchange(0, unix_secs(now), Ordering::Relaxed, Ordering::Relaxed)
1274 .ok();
1275 }
1276 counters.events_received.fetch_add(1, Ordering::Relaxed);
1277
1278 let Some(value) = pv_field_scalar_to_archiver(field, canonical_desc) else {
1279 counters.type_change_drops.fetch_add(1, Ordering::Relaxed);
1280 debug!(pv = pv_name, "PVA event has no archiver mapping; dropping");
1281 return;
1282 };
1283
1284 let ts = pv_field_extract_timestamp(field);
1285 if !first_after_connect && !ioc_timestamp_in_window(ts, now, drift_secs) {
1286 counters.timestamp_drops.fetch_add(1, Ordering::Relaxed);
1287 debug!(
1288 pv = pv_name,
1289 ?ts,
1290 "Dropping PVA sample with out-of-window timestamp"
1291 );
1292 return;
1293 }
1294 let elem_count = match pv_field_element_count(field) {
1295 0 => 1,
1296 n => n,
1297 };
1298 refresh_nt_enum_extras(field, extras);
1299 for (field_name, path) in extras_paths {
1300 if let Some(PvField::Scalar(s)) = pv_field_walk_path(field, path) {
1301 extras.insert(field_name.clone(), scalar_value_to_string(s));
1302 }
1303 }
1304 let mut sample = ArchiverSample::new(ts, value);
1305 attach_extras(extras, &mut sample);
1306 let pv_sample = PvSample {
1307 pv_name: pv_name.to_string(),
1308 dbr_type,
1309 sample,
1310 element_count: Some(elem_count),
1311 counters: Some(counters.clone()),
1312 };
1313 if let Err(tokio::sync::mpsc::error::TrySendError::Full(_)) = tx.try_send(pv_sample) {
1314 counters
1315 .buffer_overflow_drops
1316 .fetch_add(1, Ordering::Relaxed);
1317 }
1318}
1319
1320#[allow(clippy::too_many_arguments)]
1330async fn monitor_loop_pva(
1331 pv_name: String,
1332 dbr_type: ArchDbType,
1333 element_count: i32,
1334 pva_client: PvaClient,
1335 tx: mpsc::Sender<PvSample>,
1336 cancel_token: CancellationToken,
1337 conn_info: Arc<Mutex<ConnectionInfo>>,
1338 counters: Arc<PvCounters>,
1339 server_ioc_drift_secs: u64,
1340 archive_fields: Vec<String>,
1341 extras: Arc<ExtraFieldsCache>,
1342) {
1343 let drift_secs = server_ioc_drift_secs;
1344 let _ = element_count;
1348
1349 let extras_paths: Vec<(String, &'static str)> = archive_fields
1353 .iter()
1354 .filter_map(|f| ca_archive_field_to_pva_path(f).map(|p| (f.clone(), p)))
1355 .collect();
1356 let request_expr = if extras_paths.is_empty() {
1357 None
1358 } else {
1359 let mut builder = epics_rs::pva::pv_request::PvRequestBuilder::new()
1360 .field("value")
1361 .field("alarm")
1362 .field("timeStamp");
1363 for (_, path) in &extras_paths {
1364 builder = builder.field(*path);
1365 }
1366 Some(builder.build())
1367 };
1368
1369 loop {
1375 if let Some(ref req) = request_expr {
1376 let canonical = match pva_client.pvinfo(&pv_name).await {
1385 Ok(d) => Arc::new(d),
1386 Err(e) => {
1387 counters
1388 .transient_error_count
1389 .fetch_add(1, Ordering::Relaxed);
1390 warn!(
1391 pv = pv_name,
1392 "PVA pvinfo failed: {e}; cannot capture canonical descriptor — retrying subscribe"
1393 );
1394 tokio::select! {
1395 _ = cancel_token.cancelled() => return,
1396 _ = tokio::time::sleep(CA_RETRY_DELAY) => continue,
1397 }
1398 }
1399 };
1400 let pv_name_cb = pv_name.clone();
1401 let tx_cb = tx.clone();
1402 let conn_info_cb = conn_info.clone();
1403 let counters_cb = counters.clone();
1404 let extras_cb = extras.clone();
1405 let extras_paths_cb = extras_paths.clone();
1406 let canonical_cb = canonical.clone();
1407 let cb = move |field: &PvField| {
1408 pva_handle_event(
1409 field,
1410 &canonical_cb,
1411 &pv_name_cb,
1412 dbr_type,
1413 &tx_cb,
1414 &conn_info_cb,
1415 &counters_cb,
1416 &extras_cb,
1417 &extras_paths_cb,
1418 drift_secs,
1419 );
1420 };
1421 tokio::select! {
1422 _ = cancel_token.cancelled() => {
1423 debug!(pv = pv_name, "PVA monitor (custom request) cancelled");
1424 return;
1425 }
1426 res = pva_client.pvmonitor_with_request(&pv_name, req, cb) => {
1427 match res {
1428 Ok(()) => {
1429 debug!(pv = pv_name, "PVA pvmonitor_with_request returned Ok; resubscribing");
1430 }
1431 Err(e) => {
1432 counters
1433 .transient_error_count
1434 .fetch_add(1, Ordering::Relaxed);
1435 warn!(pv = pv_name, "PVA pvmonitor_with_request failed: {e}; retrying");
1436 }
1437 }
1438 tokio::select! {
1439 _ = cancel_token.cancelled() => return,
1440 _ = tokio::time::sleep(CA_RETRY_DELAY) => continue,
1441 }
1442 }
1443 }
1444 } else {
1445 let pv_name_cb = pv_name.clone();
1453 let tx_cb = tx.clone();
1454 let conn_info_cb = conn_info.clone();
1455 let counters_cb = counters.clone();
1456 let extras_cb = extras.clone();
1457 let extras_paths_cb = extras_paths.clone();
1458 let cb = move |desc: &epics_rs::pva::pvdata::FieldDesc, field: &PvField| {
1459 pva_handle_event(
1460 field,
1461 desc,
1462 &pv_name_cb,
1463 dbr_type,
1464 &tx_cb,
1465 &conn_info_cb,
1466 &counters_cb,
1467 &extras_cb,
1468 &extras_paths_cb,
1469 drift_secs,
1470 );
1471 };
1472 let handle = match pva_client.pvmonitor_handle(&pv_name, cb).await {
1473 Ok(h) => h,
1474 Err(e) => {
1475 counters
1476 .transient_error_count
1477 .fetch_add(1, Ordering::Relaxed);
1478 warn!(pv = pv_name, "PVA pvmonitor failed: {e}; retrying");
1479 tokio::select! {
1480 _ = cancel_token.cancelled() => return,
1481 _ = tokio::time::sleep(CA_RETRY_DELAY) => continue,
1482 }
1483 }
1484 };
1485 debug!(pv = pv_name, "PVA monitor active");
1486 cancel_token.cancelled().await;
1487 debug!(pv = pv_name, "PVA monitor cancelled; dropping subscription");
1488 drop(handle);
1489 return;
1490 }
1491 }
1492}
1493
1494#[allow(clippy::too_many_arguments)]
1497async fn scan_loop_pva(
1498 pv_name: String,
1499 dbr_type: ArchDbType,
1500 pva_client: PvaClient,
1501 tx: mpsc::Sender<PvSample>,
1502 cancel_token: CancellationToken,
1503 period_secs: f64,
1504 conn_info: Arc<Mutex<ConnectionInfo>>,
1505 counters: Arc<PvCounters>,
1506 server_ioc_drift_secs: u64,
1507 archive_fields: Vec<String>,
1508 extras: Arc<ExtraFieldsCache>,
1509) {
1510 let extras_paths: Vec<(String, &'static str)> = archive_fields
1511 .iter()
1512 .filter_map(|f| ca_archive_field_to_pva_path(f).map(|p| (f.clone(), p)))
1513 .collect();
1514 let period = Duration::from_secs_f64(period_secs);
1515 let mut interval = tokio::time::interval(period);
1516 let drift_secs = server_ioc_drift_secs;
1517 let pvget_timeout = period.max(Duration::from_secs(5));
1520
1521 loop {
1522 tokio::select! {
1523 _ = cancel_token.cancelled() => return,
1524 _ = interval.tick() => {}
1525 }
1526
1527 let res = tokio::time::timeout(pvget_timeout, pva_client.pvget_full(&pv_name)).await;
1532 let (field, canonical) = match res {
1533 Ok(Ok(r)) => (r.value, r.introspection),
1534 Ok(Err(e)) => {
1535 let was_connected = {
1536 let mut ci = conn_info.lock().unwrap_or_else(|e| e.into_inner());
1537 let prev = ci.is_connected;
1538 ci.is_connected = false;
1539 ci.last_event_time = None;
1540 ci.state = match ci.state {
1541 PvConnectionState::Connected => PvConnectionState::Disconnected,
1542 PvConnectionState::Disconnected => PvConnectionState::Disconnected,
1543 _ => PvConnectionState::Connecting,
1544 };
1545 prev
1546 };
1547 if was_connected {
1548 counters.disconnect_count.fetch_add(1, Ordering::Relaxed);
1549 counters
1550 .last_disconnect_unix_secs
1551 .store(unix_secs(SystemTime::now()), Ordering::Relaxed);
1552 }
1553 counters
1554 .transient_error_count
1555 .fetch_add(1, Ordering::Relaxed);
1556 debug!(pv = pv_name, "PVA scan pvget failed: {e}");
1557 continue;
1558 }
1559 Err(_) => {
1560 counters
1561 .transient_error_count
1562 .fetch_add(1, Ordering::Relaxed);
1563 debug!(pv = pv_name, "PVA scan pvget timed out");
1564 continue;
1565 }
1566 };
1567
1568 let now = SystemTime::now();
1569 let first_after_connect = {
1570 let mut ci = conn_info.lock().unwrap_or_else(|e| e.into_inner());
1571 let first = ci.last_event_time.is_none();
1572 if ci.connected_since.is_none() {
1573 ci.connected_since = Some(now);
1574 }
1575 ci.is_connected = true;
1576 ci.last_event_time = Some(now);
1577 ci.state = PvConnectionState::Connected;
1578 first
1579 };
1580 counters.events_received.fetch_add(1, Ordering::Relaxed);
1581 if first_after_connect {
1582 counters
1583 .first_event_unix_secs
1584 .compare_exchange(0, unix_secs(now), Ordering::Relaxed, Ordering::Relaxed)
1585 .ok();
1586 }
1587
1588 let Some(value) = pv_field_scalar_to_archiver(&field, &canonical) else {
1589 counters.type_change_drops.fetch_add(1, Ordering::Relaxed);
1590 debug!(
1591 pv = pv_name,
1592 "PVA scan value has no archiver mapping; dropping"
1593 );
1594 continue;
1595 };
1596 let _ = drift_secs; let elem_count = match pv_field_element_count(&field) {
1601 0 => 1,
1602 n => n,
1603 };
1604 refresh_nt_enum_extras(&field, &extras);
1608 for (field_name, path) in &extras_paths {
1609 if let Some(PvField::Scalar(s)) = pv_field_walk_path(&field, path) {
1610 extras.insert(field_name.clone(), scalar_value_to_string(s));
1611 }
1612 }
1613 let mut sample = ArchiverSample::new(now, value);
1614 attach_extras(&extras, &mut sample);
1615 let pv_sample = PvSample {
1616 pv_name: pv_name.clone(),
1617 dbr_type,
1618 sample,
1619 element_count: Some(elem_count),
1620 counters: Some(counters.clone()),
1621 };
1622 if let Err(rejected) = try_send_with_overflow_count(&tx, pv_sample, &counters).await {
1623 let _ = rejected;
1625 return;
1626 }
1627 }
1628}
1629
1630async fn pva_metadata_refresh_loop(
1637 pv_name: String,
1638 pva_client: PvaClient,
1639 registry: Arc<PvRegistry>,
1640 cancel_token: CancellationToken,
1641 counters: Arc<PvCounters>,
1642) {
1643 const REFRESH_INTERVAL: Duration = Duration::from_secs(5 * 60);
1648 const FETCH_TIMEOUT: Duration = Duration::from_secs(15);
1649 let mut tick = tokio::time::interval(REFRESH_INTERVAL);
1650 tick.tick().await;
1653 loop {
1654 tokio::select! {
1655 _ = cancel_token.cancelled() => return,
1656 _ = tick.tick() => {}
1657 }
1658
1659 let res = tokio::time::timeout(FETCH_TIMEOUT, pva_client.pvget(&pv_name)).await;
1660 let field = match res {
1661 Ok(Ok(f)) => f,
1662 _ => {
1663 counters
1664 .metadata_fetch_failures
1665 .fetch_add(1, Ordering::Relaxed);
1666 continue;
1667 }
1668 };
1669 let (new_prec, new_egu) = pv_field_extract_display(&field);
1670 let stored = match registry.get_pv(&pv_name) {
1671 Ok(Some(r)) => r,
1672 _ => continue,
1673 };
1674 let prec_changed = match (stored.prec.as_deref(), new_prec.as_deref()) {
1675 (Some(s), Some(n)) => s != n,
1676 (None, Some(_)) => true,
1677 _ => false,
1678 };
1679 let egu_changed = match (stored.egu.as_deref(), new_egu.as_deref()) {
1680 (Some(s), Some(n)) => s != n,
1681 (None, Some(_)) => true,
1682 _ => false,
1683 };
1684 if !prec_changed && !egu_changed {
1685 continue;
1686 }
1687 let prec_arg = if prec_changed {
1688 new_prec.as_deref()
1689 } else {
1690 None
1691 };
1692 let egu_arg = if egu_changed {
1693 new_egu.as_deref()
1694 } else {
1695 None
1696 };
1697 if let Err(e) = registry.update_metadata(&pv_name, prec_arg, egu_arg) {
1698 warn!(pv = pv_name, "Failed to persist PVA PREC/EGU: {e}");
1699 } else {
1700 debug!(pv = pv_name, prec = ?prec_arg, egu = ?egu_arg, "Refreshed PVA display");
1701 }
1702 }
1703}
1704
1705async fn pva_state_watchdog(
1713 pv_name: String,
1714 conn_info: Arc<Mutex<ConnectionInfo>>,
1715 counters: Arc<PvCounters>,
1716 cancel_token: CancellationToken,
1717 sample_mode: SampleMode,
1718) {
1719 const POLL_INTERVAL: Duration = Duration::from_secs(5);
1720 let stale_threshold = match sample_mode {
1726 SampleMode::Monitor => Duration::from_secs(60),
1727 SampleMode::Scan { period_secs } => Duration::from_secs_f64((period_secs * 3.0).max(60.0)),
1728 };
1729 let mut interval = tokio::time::interval(POLL_INTERVAL);
1730 interval.tick().await;
1731 loop {
1732 tokio::select! {
1733 _ = cancel_token.cancelled() => return,
1734 _ = interval.tick() => {}
1735 }
1736 let stale_now = {
1737 let ci = conn_info.lock().unwrap_or_else(|e| e.into_inner());
1738 if !ci.is_connected {
1739 continue;
1740 }
1741 match ci.last_event_time {
1742 Some(t) => SystemTime::now()
1743 .duration_since(t)
1744 .map(|d| d > stale_threshold)
1745 .unwrap_or(false),
1746 None => false,
1747 }
1748 };
1749 if stale_now {
1750 let was_connected = {
1751 let mut ci = conn_info.lock().unwrap_or_else(|e| e.into_inner());
1752 let prev = ci.is_connected;
1753 ci.is_connected = false;
1754 ci.state = PvConnectionState::Disconnected;
1755 prev
1756 };
1757 if was_connected {
1758 counters.disconnect_count.fetch_add(1, Ordering::Relaxed);
1759 counters
1760 .last_disconnect_unix_secs
1761 .store(unix_secs(SystemTime::now()), Ordering::Relaxed);
1762 debug!(
1763 pv = pv_name,
1764 "PVA watchdog: marking disconnected after stale heartbeat"
1765 );
1766 }
1767 }
1768 }
1769}
1770
1771#[allow(clippy::too_many_arguments)]
1773async fn monitor_loop(
1774 pv_name: String,
1775 dbr_type: ArchDbType,
1776 element_count: i32,
1777 channel: CaChannel,
1778 tx: mpsc::Sender<PvSample>,
1779 cancel_token: CancellationToken,
1780 conn_info: Arc<Mutex<ConnectionInfo>>,
1781 registry: Arc<PvRegistry>,
1782 extras: Arc<ExtraFieldsCache>,
1783 counters: Arc<PvCounters>,
1784 server_ioc_drift_secs: u64,
1785) {
1786 loop {
1787 tokio::select! {
1789 _ = cancel_token.cancelled() => return,
1790 result = channel.wait_connected(CA_RECONNECT_TIMEOUT) => {
1791 if result.is_err() {
1792 let was_connected = {
1793 let mut ci = conn_info.lock().unwrap_or_else(|e| e.into_inner());
1794 let prev_connected = ci.is_connected;
1795 ci.is_connected = false;
1796 ci.last_event_time = None;
1797 ci.state = match ci.state {
1800 PvConnectionState::Connected => PvConnectionState::Disconnected,
1801 PvConnectionState::Disconnected => PvConnectionState::Disconnected,
1802 _ => PvConnectionState::Connecting,
1803 };
1804 prev_connected
1805 };
1806 if was_connected {
1813 counters.disconnect_count.fetch_add(1, Ordering::Relaxed);
1814 counters
1815 .last_disconnect_unix_secs
1816 .store(unix_secs(SystemTime::now()), Ordering::Relaxed);
1817 }
1818 let mut conn_rx = channel.connection_events();
1823
1824 if channel
1829 .wait_connected(Duration::from_millis(100))
1830 .await
1831 .is_err()
1832 {
1833 loop {
1834 tokio::select! {
1835 _ = cancel_token.cancelled() => return,
1836 event = conn_rx.recv() => {
1837 use tokio::sync::broadcast::error::RecvError;
1838 match event {
1839 Ok(ConnectionEvent::Connected) => break,
1840 Ok(_) => continue,
1841 Err(RecvError::Lagged(_)) => {
1845 if channel
1846 .wait_connected(Duration::from_millis(100))
1847 .await
1848 .is_ok()
1849 {
1850 break;
1851 }
1852 continue;
1853 }
1854 Err(RecvError::Closed) => return,
1855 }
1856 }
1857 }
1858 }
1859 }
1860 }
1861 }
1862 }
1863
1864 {
1872 let channel = channel.clone();
1873 let registry = registry.clone();
1874 let counters = counters.clone();
1875 let pv_name = pv_name.clone();
1876 let cancel = cancel_token.clone();
1877 tokio::spawn(async move {
1878 tokio::select! {
1879 _ = cancel.cancelled() => {}
1880 _ = refresh_ctrl_metadata(&channel, ®istry, &pv_name, &counters) => {}
1881 }
1882 });
1883 }
1884
1885 let mut monitor = match channel.subscribe().await {
1887 Ok(m) => m,
1888 Err(e) => {
1889 counters
1890 .transient_error_count
1891 .fetch_add(1, Ordering::Relaxed);
1892 warn!(pv = pv_name, "Subscribe failed: {e}, retrying...");
1893 tokio::select! {
1894 _ = cancel_token.cancelled() => return,
1895 _ = tokio::time::sleep(CA_RETRY_DELAY) => continue,
1896 }
1897 }
1898 };
1899
1900 debug!(pv = pv_name, "Monitor subscription active");
1901
1902 loop {
1904 tokio::select! {
1905 _ = cancel_token.cancelled() => return,
1906 result = monitor.recv() => {
1907 match result {
1908 Some(Ok(snapshot)) => {
1909 let now = SystemTime::now();
1910 let first_after_connect = {
1921 let mut ci = conn_info.lock().unwrap_or_else(|e| e.into_inner());
1922 let first = ci.last_event_time.is_none();
1923 if ci.connected_since.is_none() {
1924 ci.connected_since = Some(now);
1925 }
1926 ci.is_connected = true;
1927 ci.last_event_time = Some(now);
1928 ci.state = PvConnectionState::Connected;
1929 first
1930 };
1931 if !first_after_connect
1932 && !ioc_timestamp_in_window(
1933 snapshot.timestamp,
1934 now,
1935 server_ioc_drift_secs,
1936 )
1937 {
1938 counters.timestamp_drops.fetch_add(1, Ordering::Relaxed);
1939 debug!(
1940 pv = pv_name,
1941 ?snapshot.timestamp,
1942 "Dropping sample with out-of-window IOC timestamp"
1943 );
1944 continue;
1945 }
1946 counters.events_received.fetch_add(1, Ordering::Relaxed);
1947 let now_secs = unix_secs(now);
1950 let _ = counters.first_event_unix_secs.compare_exchange(
1951 0,
1952 now_secs,
1953 Ordering::Relaxed,
1954 Ordering::Relaxed,
1955 );
1956 let archiver_val = epics_value_to_archiver(&snapshot.value);
1957 let mut sample = ArchiverSample::new(snapshot.timestamp, archiver_val);
1958 attach_extras(&extras, &mut sample);
1959 if first_after_connect {
1960 let lost_secs = counters
1961 .last_disconnect_unix_secs
1962 .load(Ordering::Relaxed);
1963 attach_cnx_lost_headers(&mut sample, lost_secs, now_secs);
1964 }
1965 let pv_sample = PvSample {
1966 pv_name: pv_name.clone(),
1967 dbr_type,
1968 sample,
1969 element_count: Some(element_count),
1970 counters: Some(counters.clone()),
1971 };
1972 if let Err(pv_sample) = try_send_with_overflow_count(
1973 &tx,
1974 pv_sample,
1975 &counters,
1976 )
1977 .await
1978 {
1979 let _ = pv_sample;
1980 return; }
1982 }
1983 Some(Err(e)) => {
1984 counters.transient_error_count.fetch_add(1, Ordering::Relaxed);
1985 warn!(pv = pv_name, "Monitor error: {e}");
1986 }
1987 None => break, }
1989 }
1990 }
1991 }
1992
1993 {
2000 let mut ci = conn_info.lock().unwrap_or_else(|e| e.into_inner());
2001 ci.is_connected = false;
2002 ci.last_event_time = None;
2003 ci.state = PvConnectionState::Disconnected;
2004 }
2005 counters.disconnect_count.fetch_add(1, Ordering::Relaxed);
2006 counters
2007 .last_disconnect_unix_secs
2008 .store(unix_secs(SystemTime::now()), Ordering::Relaxed);
2009 debug!(pv = pv_name, "Monitor ended, waiting for reconnection");
2010 }
2011}
2012
2013fn unix_secs(t: SystemTime) -> i64 {
2014 t.duration_since(SystemTime::UNIX_EPOCH)
2015 .map(|d| d.as_secs() as i64)
2016 .unwrap_or(0)
2017}
2018
2019async fn try_send_with_overflow_count(
2023 tx: &mpsc::Sender<PvSample>,
2024 pv_sample: PvSample,
2025 counters: &PvCounters,
2026) -> Result<(), PvSample> {
2027 match tx.try_send(pv_sample) {
2028 Ok(()) => Ok(()),
2029 Err(tokio::sync::mpsc::error::TrySendError::Full(pv_sample)) => {
2030 counters
2031 .buffer_overflow_drops
2032 .fetch_add(1, Ordering::Relaxed);
2033 tx.send(pv_sample).await.map_err(|e| e.0)
2037 }
2038 Err(tokio::sync::mpsc::error::TrySendError::Closed(pv_sample)) => Err(pv_sample),
2039 }
2040}
2041
2042#[allow(clippy::too_many_arguments)]
2044async fn scan_loop(
2045 pv_name: String,
2046 dbr_type: ArchDbType,
2047 element_count: i32,
2048 channel: CaChannel,
2049 tx: mpsc::Sender<PvSample>,
2050 cancel_token: CancellationToken,
2051 period_secs: f64,
2052 conn_info: Arc<Mutex<ConnectionInfo>>,
2053 registry: Arc<PvRegistry>,
2054 extras: Arc<ExtraFieldsCache>,
2055 counters: Arc<PvCounters>,
2056) {
2057 let period = Duration::from_secs_f64(period_secs);
2058 let mut interval = tokio::time::interval(period);
2059 let mut metadata_done = false;
2062
2063 loop {
2064 tokio::select! {
2065 _ = cancel_token.cancelled() => return,
2066 _ = interval.tick() => {}
2067 }
2068
2069 if channel.wait_connected(CA_RETRY_DELAY).await.is_err() {
2070 let was_connected = {
2071 let mut ci = conn_info.lock().unwrap_or_else(|e| e.into_inner());
2072 let prev = ci.is_connected;
2073 ci.is_connected = false;
2074 ci.last_event_time = None;
2075 ci.state = match ci.state {
2076 PvConnectionState::Connected => PvConnectionState::Disconnected,
2077 PvConnectionState::Disconnected => PvConnectionState::Disconnected,
2078 _ => PvConnectionState::Connecting,
2079 };
2080 prev
2081 };
2082 if was_connected {
2083 counters.disconnect_count.fetch_add(1, Ordering::Relaxed);
2084 counters
2085 .last_disconnect_unix_secs
2086 .store(unix_secs(SystemTime::now()), Ordering::Relaxed);
2087 }
2088 metadata_done = false;
2089 continue;
2090 }
2091
2092 if !metadata_done {
2093 let channel = channel.clone();
2100 let registry = registry.clone();
2101 let counters = counters.clone();
2102 let pv_name = pv_name.clone();
2103 let cancel = cancel_token.clone();
2104 tokio::spawn(async move {
2105 tokio::select! {
2106 _ = cancel.cancelled() => {}
2107 _ = refresh_ctrl_metadata(&channel, ®istry, &pv_name, &counters) => {}
2108 }
2109 });
2110 metadata_done = true;
2111 }
2112
2113 match channel.get().await {
2114 Ok((_dbr_type, epics_val)) => {
2115 let now = SystemTime::now();
2116 let first_after_connect = {
2117 let mut ci = conn_info.lock().unwrap_or_else(|e| e.into_inner());
2118 let first = ci.last_event_time.is_none();
2119 if ci.connected_since.is_none() {
2120 ci.connected_since = Some(now);
2121 }
2122 ci.is_connected = true;
2123 ci.last_event_time = Some(now);
2124 ci.state = PvConnectionState::Connected;
2125 first
2126 };
2127 counters.events_received.fetch_add(1, Ordering::Relaxed);
2128 let now_secs = unix_secs(now);
2129 let _ = counters.first_event_unix_secs.compare_exchange(
2130 0,
2131 now_secs,
2132 Ordering::Relaxed,
2133 Ordering::Relaxed,
2134 );
2135 let archiver_val = epics_value_to_archiver(&epics_val);
2136 let mut sample = ArchiverSample::new(now, archiver_val);
2137 attach_extras(&extras, &mut sample);
2138 if first_after_connect {
2139 let lost_secs = counters.last_disconnect_unix_secs.load(Ordering::Relaxed);
2140 attach_cnx_lost_headers(&mut sample, lost_secs, now_secs);
2141 }
2142 let pv_sample = PvSample {
2143 pv_name: pv_name.clone(),
2144 dbr_type,
2145 sample,
2146 element_count: Some(element_count),
2147 counters: Some(counters.clone()),
2148 };
2149 if try_send_with_overflow_count(&tx, pv_sample, &counters)
2150 .await
2151 .is_err()
2152 {
2153 return;
2154 }
2155 }
2156 Err(e) => {
2157 counters
2158 .transient_error_count
2159 .fetch_add(1, Ordering::Relaxed);
2160 debug!(pv = pv_name, "Scan read error: {e}");
2161 }
2162 }
2163 }
2164}
2165
2166fn attach_cnx_lost_headers(sample: &mut ArchiverSample, lost_secs: i64, now_secs: i64) {
2172 sample
2173 .field_values
2174 .push(("cnxlostepsecs".to_string(), lost_secs.to_string()));
2175 sample
2176 .field_values
2177 .push(("cnxregainedepsecs".to_string(), now_secs.to_string()));
2178 sample
2179 .field_values
2180 .push(("startup".to_string(), "true".to_string()));
2181}
2182
2183fn attach_extras(extras: &ExtraFieldsCache, sample: &mut ArchiverSample) {
2187 if extras.is_empty() {
2188 return;
2189 }
2190 let mut entries: Vec<(String, String)> = extras
2191 .iter()
2192 .map(|e| (e.key().clone(), e.value().clone()))
2193 .collect();
2194 entries.sort_by(|a, b| a.0.cmp(&b.0));
2195 sample.field_values = entries;
2196}
2197
2198fn epics_value_to_field_string(val: &EpicsValue) -> String {
2203 match val {
2204 EpicsValue::String(s) => s.clone(),
2205 EpicsValue::Short(v) => v.to_string(),
2206 EpicsValue::Float(v) => v.to_string(),
2207 EpicsValue::Enum(v) => v.to_string(),
2208 EpicsValue::Char(v) => v.to_string(),
2209 EpicsValue::Long(v) => v.to_string(),
2210 EpicsValue::Int64(v) => v.to_string(),
2211 EpicsValue::Double(v) => v.to_string(),
2212 EpicsValue::ShortArray(v) => format!("{v:?}"),
2213 EpicsValue::FloatArray(v) => format!("{v:?}"),
2214 EpicsValue::EnumArray(v) => format!("{v:?}"),
2215 EpicsValue::DoubleArray(v) => format!("{v:?}"),
2216 EpicsValue::LongArray(v) => format!("{v:?}"),
2217 EpicsValue::Int64Array(v) => format!("{v:?}"),
2218 EpicsValue::CharArray(v) => String::from_utf8_lossy(v).into_owned(),
2219 EpicsValue::StringArray(v) => format!("{v:?}"),
2220 }
2221}
2222
2223fn spawn_extra_field_monitor(
2227 ca_client: &CaClient,
2228 pv_name: &str,
2229 field: &str,
2230 extras: Arc<ExtraFieldsCache>,
2231 parent_token: CancellationToken,
2232 counters: Arc<PvCounters>,
2233) {
2234 let full_name = format!("{pv_name}.{field}");
2235 let channel = ca_client.create_channel(&full_name);
2236 let field_owned = field.to_string();
2237 let pv_owned = pv_name.to_string();
2238
2239 let panic_pv = pv_owned.clone();
2245 let panic_field = field_owned.clone();
2246 tokio::spawn(async move {
2247 let body = std::panic::AssertUnwindSafe(extra_field_monitor_body(
2248 channel,
2249 pv_owned,
2250 field_owned,
2251 extras,
2252 parent_token,
2253 counters,
2254 ));
2255 if let Err(payload) = futures::FutureExt::catch_unwind(body).await {
2256 let msg = panic_payload_msg(&payload);
2257 error!(
2258 pv = panic_pv,
2259 field = panic_field,
2260 "Extra-field monitor panicked: {msg}"
2261 );
2262 }
2263 });
2264}
2265
2266async fn extra_field_monitor_body(
2269 channel: CaChannel,
2270 pv_owned: String,
2271 field_owned: String,
2272 extras: Arc<ExtraFieldsCache>,
2273 parent_token: CancellationToken,
2274 counters: Arc<PvCounters>,
2275) {
2276 if channel.wait_connected(CA_CONNECT_TIMEOUT).await.is_err() {
2279 debug!(
2280 pv = pv_owned,
2281 field = field_owned,
2282 "Extra-field channel did not connect within timeout (will keep retrying via subscribe)"
2283 );
2284 }
2285
2286 let mut backoff = CA_RETRY_DELAY;
2292 let max_backoff = Duration::from_secs(60);
2293 let mut warned_at_cap = false;
2294
2295 loop {
2296 tokio::select! {
2298 _ = parent_token.cancelled() => return,
2299 sub = channel.subscribe() => {
2300 let mut monitor = match sub {
2301 Ok(m) => m,
2302 Err(e) => {
2303 counters.transient_error_count.fetch_add(1, Ordering::Relaxed);
2307 debug!(
2308 pv = pv_owned,
2309 field = field_owned,
2310 ?backoff,
2311 "Extra-field subscribe failed: {e}; retrying"
2312 );
2313 if backoff >= max_backoff && !warned_at_cap {
2314 warn!(
2315 pv = pv_owned,
2316 field = field_owned,
2317 "Extra-field repeatedly fails to subscribe; \
2318 check archive_fields config (now retrying every 60s)"
2319 );
2320 warned_at_cap = true;
2321 }
2322 let sleep_for = backoff;
2323 backoff = (backoff * 2).min(max_backoff);
2324 tokio::select! {
2325 _ = parent_token.cancelled() => return,
2326 _ = tokio::time::sleep(sleep_for) => continue,
2327 }
2328 }
2329 };
2330 backoff = CA_RETRY_DELAY;
2333 warned_at_cap = false;
2334 loop {
2335 tokio::select! {
2336 _ = parent_token.cancelled() => return,
2337 ev = monitor.recv() => match ev {
2338 Some(Ok(snapshot)) => {
2339 extras.insert(
2340 field_owned.clone(),
2341 epics_value_to_field_string(&snapshot.value),
2342 );
2343 }
2344 Some(Err(e)) => {
2345 counters.transient_error_count.fetch_add(1, Ordering::Relaxed);
2346 debug!(
2347 pv = pv_owned,
2348 field = field_owned,
2349 "Extra-field monitor error: {e}"
2350 );
2351 }
2352 None => break, }
2354 }
2355 }
2356 }
2357 }
2358 }
2359}
2360
2361fn panic_payload_msg(payload: &Box<dyn std::any::Any + Send>) -> String {
2364 if let Some(s) = payload.downcast_ref::<&'static str>() {
2365 (*s).to_string()
2366 } else if let Some(s) = payload.downcast_ref::<String>() {
2367 s.clone()
2368 } else {
2369 "<non-string panic payload>".to_string()
2370 }
2371}
2372
2373fn pv_value_field(field: &PvField) -> &PvField {
2377 match field {
2378 PvField::Structure(s) => s.get_field("value").unwrap_or(field),
2379 _ => field,
2380 }
2381}
2382
2383fn scalar_value_to_archiver(s: &ScalarValue) -> ArchiverValue {
2385 match s {
2386 ScalarValue::Boolean(v) => ArchiverValue::ScalarEnum(*v as i32),
2387 ScalarValue::Byte(v) => ArchiverValue::ScalarByte(vec![*v as u8]),
2388 ScalarValue::UByte(v) => ArchiverValue::ScalarByte(vec![*v]),
2389 ScalarValue::Short(v) => ArchiverValue::ScalarShort(*v as i32),
2390 ScalarValue::UShort(v) => ArchiverValue::ScalarShort(*v as i32),
2391 ScalarValue::Int(v) => ArchiverValue::ScalarInt(*v),
2392 ScalarValue::UInt(v) => ArchiverValue::ScalarInt(*v as i32),
2393 ScalarValue::Long(v) => ArchiverValue::ScalarInt(*v as i32),
2394 ScalarValue::ULong(v) => ArchiverValue::ScalarInt(*v as i32),
2395 ScalarValue::Float(v) => ArchiverValue::ScalarFloat(*v),
2396 ScalarValue::Double(v) => ArchiverValue::ScalarDouble(*v),
2397 ScalarValue::String(s) => ArchiverValue::ScalarString(s.clone()),
2398 }
2399}
2400
2401fn pv_field_scalar_to_archiver(
2415 field: &PvField,
2416 canonical_desc: &epics_rs::pva::pvdata::FieldDesc,
2417) -> Option<ArchiverValue> {
2418 if let Some((index, _)) = nt_enum_parts(field) {
2423 return Some(ArchiverValue::ScalarEnum(index));
2424 }
2425 match pv_value_field(field) {
2426 PvField::Scalar(s) => Some(scalar_value_to_archiver(s)),
2427 PvField::ScalarArrayTyped(arr) => Some(typed_scalar_array_to_archiver(arr)),
2428 PvField::ScalarArray(items) => {
2429 let st = items.first()?.scalar_type();
2433 let typed = TypedScalarArray::from_scalar_values(items, st)?;
2434 Some(typed_scalar_array_to_archiver(&typed))
2435 }
2436 _ => Some(ArchiverValue::V4GenericBytes(
2441 encode_pv_field_self_describing(field, canonical_desc),
2442 )),
2443 }
2444}
2445
2446fn pv_field_to_arch_db_type(field: &PvField) -> Option<(ArchDbType, i32)> {
2456 if nt_enum_parts(field).is_some() {
2459 return Some((ArchDbType::ScalarEnum, 1));
2460 }
2461 let value = pv_value_field(field);
2462 Some(match value {
2463 PvField::Scalar(s) => (scalar_value_to_arch_db_type(s), 1),
2464 PvField::ScalarArray(arr) => {
2465 let elem = arr.first()?;
2466 (
2467 scalar_type_to_waveform(elem.scalar_type()),
2468 arr.len() as i32,
2469 )
2470 }
2471 PvField::ScalarArrayTyped(arr) => {
2472 (scalar_type_to_waveform(arr.scalar_type()), arr.len() as i32)
2473 }
2474 _ => (ArchDbType::V4GenericBytes, 1),
2475 })
2476}
2477
2478fn scalar_value_to_arch_db_type(s: &ScalarValue) -> ArchDbType {
2480 match s {
2481 ScalarValue::Boolean(_) => ArchDbType::ScalarEnum,
2482 ScalarValue::Byte(_) | ScalarValue::UByte(_) => ArchDbType::ScalarByte,
2483 ScalarValue::Short(_) | ScalarValue::UShort(_) => ArchDbType::ScalarShort,
2484 ScalarValue::Int(_)
2485 | ScalarValue::UInt(_)
2486 | ScalarValue::Long(_)
2487 | ScalarValue::ULong(_) => ArchDbType::ScalarInt,
2488 ScalarValue::Float(_) => ArchDbType::ScalarFloat,
2489 ScalarValue::Double(_) => ArchDbType::ScalarDouble,
2490 ScalarValue::String(_) => ArchDbType::ScalarString,
2491 }
2492}
2493
2494fn scalar_type_to_waveform(st: ScalarType) -> ArchDbType {
2496 match st {
2497 ScalarType::Boolean => ArchDbType::WaveformEnum,
2498 ScalarType::Byte | ScalarType::UByte => ArchDbType::WaveformByte,
2499 ScalarType::Short | ScalarType::UShort => ArchDbType::WaveformShort,
2500 ScalarType::Int | ScalarType::UInt | ScalarType::Long | ScalarType::ULong => {
2501 ArchDbType::WaveformInt
2502 }
2503 ScalarType::Float => ArchDbType::WaveformFloat,
2504 ScalarType::Double => ArchDbType::WaveformDouble,
2505 ScalarType::String => ArchDbType::WaveformString,
2506 }
2507}
2508
2509fn typed_scalar_array_to_archiver(arr: &TypedScalarArray) -> ArchiverValue {
2515 match arr {
2516 TypedScalarArray::Boolean(a) => {
2517 ArchiverValue::VectorEnum(a.iter().map(|&b| b as i32).collect())
2518 }
2519 TypedScalarArray::Byte(a) => {
2520 ArchiverValue::VectorChar(a.iter().map(|&v| v as u8).collect())
2521 }
2522 TypedScalarArray::UByte(a) => ArchiverValue::VectorChar(a.to_vec()),
2523 TypedScalarArray::Short(a) => {
2524 ArchiverValue::VectorShort(a.iter().map(|&v| v as i32).collect())
2525 }
2526 TypedScalarArray::UShort(a) => {
2527 ArchiverValue::VectorShort(a.iter().map(|&v| v as i32).collect())
2528 }
2529 TypedScalarArray::Int(a) => ArchiverValue::VectorInt(a.to_vec()),
2530 TypedScalarArray::UInt(a) => {
2531 ArchiverValue::VectorInt(a.iter().map(|&v| v as i32).collect())
2532 }
2533 TypedScalarArray::Long(a) => {
2534 ArchiverValue::VectorInt(a.iter().map(|&v| v as i32).collect())
2535 }
2536 TypedScalarArray::ULong(a) => {
2537 ArchiverValue::VectorInt(a.iter().map(|&v| v as i32).collect())
2538 }
2539 TypedScalarArray::Float(a) => ArchiverValue::VectorFloat(a.to_vec()),
2540 TypedScalarArray::Double(a) => ArchiverValue::VectorDouble(a.to_vec()),
2541 TypedScalarArray::String(a) => ArchiverValue::VectorString(a.to_vec()),
2542 }
2543}
2544
2545fn nt_enum_parts(field: &PvField) -> Option<(i32, Option<Vec<String>>)> {
2556 let PvField::Structure(root) = field else {
2557 return None;
2558 };
2559 if !root.struct_id.starts_with("epics:nt/NTEnum") {
2560 return None;
2561 }
2562 let PvField::Structure(inner) = root.get_field("value")? else {
2563 return None;
2564 };
2565 let index = match inner.get_field("index")? {
2566 PvField::Scalar(ScalarValue::Int(v)) => *v,
2567 PvField::Scalar(ScalarValue::Long(v)) => *v as i32,
2568 PvField::Scalar(ScalarValue::Short(v)) => *v as i32,
2569 PvField::Scalar(ScalarValue::UInt(v)) => *v as i32,
2570 PvField::Scalar(ScalarValue::ULong(v)) => *v as i32,
2571 PvField::Scalar(ScalarValue::UShort(v)) => *v as i32,
2572 _ => return None,
2573 };
2574 let choices = match inner.get_field("choices") {
2575 Some(PvField::ScalarArrayTyped(TypedScalarArray::String(arr))) => Some(arr.to_vec()),
2576 Some(PvField::ScalarArray(items)) => {
2577 let mut out = Vec::with_capacity(items.len());
2578 for s in items {
2579 if let ScalarValue::String(c) = s {
2580 out.push(c.clone());
2581 } else {
2582 return None; }
2584 }
2585 Some(out)
2586 }
2587 Some(_) => return None,
2588 None => None,
2589 };
2590 Some((index, choices))
2591}
2592
2593fn nt_enum_choices_to_extras(choices: &[String]) -> String {
2600 let mut out = String::with_capacity(2 + choices.iter().map(|s| s.len() + 3).sum::<usize>());
2601 out.push('[');
2602 for (i, c) in choices.iter().enumerate() {
2603 if i > 0 {
2604 out.push(',');
2605 }
2606 out.push('"');
2607 for ch in c.chars() {
2608 match ch {
2609 '"' => out.push_str("\\\""),
2610 '\\' => out.push_str("\\\\"),
2611 '\n' => out.push_str("\\n"),
2612 '\r' => out.push_str("\\r"),
2613 '\t' => out.push_str("\\t"),
2614 c if (c as u32) < 0x20 => {
2615 use std::fmt::Write;
2616 let _ = write!(out, "\\u{:04x}", c as u32);
2617 }
2618 c => out.push(c),
2619 }
2620 }
2621 out.push('"');
2622 }
2623 out.push(']');
2624 out
2625}
2626
2627fn refresh_nt_enum_extras(field: &PvField, extras: &ExtraFieldsCache) {
2633 if let Some((_, Some(choices))) = nt_enum_parts(field) {
2634 extras.insert("enum_strs".to_string(), nt_enum_choices_to_extras(&choices));
2635 }
2636}
2637
2638fn encode_pv_field_self_describing(
2655 field: &PvField,
2656 canonical_desc: &epics_rs::pva::pvdata::FieldDesc,
2657) -> Vec<u8> {
2658 let mut out = Vec::with_capacity(256);
2659 encode_type_desc(canonical_desc, ByteOrder::Big, &mut out);
2660 encode_pv_field(field, canonical_desc, ByteOrder::Big, &mut out);
2661 out
2662}
2663
2664fn ca_archive_field_to_pva_path(field: &str) -> Option<&'static str> {
2669 Some(match field {
2670 "EGU" => "display.units",
2672 "PREC" => "display.precision",
2673 "DESC" => "display.description",
2674 "HOPR" => "display.limitHigh",
2675 "LOPR" => "display.limitLow",
2676 "HIHI" => "valueAlarm.highAlarmLimit",
2678 "LOLO" => "valueAlarm.lowAlarmLimit",
2679 "HIGH" => "valueAlarm.highWarningLimit",
2680 "LOW" => "valueAlarm.lowWarningLimit",
2681 "HHSV" => "valueAlarm.highAlarmSeverity",
2682 "LLSV" => "valueAlarm.lowAlarmSeverity",
2683 "HSV" => "valueAlarm.highWarningSeverity",
2684 "LSV" => "valueAlarm.lowWarningSeverity",
2685 "HYST" => "valueAlarm.hysteresis",
2686 "DRVH" => "control.limitHigh",
2688 "DRVL" => "control.limitLow",
2689 _ => return None,
2690 })
2691}
2692
2693fn pv_field_walk_path<'a>(root: &'a PvField, path: &str) -> Option<&'a PvField> {
2697 let mut current = root;
2698 for segment in path.split('.') {
2699 let PvField::Structure(s) = current else {
2700 return None;
2701 };
2702 current = s.get_field(segment)?;
2703 }
2704 Some(current)
2705}
2706
2707fn scalar_value_to_string(s: &ScalarValue) -> String {
2711 match s {
2712 ScalarValue::Boolean(v) => v.to_string(),
2713 ScalarValue::Byte(v) => v.to_string(),
2714 ScalarValue::Short(v) => v.to_string(),
2715 ScalarValue::Int(v) => v.to_string(),
2716 ScalarValue::Long(v) => v.to_string(),
2717 ScalarValue::UByte(v) => v.to_string(),
2718 ScalarValue::UShort(v) => v.to_string(),
2719 ScalarValue::UInt(v) => v.to_string(),
2720 ScalarValue::ULong(v) => v.to_string(),
2721 ScalarValue::Float(v) => v.to_string(),
2722 ScalarValue::Double(v) => v.to_string(),
2723 ScalarValue::String(s) => s.clone(),
2724 }
2725}
2726
2727fn pv_field_element_count(field: &PvField) -> i32 {
2730 match pv_value_field(field) {
2731 PvField::Scalar(_) => 1,
2732 PvField::ScalarArray(arr) => arr.len() as i32,
2733 PvField::ScalarArrayTyped(t) => t.len() as i32,
2734 _ => 0,
2735 }
2736}
2737
2738fn pv_field_extract_display(field: &PvField) -> (Option<String>, Option<String>) {
2744 let PvField::Structure(s) = field else {
2745 return (None, None);
2746 };
2747 let Some(PvField::Structure(disp)) = s.get_field("display") else {
2748 return (None, None);
2749 };
2750 let prec = match disp.get_field("precision") {
2751 Some(PvField::Scalar(ScalarValue::Int(p))) if *p >= 0 => Some(p.to_string()),
2752 Some(PvField::Scalar(ScalarValue::Short(p))) if *p >= 0 => Some(p.to_string()),
2753 Some(PvField::Scalar(ScalarValue::Long(p))) if *p >= 0 => Some(p.to_string()),
2754 _ => None,
2755 };
2756 let egu = match disp.get_field("units") {
2757 Some(PvField::Scalar(ScalarValue::String(u))) => {
2758 let t = u.trim();
2759 if t.is_empty() {
2760 None
2761 } else {
2762 Some(t.to_string())
2763 }
2764 }
2765 _ => None,
2766 };
2767 (prec, egu)
2768}
2769
2770fn pv_field_extract_timestamp(field: &PvField) -> SystemTime {
2774 let PvField::Structure(s) = field else {
2775 return SystemTime::now();
2776 };
2777 let Some(PvField::Structure(ts)) = s.get_field("timeStamp") else {
2778 return SystemTime::now();
2779 };
2780 let secs = match ts.get_field("secondsPastEpoch") {
2781 Some(PvField::Scalar(ScalarValue::Long(v))) => *v as u64,
2782 Some(PvField::Scalar(ScalarValue::ULong(v))) => *v,
2783 Some(PvField::Scalar(ScalarValue::Int(v))) => *v as u64,
2784 Some(PvField::Scalar(ScalarValue::UInt(v))) => *v as u64,
2785 _ => return SystemTime::now(),
2786 };
2787 let nanos = match ts.get_field("nanoseconds") {
2788 Some(PvField::Scalar(ScalarValue::Int(v))) => *v as u32,
2789 Some(PvField::Scalar(ScalarValue::UInt(v))) => *v,
2790 _ => 0,
2791 };
2792 SystemTime::UNIX_EPOCH + Duration::new(secs, nanos)
2793}
2794
2795fn dbr_field_to_arch_type(field_type: DbFieldType) -> ArchDbType {
2797 match field_type {
2798 DbFieldType::String => ArchDbType::ScalarString,
2799 DbFieldType::Short => ArchDbType::ScalarShort,
2800 DbFieldType::Float => ArchDbType::ScalarFloat,
2801 DbFieldType::Enum => ArchDbType::ScalarEnum,
2802 DbFieldType::Char => ArchDbType::ScalarByte,
2803 DbFieldType::Long => ArchDbType::ScalarInt,
2804 DbFieldType::Int64 => ArchDbType::ScalarInt,
2807 DbFieldType::Double => ArchDbType::ScalarDouble,
2808 }
2809}
2810
2811fn epics_value_to_archiver(val: &EpicsValue) -> ArchiverValue {
2813 match val {
2814 EpicsValue::String(s) => ArchiverValue::ScalarString(s.clone()),
2815 EpicsValue::Short(v) => ArchiverValue::ScalarShort(*v as i32),
2816 EpicsValue::Float(v) => ArchiverValue::ScalarFloat(*v),
2817 EpicsValue::Enum(v) => ArchiverValue::ScalarEnum(*v as i32),
2818 EpicsValue::Char(v) => ArchiverValue::ScalarByte(vec![*v]),
2819 EpicsValue::Long(v) => ArchiverValue::ScalarInt(*v),
2820 EpicsValue::Int64(v) => ArchiverValue::ScalarInt(*v as i32),
2821 EpicsValue::Double(v) => ArchiverValue::ScalarDouble(*v),
2822 EpicsValue::ShortArray(v) => {
2823 ArchiverValue::VectorShort(v.iter().map(|x| *x as i32).collect())
2824 }
2825 EpicsValue::FloatArray(v) => ArchiverValue::VectorFloat(v.clone()),
2826 EpicsValue::EnumArray(v) => {
2827 ArchiverValue::VectorEnum(v.iter().map(|x| *x as i32).collect())
2828 }
2829 EpicsValue::DoubleArray(v) => ArchiverValue::VectorDouble(v.clone()),
2830 EpicsValue::LongArray(v) => ArchiverValue::VectorInt(v.clone()),
2831 EpicsValue::Int64Array(v) => {
2832 ArchiverValue::VectorInt(v.iter().map(|x| *x as i32).collect())
2833 }
2834 EpicsValue::CharArray(v) => ArchiverValue::VectorChar(v.clone()),
2835 EpicsValue::StringArray(v) => ArchiverValue::VectorString(v.clone()),
2836 }
2837}
2838
2839#[derive(Debug, Clone)]
2844pub struct WriteLoopConfig {
2845 pub flush_period: Duration,
2848 pub append_timeout: Duration,
2855 pub flush_timeout: Duration,
2860 pub drain_per_sample_timeout: Duration,
2863 pub drain_total_budget: Duration,
2868 pub shutdown_flush_timeout: Duration,
2870}
2871
2872impl Default for WriteLoopConfig {
2873 fn default() -> Self {
2874 Self {
2875 flush_period: Duration::from_secs(10),
2876 append_timeout: Duration::from_secs(30),
2877 flush_timeout: Duration::from_secs(30),
2878 drain_per_sample_timeout: Duration::from_secs(5),
2879 drain_total_budget: Duration::from_secs(30),
2880 shutdown_flush_timeout: Duration::from_secs(15),
2881 }
2882 }
2883}
2884
2885#[derive(Debug, Clone)]
2896pub struct ShardedWritePoolConfig {
2897 pub shards: usize,
2901 pub per_shard_buffer: usize,
2909 pub write_loop: WriteLoopConfig,
2912}
2913
2914impl Default for ShardedWritePoolConfig {
2915 fn default() -> Self {
2916 Self {
2917 shards: 1,
2918 per_shard_buffer: 4096,
2919 write_loop: WriteLoopConfig::default(),
2920 }
2921 }
2922}
2923
2924use archiver_core::storage::plainpb::shard_for_pv;
2931
2932pub async fn run_sharded_write_pool(
2943 storage: Arc<dyn StoragePlugin>,
2944 registry: Arc<PvRegistry>,
2945 rx: mpsc::Receiver<PvSample>,
2946 shutdown: tokio::sync::watch::Receiver<bool>,
2947 cfg: ShardedWritePoolConfig,
2948) {
2949 let n = cfg.shards.max(1);
2950
2951 let pending = Arc::new(PendingReports::new());
2956
2957 let flush_owner_handle = tokio::spawn(flush_owner_loop(
2960 storage.clone(),
2961 registry.clone(),
2962 pending.clone(),
2963 shutdown.clone(),
2964 cfg.write_loop.clone(),
2965 ));
2966
2967 if n == 1 {
2968 shard_append_loop(
2971 0,
2972 storage.clone(),
2973 rx,
2974 pending.clone(),
2975 shutdown.clone(),
2976 cfg.write_loop.clone(),
2977 )
2978 .await;
2979 } else {
2980 let mut shard_txs = Vec::with_capacity(n);
2982 let mut shard_handles = Vec::with_capacity(n);
2983 for shard_idx in 0..n {
2984 let (s_tx, s_rx) = mpsc::channel::<PvSample>(cfg.per_shard_buffer);
2985 shard_txs.push(s_tx);
2986 let storage = storage.clone();
2987 let pending = pending.clone();
2988 let shard_shutdown = shutdown.clone();
2989 let shard_cfg = cfg.write_loop.clone();
2990 shard_handles.push(tokio::spawn(shard_append_loop(
2991 shard_idx,
2992 storage,
2993 s_rx,
2994 pending,
2995 shard_shutdown,
2996 shard_cfg,
2997 )));
2998 }
2999
3000 dispatch_loop(rx, shard_txs, shutdown.clone()).await;
3001
3002 for h in shard_handles {
3006 let _ = h.await;
3007 }
3008 }
3009
3010 let _ = flush_owner_handle.await;
3014}
3015
3016async fn dispatch_loop(
3028 mut rx: mpsc::Receiver<PvSample>,
3029 shard_txs: Vec<mpsc::Sender<PvSample>>,
3030 mut shutdown: tokio::sync::watch::Receiver<bool>,
3031) {
3032 let n = shard_txs.len();
3033 info!(shards = n, "Sharded write dispatcher started");
3034 loop {
3035 tokio::select! {
3036 biased;
3037 _ = shutdown.changed() => {
3041 if *shutdown.borrow() {
3042 while let Ok(sample) = rx.try_recv() {
3050 let idx = shard_for_pv(&sample.pv_name, n);
3051 match shard_txs[idx].try_send(sample) {
3052 Ok(()) => {}
3053 Err(mpsc::error::TrySendError::Full(s)) => {
3054 if let Some(c) = s.counters.as_ref() {
3055 c.buffer_overflow_drops
3056 .fetch_add(1, Ordering::Relaxed);
3057 }
3058 metrics::counter!(
3059 "archiver_dispatcher_shard_overflow_drops_total",
3060 "shard" => idx.to_string(),
3061 "phase" => "shutdown_drain",
3062 )
3063 .increment(1);
3064 debug!(
3065 shard = idx,
3066 pv = s.pv_name,
3067 "Shutdown-drain shard channel full; sample dropped"
3068 );
3069 }
3070 Err(mpsc::error::TrySendError::Closed(s)) => {
3071 warn!(
3072 shard = idx,
3073 pv = s.pv_name,
3074 "Shutdown-drain shard channel closed; sample dropped"
3075 );
3076 }
3077 }
3078 }
3079 break;
3080 }
3081 }
3082 maybe = rx.recv() => {
3083 match maybe {
3084 Some(sample) => {
3085 let idx = shard_for_pv(&sample.pv_name, n);
3086 match shard_txs[idx].try_send(sample) {
3087 Ok(()) => {}
3088 Err(mpsc::error::TrySendError::Full(s)) => {
3089 if let Some(c) = s.counters.as_ref() {
3094 c.buffer_overflow_drops
3095 .fetch_add(1, Ordering::Relaxed);
3096 }
3097 metrics::counter!(
3098 "archiver_dispatcher_shard_overflow_drops_total",
3099 "shard" => idx.to_string(),
3100 )
3101 .increment(1);
3102 debug!(
3103 shard = idx,
3104 pv = s.pv_name,
3105 "Shard channel full; sample dropped \
3106 (per-shard isolation)"
3107 );
3108 }
3109 Err(mpsc::error::TrySendError::Closed(s)) => {
3110 warn!(
3115 shard = idx,
3116 pv = s.pv_name,
3117 "Shard channel closed; sample dropped"
3118 );
3119 }
3120 }
3121 }
3122 None => break, }
3124 }
3125 }
3126 }
3127 info!("Sharded write dispatcher exiting");
3128}
3129
3130pub async fn write_loop(
3137 storage: Arc<dyn StoragePlugin>,
3138 registry: Arc<PvRegistry>,
3139 rx: mpsc::Receiver<PvSample>,
3140 shutdown: tokio::sync::watch::Receiver<bool>,
3141 flush_period: Duration,
3142) {
3143 let cfg = WriteLoopConfig {
3144 flush_period,
3145 ..Default::default()
3146 };
3147 write_loop_with_config(storage, registry, rx, shutdown, cfg).await
3148}
3149
3150struct FlushInFlightGuard {
3157 flag: Arc<std::sync::atomic::AtomicBool>,
3158}
3159
3160impl Drop for FlushInFlightGuard {
3161 fn drop(&mut self) {
3162 self.flag.store(false, Ordering::Release);
3163 }
3164}
3165
3166async fn run_flush_and_commit(
3193 storage: &Arc<dyn StoragePlugin>,
3194 registry: &Arc<PvRegistry>,
3195 pending: &Arc<PendingReports>,
3196 flush_timeout: Duration,
3197 in_flight: &Arc<std::sync::atomic::AtomicBool>,
3198) -> bool {
3199 if in_flight.load(Ordering::Acquire) {
3200 return false;
3205 }
3206 let snapshot = pending.snapshot();
3213 if snapshot.is_empty() {
3214 return false;
3215 }
3216 in_flight.store(true, Ordering::Release);
3217
3218 let storage_for_flush = storage.clone();
3219 let in_flight_for_task = in_flight.clone();
3220 let flush_join = tokio::task::spawn_blocking(move || {
3221 let _guard = FlushInFlightGuard {
3228 flag: in_flight_for_task,
3229 };
3230 let rt = tokio::runtime::Handle::current();
3231 rt.block_on(storage_for_flush.flush_ingest_writes())
3232 });
3233 match tokio::time::timeout(flush_timeout, flush_join).await {
3234 Ok(Ok(Ok(IngestFlushResult { failed, deferred }))) => {
3235 if !failed.is_empty() {
3257 pending.remove_failed(&failed);
3258 error!(
3259 "STS flush dropped {} PV(s) from timestamp commit \
3260 (bytes never reached disk): {:?}",
3261 failed.len(),
3262 failed
3263 );
3264 }
3265 if !deferred.is_empty() {
3270 debug!(
3271 "STS flush deferred {} PV(s); keeping in pending \
3272 for next cycle: {:?}",
3273 deferred.len(),
3274 deferred
3275 );
3276 }
3277 let failed_set: std::collections::HashSet<&str> =
3281 failed.iter().map(|s| s.as_str()).collect();
3282 let deferred_set: std::collections::HashSet<&str> =
3283 deferred.iter().map(|s| s.as_str()).collect();
3284 let to_commit: Vec<(&str, SystemTime)> = snapshot
3285 .iter()
3286 .filter(|(pv, _)| {
3287 !failed_set.contains(pv.as_str()) && !deferred_set.contains(pv.as_str())
3288 })
3289 .map(|(pv, ts)| (pv.as_str(), *ts))
3290 .collect();
3291 if to_commit.is_empty() {
3292 return true;
3293 }
3294 match registry.batch_update_timestamps(&to_commit) {
3295 Ok(()) => {
3296 let committed_map: std::collections::HashMap<String, SystemTime> = to_commit
3302 .iter()
3303 .map(|(pv, ts)| ((*pv).to_string(), *ts))
3304 .collect();
3305 pending.remove_committed(&committed_map);
3306 }
3307 Err(e) => {
3308 error!(
3313 "Registry timestamp commit failed; \
3314 keeping {} pending for retry: {e}",
3315 snapshot.len()
3316 );
3317 }
3318 }
3319 }
3320 Ok(Ok(Err(e))) => {
3321 error!(
3322 "STS ingest flush errored; deferring all {} \
3323 pending timestamp commits: {e}",
3324 snapshot.len()
3325 );
3326 }
3327 Ok(Err(join_err)) => {
3328 error!(
3329 "STS ingest flush task panicked; deferring all {} \
3330 pending timestamp commits: {join_err}",
3331 snapshot.len()
3332 );
3333 }
3334 Err(_) => {
3335 metrics::counter!("archiver_storage_flush_timeouts_total").increment(1);
3351 let dropped = snapshot.len();
3352 pending.remove_committed(&snapshot);
3353 error!(
3354 "STS ingest flush timed out after {flush_timeout:?}; \
3355 conservatively dropped {dropped} pending timestamp \
3356 commit(s) (task remains on blocking pool; \
3357 timestamps will be rebuilt from subsequent samples)"
3358 );
3359 }
3360 }
3361 true
3362}
3363
3364#[derive(Default)]
3384pub struct PendingReports {
3385 inner: dashmap::DashMap<String, SystemTime>,
3386}
3387
3388impl PendingReports {
3389 pub fn new() -> Self {
3390 Self::default()
3391 }
3392
3393 pub fn report(&self, pv: &str, ts: SystemTime) {
3397 self.inner
3401 .entry(pv.to_string())
3402 .and_modify(|cur| {
3403 if *cur < ts {
3404 *cur = ts;
3405 }
3406 })
3407 .or_insert(ts);
3408 }
3409
3410 pub fn snapshot(&self) -> std::collections::HashMap<String, SystemTime> {
3414 self.inner
3415 .iter()
3416 .map(|kv| (kv.key().clone(), *kv.value()))
3417 .collect()
3418 }
3419
3420 pub fn remove_committed(&self, committed: &std::collections::HashMap<String, SystemTime>) {
3425 for (pv, &committed_ts) in committed {
3426 let _ = self.inner.remove_if(pv, |_, v| *v == committed_ts);
3429 }
3430 }
3431
3432 pub fn remove_failed(&self, failed: &[String]) {
3450 for pv in failed {
3451 let _ = self.inner.remove(pv);
3452 }
3453 }
3454
3455 pub fn is_empty(&self) -> bool {
3456 self.inner.is_empty()
3457 }
3458
3459 pub fn len(&self) -> usize {
3460 self.inner.len()
3461 }
3462}
3463
3464pub async fn write_loop_with_config(
3470 storage: Arc<dyn StoragePlugin>,
3471 registry: Arc<PvRegistry>,
3472 rx: mpsc::Receiver<PvSample>,
3473 shutdown: tokio::sync::watch::Receiver<bool>,
3474 cfg: WriteLoopConfig,
3475) {
3476 let pool_cfg = ShardedWritePoolConfig {
3477 shards: 1,
3478 per_shard_buffer: 4096,
3480 write_loop: cfg,
3481 };
3482 run_sharded_write_pool(storage, registry, rx, shutdown, pool_cfg).await;
3483}
3484
3485async fn shard_append_loop(
3500 shard_idx: usize,
3501 storage: Arc<dyn StoragePlugin>,
3502 mut sample_rx: mpsc::Receiver<PvSample>,
3503 pending: Arc<PendingReports>,
3504 mut shutdown: tokio::sync::watch::Receiver<bool>,
3505 cfg: WriteLoopConfig,
3506) {
3507 let append_timeout = cfg.append_timeout;
3508 info!(shard = shard_idx, "Shard append loop started");
3509 let mut last_ts: std::collections::HashMap<String, SystemTime> =
3513 std::collections::HashMap::new();
3514 let mut last_dbr_type: std::collections::HashMap<String, ArchDbType> =
3515 std::collections::HashMap::new();
3516
3517 loop {
3518 tokio::select! {
3519 Some(pv_sample) = sample_rx.recv() => {
3520 shard_handle_sample(
3521 shard_idx,
3522 &storage,
3523 &pending,
3524 pv_sample,
3525 &mut last_ts,
3526 &mut last_dbr_type,
3527 append_timeout,
3528 )
3529 .await;
3530 }
3531 _ = shutdown.changed() => {
3532 shard_drain_on_shutdown(
3533 shard_idx,
3534 &storage,
3535 &pending,
3536 &mut sample_rx,
3537 &mut last_ts,
3538 &mut last_dbr_type,
3539 &cfg,
3540 )
3541 .await;
3542 break;
3543 }
3544 }
3545 }
3546 info!(shard = shard_idx, "Shard append loop exited");
3547}
3548
3549async fn shard_handle_sample(
3554 shard_idx: usize,
3555 storage: &Arc<dyn StoragePlugin>,
3556 pending: &Arc<PendingReports>,
3557 pv_sample: PvSample,
3558 last_ts: &mut std::collections::HashMap<String, SystemTime>,
3559 last_dbr_type: &mut std::collections::HashMap<String, ArchDbType>,
3560 append_timeout: Duration,
3561) {
3562 let ts = pv_sample.sample.timestamp;
3563
3564 if let Some(prev_ts) = last_ts.get(&pv_sample.pv_name)
3570 && ts < *prev_ts
3571 {
3572 if let Some(ref c) = pv_sample.counters {
3573 c.timestamp_drops.fetch_add(1, Ordering::Relaxed);
3574 }
3575 debug!(
3576 shard = shard_idx,
3577 pv = pv_sample.pv_name,
3578 ?ts,
3579 ?prev_ts,
3580 "Dropping out-of-order sample"
3581 );
3582 return;
3583 }
3584
3585 let prev_type = last_dbr_type.insert(pv_sample.pv_name.clone(), pv_sample.dbr_type);
3589 if let Some(prev) = prev_type
3590 && prev != pv_sample.dbr_type
3591 {
3592 if let Some(ref c) = pv_sample.counters {
3593 c.type_change_drops.fetch_add(1, Ordering::Relaxed);
3594 c.latest_observed_dbr
3598 .store(pv_sample.dbr_type as i32, Ordering::Relaxed);
3599 }
3600 debug!(
3601 shard = shard_idx,
3602 pv = pv_sample.pv_name,
3603 ?prev,
3604 new = ?pv_sample.dbr_type,
3605 "Dropping type-changed sample"
3606 );
3607 last_dbr_type.insert(pv_sample.pv_name.clone(), prev);
3610 return;
3611 }
3612
3613 let meta = AppendMeta {
3614 element_count: pv_sample.element_count,
3615 ..Default::default()
3616 };
3617 let pv_name_for_post = pv_sample.pv_name.clone();
3618 let counters_for_post = pv_sample.counters.clone();
3619 let counters_in_task = pv_sample.counters.clone();
3620 let storage_for_task = storage.clone();
3621 let pending_for_task = pending.clone();
3622
3623 let join = tokio::task::spawn_blocking(move || {
3640 let rt = tokio::runtime::Handle::current();
3641 let res = rt.block_on(storage_for_task.append_event_with_meta(
3642 &pv_sample.pv_name,
3643 pv_sample.dbr_type,
3644 &pv_sample.sample,
3645 &meta,
3646 ));
3647 if res.is_ok() {
3648 metrics::counter!("archiver_events_stored_total").increment(1);
3649 if let Some(c) = counters_in_task.as_ref() {
3650 c.events_stored.fetch_add(1, Ordering::Relaxed);
3651 }
3652 pending_for_task.report(&pv_sample.pv_name, ts);
3659 }
3660 res
3661 });
3662 let res = tokio::time::timeout(append_timeout, join).await;
3663 last_ts.insert(pv_name_for_post.clone(), ts);
3683 match res {
3684 Ok(Ok(Ok(()))) => {
3685 }
3687 Ok(Ok(Err(e))) => {
3688 error!(shard = shard_idx, pv = pv_name_for_post, "Write error: {e}");
3689 }
3690 Ok(Err(join_err)) => {
3691 error!(
3692 shard = shard_idx,
3693 pv = pv_name_for_post,
3694 "Storage write task panicked: {join_err}"
3695 );
3696 }
3697 Err(_) => {
3698 error!(
3704 shard = shard_idx,
3705 pv = pv_name_for_post,
3706 "Storage append timed out after {append_timeout:?}; \
3707 shard abandoning (task remains on blocking pool, \
3708 may still succeed late)"
3709 );
3710 if let Some(ref c) = counters_for_post {
3711 c.storage_append_timeouts.fetch_add(1, Ordering::Relaxed);
3712 }
3713 }
3714 }
3715}
3716
3717async fn shard_drain_on_shutdown(
3720 shard_idx: usize,
3721 storage: &Arc<dyn StoragePlugin>,
3722 pending: &Arc<PendingReports>,
3723 sample_rx: &mut mpsc::Receiver<PvSample>,
3724 last_ts: &mut std::collections::HashMap<String, SystemTime>,
3725 last_dbr_type: &mut std::collections::HashMap<String, ArchDbType>,
3726 cfg: &WriteLoopConfig,
3727) {
3728 let drain_per_sample_timeout = cfg.drain_per_sample_timeout;
3729 let drain_total_budget = cfg.drain_total_budget;
3730 let drain_start = std::time::Instant::now();
3731 let mut drained_skipped = 0usize;
3732 while let Ok(pv_sample) = sample_rx.try_recv() {
3733 if drain_start.elapsed() > drain_total_budget {
3734 drained_skipped += 1;
3735 continue;
3736 }
3737 shard_handle_sample(
3742 shard_idx,
3743 storage,
3744 pending,
3745 pv_sample,
3746 last_ts,
3747 last_dbr_type,
3748 drain_per_sample_timeout,
3749 )
3750 .await;
3751 }
3752 if drained_skipped > 0 {
3753 warn!(
3754 shard = shard_idx,
3755 "Shutdown drain budget ({drain_total_budget:?}) exhausted; \
3756 {drained_skipped} buffered sample(s) abandoned without \
3757 attempting append"
3758 );
3759 }
3760}
3761
3762async fn flush_owner_loop(
3770 storage: Arc<dyn StoragePlugin>,
3771 registry: Arc<PvRegistry>,
3772 pending: Arc<PendingReports>,
3773 mut shutdown: tokio::sync::watch::Receiver<bool>,
3774 cfg: WriteLoopConfig,
3775) {
3776 let flush_period = cfg.flush_period;
3777 let flush_timeout = cfg.flush_timeout;
3778 let shutdown_flush_timeout = cfg.shutdown_flush_timeout;
3779 let drain_total_budget = cfg.drain_total_budget;
3780 let flush_period = if flush_period.is_zero() {
3785 warn!(
3786 "flush_owner: flush_period was Duration::ZERO; \
3787 clamping to 1s to avoid tokio::time::interval panic"
3788 );
3789 Duration::from_secs(1)
3790 } else {
3791 flush_period
3792 };
3793 info!("Flush owner started");
3794
3795 let mut flush_ticker = tokio::time::interval(flush_period);
3796 flush_ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
3797 let _ = flush_ticker.tick().await;
3798
3799 let flush_in_flight = Arc::new(std::sync::atomic::AtomicBool::new(false));
3800
3801 loop {
3806 tokio::select! {
3807 biased;
3808 _ = shutdown.changed() => {
3809 if *shutdown.borrow() {
3810 break;
3811 }
3812 }
3813 _ = flush_ticker.tick() => {
3814 if !pending.is_empty() {
3815 run_flush_and_commit(
3816 &storage,
3817 ®istry,
3818 &pending,
3819 flush_timeout,
3820 &flush_in_flight,
3821 )
3822 .await;
3823 }
3824 }
3825 }
3826 }
3827
3828 let phase2_deadline = std::time::Instant::now() + drain_total_budget;
3848 let min_grace = std::cmp::min(drain_total_budget, Duration::from_millis(200));
3849 if !min_grace.is_zero() {
3850 tokio::time::sleep(min_grace).await;
3851 }
3852 while flush_in_flight.load(Ordering::Acquire) && std::time::Instant::now() < phase2_deadline {
3853 tokio::time::sleep(Duration::from_millis(50)).await;
3854 }
3855 if flush_in_flight.load(Ordering::Acquire) {
3856 warn!(
3857 "Shutdown grace exhausted while a flush is still in flight; \
3858 final flush will be skipped (pending entries left for next \
3859 process restart to rebuild from re-archived samples)"
3860 );
3861 }
3862
3863 info!(
3867 pending_at_shutdown = pending.len(),
3868 "Flush owner running final flush + commit"
3869 );
3870 if !pending.is_empty() {
3871 run_flush_and_commit(
3872 &storage,
3873 ®istry,
3874 &pending,
3875 shutdown_flush_timeout,
3876 &flush_in_flight,
3877 )
3878 .await;
3879 }
3880 info!("Flush owner exiting");
3881}
3882
3883#[cfg(test)]
3884mod pva_mapping_tests {
3885 use super::*;
3886 use epics_rs::pva::pvdata::PvStructure;
3887 use epics_rs::pva::pvdata::encode::{decode_pv_field, decode_type_desc};
3888 use std::io::Cursor;
3889
3890 fn make_nttable() -> PvField {
3893 let mut table = PvStructure::new("epics:nt/NTTable:1.0");
3894 let labels =
3895 TypedScalarArray::String(vec!["x".to_string(), "y".to_string()].into_iter().collect());
3896 table
3897 .fields
3898 .push(("labels".into(), PvField::ScalarArrayTyped(labels)));
3899 let mut value_inner = PvStructure::new("");
3900 value_inner.fields.push((
3901 "x".into(),
3902 PvField::ScalarArrayTyped(TypedScalarArray::Double(vec![1.0, 2.0, 3.0].into())),
3903 ));
3904 value_inner.fields.push((
3905 "y".into(),
3906 PvField::ScalarArrayTyped(TypedScalarArray::Double(vec![4.0, 5.0, 6.0].into())),
3907 ));
3908 table
3909 .fields
3910 .push(("value".into(), PvField::Structure(value_inner)));
3911 PvField::Structure(table)
3912 }
3913
3914 #[test]
3915 fn nttable_classifies_as_v4_generic_bytes() {
3916 let pv = make_nttable();
3917 let (db, ec) = pv_field_to_arch_db_type(&pv).expect("classified");
3918 assert_eq!(db, ArchDbType::V4GenericBytes);
3919 assert_eq!(ec, 1);
3920 }
3921
3922 #[test]
3923 fn nttable_round_trips_through_self_describing_bytes() {
3924 let pv = make_nttable();
3925 let av = pv_field_scalar_to_archiver(&pv, &pv.descriptor()).expect("converted");
3926 let bytes = match av {
3927 ArchiverValue::V4GenericBytes(b) => b,
3928 other => panic!("expected V4GenericBytes, got {:?}", other.db_type()),
3929 };
3930 let mut cur = Cursor::new(bytes.as_slice());
3931 let desc = decode_type_desc(&mut cur, ByteOrder::Big).expect("desc decode");
3932 let decoded = decode_pv_field(&desc, &mut cur, ByteOrder::Big).expect("value decode");
3933 let PvField::Structure(s) = decoded else {
3934 panic!("expected Structure after decode");
3935 };
3936 assert_eq!(s.struct_id, "epics:nt/NTTable:1.0");
3937 let labels = s
3939 .fields
3940 .iter()
3941 .find_map(|(n, f)| if n == "labels" { Some(f) } else { None })
3942 .expect("labels");
3943 let Some(PvField::ScalarArrayTyped(TypedScalarArray::String(arr))) = Some(labels) else {
3944 panic!("labels not a string array");
3945 };
3946 assert_eq!(arr.len(), 2);
3947 let value = s
3948 .fields
3949 .iter()
3950 .find_map(|(n, f)| {
3951 if n == "value" {
3952 if let PvField::Structure(v) = f {
3953 Some(v)
3954 } else {
3955 None
3956 }
3957 } else {
3958 None
3959 }
3960 })
3961 .expect("value substruct");
3962 assert_eq!(value.fields.len(), 2);
3963 }
3964
3965 #[test]
3970 fn nt_scalar_array_double_classifies_as_waveform() {
3971 let mut wrapper = PvStructure::new("epics:nt/NTScalarArray:1.0");
3972 wrapper.fields.push((
3973 "value".into(),
3974 PvField::ScalarArrayTyped(TypedScalarArray::Double(vec![1.5, 2.5, 3.5].into())),
3975 ));
3976 let pv = PvField::Structure(wrapper);
3977
3978 let (db, ec) = pv_field_to_arch_db_type(&pv).expect("classified");
3979 assert_eq!(db, ArchDbType::WaveformDouble);
3980 assert_eq!(ec, 3);
3981 let av = pv_field_scalar_to_archiver(&pv, &pv.descriptor()).expect("converted");
3982 match av {
3983 ArchiverValue::VectorDouble(v) => assert_eq!(v, vec![1.5, 2.5, 3.5]),
3984 other => panic!("expected VectorDouble, got {:?}", other.db_type()),
3985 }
3986 }
3987
3988 #[test]
3992 fn nt_enum_classifies_as_scalar_enum() {
3993 let pv = make_nt_enum(2, &["Zero", "One", "Two"]);
3994 let (db, ec) = pv_field_to_arch_db_type(&pv).expect("classified");
3995 assert_eq!(db, ArchDbType::ScalarEnum);
3996 assert_eq!(ec, 1);
3997 }
3998
3999 #[test]
4001 fn nt_enum_value_is_index() {
4002 let pv = make_nt_enum(2, &["Off", "On"]);
4003 let av = pv_field_scalar_to_archiver(&pv, &pv.descriptor()).expect("converted");
4004 match av {
4005 ArchiverValue::ScalarEnum(i) => assert_eq!(i, 2),
4006 other => panic!("expected ScalarEnum, got {:?}", other.db_type()),
4007 }
4008 }
4009
4010 #[test]
4015 fn nt_enum_extras_carries_json_choices() {
4016 let pv = make_nt_enum(1, &["Off", "On", "Trip"]);
4017 let extras: ExtraFieldsCache = DashMap::new();
4018 refresh_nt_enum_extras(&pv, &extras);
4019 let stored = extras
4020 .get("enum_strs")
4021 .map(|s| s.value().clone())
4022 .expect("enum_strs cached");
4023 assert_eq!(stored, r#"["Off","On","Trip"]"#);
4024 }
4025
4026 #[test]
4033 fn union_array_roundtrips_with_canonical_descriptor() {
4034 use epics_rs::pva::pvdata::encode::{decode_pv_field, decode_type_desc};
4035 use epics_rs::pva::pvdata::{FieldDesc, UnionItem};
4036 use std::io::Cursor;
4037
4038 let variants_desc = vec![
4039 ("intVal".to_string(), FieldDesc::Scalar(ScalarType::Int)),
4040 ("dblVal".to_string(), FieldDesc::Scalar(ScalarType::Double)),
4041 ];
4042 let canonical = FieldDesc::UnionArray {
4043 struct_id: String::new(),
4044 variants: variants_desc.clone(),
4045 };
4046 let items = vec![
4047 UnionItem {
4048 selector: 0,
4049 variant_name: "intVal".into(),
4050 value: PvField::Scalar(ScalarValue::Int(42)),
4051 },
4052 UnionItem {
4053 selector: 1,
4054 variant_name: "dblVal".into(),
4055 value: PvField::Scalar(ScalarValue::Double(1.5)),
4056 },
4057 ];
4058 let field = PvField::UnionArray(items);
4059
4060 let wire = encode_pv_field_self_describing(&field, &canonical);
4061 let mut cur = Cursor::new(wire.as_slice());
4062 let desc_back = decode_type_desc(&mut cur, ByteOrder::Big).expect("desc");
4063 let val_back = decode_pv_field(&desc_back, &mut cur, ByteOrder::Big).expect("value");
4064 let PvField::UnionArray(items_back) = val_back else {
4065 panic!("expected UnionArray, got {val_back:?}");
4066 };
4067 assert_eq!(items_back.len(), 2);
4068 assert_eq!(items_back[0].selector, 0);
4069 assert!(matches!(
4070 items_back[0].value,
4071 PvField::Scalar(ScalarValue::Int(42))
4072 ));
4073 assert_eq!(items_back[1].selector, 1);
4074 match &items_back[1].value {
4075 PvField::Scalar(ScalarValue::Double(d)) => assert!((*d - 1.5).abs() < 1e-9),
4076 other => panic!("variant 1 not Double, got {other:?}"),
4077 }
4078
4079 match desc_back {
4082 FieldDesc::UnionArray { variants, .. } => {
4083 assert_eq!(variants.len(), 2);
4084 assert_eq!(variants[0].0, "intVal");
4085 assert!(matches!(variants[0].1, FieldDesc::Scalar(ScalarType::Int)));
4086 assert_eq!(variants[1].0, "dblVal");
4087 assert!(matches!(
4088 variants[1].1,
4089 FieldDesc::Scalar(ScalarType::Double)
4090 ));
4091 }
4092 other => panic!("expected UnionArray descriptor, got {other:?}"),
4093 }
4094 }
4095
4096 #[test]
4109 fn nt_nd_array_union_preserves_all_variants_via_canonical_desc() {
4110 use epics_rs::pva::pvdata::FieldDesc;
4111 use epics_rs::pva::pvdata::encode::{decode_pv_field, decode_type_desc};
4112 use std::io::Cursor;
4113
4114 let variant_specs: &[(&str, ScalarType)] = &[
4116 ("booleanValue", ScalarType::Boolean),
4117 ("byteValue", ScalarType::Byte),
4118 ("shortValue", ScalarType::Short),
4119 ("intValue", ScalarType::Int),
4120 ("longValue", ScalarType::Long),
4121 ("ubyteValue", ScalarType::UByte),
4122 ("ushortValue", ScalarType::UShort),
4123 ("uintValue", ScalarType::UInt),
4124 ("ulongValue", ScalarType::ULong),
4125 ("floatValue", ScalarType::Float),
4126 ("doubleValue", ScalarType::Double),
4127 ];
4128 let union_variants: Vec<(String, FieldDesc)> = variant_specs
4130 .iter()
4131 .map(|(n, st)| (n.to_string(), FieldDesc::ScalarArray(*st)))
4132 .collect();
4133 let canonical = FieldDesc::Structure {
4134 struct_id: "epics:nt/NTNDArray:1.0".into(),
4135 fields: vec![(
4136 "value".into(),
4137 FieldDesc::Union {
4138 struct_id: String::new(),
4139 variants: union_variants.clone(),
4140 },
4141 )],
4142 };
4143
4144 let mut root = PvStructure::new("epics:nt/NTNDArray:1.0");
4148 root.fields.push((
4149 "value".into(),
4150 PvField::Union {
4151 selector: 10,
4152 variant_name: "doubleValue".into(),
4153 value: Box::new(PvField::ScalarArrayTyped(TypedScalarArray::Double(
4154 vec![1.0, 2.0, 3.0].into(),
4155 ))),
4156 },
4157 ));
4158 let pv = PvField::Structure(root);
4159
4160 let av = pv_field_scalar_to_archiver(&pv, &canonical).expect("converted");
4162 let bytes = match av {
4163 ArchiverValue::V4GenericBytes(b) => b,
4164 other => panic!("expected V4GenericBytes, got {:?}", other.db_type()),
4165 };
4166
4167 let mut cur = Cursor::new(bytes.as_slice());
4172 let desc_back = decode_type_desc(&mut cur, ByteOrder::Big).expect("desc");
4173 let decoded = decode_pv_field(&desc_back, &mut cur, ByteOrder::Big).expect("value");
4174 let PvField::Structure(s) = decoded else {
4175 panic!("expected Structure root");
4176 };
4177 let value = s
4178 .fields
4179 .iter()
4180 .find_map(|(n, f)| if n == "value" { Some(f) } else { None })
4181 .expect("value");
4182 let PvField::Union {
4183 selector,
4184 variant_name,
4185 value: inner,
4186 } = value
4187 else {
4188 panic!("expected Union, got {value:?}");
4189 };
4190 assert_eq!(*selector, 10);
4191 assert_eq!(variant_name, "doubleValue");
4192 let PvField::ScalarArrayTyped(TypedScalarArray::Double(arr)) = inner.as_ref() else {
4193 panic!("expected Double[]");
4194 };
4195 assert_eq!(arr.as_ref(), &[1.0, 2.0, 3.0]);
4196
4197 let value_field_desc = match &desc_back {
4199 FieldDesc::Structure { fields, .. } => fields
4200 .iter()
4201 .find_map(|(n, f)| if n == "value" { Some(f) } else { None })
4202 .expect("value field in desc"),
4203 other => panic!("expected Structure descriptor, got {other:?}"),
4204 };
4205 let FieldDesc::Union {
4206 variants: v_back, ..
4207 } = value_field_desc
4208 else {
4209 panic!("expected Union descriptor for value");
4210 };
4211 assert_eq!(
4212 v_back.len(),
4213 variant_specs.len(),
4214 "all sibling Union variants must survive the wire round-trip — \
4215 this is what value-recovery descriptor would have dropped"
4216 );
4217 for (i, (expected_name, expected_st)) in variant_specs.iter().enumerate() {
4218 assert_eq!(&v_back[i].0, expected_name);
4219 assert!(matches!(v_back[i].1, FieldDesc::ScalarArray(st) if st == *expected_st));
4220 }
4221 }
4222
4223 #[test]
4229 fn structure_without_nt_enum_id_is_v4_bytes() {
4230 let mut inner = PvStructure::new("");
4231 inner
4232 .fields
4233 .push(("index".into(), PvField::Scalar(ScalarValue::Int(1))));
4234 inner.fields.push((
4235 "choices".into(),
4236 PvField::ScalarArrayTyped(TypedScalarArray::String(
4237 vec!["a".to_string(), "b".to_string()].into_iter().collect(),
4238 )),
4239 ));
4240 let mut root = PvStructure::new("my:custom/Thing:1.0");
4241 root.fields
4242 .push(("value".into(), PvField::Structure(inner)));
4243 let pv = PvField::Structure(root);
4244 let (db, _) = pv_field_to_arch_db_type(&pv).expect("classified");
4245 assert_eq!(db, ArchDbType::V4GenericBytes);
4246 }
4247
4248 fn make_nt_enum(index: i32, choices: &[&str]) -> PvField {
4249 let mut inner = PvStructure::new("enum_t");
4250 inner
4251 .fields
4252 .push(("index".into(), PvField::Scalar(ScalarValue::Int(index))));
4253 let arr = TypedScalarArray::String(
4254 choices
4255 .iter()
4256 .map(|s| s.to_string())
4257 .collect::<Vec<_>>()
4258 .into(),
4259 );
4260 inner
4261 .fields
4262 .push(("choices".into(), PvField::ScalarArrayTyped(arr)));
4263 let mut root = PvStructure::new("epics:nt/NTEnum:1.0");
4264 root.fields
4265 .push(("value".into(), PvField::Structure(inner)));
4266 PvField::Structure(root)
4267 }
4268
4269 #[test]
4273 fn nt_scalar_double_still_scalar() {
4274 let mut wrapper = PvStructure::new("epics:nt/NTScalar:1.0");
4275 wrapper
4276 .fields
4277 .push(("value".into(), PvField::Scalar(ScalarValue::Double(42.5))));
4278 let pv = PvField::Structure(wrapper);
4279 let (db, ec) = pv_field_to_arch_db_type(&pv).expect("classified");
4280 assert_eq!(db, ArchDbType::ScalarDouble);
4281 assert_eq!(ec, 1);
4282 match pv_field_scalar_to_archiver(&pv, &pv.descriptor()).expect("converted") {
4283 ArchiverValue::ScalarDouble(v) => assert!((v - 42.5).abs() < 1e-9),
4284 other => panic!("expected ScalarDouble, got {:?}", other.db_type()),
4285 }
4286 }
4287}