Skip to main content

spvirit_server/
monitor.rs

1//! Monitor subscription management for the PVA server.
2//!
3//! Tracks per-PV subscriber lists and dispatches monitor update messages.
4
5use std::collections::HashMap;
6
7use tokio::sync::{Mutex, mpsc};
8use tracing::debug;
9
10use spvirit_codec::spvirit_encode::{
11    encode_monitor_data_response_filtered, encode_monitor_data_response_payload,
12};
13use spvirit_types::NtPayload;
14
15use crate::state::MonitorSub;
16
17/// Active connection channels and monitor subscriptions managed by the server.
18pub struct MonitorRegistry {
19    /// PV name → list of active monitor subscriptions.
20    pub monitors: Mutex<HashMap<String, Vec<MonitorSub>>>,
21    /// Connection id → message sender.
22    pub conns: Mutex<HashMap<u64, mpsc::Sender<Vec<u8>>>>,
23}
24
25impl MonitorRegistry {
26    pub fn new() -> Self {
27        Self {
28            monitors: Mutex::new(HashMap::new()),
29            conns: Mutex::new(HashMap::new()),
30        }
31    }
32
33    /// Send a raw message to a connection.
34    pub async fn send_msg(&self, conn_id: u64, msg: Vec<u8>) {
35        let conns = self.conns.lock().await;
36        if let Some(tx) = conns.get(&conn_id) {
37            let _ = tx.send(msg).await;
38        }
39    }
40
41    /// Broadcast a monitor update for `pv_name` to all running subscribers.
42    pub async fn notify_monitors(&self, pv_name: &str, payload: &NtPayload) {
43        let mut to_send: Vec<(u64, Vec<u8>)> = Vec::new();
44        {
45            let mut monitors = self.monitors.lock().await;
46            if let Some(list) = monitors.get_mut(pv_name) {
47                for sub in list.iter_mut() {
48                    if !sub.running {
49                        continue;
50                    }
51                    if sub.pipeline_enabled && sub.nfree == 0 {
52                        continue;
53                    }
54                    let subcmd = 0x00;
55                    if sub.pipeline_enabled && sub.nfree > 0 {
56                        sub.nfree -= 1;
57                    }
58                    let msg = if let Some(ref desc) = sub.filtered_desc {
59                        encode_monitor_data_response_filtered(
60                            sub.ioid,
61                            subcmd,
62                            payload,
63                            desc,
64                            sub.version,
65                            sub.is_be,
66                        )
67                    } else {
68                        encode_monitor_data_response_payload(
69                            sub.ioid,
70                            subcmd,
71                            payload,
72                            sub.version,
73                            sub.is_be,
74                        )
75                    };
76                    to_send.push((sub.conn_id, msg));
77                }
78            }
79        }
80
81        for (conn_id, msg) in to_send {
82            self.send_msg(conn_id, msg).await;
83            debug!("Monitor update pv='{}' conn={}", pv_name, conn_id);
84        }
85    }
86
87    /// Send a monitor update to a specific subscriber.
88    pub async fn send_monitor_update_for(
89        &self,
90        pv_name: &str,
91        conn_id: u64,
92        ioid: u32,
93        payload: &NtPayload,
94    ) {
95        let mut to_send: Option<(u64, Vec<u8>)> = None;
96        {
97            let mut monitors = self.monitors.lock().await;
98            if let Some(list) = monitors.get_mut(pv_name) {
99                if let Some(sub) = list
100                    .iter_mut()
101                    .find(|s| s.conn_id == conn_id && s.ioid == ioid)
102                {
103                    if !sub.running {
104                        return;
105                    }
106                    if sub.pipeline_enabled && sub.nfree == 0 {
107                        return;
108                    }
109                    let subcmd = 0x00;
110                    if sub.pipeline_enabled && sub.nfree > 0 {
111                        sub.nfree -= 1;
112                    }
113                    let msg = if let Some(ref desc) = sub.filtered_desc {
114                        encode_monitor_data_response_filtered(
115                            sub.ioid,
116                            subcmd,
117                            payload,
118                            desc,
119                            sub.version,
120                            sub.is_be,
121                        )
122                    } else {
123                        encode_monitor_data_response_payload(
124                            sub.ioid,
125                            subcmd,
126                            payload,
127                            sub.version,
128                            sub.is_be,
129                        )
130                    };
131                    to_send = Some((sub.conn_id, msg));
132                }
133            }
134        }
135
136        if let Some((conn_id, msg)) = to_send {
137            self.send_msg(conn_id, msg).await;
138        }
139    }
140
141    /// Update a monitor subscription's running/pipeline state.
142    pub async fn update_monitor_subscription(
143        &self,
144        conn_id: u64,
145        ioid: u32,
146        pv_name: &str,
147        running: bool,
148        nfree: Option<u32>,
149        pipeline_enabled: Option<bool>,
150    ) -> bool {
151        let mut monitors = self.monitors.lock().await;
152        if let Some(list) = monitors.get_mut(pv_name) {
153            if let Some(sub) = list
154                .iter_mut()
155                .find(|s| s.conn_id == conn_id && s.ioid == ioid)
156            {
157                sub.running = running;
158                if let Some(v) = nfree {
159                    sub.nfree = v;
160                }
161                if let Some(enabled) = pipeline_enabled {
162                    if enabled {
163                        sub.pipeline_enabled = true;
164                    }
165                }
166                return true;
167            }
168        }
169        false
170    }
171
172    /// Remove a monitor subscription.
173    pub async fn remove_monitor_subscription(&self, conn_id: u64, ioid: u32, pv_name: &str) {
174        let mut monitors = self.monitors.lock().await;
175        if let Some(list) = monitors.get_mut(pv_name) {
176            list.retain(|s| s.conn_id != conn_id || s.ioid != ioid);
177        }
178    }
179
180    /// Remove all subscriptions and connection entries for a given connection.
181    pub async fn cleanup_connection(&self, conn_id: u64) {
182        {
183            let mut monitors = self.monitors.lock().await;
184            for list in monitors.values_mut() {
185                list.retain(|s| s.conn_id != conn_id);
186            }
187        }
188        {
189            let mut conns = self.conns.lock().await;
190            conns.remove(&conn_id);
191        }
192    }
193}