1use std::collections::HashMap;
2use std::sync::atomic::{AtomicBool, Ordering};
3use std::sync::Arc;
4use std::time::Duration;
5
6use epics_base_rs::client::{CaChannel, CaClient, ConnectionEvent};
7use tokio::sync::Notify;
8
9use crate::channel_store::ChannelStore;
10use crate::variables::ChannelDef;
11
12pub struct Channel {
14 pub def: ChannelDef,
15 pub ch_id: usize,
16 ca_channel: Option<CaChannel>,
17 connected: Arc<AtomicBool>,
18}
19
20impl Channel {
21 pub fn new(def: ChannelDef, ch_id: usize) -> Self {
22 Self {
23 def,
24 ch_id,
25 ca_channel: None,
26 connected: Arc::new(AtomicBool::new(false)),
27 }
28 }
29
30 fn resolve_pv_name(&self, macros: &HashMap<String, String>) -> String {
32 let mut name = self.def.pv_name.clone();
33 for (key, val) in macros {
34 name = name.replace(&format!("{{{key}}}"), val);
35 }
36 name
37 }
38
39 pub async fn connect(
41 &mut self,
42 ca_client: &CaClient,
43 macros: &HashMap<String, String>,
44 store: Arc<ChannelStore>,
45 dirty_flags: Vec<Arc<Vec<AtomicBool>>>,
46 ss_wakeups: Vec<Arc<Notify>>,
47 event_flags: Option<Arc<crate::event_flag::EventFlagSet>>,
48 ) {
49 let pv_name = self.resolve_pv_name(macros);
50 if pv_name.is_empty() {
51 return;
52 }
53
54 let ca_channel = ca_client.create_channel(&pv_name);
55
56 if ca_channel
58 .wait_connected(Duration::from_secs(5))
59 .await
60 .is_ok()
61 {
62 self.connected.store(true, Ordering::Release);
63 }
64
65 if self.def.monitored {
67 let ch_id = self.ch_id;
68 let connected = self.connected.clone();
69 let sync_ef = self.def.sync_ef;
70
71 let mut conn_rx = ca_channel.connection_events();
73 let connected_watcher = connected.clone();
74 tokio::spawn(async move {
75 while let Ok(event) = conn_rx.recv().await {
76 match event {
77 ConnectionEvent::Connected => {
78 connected_watcher.store(true, Ordering::Release);
79 }
80 ConnectionEvent::Disconnected => {
81 connected_watcher.store(false, Ordering::Release);
82 }
83 ConnectionEvent::AccessRightsChanged { .. } => {}
84 }
85 }
86 });
87
88 match ca_channel.subscribe().await {
90 Ok(mut monitor) => {
91 tokio::spawn(async move {
92 while let Some(result) = monitor.recv().await {
93 if let Ok(value) = result {
94 store.set(ch_id, value);
96
97 for ss_dirty in &dirty_flags {
99 if let Some(flag) = ss_dirty.get(ch_id) {
100 flag.store(true, Ordering::Release);
101 }
102 }
103
104 if let Some(ef_id) = sync_ef {
106 if let Some(efs) = &event_flags {
107 efs.set(ef_id);
108 }
109 }
110
111 for notify in &ss_wakeups {
113 notify.notify_one();
114 }
115 }
116 }
117 });
118 }
119 Err(e) => {
120 tracing::warn!("failed to subscribe to {pv_name}: {e}");
121 }
122 }
123 }
124
125 self.ca_channel = Some(ca_channel);
126 }
127
128 pub fn is_connected(&self) -> bool {
129 self.connected.load(Ordering::Acquire)
130 }
131
132 pub fn ca_channel(&self) -> Option<&CaChannel> {
133 self.ca_channel.as_ref()
134 }
135}