spvirit_server/
monitor.rs1use std::collections::HashMap;
6
7use tokio::sync::{mpsc, Mutex};
8use tracing::debug;
9
10use spvirit_codec::spvirit_encode::encode_monitor_data_response_payload;
11use spvirit_types::NtPayload;
12
13use crate::state::MonitorSub;
14
15pub struct MonitorRegistry {
17 pub monitors: Mutex<HashMap<String, Vec<MonitorSub>>>,
19 pub conns: Mutex<HashMap<u64, mpsc::Sender<Vec<u8>>>>,
21}
22
23impl MonitorRegistry {
24 pub fn new() -> Self {
25 Self {
26 monitors: Mutex::new(HashMap::new()),
27 conns: Mutex::new(HashMap::new()),
28 }
29 }
30
31 pub async fn send_msg(&self, conn_id: u64, msg: Vec<u8>) {
33 let conns = self.conns.lock().await;
34 if let Some(tx) = conns.get(&conn_id) {
35 let _ = tx.send(msg).await;
36 }
37 }
38
39 pub async fn notify_monitors(&self, pv_name: &str, payload: &NtPayload) {
41 let mut to_send: Vec<(u64, Vec<u8>)> = Vec::new();
42 {
43 let mut monitors = self.monitors.lock().await;
44 if let Some(list) = monitors.get_mut(pv_name) {
45 for sub in list.iter_mut() {
46 if !sub.running {
47 continue;
48 }
49 if sub.pipeline_enabled && sub.nfree == 0 {
50 continue;
51 }
52 let subcmd = 0x00;
53 if sub.pipeline_enabled && sub.nfree > 0 {
54 sub.nfree -= 1;
55 }
56 let msg = encode_monitor_data_response_payload(
57 sub.ioid,
58 subcmd,
59 payload,
60 sub.version,
61 sub.is_be,
62 );
63 to_send.push((sub.conn_id, msg));
64 }
65 }
66 }
67
68 for (conn_id, msg) in to_send {
69 self.send_msg(conn_id, msg).await;
70 debug!("Monitor update pv='{}' conn={}", pv_name, conn_id);
71 }
72 }
73
74 pub async fn send_monitor_update_for(
76 &self,
77 pv_name: &str,
78 conn_id: u64,
79 ioid: u32,
80 payload: &NtPayload,
81 ) {
82 let mut to_send: Option<(u64, Vec<u8>)> = None;
83 {
84 let mut monitors = self.monitors.lock().await;
85 if let Some(list) = monitors.get_mut(pv_name) {
86 if let Some(sub) = list
87 .iter_mut()
88 .find(|s| s.conn_id == conn_id && s.ioid == ioid)
89 {
90 if !sub.running {
91 return;
92 }
93 if sub.pipeline_enabled && sub.nfree == 0 {
94 return;
95 }
96 let subcmd = 0x00;
97 if sub.pipeline_enabled && sub.nfree > 0 {
98 sub.nfree -= 1;
99 }
100 let msg = encode_monitor_data_response_payload(
101 sub.ioid,
102 subcmd,
103 payload,
104 sub.version,
105 sub.is_be,
106 );
107 to_send = Some((sub.conn_id, msg));
108 }
109 }
110 }
111
112 if let Some((conn_id, msg)) = to_send {
113 self.send_msg(conn_id, msg).await;
114 }
115 }
116
117 pub async fn update_monitor_subscription(
119 &self,
120 conn_id: u64,
121 ioid: u32,
122 pv_name: &str,
123 running: bool,
124 nfree: Option<u32>,
125 pipeline_enabled: Option<bool>,
126 ) -> bool {
127 let mut monitors = self.monitors.lock().await;
128 if let Some(list) = monitors.get_mut(pv_name) {
129 if let Some(sub) = list
130 .iter_mut()
131 .find(|s| s.conn_id == conn_id && s.ioid == ioid)
132 {
133 sub.running = running;
134 if let Some(v) = nfree {
135 sub.nfree = v;
136 }
137 if let Some(enabled) = pipeline_enabled {
138 if enabled {
139 sub.pipeline_enabled = true;
140 }
141 }
142 return true;
143 }
144 }
145 false
146 }
147
148 pub async fn remove_monitor_subscription(
150 &self,
151 conn_id: u64,
152 ioid: u32,
153 pv_name: &str,
154 ) {
155 let mut monitors = self.monitors.lock().await;
156 if let Some(list) = monitors.get_mut(pv_name) {
157 list.retain(|s| s.conn_id != conn_id || s.ioid != ioid);
158 }
159 }
160
161 pub async fn cleanup_connection(&self, conn_id: u64) {
163 {
164 let mut monitors = self.monitors.lock().await;
165 for list in monitors.values_mut() {
166 list.retain(|s| s.conn_id != conn_id);
167 }
168 }
169 {
170 let mut conns = self.conns.lock().await;
171 conns.remove(&conn_id);
172 }
173 }
174}