Skip to main content

ios_core/services/dproxy/
mod.rs

1use std::collections::HashMap;
2use std::fs::{self, File};
3use std::io::Write;
4use std::path::{Path, PathBuf};
5use std::time::{SystemTime, UNIX_EPOCH};
6
7use bytes::BytesMut;
8use serde::Serialize;
9use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
10
11#[derive(Debug, Clone, Copy, PartialEq, Eq)]
12pub enum ProxyProtocol {
13    Lockdown,
14    Dtx,
15    Xpc,
16    Binary,
17}
18
19impl ProxyProtocol {
20    fn as_str(self) -> &'static str {
21        match self {
22            ProxyProtocol::Lockdown => "lockdown",
23            ProxyProtocol::Dtx => "dtx",
24            ProxyProtocol::Xpc => "xpc",
25            ProxyProtocol::Binary => "binary",
26        }
27    }
28}
29
30#[derive(Debug, Clone, Copy)]
31pub enum Direction {
32    HostToDevice,
33    DeviceToHost,
34}
35
36impl Direction {
37    fn as_str(self) -> &'static str {
38        match self {
39            Direction::HostToDevice => "host->device",
40            Direction::DeviceToHost => "device->host",
41        }
42    }
43
44    fn file_name(self) -> &'static str {
45        match self {
46            Direction::HostToDevice => "host-to-device.bin",
47            Direction::DeviceToHost => "device-to-host.bin",
48        }
49    }
50}
51
52#[derive(Debug, Serialize)]
53pub struct ProxyEvent {
54    pub timestamp_ms: u128,
55    pub direction: String,
56    pub protocol: String,
57    pub summary: String,
58    pub decoded: serde_json::Value,
59}
60
61#[derive(Debug, thiserror::Error)]
62pub enum DproxyError {
63    #[error("IO error: {0}")]
64    Io(#[from] std::io::Error),
65    #[error("serde error: {0}")]
66    Serde(#[from] serde_json::Error),
67    #[error("DTX decode error: {0}")]
68    Dtx(#[from] crate::services::dtx::DtxError),
69}
70
71pub struct ProxyRecorder {
72    output_dir: PathBuf,
73    events: File,
74    host_to_device_raw: File,
75    device_to_host_raw: File,
76    host_to_device_decoder: StreamDecoder,
77    device_to_host_decoder: StreamDecoder,
78}
79
80impl ProxyRecorder {
81    pub fn new(output_dir: impl AsRef<Path>, protocol: ProxyProtocol) -> Result<Self, DproxyError> {
82        let output_dir = output_dir.as_ref().to_path_buf();
83        fs::create_dir_all(&output_dir)?;
84
85        Ok(Self {
86            events: File::create(output_dir.join("events.ndjson"))?,
87            host_to_device_raw: File::create(output_dir.join(Direction::HostToDevice.file_name()))?,
88            device_to_host_raw: File::create(output_dir.join(Direction::DeviceToHost.file_name()))?,
89            host_to_device_decoder: StreamDecoder::new(protocol),
90            device_to_host_decoder: StreamDecoder::new(protocol),
91            output_dir,
92        })
93    }
94
95    pub fn output_dir(&self) -> &Path {
96        &self.output_dir
97    }
98
99    pub fn record_chunk(&mut self, direction: Direction, chunk: &[u8]) -> Result<(), DproxyError> {
100        if chunk.is_empty() {
101            return Ok(());
102        }
103
104        let events = match direction {
105            Direction::HostToDevice => {
106                self.host_to_device_raw.write_all(chunk)?;
107                self.host_to_device_decoder.push(direction, chunk)?
108            }
109            Direction::DeviceToHost => {
110                self.device_to_host_raw.write_all(chunk)?;
111                self.device_to_host_decoder.push(direction, chunk)?
112            }
113        };
114
115        self.write_events(events)
116    }
117
118    pub fn record_meta_event(
119        &mut self,
120        direction: Direction,
121        protocol: &str,
122        summary: impl Into<String>,
123        decoded: serde_json::Value,
124    ) -> Result<(), DproxyError> {
125        self.write_events(vec![ProxyEvent {
126            timestamp_ms: now_ms(),
127            direction: direction.as_str().to_string(),
128            protocol: protocol.to_string(),
129            summary: summary.into(),
130            decoded,
131        }])
132    }
133
134    fn write_events(&mut self, events: Vec<ProxyEvent>) -> Result<(), DproxyError> {
135        for event in events {
136            serde_json::to_writer(&mut self.events, &event)?;
137            self.events.write_all(b"\n")?;
138            eprintln!("[{}] {} {}", event.protocol, event.direction, event.summary);
139        }
140        self.events.flush()?;
141        Ok(())
142    }
143}
144
145pub async fn proxy_bidirectional<L, R>(
146    local: L,
147    remote: R,
148    recorder: ProxyRecorder,
149) -> Result<(), DproxyError>
150where
151    L: AsyncRead + AsyncWrite + Unpin,
152    R: AsyncRead + AsyncWrite + Unpin,
153{
154    let recorder = std::sync::Arc::new(tokio::sync::Mutex::new(recorder));
155    let (local_reader, local_writer) = tokio::io::split(local);
156    let (remote_reader, remote_writer) = tokio::io::split(remote);
157
158    tokio::try_join!(
159        pump(
160            local_reader,
161            remote_writer,
162            Direction::HostToDevice,
163            recorder.clone()
164        ),
165        pump(
166            remote_reader,
167            local_writer,
168            Direction::DeviceToHost,
169            recorder
170        ),
171    )?;
172
173    Ok(())
174}
175
176async fn pump<R, W>(
177    mut reader: R,
178    mut writer: W,
179    direction: Direction,
180    recorder: std::sync::Arc<tokio::sync::Mutex<ProxyRecorder>>,
181) -> Result<(), DproxyError>
182where
183    R: AsyncRead + Unpin,
184    W: AsyncWrite + Unpin,
185{
186    let mut buf = [0u8; 16 * 1024];
187    loop {
188        let read = reader.read(&mut buf).await?;
189        if read == 0 {
190            writer.shutdown().await?;
191            return Ok(());
192        }
193
194        {
195            let mut recorder = recorder.lock().await;
196            recorder.record_chunk(direction, &buf[..read])?;
197        }
198
199        writer.write_all(&buf[..read]).await?;
200        writer.flush().await?;
201    }
202}
203
204pub struct StreamDecoder {
205    protocol: ProxyProtocol,
206    buffer: BytesMut,
207    xpc_streams: HashMap<u32, BytesMut>,
208    xpc_preface_handled: bool,
209    dtx_broken: bool,
210}
211
212impl StreamDecoder {
213    pub fn new(protocol: ProxyProtocol) -> Self {
214        Self {
215            protocol,
216            buffer: BytesMut::new(),
217            xpc_streams: HashMap::new(),
218            xpc_preface_handled: false,
219            dtx_broken: false,
220        }
221    }
222
223    pub fn push(
224        &mut self,
225        direction: Direction,
226        chunk: &[u8],
227    ) -> Result<Vec<ProxyEvent>, DproxyError> {
228        if self.protocol == ProxyProtocol::Dtx && self.dtx_broken {
229            return Ok(Vec::new());
230        }
231
232        self.buffer.extend_from_slice(chunk);
233        match self.protocol {
234            ProxyProtocol::Lockdown => Ok(self.decode_lockdown(direction)),
235            ProxyProtocol::Dtx => Ok(self.decode_dtx(direction)),
236            ProxyProtocol::Xpc => Ok(self.decode_xpc(direction)),
237            ProxyProtocol::Binary => Ok(Vec::new()),
238        }
239    }
240
241    fn decode_lockdown(&mut self, direction: Direction) -> Vec<ProxyEvent> {
242        let mut events = Vec::new();
243        loop {
244            if self.buffer.len() < 4 {
245                break;
246            }
247            // Safety: self.buffer.len() >= 4 is checked above, so [..4] is exactly 4 bytes
248            // and try_into::<[u8; 4]>() is infallible.
249            let len = u32::from_be_bytes(self.buffer[..4].try_into().unwrap()) as usize;
250            if self.buffer.len() < 4 + len {
251                break;
252            }
253
254            let _ = self.buffer.split_to(4);
255            let payload = self.buffer.split_to(len).freeze();
256            let decoded = plist::from_bytes::<plist::Value>(&payload)
257                .map(plist_to_json)
258                .unwrap_or_else(|_| serde_json::json!({"raw": hex::encode(payload)}));
259            events.push(ProxyEvent {
260                timestamp_ms: now_ms(),
261                direction: direction.as_str().to_string(),
262                protocol: self.protocol.as_str().to_string(),
263                summary: summarize_lockdown(&decoded),
264                decoded,
265            });
266        }
267        events
268    }
269
270    fn decode_dtx(&mut self, direction: Direction) -> Vec<ProxyEvent> {
271        let mut events = Vec::new();
272        loop {
273            match crate::services::dtx::decode_dtx_message_from_bytes(&self.buffer) {
274                Ok(Some((message, consumed))) => {
275                    let _ = self.buffer.split_to(consumed);
276                    let decoded = dtx_message_to_json(&message);
277                    events.push(ProxyEvent {
278                        timestamp_ms: now_ms(),
279                        direction: direction.as_str().to_string(),
280                        protocol: self.protocol.as_str().to_string(),
281                        summary: summarize_dtx(&message),
282                        decoded,
283                    });
284                }
285                Ok(None) => break,
286                Err(err) => {
287                    events.push(decoder_error_event(
288                        direction,
289                        self.protocol,
290                        format!("DTX decode error: {err}"),
291                    ));
292                    self.buffer.clear();
293                    self.dtx_broken = true;
294                    break;
295                }
296            }
297        }
298        events
299    }
300
301    fn decode_xpc(&mut self, direction: Direction) -> Vec<ProxyEvent> {
302        let mut events = Vec::new();
303        loop {
304            if self.consume_xpc_preface() {
305                break;
306            }
307
308            let Some((stream_id, frame_type, payload, consumed)) = try_take_h2_frame(&self.buffer)
309            else {
310                break;
311            };
312            let _ = self.buffer.split_to(consumed);
313            if frame_type != 0x00 {
314                continue;
315            }
316
317            let stream_buffer = self.xpc_streams.entry(stream_id).or_default();
318            stream_buffer.extend_from_slice(&payload);
319
320            loop {
321                match try_take_xpc_message(stream_buffer) {
322                    Ok(Some(message)) => {
323                        let decoded = message
324                            .body
325                            .as_ref()
326                            .map(xpc_value_to_json)
327                            .unwrap_or(serde_json::Value::Null);
328                        events.push(ProxyEvent {
329                            timestamp_ms: now_ms(),
330                            direction: direction.as_str().to_string(),
331                            protocol: self.protocol.as_str().to_string(),
332                            summary: summarize_xpc(stream_id, &message),
333                            decoded,
334                        });
335                    }
336                    Ok(None) => break,
337                    Err(err) => {
338                        events.push(decoder_error_event(direction, self.protocol, err));
339                        stream_buffer.clear();
340                        break;
341                    }
342                }
343            }
344        }
345        events
346    }
347
348    fn consume_xpc_preface(&mut self) -> bool {
349        if self.xpc_preface_handled {
350            return false;
351        }
352
353        let preface = crate::xpc::h2_raw::H2_PREFACE;
354        if self.buffer.len() < preface.len() {
355            if preface.starts_with(self.buffer.as_ref()) {
356                return true;
357            }
358            self.xpc_preface_handled = true;
359            return false;
360        }
361
362        if self.buffer.starts_with(preface) {
363            let _ = self.buffer.split_to(preface.len());
364        }
365        self.xpc_preface_handled = true;
366        false
367    }
368}
369
370fn decoder_error_event(
371    direction: Direction,
372    protocol: ProxyProtocol,
373    summary: impl Into<String>,
374) -> ProxyEvent {
375    ProxyEvent {
376        timestamp_ms: now_ms(),
377        direction: direction.as_str().to_string(),
378        protocol: protocol.as_str().to_string(),
379        summary: summary.into(),
380        decoded: serde_json::Value::Null,
381    }
382}
383
384fn try_take_h2_frame(buffer: &[u8]) -> Option<(u32, u8, Vec<u8>, usize)> {
385    if buffer.len() < 9 {
386        return None;
387    }
388    let len = ((buffer[0] as usize) << 16) | ((buffer[1] as usize) << 8) | buffer[2] as usize;
389    let total = 9 + len;
390    if buffer.len() < total {
391        return None;
392    }
393    let frame_type = buffer[3];
394    let stream_id = u32::from_be_bytes([buffer[5] & 0x7f, buffer[6], buffer[7], buffer[8]]);
395    Some((stream_id, frame_type, buffer[9..total].to_vec(), total))
396}
397
398fn try_take_xpc_message(buffer: &mut BytesMut) -> Result<Option<crate::xpc::XpcMessage>, String> {
399    if buffer.len() < 24 {
400        return Ok(None);
401    }
402
403    let body_len = u64::from_le_bytes(
404        buffer[8..16]
405            .try_into()
406            .map_err(|_| "invalid XPC header".to_string())?,
407    ) as usize;
408    let total = 24usize
409        .checked_add(body_len)
410        .ok_or_else(|| "XPC message length overflow".to_string())?;
411    if buffer.len() < total {
412        return Ok(None);
413    }
414
415    let payload = buffer.split_to(total).freeze();
416    crate::xpc::message::decode_message(payload)
417        .map(Some)
418        .map_err(|err| err.to_string())
419}
420
421fn summarize_lockdown(decoded: &serde_json::Value) -> String {
422    decoded
423        .get("Request")
424        .or_else(|| decoded.get("Error"))
425        .or_else(|| decoded.get("Type"))
426        .map(|value| value.to_string().trim_matches('"').to_string())
427        .unwrap_or_else(|| "lockdown frame".into())
428}
429
430fn summarize_dtx(message: &crate::services::dtx::DtxMessage) -> String {
431    match &message.payload {
432        crate::services::dtx::DtxPayload::MethodInvocation { selector, .. } => format!(
433            "{}.{}{} c{} {}",
434            message.identifier,
435            message.conversation_idx,
436            if message.expects_reply { "e" } else { "" },
437            message.channel_code,
438            selector
439        ),
440        crate::services::dtx::DtxPayload::Response(value) => format!(
441            "{}.{} c{} response {:?}",
442            message.identifier, message.conversation_idx, message.channel_code, value
443        ),
444        crate::services::dtx::DtxPayload::Notification { name, .. } => format!(
445            "{}.{} c{} notify {}",
446            message.identifier, message.conversation_idx, message.channel_code, name
447        ),
448        crate::services::dtx::DtxPayload::Raw(bytes) => format!(
449            "{}.{} c{} raw {} bytes",
450            message.identifier,
451            message.conversation_idx,
452            message.channel_code,
453            bytes.len()
454        ),
455        crate::services::dtx::DtxPayload::RawWithAux { payload, .. } => format!(
456            "{}.{} c{} raw {} bytes",
457            message.identifier,
458            message.conversation_idx,
459            message.channel_code,
460            payload.len()
461        ),
462        crate::services::dtx::DtxPayload::Empty => format!(
463            "{}.{} c{} empty",
464            message.identifier, message.conversation_idx, message.channel_code
465        ),
466    }
467}
468
469fn summarize_xpc(stream_id: u32, message: &crate::xpc::XpcMessage) -> String {
470    let keys = message
471        .body
472        .as_ref()
473        .and_then(crate::xpc::XpcValue::as_dict)
474        .map(|dict| dict.keys().take(4).cloned().collect::<Vec<_>>().join(","))
475        .unwrap_or_else(|| "no-body".into());
476    format!(
477        "stream={} msg_id={} flags=0x{:08x} keys=[{}]",
478        stream_id, message.msg_id, message.flags, keys
479    )
480}
481
482fn dtx_message_to_json(message: &crate::services::dtx::DtxMessage) -> serde_json::Value {
483    let payload = match &message.payload {
484        crate::services::dtx::DtxPayload::MethodInvocation { selector, args } => {
485            serde_json::json!({
486                "type": "method",
487                "selector": selector,
488                "args": args.iter().map(nsobject_to_json).collect::<Vec<_>>(),
489            })
490        }
491        crate::services::dtx::DtxPayload::Response(value) => serde_json::json!({
492            "type": "response",
493            "value": nsobject_to_json(value),
494        }),
495        crate::services::dtx::DtxPayload::Notification { name, object } => serde_json::json!({
496            "type": "notification",
497            "name": name,
498            "object": nsobject_to_json(object),
499        }),
500        crate::services::dtx::DtxPayload::Raw(bytes) => serde_json::json!({
501            "type": "raw",
502            "bytes": hex::encode(bytes),
503        }),
504        crate::services::dtx::DtxPayload::RawWithAux { payload, aux } => serde_json::json!({
505            "type": "raw_with_aux",
506            "payload": hex::encode(payload),
507            "aux": aux.iter().map(nsobject_to_json).collect::<Vec<_>>(),
508        }),
509        crate::services::dtx::DtxPayload::Empty => serde_json::json!({"type": "empty"}),
510    };
511
512    serde_json::json!({
513        "identifier": message.identifier,
514        "conversation_idx": message.conversation_idx,
515        "channel_code": message.channel_code,
516        "expects_reply": message.expects_reply,
517        "payload": payload,
518    })
519}
520
521fn nsobject_to_json(value: &crate::services::dtx::NSObject) -> serde_json::Value {
522    match value {
523        crate::services::dtx::NSObject::Int(value) => serde_json::Value::from(*value),
524        crate::services::dtx::NSObject::Uint(value) => serde_json::Value::from(*value),
525        crate::services::dtx::NSObject::Double(value) => serde_json::Number::from_f64(*value)
526            .map(serde_json::Value::Number)
527            .unwrap_or(serde_json::Value::Null),
528        crate::services::dtx::NSObject::Bool(value) => serde_json::Value::Bool(*value),
529        crate::services::dtx::NSObject::String(value) => serde_json::Value::String(value.clone()),
530        crate::services::dtx::NSObject::Data(value) => {
531            serde_json::Value::String(hex::encode(value))
532        }
533        crate::services::dtx::NSObject::Array(values) => {
534            serde_json::Value::Array(values.iter().map(nsobject_to_json).collect())
535        }
536        crate::services::dtx::NSObject::Dict(values) => serde_json::Value::Object(
537            values
538                .iter()
539                .map(|(key, value)| (key.clone(), nsobject_to_json(value)))
540                .collect(),
541        ),
542        crate::services::dtx::NSObject::Null => serde_json::Value::Null,
543    }
544}
545
546fn xpc_value_to_json(value: &crate::xpc::XpcValue) -> serde_json::Value {
547    match value {
548        crate::xpc::XpcValue::Null => serde_json::Value::Null,
549        crate::xpc::XpcValue::Bool(value) => serde_json::Value::Bool(*value),
550        crate::xpc::XpcValue::Int64(value) => serde_json::Value::from(*value),
551        crate::xpc::XpcValue::Uint64(value) => serde_json::Value::from(*value),
552        crate::xpc::XpcValue::Double(value) => serde_json::Number::from_f64(*value)
553            .map(serde_json::Value::Number)
554            .unwrap_or(serde_json::Value::Null),
555        crate::xpc::XpcValue::Date(value) => serde_json::Value::from(*value),
556        crate::xpc::XpcValue::Data(bytes) => serde_json::Value::String(hex::encode(bytes)),
557        crate::xpc::XpcValue::String(value) => serde_json::Value::String(value.clone()),
558        crate::xpc::XpcValue::Uuid(bytes) => {
559            serde_json::Value::String(uuid::Uuid::from_bytes(*bytes).to_string())
560        }
561        crate::xpc::XpcValue::Array(values) => {
562            serde_json::Value::Array(values.iter().map(xpc_value_to_json).collect())
563        }
564        crate::xpc::XpcValue::Dictionary(values) => serde_json::Value::Object(
565            values
566                .iter()
567                .map(|(key, value)| (key.clone(), xpc_value_to_json(value)))
568                .collect(),
569        ),
570        crate::xpc::XpcValue::FileTransfer { msg_id, data } => serde_json::json!({
571            "msg_id": msg_id,
572            "data": xpc_value_to_json(data),
573        }),
574    }
575}
576
577fn plist_to_json(value: plist::Value) -> serde_json::Value {
578    match value {
579        plist::Value::String(value) => serde_json::Value::String(value),
580        plist::Value::Boolean(value) => serde_json::Value::Bool(value),
581        plist::Value::Integer(value) => value
582            .as_signed()
583            .map(serde_json::Value::from)
584            .or_else(|| value.as_unsigned().map(serde_json::Value::from))
585            .unwrap_or(serde_json::Value::Null),
586        plist::Value::Real(value) => serde_json::Number::from_f64(value)
587            .map(serde_json::Value::Number)
588            .unwrap_or(serde_json::Value::Null),
589        plist::Value::Data(bytes) => serde_json::Value::String(hex::encode(bytes)),
590        plist::Value::Array(values) => {
591            serde_json::Value::Array(values.into_iter().map(plist_to_json).collect())
592        }
593        plist::Value::Dictionary(values) => serde_json::Value::Object(
594            values
595                .into_iter()
596                .map(|(key, value)| (key, plist_to_json(value)))
597                .collect(),
598        ),
599        plist::Value::Date(value) => serde_json::Value::String(value.to_xml_format()),
600        plist::Value::Uid(value) => serde_json::Value::from(value.get()),
601        _ => serde_json::Value::Null,
602    }
603}
604
605fn now_ms() -> u128 {
606    SystemTime::now()
607        .duration_since(UNIX_EPOCH)
608        .map(|value| value.as_millis())
609        .unwrap_or(0)
610}
611
612#[cfg(test)]
613fn build_h2_frame(stream_id: u32, frame_type: u8, payload: &[u8]) -> Vec<u8> {
614    let len = payload.len();
615    let mut frame = Vec::with_capacity(9 + len);
616    frame.push(((len >> 16) & 0xff) as u8);
617    frame.push(((len >> 8) & 0xff) as u8);
618    frame.push((len & 0xff) as u8);
619    frame.push(frame_type);
620    frame.push(0);
621    frame.extend_from_slice(&(stream_id & 0x7fff_ffff).to_be_bytes());
622    frame.extend_from_slice(payload);
623    frame
624}
625
626#[cfg(test)]
627fn build_h2_data_frame(stream_id: u32, payload: &[u8]) -> Vec<u8> {
628    build_h2_frame(stream_id, 0x00, payload)
629}
630
631#[cfg(test)]
632mod tests {
633    use crate::xpc::XpcValue;
634    use indexmap::IndexMap;
635
636    use super::*;
637
638    #[test]
639    fn lockdown_decoder_extracts_complete_frames() {
640        let mut payload = Vec::new();
641        let plist = plist::Value::Dictionary(plist::Dictionary::from_iter([(
642            "Request".to_string(),
643            plist::Value::String("QueryType".into()),
644        )]));
645        plist::to_writer_xml(&mut payload, &plist).unwrap();
646
647        let mut framed = Vec::new();
648        framed.extend_from_slice(&(payload.len() as u32).to_be_bytes());
649        framed.extend_from_slice(&payload);
650
651        let mut decoder = StreamDecoder::new(ProxyProtocol::Lockdown);
652        let events = decoder.push(Direction::HostToDevice, &framed).unwrap();
653
654        assert_eq!(events.len(), 1);
655        assert_eq!(events[0].protocol, "lockdown");
656        assert_eq!(events[0].decoded["Request"], "QueryType");
657    }
658
659    #[test]
660    fn dtx_decoder_reassembles_fragmented_messages() {
661        let selector =
662            crate::proto::nskeyedarchiver_encode::archive_string("_notifyOfPublishedCapabilities:");
663        let encoded = crate::services::dtx::encode_dtx(1, 0, 0, true, 2, &selector, &[]);
664
665        let mut decoder = StreamDecoder::new(ProxyProtocol::Dtx);
666        assert!(decoder
667            .push(Direction::HostToDevice, &encoded[..10])
668            .unwrap()
669            .is_empty());
670        let events = decoder
671            .push(Direction::HostToDevice, &encoded[10..])
672            .unwrap();
673
674        assert_eq!(events.len(), 1);
675        assert!(events[0]
676            .summary
677            .contains("_notifyOfPublishedCapabilities:"));
678    }
679
680    #[test]
681    fn dtx_decoder_reports_errors_without_aborting_recording() {
682        let mut decoder = StreamDecoder::new(ProxyProtocol::Dtx);
683
684        let events = decoder.push(Direction::HostToDevice, &[0u8; 32]).unwrap();
685        assert_eq!(events.len(), 1);
686        assert!(events[0].summary.contains("DTX decode error: bad magic"));
687        assert!(decoder.dtx_broken);
688        assert!(decoder.buffer.is_empty());
689
690        let selector = crate::proto::nskeyedarchiver_encode::archive_string("after-error");
691        let encoded = crate::services::dtx::encode_dtx(2, 0, 0, true, 2, &selector, &[]);
692        assert!(decoder
693            .push(Direction::HostToDevice, &encoded)
694            .unwrap()
695            .is_empty());
696        assert!(decoder.buffer.is_empty());
697    }
698
699    #[test]
700    fn xpc_decoder_reassembles_messages_across_h2_frames() {
701        let payload = crate::xpc::message::encode_message(&crate::xpc::XpcMessage {
702            flags: crate::xpc::message::flags::ALWAYS_SET
703                | crate::xpc::message::flags::DATA
704                | crate::xpc::message::flags::REPLY,
705            msg_id: 7,
706            body: Some(XpcValue::Dictionary(IndexMap::from([(
707                "result".to_string(),
708                XpcValue::String("success".into()),
709            )]))),
710        })
711        .unwrap();
712
713        let first = build_h2_data_frame(3, &payload[..12]);
714        let second = build_h2_data_frame(3, &payload[12..]);
715
716        let mut decoder = StreamDecoder::new(ProxyProtocol::Xpc);
717        assert!(decoder
718            .push(Direction::DeviceToHost, &first)
719            .unwrap()
720            .is_empty());
721        let events = decoder.push(Direction::DeviceToHost, &second).unwrap();
722
723        assert_eq!(events.len(), 1);
724        assert_eq!(events[0].protocol, "xpc");
725        assert_eq!(events[0].decoded["result"], "success");
726    }
727
728    #[test]
729    fn xpc_decoder_skips_split_http2_client_preface() {
730        let payload = crate::xpc::message::encode_message(&crate::xpc::XpcMessage {
731            flags: crate::xpc::message::flags::ALWAYS_SET | crate::xpc::message::flags::DATA,
732            msg_id: 9,
733            body: Some(XpcValue::Dictionary(IndexMap::from([(
734                "request".to_string(),
735                XpcValue::String("ping".into()),
736            )]))),
737        })
738        .unwrap();
739
740        let mut decoder = StreamDecoder::new(ProxyProtocol::Xpc);
741        let preface = crate::xpc::h2_raw::H2_PREFACE;
742        let split_at = 10;
743        assert!(decoder
744            .push(Direction::HostToDevice, &preface[..split_at])
745            .unwrap()
746            .is_empty());
747
748        let mut second = preface[split_at..].to_vec();
749        second.extend_from_slice(&build_h2_frame(0, 0x04, &[]));
750        second.extend_from_slice(&build_h2_data_frame(1, &payload));
751
752        let events = decoder.push(Direction::HostToDevice, &second).unwrap();
753        assert_eq!(events.len(), 1);
754        assert_eq!(events[0].decoded["request"], "ping");
755    }
756}