1use std::net::{Ipv4Addr, Ipv6Addr};
2
3use tokio::io::{AsyncRead, AsyncWrite};
4
5use crate::services::dtx::codec::{DtxConnection, DtxError};
6use crate::services::dtx::types::{DtxMessage, DtxPayload, NSObject};
7
8const MESSAGE_TYPE_INTERFACE_DETECTION: u64 = 0;
9const MESSAGE_TYPE_CONNECTION_DETECTION: u64 = 1;
10const MESSAGE_TYPE_CONNECTION_UPDATE: u64 = 2;
11
12#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize)]
13pub struct SocketAddress {
14 pub family: u8,
15 pub port: u16,
16 pub address: String,
17 pub flow_info: Option<u32>,
18 pub scope_id: Option<u32>,
19}
20
21#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize)]
22pub struct InterfaceDetectionEvent {
23 pub interface_index: u64,
24 pub name: String,
25}
26
27#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize)]
28pub struct ConnectionDetectionEvent {
29 pub local_address: SocketAddress,
30 pub remote_address: SocketAddress,
31 pub interface_index: u64,
32 pub pid: u64,
33 pub recv_buffer_size: u64,
34 pub recv_buffer_used: u64,
35 pub serial_number: u64,
36 pub kind: u64,
37}
38
39#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize)]
40pub struct ConnectionUpdateEvent {
41 pub rx_packets: u64,
42 pub rx_bytes: u64,
43 pub tx_packets: u64,
44 pub tx_bytes: u64,
45 pub rx_dups: u64,
46 pub rx_ooo: u64,
47 pub tx_retx: u64,
48 pub min_rtt: u64,
49 pub avg_rtt: u64,
50 pub connection_serial: u64,
51 pub time: u64,
52}
53
54#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize)]
55#[serde(tag = "type", rename_all = "snake_case")]
56pub enum NetworkMonitorEvent {
57 InterfaceDetection(InterfaceDetectionEvent),
58 ConnectionDetection(ConnectionDetectionEvent),
59 ConnectionUpdate(ConnectionUpdateEvent),
60}
61
62pub struct NetworkMonitorClient<S> {
63 conn: DtxConnection<S>,
64 channel_code: i32,
65}
66
67impl<S: AsyncRead + AsyncWrite + Unpin + Send> NetworkMonitorClient<S> {
68 pub async fn connect(stream: S) -> Result<Self, DtxError> {
69 let mut conn = DtxConnection::new(stream);
70 let channel_code = conn.request_channel(super::NETWORK_MONITOR_SVC).await?;
71 conn.method_call_async(channel_code, "startMonitoring", &[])
72 .await?;
73 Ok(Self { conn, channel_code })
74 }
75
76 pub async fn next_event(&mut self) -> Result<NetworkMonitorEvent, DtxError> {
77 loop {
78 let msg = self.conn.recv().await?;
79 if msg.expects_reply {
80 self.conn.send_ack(&msg).await?;
81 }
82 if let Some(event) = parse_network_message(&msg)? {
83 return Ok(event);
84 }
85 }
86 }
87
88 pub async fn stop(&mut self) -> Result<(), DtxError> {
89 self.conn
90 .method_call(self.channel_code, "stopMonitoring", &[])
91 .await?;
92 Ok(())
93 }
94}
95
96fn parse_network_message(msg: &DtxMessage) -> Result<Option<NetworkMonitorEvent>, DtxError> {
97 let Some((kind, args)) = extract_event_tuple(&msg.payload)? else {
98 return Ok(None);
99 };
100
101 let event = match kind {
102 MESSAGE_TYPE_INTERFACE_DETECTION => {
103 let Some(interface_index) = args.first().and_then(as_u64) else {
104 return Ok(None);
105 };
106 let Some(name) = args.get(1).and_then(as_string) else {
107 return Ok(None);
108 };
109 NetworkMonitorEvent::InterfaceDetection(InterfaceDetectionEvent {
110 interface_index,
111 name,
112 })
113 }
114 MESSAGE_TYPE_CONNECTION_DETECTION => {
115 if args.len() < 8 {
116 return Ok(None);
117 }
118 let Ok(local_address) = parse_socket_address(&args[0]) else {
119 return Ok(None);
120 };
121 let Ok(remote_address) = parse_socket_address(&args[1]) else {
122 return Ok(None);
123 };
124 let Some(interface_index) = as_u64(&args[2]) else {
125 return Ok(None);
126 };
127 let Some(pid) = as_u64(&args[3]) else {
128 return Ok(None);
129 };
130 let Some(recv_buffer_size) = as_u64(&args[4]) else {
131 return Ok(None);
132 };
133 let Some(recv_buffer_used) = as_u64(&args[5]) else {
134 return Ok(None);
135 };
136 let Some(serial_number) = as_u64(&args[6]) else {
137 return Ok(None);
138 };
139 let Some(kind) = as_u64(&args[7]) else {
140 return Ok(None);
141 };
142 NetworkMonitorEvent::ConnectionDetection(ConnectionDetectionEvent {
143 local_address,
144 remote_address,
145 interface_index,
146 pid,
147 recv_buffer_size,
148 recv_buffer_used,
149 serial_number,
150 kind,
151 })
152 }
153 MESSAGE_TYPE_CONNECTION_UPDATE => {
154 if args.len() < 11 {
155 return Ok(None);
156 }
157 let Some(rx_packets) = as_u64(&args[0]) else {
158 return Ok(None);
159 };
160 let Some(rx_bytes) = as_u64(&args[1]) else {
161 return Ok(None);
162 };
163 let Some(tx_packets) = as_u64(&args[2]) else {
164 return Ok(None);
165 };
166 let Some(tx_bytes) = as_u64(&args[3]) else {
167 return Ok(None);
168 };
169 let Some(rx_dups) = as_u64(&args[4]) else {
170 return Ok(None);
171 };
172 let Some(rx_ooo) = as_u64(&args[5]) else {
173 return Ok(None);
174 };
175 let Some(tx_retx) = as_u64(&args[6]) else {
176 return Ok(None);
177 };
178 let Some(min_rtt) = as_u64(&args[7]) else {
179 return Ok(None);
180 };
181 let Some(avg_rtt) = as_u64(&args[8]) else {
182 return Ok(None);
183 };
184 let Some(connection_serial) = as_u64(&args[9]) else {
185 return Ok(None);
186 };
187 let Some(time) = as_u64(&args[10]) else {
188 return Ok(None);
189 };
190 NetworkMonitorEvent::ConnectionUpdate(ConnectionUpdateEvent {
191 rx_packets,
192 rx_bytes,
193 tx_packets,
194 tx_bytes,
195 rx_dups,
196 rx_ooo,
197 tx_retx,
198 min_rtt,
199 avg_rtt,
200 connection_serial,
201 time,
202 })
203 }
204 _ => return Ok(None),
205 };
206
207 Ok(Some(event))
208}
209
210fn extract_event_tuple(payload: &DtxPayload) -> Result<Option<(u64, Vec<NSObject>)>, DtxError> {
211 match payload {
212 DtxPayload::MethodInvocation { selector, args } => {
213 if let Ok(kind) = selector.parse::<u64>() {
214 return Ok(Some((kind, args.clone())));
215 }
216 if let Some((kind, values)) = event_tuple_from_objects(args) {
217 return Ok(Some((kind, values)));
218 }
219 Ok(None)
220 }
221 DtxPayload::Response(value) => Ok(event_tuple_from_value(value)),
222 DtxPayload::Raw(bytes) => match super::unarchive_raw_payload(bytes) {
223 Some(value) => Ok(event_tuple_from_value(&value)),
224 None => Ok(None),
225 },
226 _ => Ok(None),
227 }
228}
229
230fn event_tuple_from_value(value: &NSObject) -> Option<(u64, Vec<NSObject>)> {
231 match value {
232 NSObject::Array(values) => event_tuple_from_objects(values),
233 _ => None,
234 }
235}
236
237fn event_tuple_from_objects(values: &[NSObject]) -> Option<(u64, Vec<NSObject>)> {
238 if values.len() < 2 {
239 return None;
240 }
241 let kind = as_u64(&values[0])?;
242 match &values[1] {
243 NSObject::Array(items) => Some((kind, items.clone())),
244 _ => Some((kind, values[1..].to_vec())),
245 }
246}
247
248fn parse_socket_address(value: &NSObject) -> Result<SocketAddress, DtxError> {
249 let data = match value {
250 NSObject::Data(bytes) => bytes,
251 other => {
252 return Err(DtxError::Protocol(format!(
253 "socket address expected NSData, got {other:?}"
254 )))
255 }
256 };
257
258 if data.len() < 4 {
259 return Err(DtxError::Protocol(
260 "socket address payload too short".into(),
261 ));
262 }
263
264 let length = data[0] as usize;
265 let family = data[1];
266 let port = u16::from_be_bytes([data[2], data[3]]);
267
268 match length {
269 0x10 if data.len() >= 16 => {
270 let address = Ipv4Addr::new(data[4], data[5], data[6], data[7]).to_string();
271 Ok(SocketAddress {
272 family,
273 port,
274 address,
275 flow_info: None,
276 scope_id: None,
277 })
278 }
279 0x1C if data.len() >= 28 => {
280 let flow_info = u32::from_le_bytes([data[4], data[5], data[6], data[7]]);
281 let mut address = [0u8; 16];
282 address.copy_from_slice(&data[8..24]);
283 let scope_id = u32::from_le_bytes([data[24], data[25], data[26], data[27]]);
284 Ok(SocketAddress {
285 family,
286 port,
287 address: Ipv6Addr::from(address).to_string(),
288 flow_info: Some(flow_info),
289 scope_id: Some(scope_id),
290 })
291 }
292 _ => Err(DtxError::Protocol(format!(
293 "unsupported socket address length {length}"
294 ))),
295 }
296}
297
298fn as_u64(value: &NSObject) -> Option<u64> {
299 match value {
300 NSObject::Uint(value) => Some(*value),
301 NSObject::Int(value) if *value >= 0 => Some(*value as u64),
302 _ => None,
303 }
304}
305
306fn as_string(value: &NSObject) -> Option<String> {
307 match value {
308 NSObject::String(value) => Some(value.clone()),
309 _ => None,
310 }
311}
312
313#[cfg(test)]
314mod tests {
315 use bytes::Bytes;
316
317 use super::*;
318
319 #[test]
320 fn parses_interface_detection_payload() {
321 let msg = DtxMessage {
322 identifier: 1,
323 conversation_idx: 0,
324 channel_code: 4,
325 expects_reply: false,
326 payload: DtxPayload::Response(NSObject::Array(vec![
327 NSObject::Int(0),
328 NSObject::Array(vec![NSObject::Int(3), NSObject::String("en0".into())]),
329 ])),
330 };
331
332 let event = parse_network_message(&msg).unwrap().unwrap();
333 assert_eq!(
334 event,
335 NetworkMonitorEvent::InterfaceDetection(InterfaceDetectionEvent {
336 interface_index: 3,
337 name: "en0".into(),
338 })
339 );
340 }
341
342 #[test]
343 fn parses_connection_detection_payload() {
344 let msg = DtxMessage {
345 identifier: 1,
346 conversation_idx: 0,
347 channel_code: 4,
348 expects_reply: false,
349 payload: DtxPayload::Response(NSObject::Array(vec![
350 NSObject::Int(1),
351 NSObject::Array(vec![
352 NSObject::Data(Bytes::from_static(&[
353 0x10, 0x02, 0x00, 0x50, 127, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0,
354 ])),
355 NSObject::Data(Bytes::from_static(&[
356 0x10, 0x02, 0x01, 0xbb, 8, 8, 8, 8, 0, 0, 0, 0, 0, 0, 0, 0,
357 ])),
358 NSObject::Int(2),
359 NSObject::Int(123),
360 NSObject::Int(4096),
361 NSObject::Int(512),
362 NSObject::Int(99),
363 NSObject::Int(1),
364 ]),
365 ])),
366 };
367
368 let event = parse_network_message(&msg).unwrap().unwrap();
369 match event {
370 NetworkMonitorEvent::ConnectionDetection(event) => {
371 assert_eq!(event.local_address.address, "127.0.0.1");
372 assert_eq!(event.local_address.port, 80);
373 assert_eq!(event.remote_address.address, "8.8.8.8");
374 assert_eq!(event.remote_address.port, 443);
375 assert_eq!(event.pid, 123);
376 assert_eq!(event.serial_number, 99);
377 }
378 other => panic!("unexpected event: {other:?}"),
379 }
380 }
381
382 #[test]
383 fn parses_connection_update_payload() {
384 let msg = DtxMessage {
385 identifier: 1,
386 conversation_idx: 0,
387 channel_code: 4,
388 expects_reply: false,
389 payload: DtxPayload::Response(NSObject::Array(vec![
390 NSObject::Int(2),
391 NSObject::Array(vec![
392 NSObject::Int(1),
393 NSObject::Int(2),
394 NSObject::Int(3),
395 NSObject::Int(4),
396 NSObject::Int(5),
397 NSObject::Int(6),
398 NSObject::Int(7),
399 NSObject::Int(8),
400 NSObject::Int(9),
401 NSObject::Int(10),
402 NSObject::Int(11),
403 ]),
404 ])),
405 };
406
407 let event = parse_network_message(&msg).unwrap().unwrap();
408 match event {
409 NetworkMonitorEvent::ConnectionUpdate(event) => {
410 assert_eq!(event.rx_packets, 1);
411 assert_eq!(event.tx_bytes, 4);
412 assert_eq!(event.connection_serial, 10);
413 assert_eq!(event.time, 11);
414 }
415 other => panic!("unexpected event: {other:?}"),
416 }
417 }
418}