1use std::collections::HashMap;
2use std::fs::{self, File};
3use std::io::Write;
4use std::path::{Path, PathBuf};
5use std::time::{SystemTime, UNIX_EPOCH};
6
7use bytes::BytesMut;
8use serde::Serialize;
9use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
10
11#[derive(Debug, Clone, Copy, PartialEq, Eq)]
12pub enum ProxyProtocol {
13 Lockdown,
14 Dtx,
15 Xpc,
16 Binary,
17}
18
19impl ProxyProtocol {
20 fn as_str(self) -> &'static str {
21 match self {
22 ProxyProtocol::Lockdown => "lockdown",
23 ProxyProtocol::Dtx => "dtx",
24 ProxyProtocol::Xpc => "xpc",
25 ProxyProtocol::Binary => "binary",
26 }
27 }
28}
29
30#[derive(Debug, Clone, Copy)]
31pub enum Direction {
32 HostToDevice,
33 DeviceToHost,
34}
35
36impl Direction {
37 fn as_str(self) -> &'static str {
38 match self {
39 Direction::HostToDevice => "host->device",
40 Direction::DeviceToHost => "device->host",
41 }
42 }
43
44 fn file_name(self) -> &'static str {
45 match self {
46 Direction::HostToDevice => "host-to-device.bin",
47 Direction::DeviceToHost => "device-to-host.bin",
48 }
49 }
50}
51
52#[derive(Debug, Serialize)]
53pub struct ProxyEvent {
54 pub timestamp_ms: u128,
55 pub direction: String,
56 pub protocol: String,
57 pub summary: String,
58 pub decoded: serde_json::Value,
59}
60
61#[derive(Debug, thiserror::Error)]
62pub enum DproxyError {
63 #[error("IO error: {0}")]
64 Io(#[from] std::io::Error),
65 #[error("serde error: {0}")]
66 Serde(#[from] serde_json::Error),
67 #[error("DTX decode error: {0}")]
68 Dtx(#[from] crate::services::dtx::DtxError),
69}
70
71pub struct ProxyRecorder {
72 output_dir: PathBuf,
73 events: File,
74 host_to_device_raw: File,
75 device_to_host_raw: File,
76 host_to_device_decoder: StreamDecoder,
77 device_to_host_decoder: StreamDecoder,
78}
79
80impl ProxyRecorder {
81 pub fn new(output_dir: impl AsRef<Path>, protocol: ProxyProtocol) -> Result<Self, DproxyError> {
82 let output_dir = output_dir.as_ref().to_path_buf();
83 fs::create_dir_all(&output_dir)?;
84
85 Ok(Self {
86 events: File::create(output_dir.join("events.ndjson"))?,
87 host_to_device_raw: File::create(output_dir.join(Direction::HostToDevice.file_name()))?,
88 device_to_host_raw: File::create(output_dir.join(Direction::DeviceToHost.file_name()))?,
89 host_to_device_decoder: StreamDecoder::new(protocol),
90 device_to_host_decoder: StreamDecoder::new(protocol),
91 output_dir,
92 })
93 }
94
95 pub fn output_dir(&self) -> &Path {
96 &self.output_dir
97 }
98
99 pub fn record_chunk(&mut self, direction: Direction, chunk: &[u8]) -> Result<(), DproxyError> {
100 if chunk.is_empty() {
101 return Ok(());
102 }
103
104 let events = match direction {
105 Direction::HostToDevice => {
106 self.host_to_device_raw.write_all(chunk)?;
107 self.host_to_device_decoder.push(direction, chunk)?
108 }
109 Direction::DeviceToHost => {
110 self.device_to_host_raw.write_all(chunk)?;
111 self.device_to_host_decoder.push(direction, chunk)?
112 }
113 };
114
115 self.write_events(events)
116 }
117
118 pub fn record_meta_event(
119 &mut self,
120 direction: Direction,
121 protocol: &str,
122 summary: impl Into<String>,
123 decoded: serde_json::Value,
124 ) -> Result<(), DproxyError> {
125 self.write_events(vec![ProxyEvent {
126 timestamp_ms: now_ms(),
127 direction: direction.as_str().to_string(),
128 protocol: protocol.to_string(),
129 summary: summary.into(),
130 decoded,
131 }])
132 }
133
134 fn write_events(&mut self, events: Vec<ProxyEvent>) -> Result<(), DproxyError> {
135 for event in events {
136 serde_json::to_writer(&mut self.events, &event)?;
137 self.events.write_all(b"\n")?;
138 eprintln!("[{}] {} {}", event.protocol, event.direction, event.summary);
139 }
140 self.events.flush()?;
141 Ok(())
142 }
143}
144
145pub async fn proxy_bidirectional<L, R>(
146 local: L,
147 remote: R,
148 recorder: ProxyRecorder,
149) -> Result<(), DproxyError>
150where
151 L: AsyncRead + AsyncWrite + Unpin,
152 R: AsyncRead + AsyncWrite + Unpin,
153{
154 let recorder = std::sync::Arc::new(tokio::sync::Mutex::new(recorder));
155 let (local_reader, local_writer) = tokio::io::split(local);
156 let (remote_reader, remote_writer) = tokio::io::split(remote);
157
158 tokio::try_join!(
159 pump(
160 local_reader,
161 remote_writer,
162 Direction::HostToDevice,
163 recorder.clone()
164 ),
165 pump(
166 remote_reader,
167 local_writer,
168 Direction::DeviceToHost,
169 recorder
170 ),
171 )?;
172
173 Ok(())
174}
175
176async fn pump<R, W>(
177 mut reader: R,
178 mut writer: W,
179 direction: Direction,
180 recorder: std::sync::Arc<tokio::sync::Mutex<ProxyRecorder>>,
181) -> Result<(), DproxyError>
182where
183 R: AsyncRead + Unpin,
184 W: AsyncWrite + Unpin,
185{
186 let mut buf = [0u8; 16 * 1024];
187 loop {
188 let read = reader.read(&mut buf).await?;
189 if read == 0 {
190 writer.shutdown().await?;
191 return Ok(());
192 }
193
194 {
195 let mut recorder = recorder.lock().await;
196 recorder.record_chunk(direction, &buf[..read])?;
197 }
198
199 writer.write_all(&buf[..read]).await?;
200 writer.flush().await?;
201 }
202}
203
204pub struct StreamDecoder {
205 protocol: ProxyProtocol,
206 buffer: BytesMut,
207 xpc_streams: HashMap<u32, BytesMut>,
208 xpc_preface_handled: bool,
209 dtx_broken: bool,
210}
211
212impl StreamDecoder {
213 pub fn new(protocol: ProxyProtocol) -> Self {
214 Self {
215 protocol,
216 buffer: BytesMut::new(),
217 xpc_streams: HashMap::new(),
218 xpc_preface_handled: false,
219 dtx_broken: false,
220 }
221 }
222
223 pub fn push(
224 &mut self,
225 direction: Direction,
226 chunk: &[u8],
227 ) -> Result<Vec<ProxyEvent>, DproxyError> {
228 if self.protocol == ProxyProtocol::Dtx && self.dtx_broken {
229 return Ok(Vec::new());
230 }
231
232 self.buffer.extend_from_slice(chunk);
233 match self.protocol {
234 ProxyProtocol::Lockdown => Ok(self.decode_lockdown(direction)),
235 ProxyProtocol::Dtx => Ok(self.decode_dtx(direction)),
236 ProxyProtocol::Xpc => Ok(self.decode_xpc(direction)),
237 ProxyProtocol::Binary => Ok(Vec::new()),
238 }
239 }
240
241 fn decode_lockdown(&mut self, direction: Direction) -> Vec<ProxyEvent> {
242 let mut events = Vec::new();
243 loop {
244 if self.buffer.len() < 4 {
245 break;
246 }
247 let len = u32::from_be_bytes(self.buffer[..4].try_into().unwrap()) as usize;
250 if self.buffer.len() < 4 + len {
251 break;
252 }
253
254 let _ = self.buffer.split_to(4);
255 let payload = self.buffer.split_to(len).freeze();
256 let decoded = plist::from_bytes::<plist::Value>(&payload)
257 .map(plist_to_json)
258 .unwrap_or_else(|_| serde_json::json!({"raw": hex::encode(payload)}));
259 events.push(ProxyEvent {
260 timestamp_ms: now_ms(),
261 direction: direction.as_str().to_string(),
262 protocol: self.protocol.as_str().to_string(),
263 summary: summarize_lockdown(&decoded),
264 decoded,
265 });
266 }
267 events
268 }
269
270 fn decode_dtx(&mut self, direction: Direction) -> Vec<ProxyEvent> {
271 let mut events = Vec::new();
272 loop {
273 match crate::services::dtx::decode_dtx_message_from_bytes(&self.buffer) {
274 Ok(Some((message, consumed))) => {
275 let _ = self.buffer.split_to(consumed);
276 let decoded = dtx_message_to_json(&message);
277 events.push(ProxyEvent {
278 timestamp_ms: now_ms(),
279 direction: direction.as_str().to_string(),
280 protocol: self.protocol.as_str().to_string(),
281 summary: summarize_dtx(&message),
282 decoded,
283 });
284 }
285 Ok(None) => break,
286 Err(err) => {
287 events.push(decoder_error_event(
288 direction,
289 self.protocol,
290 format!("DTX decode error: {err}"),
291 ));
292 self.buffer.clear();
293 self.dtx_broken = true;
294 break;
295 }
296 }
297 }
298 events
299 }
300
301 fn decode_xpc(&mut self, direction: Direction) -> Vec<ProxyEvent> {
302 let mut events = Vec::new();
303 loop {
304 if self.consume_xpc_preface() {
305 break;
306 }
307
308 let Some((stream_id, frame_type, payload, consumed)) = try_take_h2_frame(&self.buffer)
309 else {
310 break;
311 };
312 let _ = self.buffer.split_to(consumed);
313 if frame_type != 0x00 {
314 continue;
315 }
316
317 let stream_buffer = self.xpc_streams.entry(stream_id).or_default();
318 stream_buffer.extend_from_slice(&payload);
319
320 loop {
321 match try_take_xpc_message(stream_buffer) {
322 Ok(Some(message)) => {
323 let decoded = message
324 .body
325 .as_ref()
326 .map(xpc_value_to_json)
327 .unwrap_or(serde_json::Value::Null);
328 events.push(ProxyEvent {
329 timestamp_ms: now_ms(),
330 direction: direction.as_str().to_string(),
331 protocol: self.protocol.as_str().to_string(),
332 summary: summarize_xpc(stream_id, &message),
333 decoded,
334 });
335 }
336 Ok(None) => break,
337 Err(err) => {
338 events.push(decoder_error_event(direction, self.protocol, err));
339 stream_buffer.clear();
340 break;
341 }
342 }
343 }
344 }
345 events
346 }
347
348 fn consume_xpc_preface(&mut self) -> bool {
349 if self.xpc_preface_handled {
350 return false;
351 }
352
353 let preface = crate::xpc::h2_raw::H2_PREFACE;
354 if self.buffer.len() < preface.len() {
355 if preface.starts_with(self.buffer.as_ref()) {
356 return true;
357 }
358 self.xpc_preface_handled = true;
359 return false;
360 }
361
362 if self.buffer.starts_with(preface) {
363 let _ = self.buffer.split_to(preface.len());
364 }
365 self.xpc_preface_handled = true;
366 false
367 }
368}
369
370fn decoder_error_event(
371 direction: Direction,
372 protocol: ProxyProtocol,
373 summary: impl Into<String>,
374) -> ProxyEvent {
375 ProxyEvent {
376 timestamp_ms: now_ms(),
377 direction: direction.as_str().to_string(),
378 protocol: protocol.as_str().to_string(),
379 summary: summary.into(),
380 decoded: serde_json::Value::Null,
381 }
382}
383
384fn try_take_h2_frame(buffer: &[u8]) -> Option<(u32, u8, Vec<u8>, usize)> {
385 if buffer.len() < 9 {
386 return None;
387 }
388 let len = ((buffer[0] as usize) << 16) | ((buffer[1] as usize) << 8) | buffer[2] as usize;
389 let total = 9 + len;
390 if buffer.len() < total {
391 return None;
392 }
393 let frame_type = buffer[3];
394 let stream_id = u32::from_be_bytes([buffer[5] & 0x7f, buffer[6], buffer[7], buffer[8]]);
395 Some((stream_id, frame_type, buffer[9..total].to_vec(), total))
396}
397
398fn try_take_xpc_message(buffer: &mut BytesMut) -> Result<Option<crate::xpc::XpcMessage>, String> {
399 if buffer.len() < 24 {
400 return Ok(None);
401 }
402
403 let body_len = u64::from_le_bytes(
404 buffer[8..16]
405 .try_into()
406 .map_err(|_| "invalid XPC header".to_string())?,
407 ) as usize;
408 let total = 24usize
409 .checked_add(body_len)
410 .ok_or_else(|| "XPC message length overflow".to_string())?;
411 if buffer.len() < total {
412 return Ok(None);
413 }
414
415 let payload = buffer.split_to(total).freeze();
416 crate::xpc::message::decode_message(payload)
417 .map(Some)
418 .map_err(|err| err.to_string())
419}
420
421fn summarize_lockdown(decoded: &serde_json::Value) -> String {
422 decoded
423 .get("Request")
424 .or_else(|| decoded.get("Error"))
425 .or_else(|| decoded.get("Type"))
426 .map(|value| value.to_string().trim_matches('"').to_string())
427 .unwrap_or_else(|| "lockdown frame".into())
428}
429
430fn summarize_dtx(message: &crate::services::dtx::DtxMessage) -> String {
431 match &message.payload {
432 crate::services::dtx::DtxPayload::MethodInvocation { selector, .. } => format!(
433 "{}.{}{} c{} {}",
434 message.identifier,
435 message.conversation_idx,
436 if message.expects_reply { "e" } else { "" },
437 message.channel_code,
438 selector
439 ),
440 crate::services::dtx::DtxPayload::Response(value) => format!(
441 "{}.{} c{} response {:?}",
442 message.identifier, message.conversation_idx, message.channel_code, value
443 ),
444 crate::services::dtx::DtxPayload::Notification { name, .. } => format!(
445 "{}.{} c{} notify {}",
446 message.identifier, message.conversation_idx, message.channel_code, name
447 ),
448 crate::services::dtx::DtxPayload::Raw(bytes) => format!(
449 "{}.{} c{} raw {} bytes",
450 message.identifier,
451 message.conversation_idx,
452 message.channel_code,
453 bytes.len()
454 ),
455 crate::services::dtx::DtxPayload::RawWithAux { payload, .. } => format!(
456 "{}.{} c{} raw {} bytes",
457 message.identifier,
458 message.conversation_idx,
459 message.channel_code,
460 payload.len()
461 ),
462 crate::services::dtx::DtxPayload::Empty => format!(
463 "{}.{} c{} empty",
464 message.identifier, message.conversation_idx, message.channel_code
465 ),
466 }
467}
468
469fn summarize_xpc(stream_id: u32, message: &crate::xpc::XpcMessage) -> String {
470 let keys = message
471 .body
472 .as_ref()
473 .and_then(crate::xpc::XpcValue::as_dict)
474 .map(|dict| dict.keys().take(4).cloned().collect::<Vec<_>>().join(","))
475 .unwrap_or_else(|| "no-body".into());
476 format!(
477 "stream={} msg_id={} flags=0x{:08x} keys=[{}]",
478 stream_id, message.msg_id, message.flags, keys
479 )
480}
481
482fn dtx_message_to_json(message: &crate::services::dtx::DtxMessage) -> serde_json::Value {
483 let payload = match &message.payload {
484 crate::services::dtx::DtxPayload::MethodInvocation { selector, args } => {
485 serde_json::json!({
486 "type": "method",
487 "selector": selector,
488 "args": args.iter().map(nsobject_to_json).collect::<Vec<_>>(),
489 })
490 }
491 crate::services::dtx::DtxPayload::Response(value) => serde_json::json!({
492 "type": "response",
493 "value": nsobject_to_json(value),
494 }),
495 crate::services::dtx::DtxPayload::Notification { name, object } => serde_json::json!({
496 "type": "notification",
497 "name": name,
498 "object": nsobject_to_json(object),
499 }),
500 crate::services::dtx::DtxPayload::Raw(bytes) => serde_json::json!({
501 "type": "raw",
502 "bytes": hex::encode(bytes),
503 }),
504 crate::services::dtx::DtxPayload::RawWithAux { payload, aux } => serde_json::json!({
505 "type": "raw_with_aux",
506 "payload": hex::encode(payload),
507 "aux": aux.iter().map(nsobject_to_json).collect::<Vec<_>>(),
508 }),
509 crate::services::dtx::DtxPayload::Empty => serde_json::json!({"type": "empty"}),
510 };
511
512 serde_json::json!({
513 "identifier": message.identifier,
514 "conversation_idx": message.conversation_idx,
515 "channel_code": message.channel_code,
516 "expects_reply": message.expects_reply,
517 "payload": payload,
518 })
519}
520
521fn nsobject_to_json(value: &crate::services::dtx::NSObject) -> serde_json::Value {
522 match value {
523 crate::services::dtx::NSObject::Int(value) => serde_json::Value::from(*value),
524 crate::services::dtx::NSObject::Uint(value) => serde_json::Value::from(*value),
525 crate::services::dtx::NSObject::Double(value) => serde_json::Number::from_f64(*value)
526 .map(serde_json::Value::Number)
527 .unwrap_or(serde_json::Value::Null),
528 crate::services::dtx::NSObject::Bool(value) => serde_json::Value::Bool(*value),
529 crate::services::dtx::NSObject::String(value) => serde_json::Value::String(value.clone()),
530 crate::services::dtx::NSObject::Data(value) => {
531 serde_json::Value::String(hex::encode(value))
532 }
533 crate::services::dtx::NSObject::Array(values) => {
534 serde_json::Value::Array(values.iter().map(nsobject_to_json).collect())
535 }
536 crate::services::dtx::NSObject::Dict(values) => serde_json::Value::Object(
537 values
538 .iter()
539 .map(|(key, value)| (key.clone(), nsobject_to_json(value)))
540 .collect(),
541 ),
542 crate::services::dtx::NSObject::Null => serde_json::Value::Null,
543 }
544}
545
546fn xpc_value_to_json(value: &crate::xpc::XpcValue) -> serde_json::Value {
547 match value {
548 crate::xpc::XpcValue::Null => serde_json::Value::Null,
549 crate::xpc::XpcValue::Bool(value) => serde_json::Value::Bool(*value),
550 crate::xpc::XpcValue::Int64(value) => serde_json::Value::from(*value),
551 crate::xpc::XpcValue::Uint64(value) => serde_json::Value::from(*value),
552 crate::xpc::XpcValue::Double(value) => serde_json::Number::from_f64(*value)
553 .map(serde_json::Value::Number)
554 .unwrap_or(serde_json::Value::Null),
555 crate::xpc::XpcValue::Date(value) => serde_json::Value::from(*value),
556 crate::xpc::XpcValue::Data(bytes) => serde_json::Value::String(hex::encode(bytes)),
557 crate::xpc::XpcValue::String(value) => serde_json::Value::String(value.clone()),
558 crate::xpc::XpcValue::Uuid(bytes) => {
559 serde_json::Value::String(uuid::Uuid::from_bytes(*bytes).to_string())
560 }
561 crate::xpc::XpcValue::Array(values) => {
562 serde_json::Value::Array(values.iter().map(xpc_value_to_json).collect())
563 }
564 crate::xpc::XpcValue::Dictionary(values) => serde_json::Value::Object(
565 values
566 .iter()
567 .map(|(key, value)| (key.clone(), xpc_value_to_json(value)))
568 .collect(),
569 ),
570 crate::xpc::XpcValue::FileTransfer { msg_id, data } => serde_json::json!({
571 "msg_id": msg_id,
572 "data": xpc_value_to_json(data),
573 }),
574 }
575}
576
577fn plist_to_json(value: plist::Value) -> serde_json::Value {
578 match value {
579 plist::Value::String(value) => serde_json::Value::String(value),
580 plist::Value::Boolean(value) => serde_json::Value::Bool(value),
581 plist::Value::Integer(value) => value
582 .as_signed()
583 .map(serde_json::Value::from)
584 .or_else(|| value.as_unsigned().map(serde_json::Value::from))
585 .unwrap_or(serde_json::Value::Null),
586 plist::Value::Real(value) => serde_json::Number::from_f64(value)
587 .map(serde_json::Value::Number)
588 .unwrap_or(serde_json::Value::Null),
589 plist::Value::Data(bytes) => serde_json::Value::String(hex::encode(bytes)),
590 plist::Value::Array(values) => {
591 serde_json::Value::Array(values.into_iter().map(plist_to_json).collect())
592 }
593 plist::Value::Dictionary(values) => serde_json::Value::Object(
594 values
595 .into_iter()
596 .map(|(key, value)| (key, plist_to_json(value)))
597 .collect(),
598 ),
599 plist::Value::Date(value) => serde_json::Value::String(value.to_xml_format()),
600 plist::Value::Uid(value) => serde_json::Value::from(value.get()),
601 _ => serde_json::Value::Null,
602 }
603}
604
605fn now_ms() -> u128 {
606 SystemTime::now()
607 .duration_since(UNIX_EPOCH)
608 .map(|value| value.as_millis())
609 .unwrap_or(0)
610}
611
612#[cfg(test)]
613fn build_h2_frame(stream_id: u32, frame_type: u8, payload: &[u8]) -> Vec<u8> {
614 let len = payload.len();
615 let mut frame = Vec::with_capacity(9 + len);
616 frame.push(((len >> 16) & 0xff) as u8);
617 frame.push(((len >> 8) & 0xff) as u8);
618 frame.push((len & 0xff) as u8);
619 frame.push(frame_type);
620 frame.push(0);
621 frame.extend_from_slice(&(stream_id & 0x7fff_ffff).to_be_bytes());
622 frame.extend_from_slice(payload);
623 frame
624}
625
626#[cfg(test)]
627fn build_h2_data_frame(stream_id: u32, payload: &[u8]) -> Vec<u8> {
628 build_h2_frame(stream_id, 0x00, payload)
629}
630
631#[cfg(test)]
632mod tests {
633 use crate::xpc::XpcValue;
634 use indexmap::IndexMap;
635
636 use super::*;
637
638 #[test]
639 fn lockdown_decoder_extracts_complete_frames() {
640 let mut payload = Vec::new();
641 let plist = plist::Value::Dictionary(plist::Dictionary::from_iter([(
642 "Request".to_string(),
643 plist::Value::String("QueryType".into()),
644 )]));
645 plist::to_writer_xml(&mut payload, &plist).unwrap();
646
647 let mut framed = Vec::new();
648 framed.extend_from_slice(&(payload.len() as u32).to_be_bytes());
649 framed.extend_from_slice(&payload);
650
651 let mut decoder = StreamDecoder::new(ProxyProtocol::Lockdown);
652 let events = decoder.push(Direction::HostToDevice, &framed).unwrap();
653
654 assert_eq!(events.len(), 1);
655 assert_eq!(events[0].protocol, "lockdown");
656 assert_eq!(events[0].decoded["Request"], "QueryType");
657 }
658
659 #[test]
660 fn dtx_decoder_reassembles_fragmented_messages() {
661 let selector =
662 crate::proto::nskeyedarchiver_encode::archive_string("_notifyOfPublishedCapabilities:");
663 let encoded = crate::services::dtx::encode_dtx(1, 0, 0, true, 2, &selector, &[]);
664
665 let mut decoder = StreamDecoder::new(ProxyProtocol::Dtx);
666 assert!(decoder
667 .push(Direction::HostToDevice, &encoded[..10])
668 .unwrap()
669 .is_empty());
670 let events = decoder
671 .push(Direction::HostToDevice, &encoded[10..])
672 .unwrap();
673
674 assert_eq!(events.len(), 1);
675 assert!(events[0]
676 .summary
677 .contains("_notifyOfPublishedCapabilities:"));
678 }
679
680 #[test]
681 fn dtx_decoder_reports_errors_without_aborting_recording() {
682 let mut decoder = StreamDecoder::new(ProxyProtocol::Dtx);
683
684 let events = decoder.push(Direction::HostToDevice, &[0u8; 32]).unwrap();
685 assert_eq!(events.len(), 1);
686 assert!(events[0].summary.contains("DTX decode error: bad magic"));
687 assert!(decoder.dtx_broken);
688 assert!(decoder.buffer.is_empty());
689
690 let selector = crate::proto::nskeyedarchiver_encode::archive_string("after-error");
691 let encoded = crate::services::dtx::encode_dtx(2, 0, 0, true, 2, &selector, &[]);
692 assert!(decoder
693 .push(Direction::HostToDevice, &encoded)
694 .unwrap()
695 .is_empty());
696 assert!(decoder.buffer.is_empty());
697 }
698
699 #[test]
700 fn xpc_decoder_reassembles_messages_across_h2_frames() {
701 let payload = crate::xpc::message::encode_message(&crate::xpc::XpcMessage {
702 flags: crate::xpc::message::flags::ALWAYS_SET
703 | crate::xpc::message::flags::DATA
704 | crate::xpc::message::flags::REPLY,
705 msg_id: 7,
706 body: Some(XpcValue::Dictionary(IndexMap::from([(
707 "result".to_string(),
708 XpcValue::String("success".into()),
709 )]))),
710 })
711 .unwrap();
712
713 let first = build_h2_data_frame(3, &payload[..12]);
714 let second = build_h2_data_frame(3, &payload[12..]);
715
716 let mut decoder = StreamDecoder::new(ProxyProtocol::Xpc);
717 assert!(decoder
718 .push(Direction::DeviceToHost, &first)
719 .unwrap()
720 .is_empty());
721 let events = decoder.push(Direction::DeviceToHost, &second).unwrap();
722
723 assert_eq!(events.len(), 1);
724 assert_eq!(events[0].protocol, "xpc");
725 assert_eq!(events[0].decoded["result"], "success");
726 }
727
728 #[test]
729 fn xpc_decoder_skips_split_http2_client_preface() {
730 let payload = crate::xpc::message::encode_message(&crate::xpc::XpcMessage {
731 flags: crate::xpc::message::flags::ALWAYS_SET | crate::xpc::message::flags::DATA,
732 msg_id: 9,
733 body: Some(XpcValue::Dictionary(IndexMap::from([(
734 "request".to_string(),
735 XpcValue::String("ping".into()),
736 )]))),
737 })
738 .unwrap();
739
740 let mut decoder = StreamDecoder::new(ProxyProtocol::Xpc);
741 let preface = crate::xpc::h2_raw::H2_PREFACE;
742 let split_at = 10;
743 assert!(decoder
744 .push(Direction::HostToDevice, &preface[..split_at])
745 .unwrap()
746 .is_empty());
747
748 let mut second = preface[split_at..].to_vec();
749 second.extend_from_slice(&build_h2_frame(0, 0x04, &[]));
750 second.extend_from_slice(&build_h2_data_frame(1, &payload));
751
752 let events = decoder.push(Direction::HostToDevice, &second).unwrap();
753 assert_eq!(events.len(), 1);
754 assert_eq!(events[0].decoded["request"], "ping");
755 }
756}