Skip to main content

epics_seq/
channel.rs

1use std::collections::HashMap;
2use std::sync::atomic::{AtomicBool, Ordering};
3use std::sync::Arc;
4use std::time::Duration;
5
6use epics_base_rs::client::{CaChannel, CaClient, ConnectionEvent};
7use tokio::sync::Notify;
8
9use crate::channel_store::ChannelStore;
10use crate::variables::ChannelDef;
11
12/// Active runtime channel: manages CA lifecycle, monitor, and reconnect.
13pub struct Channel {
14    pub def: ChannelDef,
15    pub ch_id: usize,
16    ca_channel: Option<CaChannel>,
17    connected: Arc<AtomicBool>,
18}
19
20impl Channel {
21    pub fn new(def: ChannelDef, ch_id: usize) -> Self {
22        Self {
23            def,
24            ch_id,
25            ca_channel: None,
26            connected: Arc::new(AtomicBool::new(false)),
27        }
28    }
29
30    /// Resolve macro substitutions in the PV name.
31    fn resolve_pv_name(&self, macros: &HashMap<String, String>) -> String {
32        let mut name = self.def.pv_name.clone();
33        for (key, val) in macros {
34            name = name.replace(&format!("{{{key}}}"), val);
35        }
36        name
37    }
38
39    /// Connect to the CA server and optionally start monitoring.
40    pub async fn connect(
41        &mut self,
42        ca_client: &CaClient,
43        macros: &HashMap<String, String>,
44        store: Arc<ChannelStore>,
45        dirty_flags: Vec<Arc<Vec<AtomicBool>>>,
46        ss_wakeups: Vec<Arc<Notify>>,
47        event_flags: Option<Arc<crate::event_flag::EventFlagSet>>,
48    ) {
49        let pv_name = self.resolve_pv_name(macros);
50        if pv_name.is_empty() {
51            return;
52        }
53
54        let ca_channel = ca_client.create_channel(&pv_name);
55
56        // Wait for initial connection
57        if ca_channel
58            .wait_connected(Duration::from_secs(5))
59            .await
60            .is_ok()
61        {
62            self.connected.store(true, Ordering::Release);
63        }
64
65        // Spawn monitor task if this channel is monitored
66        if self.def.monitored {
67            let ch_id = self.ch_id;
68            let connected = self.connected.clone();
69            let sync_ef = self.def.sync_ef;
70
71            // Spawn connection watcher
72            let mut conn_rx = ca_channel.connection_events();
73            let connected_watcher = connected.clone();
74            tokio::spawn(async move {
75                while let Ok(event) = conn_rx.recv().await {
76                    match event {
77                        ConnectionEvent::Connected => {
78                            connected_watcher.store(true, Ordering::Release);
79                        }
80                        ConnectionEvent::Disconnected => {
81                            connected_watcher.store(false, Ordering::Release);
82                        }
83                        ConnectionEvent::AccessRightsChanged { .. } => {}
84                    }
85                }
86            });
87
88            // Spawn monitor task
89            match ca_channel.subscribe().await {
90                Ok(mut monitor) => {
91                    tokio::spawn(async move {
92                        while let Some(result) = monitor.recv().await {
93                            if let Ok(value) = result {
94                                // Update channel store
95                                store.set(ch_id, value);
96
97                                // Mark dirty for all state sets
98                                for ss_dirty in &dirty_flags {
99                                    if let Some(flag) = ss_dirty.get(ch_id) {
100                                        flag.store(true, Ordering::Release);
101                                    }
102                                }
103
104                                // Set synced event flag if configured
105                                if let Some(ef_id) = sync_ef {
106                                    if let Some(efs) = &event_flags {
107                                        efs.set(ef_id);
108                                    }
109                                }
110
111                                // Wake all state sets
112                                for notify in &ss_wakeups {
113                                    notify.notify_one();
114                                }
115                            }
116                        }
117                    });
118                }
119                Err(e) => {
120                    tracing::warn!("failed to subscribe to {pv_name}: {e}");
121                }
122            }
123        }
124
125        self.ca_channel = Some(ca_channel);
126    }
127
128    pub fn is_connected(&self) -> bool {
129        self.connected.load(Ordering::Acquire)
130    }
131
132    pub fn ca_channel(&self) -> Option<&CaChannel> {
133        self.ca_channel.as_ref()
134    }
135}