Skip to main content

spvirit_server/
monitor.rs

1//! Monitor subscription management for the PVA server.
2//!
3//! Tracks per-PV subscriber lists and dispatches monitor update messages.
4
5use std::collections::HashMap;
6
7use tokio::sync::{mpsc, Mutex};
8use tracing::debug;
9
10use spvirit_codec::spvirit_encode::encode_monitor_data_response_payload;
11use spvirit_types::NtPayload;
12
13use crate::state::MonitorSub;
14
15/// Active connection channels and monitor subscriptions managed by the server.
16pub struct MonitorRegistry {
17    /// PV name → list of active monitor subscriptions.
18    pub monitors: Mutex<HashMap<String, Vec<MonitorSub>>>,
19    /// Connection id → message sender.
20    pub conns: Mutex<HashMap<u64, mpsc::Sender<Vec<u8>>>>,
21}
22
23impl MonitorRegistry {
24    pub fn new() -> Self {
25        Self {
26            monitors: Mutex::new(HashMap::new()),
27            conns: Mutex::new(HashMap::new()),
28        }
29    }
30
31    /// Send a raw message to a connection.
32    pub async fn send_msg(&self, conn_id: u64, msg: Vec<u8>) {
33        let conns = self.conns.lock().await;
34        if let Some(tx) = conns.get(&conn_id) {
35            let _ = tx.send(msg).await;
36        }
37    }
38
39    /// Broadcast a monitor update for `pv_name` to all running subscribers.
40    pub async fn notify_monitors(&self, pv_name: &str, payload: &NtPayload) {
41        let mut to_send: Vec<(u64, Vec<u8>)> = Vec::new();
42        {
43            let mut monitors = self.monitors.lock().await;
44            if let Some(list) = monitors.get_mut(pv_name) {
45                for sub in list.iter_mut() {
46                    if !sub.running {
47                        continue;
48                    }
49                    if sub.pipeline_enabled && sub.nfree == 0 {
50                        continue;
51                    }
52                    let subcmd = 0x00;
53                    if sub.pipeline_enabled && sub.nfree > 0 {
54                        sub.nfree -= 1;
55                    }
56                    let msg = encode_monitor_data_response_payload(
57                        sub.ioid,
58                        subcmd,
59                        payload,
60                        sub.version,
61                        sub.is_be,
62                    );
63                    to_send.push((sub.conn_id, msg));
64                }
65            }
66        }
67
68        for (conn_id, msg) in to_send {
69            self.send_msg(conn_id, msg).await;
70            debug!("Monitor update pv='{}' conn={}", pv_name, conn_id);
71        }
72    }
73
74    /// Send a monitor update to a specific subscriber.
75    pub async fn send_monitor_update_for(
76        &self,
77        pv_name: &str,
78        conn_id: u64,
79        ioid: u32,
80        payload: &NtPayload,
81    ) {
82        let mut to_send: Option<(u64, Vec<u8>)> = None;
83        {
84            let mut monitors = self.monitors.lock().await;
85            if let Some(list) = monitors.get_mut(pv_name) {
86                if let Some(sub) = list
87                    .iter_mut()
88                    .find(|s| s.conn_id == conn_id && s.ioid == ioid)
89                {
90                    if !sub.running {
91                        return;
92                    }
93                    if sub.pipeline_enabled && sub.nfree == 0 {
94                        return;
95                    }
96                    let subcmd = 0x00;
97                    if sub.pipeline_enabled && sub.nfree > 0 {
98                        sub.nfree -= 1;
99                    }
100                    let msg = encode_monitor_data_response_payload(
101                        sub.ioid,
102                        subcmd,
103                        payload,
104                        sub.version,
105                        sub.is_be,
106                    );
107                    to_send = Some((sub.conn_id, msg));
108                }
109            }
110        }
111
112        if let Some((conn_id, msg)) = to_send {
113            self.send_msg(conn_id, msg).await;
114        }
115    }
116
117    /// Update a monitor subscription's running/pipeline state.
118    pub async fn update_monitor_subscription(
119        &self,
120        conn_id: u64,
121        ioid: u32,
122        pv_name: &str,
123        running: bool,
124        nfree: Option<u32>,
125        pipeline_enabled: Option<bool>,
126    ) -> bool {
127        let mut monitors = self.monitors.lock().await;
128        if let Some(list) = monitors.get_mut(pv_name) {
129            if let Some(sub) = list
130                .iter_mut()
131                .find(|s| s.conn_id == conn_id && s.ioid == ioid)
132            {
133                sub.running = running;
134                if let Some(v) = nfree {
135                    sub.nfree = v;
136                }
137                if let Some(enabled) = pipeline_enabled {
138                    if enabled {
139                        sub.pipeline_enabled = true;
140                    }
141                }
142                return true;
143            }
144        }
145        false
146    }
147
148    /// Remove a monitor subscription.
149    pub async fn remove_monitor_subscription(
150        &self,
151        conn_id: u64,
152        ioid: u32,
153        pv_name: &str,
154    ) {
155        let mut monitors = self.monitors.lock().await;
156        if let Some(list) = monitors.get_mut(pv_name) {
157            list.retain(|s| s.conn_id != conn_id || s.ioid != ioid);
158        }
159    }
160
161    /// Remove all subscriptions and connection entries for a given connection.
162    pub async fn cleanup_connection(&self, conn_id: u64) {
163        {
164            let mut monitors = self.monitors.lock().await;
165            for list in monitors.values_mut() {
166                list.retain(|s| s.conn_id != conn_id);
167            }
168        }
169        {
170            let mut conns = self.conns.lock().await;
171            conns.remove(&conn_id);
172        }
173    }
174}