spvirit_server/
monitor.rs1use std::collections::HashMap;
6
7use tokio::sync::{Mutex, mpsc};
8use tracing::debug;
9
10use spvirit_codec::spvirit_encode::{
11 encode_monitor_data_response_filtered, encode_monitor_data_response_payload,
12};
13use spvirit_types::NtPayload;
14
15use crate::state::MonitorSub;
16
17pub struct MonitorRegistry {
19 pub monitors: Mutex<HashMap<String, Vec<MonitorSub>>>,
21 pub conns: Mutex<HashMap<u64, mpsc::Sender<Vec<u8>>>>,
23}
24
25impl MonitorRegistry {
26 pub fn new() -> Self {
27 Self {
28 monitors: Mutex::new(HashMap::new()),
29 conns: Mutex::new(HashMap::new()),
30 }
31 }
32
33 pub async fn send_msg(&self, conn_id: u64, msg: Vec<u8>) {
35 let conns = self.conns.lock().await;
36 if let Some(tx) = conns.get(&conn_id) {
37 let _ = tx.send(msg).await;
38 }
39 }
40
41 pub async fn notify_monitors(&self, pv_name: &str, payload: &NtPayload) {
43 let mut to_send: Vec<(u64, Vec<u8>)> = Vec::new();
44 {
45 let mut monitors = self.monitors.lock().await;
46 if let Some(list) = monitors.get_mut(pv_name) {
47 for sub in list.iter_mut() {
48 if !sub.running {
49 continue;
50 }
51 if sub.pipeline_enabled && sub.nfree == 0 {
52 continue;
53 }
54 let subcmd = 0x00;
55 if sub.pipeline_enabled && sub.nfree > 0 {
56 sub.nfree -= 1;
57 }
58 let msg = if let Some(ref desc) = sub.filtered_desc {
59 encode_monitor_data_response_filtered(
60 sub.ioid,
61 subcmd,
62 payload,
63 desc,
64 sub.version,
65 sub.is_be,
66 )
67 } else {
68 encode_monitor_data_response_payload(
69 sub.ioid,
70 subcmd,
71 payload,
72 sub.version,
73 sub.is_be,
74 )
75 };
76 to_send.push((sub.conn_id, msg));
77 }
78 }
79 }
80
81 for (conn_id, msg) in to_send {
82 self.send_msg(conn_id, msg).await;
83 debug!("Monitor update pv='{}' conn={}", pv_name, conn_id);
84 }
85 }
86
87 pub async fn send_monitor_update_for(
89 &self,
90 pv_name: &str,
91 conn_id: u64,
92 ioid: u32,
93 payload: &NtPayload,
94 ) {
95 let mut to_send: Option<(u64, Vec<u8>)> = None;
96 {
97 let mut monitors = self.monitors.lock().await;
98 if let Some(list) = monitors.get_mut(pv_name) {
99 if let Some(sub) = list
100 .iter_mut()
101 .find(|s| s.conn_id == conn_id && s.ioid == ioid)
102 {
103 if !sub.running {
104 return;
105 }
106 if sub.pipeline_enabled && sub.nfree == 0 {
107 return;
108 }
109 let subcmd = 0x00;
110 if sub.pipeline_enabled && sub.nfree > 0 {
111 sub.nfree -= 1;
112 }
113 let msg = if let Some(ref desc) = sub.filtered_desc {
114 encode_monitor_data_response_filtered(
115 sub.ioid,
116 subcmd,
117 payload,
118 desc,
119 sub.version,
120 sub.is_be,
121 )
122 } else {
123 encode_monitor_data_response_payload(
124 sub.ioid,
125 subcmd,
126 payload,
127 sub.version,
128 sub.is_be,
129 )
130 };
131 to_send = Some((sub.conn_id, msg));
132 }
133 }
134 }
135
136 if let Some((conn_id, msg)) = to_send {
137 self.send_msg(conn_id, msg).await;
138 }
139 }
140
141 pub async fn update_monitor_subscription(
143 &self,
144 conn_id: u64,
145 ioid: u32,
146 pv_name: &str,
147 running: bool,
148 nfree: Option<u32>,
149 pipeline_enabled: Option<bool>,
150 ) -> bool {
151 let mut monitors = self.monitors.lock().await;
152 if let Some(list) = monitors.get_mut(pv_name) {
153 if let Some(sub) = list
154 .iter_mut()
155 .find(|s| s.conn_id == conn_id && s.ioid == ioid)
156 {
157 sub.running = running;
158 if let Some(v) = nfree {
159 sub.nfree = v;
160 }
161 if let Some(enabled) = pipeline_enabled {
162 if enabled {
163 sub.pipeline_enabled = true;
164 }
165 }
166 return true;
167 }
168 }
169 false
170 }
171
172 pub async fn remove_monitor_subscription(&self, conn_id: u64, ioid: u32, pv_name: &str) {
174 let mut monitors = self.monitors.lock().await;
175 if let Some(list) = monitors.get_mut(pv_name) {
176 list.retain(|s| s.conn_id != conn_id || s.ioid != ioid);
177 }
178 }
179
180 pub async fn cleanup_connection(&self, conn_id: u64) {
182 {
183 let mut monitors = self.monitors.lock().await;
184 for list in monitors.values_mut() {
185 list.retain(|s| s.conn_id != conn_id);
186 }
187 }
188 {
189 let mut conns = self.conns.lock().await;
190 conns.remove(&conn_id);
191 }
192 }
193}