1use std::sync::atomic::{AtomicI32, AtomicI64, AtomicU64, Ordering};
2use std::sync::{Arc, Mutex};
3use std::time::{Duration, SystemTime};
4
5use dashmap::DashMap;
6use epics_rs::base::types::{DbFieldType, EpicsValue};
7use epics_rs::ca::client::{CaChannel, CaClient, ConnectionEvent};
8use tokio::sync::mpsc;
9use tokio_util::sync::CancellationToken;
10use tracing::{debug, error, info, warn};
11
12use archiver_core::registry::{PvRecord, PvRegistry, PvStatus, SampleMode};
13use archiver_core::storage::traits::{AppendMeta, StoragePlugin};
14use archiver_core::types::{ArchDbType, ArchiverSample, ArchiverValue};
15
16use crate::policy::PolicyConfig;
17
18const CA_CONNECT_TIMEOUT: Duration = Duration::from_secs(10);
20const CA_RECONNECT_TIMEOUT: Duration = Duration::from_secs(30);
22const CA_RETRY_DELAY: Duration = Duration::from_secs(5);
24
25const PAST_CUTOFF_UNIX_SECS: i64 = 662_688_000; fn ioc_timestamp_in_window(ts: SystemTime, now: SystemTime, drift_secs: u64) -> bool {
39 let unix = ts
40 .duration_since(SystemTime::UNIX_EPOCH)
41 .map(|d| d.as_secs() as i64)
42 .unwrap_or(i64::MIN);
43 if unix < PAST_CUTOFF_UNIX_SECS {
44 return false;
45 }
46 let now_unix = now
48 .duration_since(SystemTime::UNIX_EPOCH)
49 .map(|d| d.as_secs() as i64)
50 .unwrap_or(0);
51 let delta = (unix - now_unix).unsigned_abs();
52 delta <= drift_secs
53}
54
55#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
58pub enum PvConnectionState {
59 #[default]
62 Idle,
63 Connecting,
66 Connected,
69 Disconnected,
73}
74
75impl PvConnectionState {
76 pub fn as_str(self) -> &'static str {
77 match self {
78 Self::Idle => "Idle",
79 Self::Connecting => "Connecting",
80 Self::Connected => "Connected",
81 Self::Disconnected => "Disconnected",
82 }
83 }
84}
85
86#[derive(Debug, Clone, Default)]
88pub struct ConnectionInfo {
89 pub connected_since: Option<SystemTime>,
90 pub last_event_time: Option<SystemTime>,
91 pub is_connected: bool,
92 pub state: PvConnectionState,
96}
97
98#[derive(Debug)]
105pub struct PvCounters {
106 pub events_received: AtomicU64,
109 pub events_stored: AtomicU64,
111 pub first_event_unix_secs: AtomicI64,
114 pub buffer_overflow_drops: AtomicU64,
118 pub timestamp_drops: AtomicU64,
122 pub type_change_drops: AtomicU64,
126 pub disconnect_count: AtomicU64,
129 pub last_disconnect_unix_secs: AtomicI64,
131 pub transient_error_count: AtomicU64,
136 pub latest_observed_dbr: AtomicI32,
140}
141
142impl Default for PvCounters {
143 fn default() -> Self {
144 Self {
145 events_received: AtomicU64::new(0),
146 events_stored: AtomicU64::new(0),
147 first_event_unix_secs: AtomicI64::new(0),
148 buffer_overflow_drops: AtomicU64::new(0),
149 timestamp_drops: AtomicU64::new(0),
150 type_change_drops: AtomicU64::new(0),
151 disconnect_count: AtomicU64::new(0),
152 last_disconnect_unix_secs: AtomicI64::new(0),
153 transient_error_count: AtomicU64::new(0),
154 latest_observed_dbr: AtomicI32::new(-1),
156 }
157 }
158}
159
160#[derive(Debug, Clone)]
163pub struct PvCountersSnapshot {
164 pub events_received: u64,
165 pub events_stored: u64,
166 pub first_event_unix_secs: Option<i64>,
167 pub buffer_overflow_drops: u64,
168 pub timestamp_drops: u64,
169 pub type_change_drops: u64,
170 pub disconnect_count: u64,
171 pub last_disconnect_unix_secs: Option<i64>,
172 pub transient_error_count: u64,
173 pub latest_observed_dbr: Option<i32>,
176}
177
178impl From<&PvCounters> for PvCountersSnapshot {
179 fn from(c: &PvCounters) -> Self {
180 let first = c.first_event_unix_secs.load(Ordering::Relaxed);
181 let last_disc = c.last_disconnect_unix_secs.load(Ordering::Relaxed);
182 Self {
183 events_received: c.events_received.load(Ordering::Relaxed),
184 events_stored: c.events_stored.load(Ordering::Relaxed),
185 first_event_unix_secs: if first == 0 { None } else { Some(first) },
186 buffer_overflow_drops: c.buffer_overflow_drops.load(Ordering::Relaxed),
187 timestamp_drops: c.timestamp_drops.load(Ordering::Relaxed),
188 type_change_drops: c.type_change_drops.load(Ordering::Relaxed),
189 disconnect_count: c.disconnect_count.load(Ordering::Relaxed),
190 last_disconnect_unix_secs: if last_disc == 0 {
191 None
192 } else {
193 Some(last_disc)
194 },
195 transient_error_count: c.transient_error_count.load(Ordering::Relaxed),
196 latest_observed_dbr: match c.latest_observed_dbr.load(Ordering::Relaxed) {
197 -1 => None,
198 v => Some(v),
199 },
200 }
201 }
202}
203
204struct PvHandle {
206 #[allow(dead_code)]
207 channel: CaChannel,
208 cancel_token: CancellationToken,
209 #[allow(dead_code)]
210 dbr_type: ArchDbType,
211 conn_info: Arc<Mutex<ConnectionInfo>>,
212 extras: Arc<ExtraFieldsCache>,
217 field_tokens: Arc<DashMap<String, CancellationToken>>,
221 update_lock: Arc<tokio::sync::Mutex<()>>,
224 counters: Arc<PvCounters>,
228}
229
230type ExtraFieldsCache = DashMap<String, String>;
233
234const SAMPLE_CHANNEL_CAPACITY: usize = 500_000;
238
239struct PendingGuard<'a> {
242 map: &'a DashMap<String, ()>,
243 key: String,
244}
245
246impl Drop for PendingGuard<'_> {
247 fn drop(&mut self) {
248 self.map.remove(&self.key);
249 }
250}
251
252pub struct ChannelManager {
254 ca_client: CaClient,
256 channels: DashMap<String, PvHandle>,
258 pending_archives: DashMap<String, ()>,
261 op_locks: DashMap<String, Arc<tokio::sync::Mutex<()>>>,
265 #[allow(dead_code)]
267 storage: Arc<dyn StoragePlugin>,
268 registry: Arc<PvRegistry>,
270 sample_tx: mpsc::Sender<PvSample>,
272 policy: Option<PolicyConfig>,
274 server_ioc_drift_secs: u64,
276}
277
278pub struct PvSample {
280 pub pv_name: String,
281 pub dbr_type: ArchDbType,
282 pub sample: ArchiverSample,
283 pub element_count: Option<i32>,
284 pub counters: Option<Arc<PvCounters>>,
288}
289
290impl ChannelManager {
291 pub async fn new(
292 storage: Arc<dyn StoragePlugin>,
293 registry: Arc<PvRegistry>,
294 policy: Option<PolicyConfig>,
295 ) -> anyhow::Result<(Self, mpsc::Receiver<PvSample>)> {
296 Self::new_with_drift(storage, registry, policy, 30 * 60).await
297 }
298
299 pub async fn new_with_drift(
303 storage: Arc<dyn StoragePlugin>,
304 registry: Arc<PvRegistry>,
305 policy: Option<PolicyConfig>,
306 server_ioc_drift_secs: u64,
307 ) -> anyhow::Result<(Self, mpsc::Receiver<PvSample>)> {
308 let ca_client = CaClient::new().await.map_err(|e| anyhow::anyhow!("{e}"))?;
309 let (tx, rx) = mpsc::channel(SAMPLE_CHANNEL_CAPACITY);
310
311 let mgr = Self {
312 ca_client,
313 channels: DashMap::new(),
314 pending_archives: DashMap::new(),
315 op_locks: DashMap::new(),
316 storage,
317 registry,
318 sample_tx: tx,
319 policy,
320 server_ioc_drift_secs,
321 };
322
323 Ok((mgr, rx))
324 }
325
326 fn op_lock(&self, pv_name: &str) -> Arc<tokio::sync::Mutex<()>> {
331 if let Some(existing) = self.op_locks.get(pv_name) {
332 return existing.clone();
333 }
334 self.op_locks
335 .entry(pv_name.to_string())
336 .or_insert_with(|| Arc::new(tokio::sync::Mutex::new(())))
337 .clone()
338 }
339
340 pub async fn restore_from_registry(&self) -> anyhow::Result<u64> {
347 let active_pvs = self.registry.pvs_by_status(PvStatus::Active)?;
348 let total = active_pvs.len() as u64;
349 info!(total, "Restoring PVs from registry");
350
351 let mut restored = 0u64;
352 for record in active_pvs {
353 if record.alias_for.is_some() {
354 warn!(
355 pv = record.pv_name,
356 target = record.alias_for.as_deref(),
357 "Skipping alias row in restore; aliases are routed, not archived"
358 );
359 continue;
360 }
361 if let Err(e) = self.start_archiving_internal(&record).await {
362 warn!(pv = record.pv_name, "Failed to restore PV: {e}");
363 self.registry.set_status(&record.pv_name, PvStatus::Error)?;
364 } else {
365 restored += 1;
366 }
367 }
368 metrics::gauge!("archiver_pvs_active").set(restored as f64);
369 if restored < total {
370 warn!(
371 restored,
372 failed = total - restored,
373 "Some PVs failed to restore"
374 );
375 }
376
377 Ok(restored)
378 }
379
380 pub async fn archive_pv(&self, pv_name: &str, sample_mode: &SampleMode) -> anyhow::Result<()> {
382 let lock = self.op_lock(pv_name);
384 let _g = lock.lock().await;
385
386 if self.channels.contains_key(pv_name) {
387 anyhow::bail!("PV {pv_name} is already being archived");
388 }
389
390 if self
393 .pending_archives
394 .insert(pv_name.to_string(), ())
395 .is_some()
396 {
397 anyhow::bail!("PV {pv_name} archive operation already in progress");
398 }
399 let _guard = PendingGuard {
400 map: &self.pending_archives,
401 key: pv_name.to_string(),
402 };
403
404 self.archive_pv_inner(pv_name, sample_mode).await
405 }
406
407 async fn archive_pv_inner(
409 &self,
410 pv_name: &str,
411 sample_mode: &SampleMode,
412 ) -> anyhow::Result<()> {
413 if self.channels.contains_key(pv_name) {
415 anyhow::bail!("PV {pv_name} is already being archived");
416 }
417
418 let (effective_mode, matched_policy_name) = if let Some(ref policy) = self.policy {
420 if let Some(p) = policy.find_policy(pv_name) {
421 (p.to_sample_mode(), Some(p.policy_name().to_string()))
422 } else {
423 (sample_mode.clone(), None)
424 }
425 } else {
426 (sample_mode.clone(), None)
427 };
428
429 let channel = self.ca_client.create_channel(pv_name);
431 channel
432 .wait_connected(CA_CONNECT_TIMEOUT)
433 .await
434 .map_err(|e| anyhow::anyhow!("Failed to connect to {pv_name}: {e}"))?;
435
436 let info = self
437 .ca_client
438 .cainfo(pv_name)
439 .await
440 .map_err(|e| anyhow::anyhow!("Failed to get info for {pv_name}: {e}"))?;
441
442 let dbr_type = dbr_field_to_arch_type(info.native_type);
443 let element_count = info.element_count as i32;
444
445 self.registry
447 .register_pv(pv_name, dbr_type, &effective_mode, element_count)?;
448 if let Some(ref name) = matched_policy_name {
451 self.registry.update_policy_name(pv_name, Some(name))?;
452 }
453
454 let record = self
455 .registry
456 .get_pv(pv_name)?
457 .ok_or_else(|| anyhow::anyhow!("PV {pv_name} not found in registry"))?;
458 self.start_archiving_internal(&record).await?;
459
460 metrics::gauge!("archiver_pvs_active").increment(1.0);
461 info!(pv = pv_name, ?dbr_type, element_count, "Started archiving");
462 Ok(())
463 }
464
465 async fn start_archiving_internal(&self, record: &PvRecord) -> anyhow::Result<()> {
467 let pv_name = record.pv_name.clone();
468 let dbr_type = record.dbr_type;
469 let element_count = record.element_count;
470 let channel = self.ca_client.create_channel(&pv_name);
471 let cancel_token = CancellationToken::new();
472 let conn_info = Arc::new(Mutex::new(ConnectionInfo::default()));
473 let extras: Arc<ExtraFieldsCache> = Arc::new(DashMap::new());
474 let field_tokens: Arc<DashMap<String, CancellationToken>> = Arc::new(DashMap::new());
475 let update_lock = Arc::new(tokio::sync::Mutex::new(()));
476 let counters = Arc::new(PvCounters::default());
477
478 let _guard = update_lock.lock().await;
484
485 self.channels.insert(
486 pv_name.clone(),
487 PvHandle {
488 channel: channel.clone(),
489 cancel_token: cancel_token.clone(),
490 dbr_type,
491 conn_info: conn_info.clone(),
492 extras: extras.clone(),
493 field_tokens: field_tokens.clone(),
494 update_lock: update_lock.clone(),
495 counters: counters.clone(),
496 },
497 );
498
499 for field in &record.archive_fields {
502 let child = cancel_token.child_token();
503 field_tokens.insert(field.clone(), child.clone());
504 spawn_extra_field_monitor(
505 &self.ca_client,
506 &pv_name,
507 field,
508 extras.clone(),
509 child,
510 counters.clone(),
511 );
512 }
513 metrics::gauge!("archiver_extra_field_tasks").increment(record.archive_fields.len() as f64);
514 drop(_guard);
515
516 let tx = self.sample_tx.clone();
517 let token = cancel_token.clone();
518 let ci = conn_info.clone();
519 let extras_for_loop = extras.clone();
520 let counters_for_loop = counters.clone();
521
522 let drift = self.server_ioc_drift_secs;
523 match &record.sample_mode {
524 SampleMode::Monitor => {
525 tokio::spawn(async move {
526 monitor_loop(
527 pv_name,
528 dbr_type,
529 element_count,
530 channel,
531 tx,
532 token,
533 ci,
534 extras_for_loop,
535 counters_for_loop,
536 drift,
537 )
538 .await;
539 });
540 }
541 SampleMode::Scan { period_secs } => {
542 let period = *period_secs;
543 tokio::spawn(async move {
544 scan_loop(
545 pv_name,
546 dbr_type,
547 element_count,
548 channel,
549 tx,
550 token,
551 period,
552 ci,
553 extras_for_loop,
554 counters_for_loop,
555 )
556 .await;
557 });
558 }
559 }
560
561 Ok(())
562 }
563
564 pub async fn update_archive_fields(
570 &self,
571 pv_name: &str,
572 fields: &[String],
573 ) -> anyhow::Result<()> {
574 self.registry.update_archive_fields(pv_name, fields)?;
577
578 let (parent_token, extras, field_tokens, update_lock, counters) = {
581 let Some(handle) = self.channels.get(pv_name) else {
582 return Ok(());
583 };
584 (
585 handle.cancel_token.clone(),
586 handle.extras.clone(),
587 handle.field_tokens.clone(),
588 handle.update_lock.clone(),
589 handle.counters.clone(),
590 )
591 };
592
593 let _guard = update_lock.lock().await;
596
597 let wanted: std::collections::HashSet<&str> = fields.iter().map(|s| s.as_str()).collect();
598
599 let to_remove: Vec<String> = field_tokens
603 .iter()
604 .filter(|e| !wanted.contains(e.key().as_str()))
605 .map(|e| e.key().clone())
606 .collect();
607 let removed_count = to_remove.len();
608 for key in to_remove {
609 if let Some((_, token)) = field_tokens.remove(&key) {
610 token.cancel();
611 }
612 extras.remove(&key);
613 }
614
615 let mut added_count = 0usize;
618 for f in fields {
619 if !field_tokens.contains_key(f) {
620 let child = parent_token.child_token();
621 field_tokens.insert(f.clone(), child.clone());
622 spawn_extra_field_monitor(
623 &self.ca_client,
624 pv_name,
625 f,
626 extras.clone(),
627 child,
628 counters.clone(),
629 );
630 added_count += 1;
631 }
632 }
633 let net = added_count as i64 - removed_count as i64;
634 if net != 0 {
635 metrics::gauge!("archiver_extra_field_tasks").increment(net as f64);
636 }
637 Ok(())
638 }
639
640 pub async fn pause_pv(&self, pv_name: &str) -> anyhow::Result<()> {
642 let lock = self.op_lock(pv_name);
643 let _g = lock.lock().await;
644 if let Some((_key, handle)) = self.channels.remove(pv_name) {
645 let extra_count = handle.field_tokens.len() as f64;
646 handle.cancel_token.cancel();
647 metrics::gauge!("archiver_pvs_active").decrement(1.0);
648 if extra_count > 0.0 {
649 metrics::gauge!("archiver_extra_field_tasks").decrement(extra_count);
650 }
651 }
652 self.registry.set_status(pv_name, PvStatus::Paused)?;
653 info!(pv = pv_name, "Paused archiving");
654 Ok(())
655 }
656
657 pub async fn resume_pv(&self, pv_name: &str) -> anyhow::Result<()> {
660 let lock = self.op_lock(pv_name);
661 let _g = lock.lock().await;
662
663 let record = self
664 .registry
665 .get_pv(pv_name)?
666 .ok_or_else(|| anyhow::anyhow!("PV {pv_name} not found in registry"))?;
667
668 if record.status == PvStatus::Active && self.channels.contains_key(pv_name) {
670 info!(
671 pv = pv_name,
672 "PV is already actively archived, skipping resume"
673 );
674 return Ok(());
675 }
676
677 if let Some((_key, handle)) = self.channels.remove(pv_name) {
679 let extra_count = handle.field_tokens.len() as f64;
680 handle.cancel_token.cancel();
681 if extra_count > 0.0 {
682 metrics::gauge!("archiver_extra_field_tasks").decrement(extra_count);
683 }
684 }
685
686 self.start_archiving_internal(&record).await?;
688 self.registry.set_status(pv_name, PvStatus::Active)?;
689 metrics::gauge!("archiver_pvs_active").increment(1.0);
690 info!(pv = pv_name, "Resumed archiving");
691 Ok(())
692 }
693
694 pub async fn stop_pv(&self, pv_name: &str) -> anyhow::Result<()> {
697 let lock = self.op_lock(pv_name);
698 let _g = lock.lock().await;
699 if let Some((_key, handle)) = self.channels.remove(pv_name) {
700 let extra_count = handle.field_tokens.len() as f64;
701 handle.cancel_token.cancel();
702 metrics::gauge!("archiver_pvs_active").decrement(1.0);
703 if extra_count > 0.0 {
704 metrics::gauge!("archiver_extra_field_tasks").decrement(extra_count);
705 }
706 }
707 self.registry.set_status(pv_name, PvStatus::Inactive)?;
708 info!(pv = pv_name, "Stopped archiving (inactive)");
709 Ok(())
710 }
711
712 pub async fn destroy_pv(&self, pv_name: &str) -> anyhow::Result<()> {
714 let lock = self.op_lock(pv_name);
715 let _g = lock.lock().await;
716 if let Some((_key, handle)) = self.channels.remove(pv_name) {
717 let extra_count = handle.field_tokens.len() as f64;
718 handle.cancel_token.cancel();
719 metrics::gauge!("archiver_pvs_active").decrement(1.0);
720 if extra_count > 0.0 {
721 metrics::gauge!("archiver_extra_field_tasks").decrement(extra_count);
722 }
723 }
724 self.registry.remove_pv(pv_name)?;
725 info!(pv = pv_name, "Destroyed archiving channel");
731 Ok(())
732 }
733
734 pub fn list_pvs(&self) -> Vec<String> {
736 self.registry.all_pv_names().unwrap_or_else(|e| {
737 warn!("Failed to list PVs: {e}");
738 Vec::new()
739 })
740 }
741
742 pub fn matching_pvs(&self, pattern: &str) -> Vec<String> {
744 self.registry.matching_pvs(pattern).unwrap_or_else(|e| {
745 warn!("Failed to match PVs: {e}");
746 Vec::new()
747 })
748 }
749
750 pub fn registry(&self) -> &Arc<PvRegistry> {
752 &self.registry
753 }
754
755 pub fn get_connection_info(&self, pv: &str) -> Option<ConnectionInfo> {
757 self.channels.get(pv).map(|h| {
758 h.conn_info
759 .lock()
760 .unwrap_or_else(|e| e.into_inner())
761 .clone()
762 })
763 }
764
765 pub fn get_never_connected_pvs(&self) -> Vec<String> {
767 self.channels
768 .iter()
769 .filter(|entry| {
770 let ci = entry
771 .value()
772 .conn_info
773 .lock()
774 .unwrap_or_else(|e| e.into_inner());
775 ci.connected_since.is_none()
776 })
777 .map(|entry| entry.key().clone())
778 .collect()
779 }
780
781 pub fn pv_counters(&self, pv_name: &str) -> Option<Arc<PvCounters>> {
785 self.channels.get(pv_name).map(|h| h.counters.clone())
786 }
787
788 pub fn all_pv_counters(&self) -> Vec<(String, PvCountersSnapshot)> {
791 self.channels
792 .iter()
793 .map(|e| {
794 (
795 e.key().clone(),
796 PvCountersSnapshot::from(&*e.value().counters),
797 )
798 })
799 .collect()
800 }
801
802 pub async fn live_value(
807 &self,
808 pv_name: &str,
809 timeout: Duration,
810 ) -> Option<anyhow::Result<ArchiverValue>> {
811 let channel = self.channels.get(pv_name)?.channel.clone();
812 if channel.wait_connected(timeout).await.is_err() {
816 return Some(Err(anyhow::anyhow!(
817 "channel not connected within {timeout:?}"
818 )));
819 }
820 match tokio::time::timeout(timeout, channel.get()).await {
821 Ok(Ok((_dbr_type, val))) => Some(Ok(epics_value_to_archiver(&val))),
822 Ok(Err(e)) => Some(Err(anyhow::anyhow!("CA get failed: {e}"))),
823 Err(_) => Some(Err(anyhow::anyhow!("CA get timed out after {timeout:?}"))),
824 }
825 }
826
827 pub fn extras_snapshot(&self, pv_name: &str) -> std::collections::HashMap<String, String> {
831 match self.channels.get(pv_name) {
832 Some(handle) => handle
833 .extras
834 .iter()
835 .map(|e| (e.key().clone(), e.value().clone()))
836 .collect(),
837 None => std::collections::HashMap::new(),
838 }
839 }
840
841 pub fn get_currently_disconnected_pvs(&self) -> Vec<String> {
843 self.channels
844 .iter()
845 .filter(|entry| {
846 let ci = entry
847 .value()
848 .conn_info
849 .lock()
850 .unwrap_or_else(|e| e.into_inner());
851 !ci.is_connected
852 })
853 .map(|entry| entry.key().clone())
854 .collect()
855 }
856}
857
858#[allow(clippy::too_many_arguments)]
860async fn monitor_loop(
861 pv_name: String,
862 dbr_type: ArchDbType,
863 element_count: i32,
864 channel: CaChannel,
865 tx: mpsc::Sender<PvSample>,
866 cancel_token: CancellationToken,
867 conn_info: Arc<Mutex<ConnectionInfo>>,
868 extras: Arc<ExtraFieldsCache>,
869 counters: Arc<PvCounters>,
870 server_ioc_drift_secs: u64,
871) {
872 loop {
873 tokio::select! {
875 _ = cancel_token.cancelled() => return,
876 result = channel.wait_connected(CA_RECONNECT_TIMEOUT) => {
877 if result.is_err() {
878 let was_connected = {
879 let mut ci = conn_info.lock().unwrap_or_else(|e| e.into_inner());
880 let prev_connected = ci.is_connected;
881 ci.is_connected = false;
882 ci.last_event_time = None;
883 ci.state = match ci.state {
886 PvConnectionState::Connected => PvConnectionState::Disconnected,
887 PvConnectionState::Disconnected => PvConnectionState::Disconnected,
888 _ => PvConnectionState::Connecting,
889 };
890 prev_connected
891 };
892 if was_connected {
899 counters.disconnect_count.fetch_add(1, Ordering::Relaxed);
900 counters
901 .last_disconnect_unix_secs
902 .store(unix_secs(SystemTime::now()), Ordering::Relaxed);
903 }
904 let mut conn_rx = channel.connection_events();
909
910 if channel
915 .wait_connected(Duration::from_millis(100))
916 .await
917 .is_err()
918 {
919 loop {
920 tokio::select! {
921 _ = cancel_token.cancelled() => return,
922 event = conn_rx.recv() => {
923 use tokio::sync::broadcast::error::RecvError;
924 match event {
925 Ok(ConnectionEvent::Connected) => break,
926 Ok(_) => continue,
927 Err(RecvError::Lagged(_)) => {
931 if channel
932 .wait_connected(Duration::from_millis(100))
933 .await
934 .is_ok()
935 {
936 break;
937 }
938 continue;
939 }
940 Err(RecvError::Closed) => return,
941 }
942 }
943 }
944 }
945 }
946 }
947 }
948 }
949
950 let mut monitor = match channel.subscribe().await {
952 Ok(m) => m,
953 Err(e) => {
954 counters
955 .transient_error_count
956 .fetch_add(1, Ordering::Relaxed);
957 warn!(pv = pv_name, "Subscribe failed: {e}, retrying...");
958 tokio::select! {
959 _ = cancel_token.cancelled() => return,
960 _ = tokio::time::sleep(CA_RETRY_DELAY) => continue,
961 }
962 }
963 };
964
965 debug!(pv = pv_name, "Monitor subscription active");
966
967 loop {
969 tokio::select! {
970 _ = cancel_token.cancelled() => return,
971 result = monitor.recv() => {
972 match result {
973 Some(Ok(snapshot)) => {
974 let now = SystemTime::now();
975 let first_after_connect = {
986 let mut ci = conn_info.lock().unwrap_or_else(|e| e.into_inner());
987 let first = ci.last_event_time.is_none();
988 if ci.connected_since.is_none() {
989 ci.connected_since = Some(now);
990 }
991 ci.is_connected = true;
992 ci.last_event_time = Some(now);
993 ci.state = PvConnectionState::Connected;
994 first
995 };
996 if !first_after_connect
997 && !ioc_timestamp_in_window(
998 snapshot.timestamp,
999 now,
1000 server_ioc_drift_secs,
1001 )
1002 {
1003 counters.timestamp_drops.fetch_add(1, Ordering::Relaxed);
1004 debug!(
1005 pv = pv_name,
1006 ?snapshot.timestamp,
1007 "Dropping sample with out-of-window IOC timestamp"
1008 );
1009 continue;
1010 }
1011 counters.events_received.fetch_add(1, Ordering::Relaxed);
1012 let now_secs = unix_secs(now);
1015 let _ = counters.first_event_unix_secs.compare_exchange(
1016 0,
1017 now_secs,
1018 Ordering::Relaxed,
1019 Ordering::Relaxed,
1020 );
1021 let archiver_val = epics_value_to_archiver(&snapshot.value);
1022 let mut sample = ArchiverSample::new(snapshot.timestamp, archiver_val);
1023 attach_extras(&extras, &mut sample);
1024 if first_after_connect {
1025 let lost_secs = counters
1026 .last_disconnect_unix_secs
1027 .load(Ordering::Relaxed);
1028 attach_cnx_lost_headers(&mut sample, lost_secs, now_secs);
1029 }
1030 let pv_sample = PvSample {
1031 pv_name: pv_name.clone(),
1032 dbr_type,
1033 sample,
1034 element_count: Some(element_count),
1035 counters: Some(counters.clone()),
1036 };
1037 if let Err(pv_sample) = try_send_with_overflow_count(
1038 &tx,
1039 pv_sample,
1040 &counters,
1041 )
1042 .await
1043 {
1044 let _ = pv_sample;
1045 return; }
1047 }
1048 Some(Err(e)) => {
1049 counters.transient_error_count.fetch_add(1, Ordering::Relaxed);
1050 warn!(pv = pv_name, "Monitor error: {e}");
1051 }
1052 None => break, }
1054 }
1055 }
1056 }
1057
1058 {
1065 let mut ci = conn_info.lock().unwrap_or_else(|e| e.into_inner());
1066 ci.is_connected = false;
1067 ci.last_event_time = None;
1068 ci.state = PvConnectionState::Disconnected;
1069 }
1070 counters.disconnect_count.fetch_add(1, Ordering::Relaxed);
1071 counters
1072 .last_disconnect_unix_secs
1073 .store(unix_secs(SystemTime::now()), Ordering::Relaxed);
1074 debug!(pv = pv_name, "Monitor ended, waiting for reconnection");
1075 }
1076}
1077
1078fn unix_secs(t: SystemTime) -> i64 {
1079 t.duration_since(SystemTime::UNIX_EPOCH)
1080 .map(|d| d.as_secs() as i64)
1081 .unwrap_or(0)
1082}
1083
1084async fn try_send_with_overflow_count(
1088 tx: &mpsc::Sender<PvSample>,
1089 pv_sample: PvSample,
1090 counters: &PvCounters,
1091) -> Result<(), PvSample> {
1092 match tx.try_send(pv_sample) {
1093 Ok(()) => Ok(()),
1094 Err(tokio::sync::mpsc::error::TrySendError::Full(pv_sample)) => {
1095 counters
1096 .buffer_overflow_drops
1097 .fetch_add(1, Ordering::Relaxed);
1098 tx.send(pv_sample).await.map_err(|e| e.0)
1102 }
1103 Err(tokio::sync::mpsc::error::TrySendError::Closed(pv_sample)) => Err(pv_sample),
1104 }
1105}
1106
1107#[allow(clippy::too_many_arguments)]
1109async fn scan_loop(
1110 pv_name: String,
1111 dbr_type: ArchDbType,
1112 element_count: i32,
1113 channel: CaChannel,
1114 tx: mpsc::Sender<PvSample>,
1115 cancel_token: CancellationToken,
1116 period_secs: f64,
1117 conn_info: Arc<Mutex<ConnectionInfo>>,
1118 extras: Arc<ExtraFieldsCache>,
1119 counters: Arc<PvCounters>,
1120) {
1121 let period = Duration::from_secs_f64(period_secs);
1122 let mut interval = tokio::time::interval(period);
1123
1124 loop {
1125 tokio::select! {
1126 _ = cancel_token.cancelled() => return,
1127 _ = interval.tick() => {}
1128 }
1129
1130 if channel.wait_connected(CA_RETRY_DELAY).await.is_err() {
1131 let was_connected = {
1132 let mut ci = conn_info.lock().unwrap_or_else(|e| e.into_inner());
1133 let prev = ci.is_connected;
1134 ci.is_connected = false;
1135 ci.last_event_time = None;
1136 ci.state = match ci.state {
1137 PvConnectionState::Connected => PvConnectionState::Disconnected,
1138 PvConnectionState::Disconnected => PvConnectionState::Disconnected,
1139 _ => PvConnectionState::Connecting,
1140 };
1141 prev
1142 };
1143 if was_connected {
1144 counters.disconnect_count.fetch_add(1, Ordering::Relaxed);
1145 counters
1146 .last_disconnect_unix_secs
1147 .store(unix_secs(SystemTime::now()), Ordering::Relaxed);
1148 }
1149 continue;
1150 }
1151
1152 match channel.get().await {
1153 Ok((_dbr_type, epics_val)) => {
1154 let now = SystemTime::now();
1155 let first_after_connect = {
1156 let mut ci = conn_info.lock().unwrap_or_else(|e| e.into_inner());
1157 let first = ci.last_event_time.is_none();
1158 if ci.connected_since.is_none() {
1159 ci.connected_since = Some(now);
1160 }
1161 ci.is_connected = true;
1162 ci.last_event_time = Some(now);
1163 ci.state = PvConnectionState::Connected;
1164 first
1165 };
1166 counters.events_received.fetch_add(1, Ordering::Relaxed);
1167 let now_secs = unix_secs(now);
1168 let _ = counters.first_event_unix_secs.compare_exchange(
1169 0,
1170 now_secs,
1171 Ordering::Relaxed,
1172 Ordering::Relaxed,
1173 );
1174 let archiver_val = epics_value_to_archiver(&epics_val);
1175 let mut sample = ArchiverSample::new(now, archiver_val);
1176 attach_extras(&extras, &mut sample);
1177 if first_after_connect {
1178 let lost_secs = counters.last_disconnect_unix_secs.load(Ordering::Relaxed);
1179 attach_cnx_lost_headers(&mut sample, lost_secs, now_secs);
1180 }
1181 let pv_sample = PvSample {
1182 pv_name: pv_name.clone(),
1183 dbr_type,
1184 sample,
1185 element_count: Some(element_count),
1186 counters: Some(counters.clone()),
1187 };
1188 if try_send_with_overflow_count(&tx, pv_sample, &counters)
1189 .await
1190 .is_err()
1191 {
1192 return;
1193 }
1194 }
1195 Err(e) => {
1196 counters
1197 .transient_error_count
1198 .fetch_add(1, Ordering::Relaxed);
1199 debug!(pv = pv_name, "Scan read error: {e}");
1200 }
1201 }
1202 }
1203}
1204
1205fn attach_cnx_lost_headers(sample: &mut ArchiverSample, lost_secs: i64, now_secs: i64) {
1211 sample
1212 .field_values
1213 .push(("cnxlostepsecs".to_string(), lost_secs.to_string()));
1214 sample
1215 .field_values
1216 .push(("cnxregainedepsecs".to_string(), now_secs.to_string()));
1217 sample
1218 .field_values
1219 .push(("startup".to_string(), "true".to_string()));
1220}
1221
1222fn attach_extras(extras: &ExtraFieldsCache, sample: &mut ArchiverSample) {
1226 if extras.is_empty() {
1227 return;
1228 }
1229 let mut entries: Vec<(String, String)> = extras
1230 .iter()
1231 .map(|e| (e.key().clone(), e.value().clone()))
1232 .collect();
1233 entries.sort_by(|a, b| a.0.cmp(&b.0));
1234 sample.field_values = entries;
1235}
1236
1237fn epics_value_to_field_string(val: &EpicsValue) -> String {
1242 match val {
1243 EpicsValue::String(s) => s.clone(),
1244 EpicsValue::Short(v) => v.to_string(),
1245 EpicsValue::Float(v) => v.to_string(),
1246 EpicsValue::Enum(v) => v.to_string(),
1247 EpicsValue::Char(v) => v.to_string(),
1248 EpicsValue::Long(v) => v.to_string(),
1249 EpicsValue::Double(v) => v.to_string(),
1250 EpicsValue::ShortArray(v) => format!("{v:?}"),
1251 EpicsValue::FloatArray(v) => format!("{v:?}"),
1252 EpicsValue::EnumArray(v) => format!("{v:?}"),
1253 EpicsValue::DoubleArray(v) => format!("{v:?}"),
1254 EpicsValue::LongArray(v) => format!("{v:?}"),
1255 EpicsValue::CharArray(v) => String::from_utf8_lossy(v).into_owned(),
1256 EpicsValue::StringArray(v) => format!("{v:?}"),
1257 }
1258}
1259
1260fn spawn_extra_field_monitor(
1264 ca_client: &CaClient,
1265 pv_name: &str,
1266 field: &str,
1267 extras: Arc<ExtraFieldsCache>,
1268 parent_token: CancellationToken,
1269 counters: Arc<PvCounters>,
1270) {
1271 let full_name = format!("{pv_name}.{field}");
1272 let channel = ca_client.create_channel(&full_name);
1273 let field_owned = field.to_string();
1274 let pv_owned = pv_name.to_string();
1275
1276 let panic_pv = pv_owned.clone();
1282 let panic_field = field_owned.clone();
1283 tokio::spawn(async move {
1284 let body = std::panic::AssertUnwindSafe(extra_field_monitor_body(
1285 channel,
1286 pv_owned,
1287 field_owned,
1288 extras,
1289 parent_token,
1290 counters,
1291 ));
1292 if let Err(payload) = futures::FutureExt::catch_unwind(body).await {
1293 let msg = panic_payload_msg(&payload);
1294 error!(
1295 pv = panic_pv,
1296 field = panic_field,
1297 "Extra-field monitor panicked: {msg}"
1298 );
1299 }
1300 });
1301}
1302
1303async fn extra_field_monitor_body(
1306 channel: CaChannel,
1307 pv_owned: String,
1308 field_owned: String,
1309 extras: Arc<ExtraFieldsCache>,
1310 parent_token: CancellationToken,
1311 counters: Arc<PvCounters>,
1312) {
1313 if channel.wait_connected(CA_CONNECT_TIMEOUT).await.is_err() {
1316 debug!(
1317 pv = pv_owned,
1318 field = field_owned,
1319 "Extra-field channel did not connect within timeout (will keep retrying via subscribe)"
1320 );
1321 }
1322
1323 let mut backoff = CA_RETRY_DELAY;
1329 let max_backoff = Duration::from_secs(60);
1330 let mut warned_at_cap = false;
1331
1332 loop {
1333 tokio::select! {
1335 _ = parent_token.cancelled() => return,
1336 sub = channel.subscribe() => {
1337 let mut monitor = match sub {
1338 Ok(m) => m,
1339 Err(e) => {
1340 counters.transient_error_count.fetch_add(1, Ordering::Relaxed);
1344 debug!(
1345 pv = pv_owned,
1346 field = field_owned,
1347 ?backoff,
1348 "Extra-field subscribe failed: {e}; retrying"
1349 );
1350 if backoff >= max_backoff && !warned_at_cap {
1351 warn!(
1352 pv = pv_owned,
1353 field = field_owned,
1354 "Extra-field repeatedly fails to subscribe; \
1355 check archive_fields config (now retrying every 60s)"
1356 );
1357 warned_at_cap = true;
1358 }
1359 let sleep_for = backoff;
1360 backoff = (backoff * 2).min(max_backoff);
1361 tokio::select! {
1362 _ = parent_token.cancelled() => return,
1363 _ = tokio::time::sleep(sleep_for) => continue,
1364 }
1365 }
1366 };
1367 backoff = CA_RETRY_DELAY;
1370 warned_at_cap = false;
1371 loop {
1372 tokio::select! {
1373 _ = parent_token.cancelled() => return,
1374 ev = monitor.recv() => match ev {
1375 Some(Ok(snapshot)) => {
1376 extras.insert(
1377 field_owned.clone(),
1378 epics_value_to_field_string(&snapshot.value),
1379 );
1380 }
1381 Some(Err(e)) => {
1382 counters.transient_error_count.fetch_add(1, Ordering::Relaxed);
1383 debug!(
1384 pv = pv_owned,
1385 field = field_owned,
1386 "Extra-field monitor error: {e}"
1387 );
1388 }
1389 None => break, }
1391 }
1392 }
1393 }
1394 }
1395 }
1396}
1397
1398fn panic_payload_msg(payload: &Box<dyn std::any::Any + Send>) -> String {
1401 if let Some(s) = payload.downcast_ref::<&'static str>() {
1402 (*s).to_string()
1403 } else if let Some(s) = payload.downcast_ref::<String>() {
1404 s.clone()
1405 } else {
1406 "<non-string panic payload>".to_string()
1407 }
1408}
1409
1410fn dbr_field_to_arch_type(field_type: DbFieldType) -> ArchDbType {
1412 match field_type {
1413 DbFieldType::String => ArchDbType::ScalarString,
1414 DbFieldType::Short => ArchDbType::ScalarShort,
1415 DbFieldType::Float => ArchDbType::ScalarFloat,
1416 DbFieldType::Enum => ArchDbType::ScalarEnum,
1417 DbFieldType::Char => ArchDbType::ScalarByte,
1418 DbFieldType::Long => ArchDbType::ScalarInt,
1419 DbFieldType::Double => ArchDbType::ScalarDouble,
1420 }
1421}
1422
1423fn epics_value_to_archiver(val: &EpicsValue) -> ArchiverValue {
1425 match val {
1426 EpicsValue::String(s) => ArchiverValue::ScalarString(s.clone()),
1427 EpicsValue::Short(v) => ArchiverValue::ScalarShort(*v as i32),
1428 EpicsValue::Float(v) => ArchiverValue::ScalarFloat(*v),
1429 EpicsValue::Enum(v) => ArchiverValue::ScalarEnum(*v as i32),
1430 EpicsValue::Char(v) => ArchiverValue::ScalarByte(vec![*v]),
1431 EpicsValue::Long(v) => ArchiverValue::ScalarInt(*v),
1432 EpicsValue::Double(v) => ArchiverValue::ScalarDouble(*v),
1433 EpicsValue::ShortArray(v) => {
1434 ArchiverValue::VectorShort(v.iter().map(|x| *x as i32).collect())
1435 }
1436 EpicsValue::FloatArray(v) => ArchiverValue::VectorFloat(v.clone()),
1437 EpicsValue::EnumArray(v) => {
1438 ArchiverValue::VectorEnum(v.iter().map(|x| *x as i32).collect())
1439 }
1440 EpicsValue::DoubleArray(v) => ArchiverValue::VectorDouble(v.clone()),
1441 EpicsValue::LongArray(v) => ArchiverValue::VectorInt(v.clone()),
1442 EpicsValue::CharArray(v) => ArchiverValue::VectorChar(v.clone()),
1443 EpicsValue::StringArray(v) => ArchiverValue::VectorString(v.clone()),
1444 }
1445}
1446
1447pub async fn write_loop(
1449 storage: Arc<dyn StoragePlugin>,
1450 registry: Arc<PvRegistry>,
1451 mut rx: mpsc::Receiver<PvSample>,
1452 mut shutdown: tokio::sync::watch::Receiver<bool>,
1453 flush_period: Duration,
1454) {
1455 info!("Write loop started");
1456 let mut ts_updates: std::collections::HashMap<String, SystemTime> =
1460 std::collections::HashMap::new();
1461 let mut last_dbr_type: std::collections::HashMap<String, ArchDbType> =
1465 std::collections::HashMap::new();
1466 let mut last_flush = std::time::Instant::now();
1467
1468 loop {
1469 tokio::select! {
1470 Some(pv_sample) = rx.recv() => {
1471 let ts = pv_sample.sample.timestamp;
1472
1473 if let Some(prev_ts) = ts_updates.get(&pv_sample.pv_name)
1477 && ts < *prev_ts
1478 {
1479 if let Some(ref c) = pv_sample.counters {
1480 c.timestamp_drops.fetch_add(1, Ordering::Relaxed);
1481 }
1482 debug!(
1483 pv = pv_sample.pv_name,
1484 ?ts,
1485 ?prev_ts,
1486 "Dropping out-of-order sample"
1487 );
1488 continue;
1489 }
1490
1491 let prev_type = last_dbr_type
1495 .insert(pv_sample.pv_name.clone(), pv_sample.dbr_type);
1496 if let Some(prev) = prev_type
1497 && prev != pv_sample.dbr_type
1498 {
1499 if let Some(ref c) = pv_sample.counters {
1500 c.type_change_drops.fetch_add(1, Ordering::Relaxed);
1501 c.latest_observed_dbr
1505 .store(pv_sample.dbr_type as i32, Ordering::Relaxed);
1506 }
1507 debug!(
1508 pv = pv_sample.pv_name,
1509 ?prev,
1510 new = ?pv_sample.dbr_type,
1511 "Dropping type-changed sample"
1512 );
1513 last_dbr_type.insert(pv_sample.pv_name.clone(), prev);
1517 continue;
1518 }
1519
1520 let meta = AppendMeta {
1521 element_count: pv_sample.element_count,
1522 ..Default::default()
1523 };
1524 if let Err(e) = storage
1525 .append_event_with_meta(
1526 &pv_sample.pv_name,
1527 pv_sample.dbr_type,
1528 &pv_sample.sample,
1529 &meta,
1530 )
1531 .await
1532 {
1533 error!(pv = pv_sample.pv_name, "Write error: {e}");
1534 } else {
1535 metrics::counter!("archiver_events_stored_total").increment(1);
1536 if let Some(ref c) = pv_sample.counters {
1537 c.events_stored.fetch_add(1, Ordering::Relaxed);
1538 }
1539 ts_updates.insert(pv_sample.pv_name, ts);
1540 }
1541
1542 if last_flush.elapsed() > flush_period && !ts_updates.is_empty() {
1544 let refs: Vec<(&str, SystemTime)> = ts_updates
1545 .iter()
1546 .map(|(name, ts)| (name.as_str(), *ts))
1547 .collect();
1548 if let Err(e) = registry.batch_update_timestamps(&refs) {
1549 error!("Failed to flush timestamps: {e}");
1550 }
1551 if let Err(e) = storage.flush_writes().await {
1552 error!("Failed to flush storage writes: {e}");
1553 }
1554 ts_updates.clear();
1555 last_flush = std::time::Instant::now();
1556 }
1557 }
1558 _ = shutdown.changed() => {
1559 while let Ok(pv_sample) = rx.try_recv() {
1561 let meta = AppendMeta {
1562 element_count: pv_sample.element_count,
1563 ..Default::default()
1564 };
1565 if let Err(e) = storage
1566 .append_event_with_meta(
1567 &pv_sample.pv_name,
1568 pv_sample.dbr_type,
1569 &pv_sample.sample,
1570 &meta,
1571 )
1572 .await
1573 {
1574 warn!(pv = pv_sample.pv_name, "Shutdown drain write error: {e}");
1575 }
1576 }
1577 if !ts_updates.is_empty() {
1579 let refs: Vec<(&str, SystemTime)> = ts_updates
1580 .iter()
1581 .map(|(name, ts)| (name.as_str(), *ts))
1582 .collect();
1583 if let Err(e) = registry.batch_update_timestamps(&refs) {
1584 warn!("Shutdown timestamp flush failed: {e}");
1585 }
1586 }
1587 if let Err(e) = storage.flush_writes().await {
1588 warn!("Shutdown storage flush failed: {e}");
1589 }
1590 info!("Write loop shutting down");
1591 break;
1592 }
1593 }
1594 }
1595}