Skip to main content

ios_core/xpc/
client.rs

1//! XPC client: high-level wrapper around XpcConnection for service calls.
2
3use std::net::{Ipv6Addr, SocketAddr};
4
5use tokio::io::{AsyncRead, AsyncWrite};
6use tokio::net::TcpStream;
7
8use crate::xpc::h2_raw::H2Framer;
9use crate::xpc::message::{flags, XpcMessage, XpcValue};
10use crate::xpc::rsd::{initialize_xpc_connection_on_framer, XpcConnection};
11use crate::xpc::XpcError;
12
13trait XpcStream: AsyncRead + AsyncWrite + Unpin + Send {}
14impl<T> XpcStream for T where T: AsyncRead + AsyncWrite + Unpin + Send {}
15
16type DynStream = Box<dyn XpcStream>;
17
18/// High-level XPC client for iOS 17+ services.
19pub struct XpcClient {
20    inner: XpcConnection<DynStream>,
21}
22
23impl XpcClient {
24    /// Connect to an XPC service at the given IPv6 address and port.
25    pub async fn connect(addr: Ipv6Addr, port: u16) -> Result<Self, XpcError> {
26        let sock_addr = SocketAddr::new(addr.into(), port);
27        let stream = TcpStream::connect(sock_addr).await?;
28        Self::connect_stream(stream).await
29    }
30
31    /// Connect to an XPC service over an already-established stream.
32    pub async fn connect_stream<S>(stream: S) -> Result<Self, XpcError>
33    where
34        S: AsyncRead + AsyncWrite + Unpin + Send + 'static,
35    {
36        let stream: DynStream = Box::new(stream);
37        let mut framer = H2Framer::connect(stream)
38            .await
39            .map_err(|e| XpcError::Tls(format!("H2: {e}")))?;
40        initialize_xpc_connection_on_framer(&mut framer).await?;
41        Ok(Self {
42            inner: XpcConnection::new(framer),
43        })
44    }
45
46    /// Send an XPC dictionary and receive the response.
47    pub async fn call(&mut self, body: XpcValue) -> Result<XpcMessage, XpcError> {
48        let msg_id = self
49            .inner
50            .send_with_flags(body, flags::WANTING_REPLY)
51            .await?;
52        self.inner
53            .recv_reply_on_stream(crate::xpc::h2_raw::STREAM_SERVER_CLIENT, msg_id)
54            .await
55    }
56
57    /// Send an XPC dictionary and receive the response from stream 1.
58    pub async fn call_recv_client_server(
59        &mut self,
60        body: XpcValue,
61    ) -> Result<XpcMessage, XpcError> {
62        let msg_id = self
63            .inner
64            .send_with_flags(body, flags::WANTING_REPLY)
65            .await?;
66        self.inner
67            .recv_reply_on_stream(crate::xpc::h2_raw::STREAM_CLIENT_SERVER, msg_id)
68            .await
69    }
70
71    /// Send without waiting for a response.
72    pub async fn send(&mut self, body: XpcValue) -> Result<(), XpcError> {
73        self.inner.send(body).await
74    }
75
76    /// Receive the next XPC message.
77    pub async fn recv(&mut self) -> Result<XpcMessage, XpcError> {
78        self.inner.recv().await
79    }
80
81    /// Receive the next XPC message from stream 1.
82    pub async fn recv_client_server(&mut self) -> Result<XpcMessage, XpcError> {
83        self.inner.recv_client_server().await
84    }
85}
86
87#[cfg(test)]
88mod tests {
89    use bytes::Bytes;
90    use indexmap::IndexMap;
91    use tokio::io::{duplex, AsyncReadExt, AsyncWriteExt};
92    use tokio::time::{timeout, Duration};
93
94    use super::*;
95    use crate::xpc::message::{encode_message, flags, XpcMessage, XpcValue};
96
97    const FRAME_DATA: u8 = 0x00;
98    const FRAME_HEADERS: u8 = 0x01;
99    const FRAME_SETTINGS: u8 = 0x04;
100    const FLAG_END_HEADERS: u8 = 0x04;
101    const FLAG_SETTINGS_ACK: u8 = 0x01;
102    const STREAM_INIT: u32 = 0;
103    const STREAM_CLIENT_SERVER: u32 = 1;
104    const STREAM_SERVER_CLIENT: u32 = 3;
105
106    fn build_frame(frame_type: u8, flags: u8, stream_id: u32, payload: &[u8]) -> Vec<u8> {
107        let len = payload.len();
108        let mut out = Vec::with_capacity(9 + len);
109        out.push(((len >> 16) & 0xFF) as u8);
110        out.push(((len >> 8) & 0xFF) as u8);
111        out.push((len & 0xFF) as u8);
112        out.push(frame_type);
113        out.push(flags);
114        out.extend_from_slice(&(stream_id & 0x7FFF_FFFF).to_be_bytes());
115        out.extend_from_slice(payload);
116        out
117    }
118
119    fn settings_frame() -> Vec<u8> {
120        build_frame(FRAME_SETTINGS, 0, STREAM_INIT, &[])
121    }
122
123    fn settings_ack_frame() -> Vec<u8> {
124        build_frame(FRAME_SETTINGS, FLAG_SETTINGS_ACK, STREAM_INIT, &[])
125    }
126
127    fn headers_frame(stream_id: u32) -> Vec<u8> {
128        build_frame(FRAME_HEADERS, FLAG_END_HEADERS, stream_id, &[])
129    }
130
131    fn data_frame(stream_id: u32, payload: &[u8]) -> Vec<u8> {
132        build_frame(FRAME_DATA, 0, stream_id, payload)
133    }
134
135    fn empty_message(flags: u32) -> Bytes {
136        encode_message(&XpcMessage {
137            flags,
138            msg_id: 0,
139            body: Some(XpcValue::Dictionary(IndexMap::new()))
140                .filter(|_| flags == flags::ALWAYS_SET),
141        })
142        .expect("message should encode")
143    }
144
145    #[tokio::test]
146    async fn connect_stream_bootstraps_remote_xpc_before_returning() {
147        let (client, mut server) = duplex(4096);
148
149        let msg1 = empty_message(flags::ALWAYS_SET);
150        let msg2 = encode_message(&XpcMessage {
151            flags: flags::ALWAYS_SET,
152            msg_id: 0,
153            body: None,
154        })
155        .expect("message should encode");
156        let msg3 = encode_message(&XpcMessage {
157            flags: flags::ALWAYS_SET,
158            msg_id: 0,
159            body: None,
160        })
161        .expect("message should encode");
162
163        let server_task = tokio::spawn(async move {
164            let mut preface = [0u8; 24];
165            server.read_exact(&mut preface).await.unwrap();
166            assert_eq!(&preface, crate::xpc::h2_raw::H2_PREFACE);
167
168            let mut settings = [0u8; 21];
169            server.read_exact(&mut settings).await.unwrap();
170            assert_eq!(settings[3], FRAME_SETTINGS);
171
172            let mut window_update = [0u8; 13];
173            server.read_exact(&mut window_update).await.unwrap();
174            assert_eq!(window_update[3], 0x08);
175
176            server.write_all(&settings_frame()).await.unwrap();
177            server.flush().await.unwrap();
178
179            let mut ack = [0u8; 9];
180            server.read_exact(&mut ack).await.unwrap();
181            assert_eq!(ack, settings_ack_frame().as_slice());
182
183            let mut cs_headers = [0u8; 9];
184            server.read_exact(&mut cs_headers).await.unwrap();
185            assert_eq!(cs_headers, headers_frame(STREAM_CLIENT_SERVER).as_slice());
186
187            let mut cs_msg1_header = [0u8; 9];
188            server.read_exact(&mut cs_msg1_header).await.unwrap();
189            assert_eq!(cs_msg1_header[3], FRAME_DATA);
190            let cs_msg1_len = ((cs_msg1_header[0] as usize) << 16)
191                | ((cs_msg1_header[1] as usize) << 8)
192                | (cs_msg1_header[2] as usize);
193            let mut cs_msg1 = vec![0u8; cs_msg1_len];
194            server.read_exact(&mut cs_msg1).await.unwrap();
195            assert_eq!(cs_msg1, msg1);
196
197            server
198                .write_all(&data_frame(STREAM_CLIENT_SERVER, &msg2))
199                .await
200                .unwrap();
201            server.flush().await.unwrap();
202
203            let mut sc_headers = [0u8; 9];
204            server.read_exact(&mut sc_headers).await.unwrap();
205            assert_eq!(sc_headers, headers_frame(STREAM_SERVER_CLIENT).as_slice());
206
207            let mut sc_msg2_header = [0u8; 9];
208            server.read_exact(&mut sc_msg2_header).await.unwrap();
209            assert_eq!(sc_msg2_header[3], FRAME_DATA);
210            let sc_msg2_len = ((sc_msg2_header[0] as usize) << 16)
211                | ((sc_msg2_header[1] as usize) << 8)
212                | (sc_msg2_header[2] as usize);
213            let mut sc_msg2 = vec![0u8; sc_msg2_len];
214            server.read_exact(&mut sc_msg2).await.unwrap();
215            assert_eq!(
216                decode_message_payload(&sc_msg2),
217                (flags::INIT_HANDSHAKE | flags::ALWAYS_SET, 0)
218            );
219
220            server
221                .write_all(&data_frame(STREAM_SERVER_CLIENT, &msg2))
222                .await
223                .unwrap();
224            server.flush().await.unwrap();
225
226            let mut cs_msg3_header = [0u8; 9];
227            server.read_exact(&mut cs_msg3_header).await.unwrap();
228            assert_eq!(cs_msg3_header[3], FRAME_DATA);
229            let cs_msg3_len = ((cs_msg3_header[0] as usize) << 16)
230                | ((cs_msg3_header[1] as usize) << 8)
231                | (cs_msg3_header[2] as usize);
232            let mut cs_msg3 = vec![0u8; cs_msg3_len];
233            server.read_exact(&mut cs_msg3).await.unwrap();
234            assert_eq!(
235                decode_message_payload(&cs_msg3),
236                (flags::ALWAYS_SET | 0x200, 0)
237            );
238
239            server
240                .write_all(&data_frame(STREAM_CLIENT_SERVER, &msg3))
241                .await
242                .unwrap();
243            server.flush().await.unwrap();
244        });
245
246        timeout(Duration::from_secs(1), XpcClient::connect_stream(client))
247            .await
248            .expect("connect timed out")
249            .expect("connect should succeed");
250
251        server_task.await.unwrap();
252    }
253
254    #[tokio::test]
255    async fn call_sets_wanting_reply_on_outgoing_request() {
256        let (client, mut server) = duplex(4096);
257
258        let empty = encode_message(&XpcMessage {
259            flags: flags::ALWAYS_SET,
260            msg_id: 0,
261            body: None,
262        })
263        .expect("message should encode");
264        let reply = encode_message(&XpcMessage {
265            flags: flags::ALWAYS_SET | flags::REPLY | flags::DATA,
266            msg_id: 1,
267            body: Some(XpcValue::Dictionary(IndexMap::new())),
268        })
269        .expect("message should encode");
270
271        let server_task = tokio::spawn(async move {
272            let mut preface = [0u8; 24];
273            server.read_exact(&mut preface).await.unwrap();
274            assert_eq!(&preface, crate::xpc::h2_raw::H2_PREFACE);
275
276            let mut settings = [0u8; 21];
277            server.read_exact(&mut settings).await.unwrap();
278            assert_eq!(settings[3], FRAME_SETTINGS);
279
280            let mut window_update = [0u8; 13];
281            server.read_exact(&mut window_update).await.unwrap();
282            assert_eq!(window_update[3], 0x08);
283
284            server.write_all(&settings_frame()).await.unwrap();
285            server.flush().await.unwrap();
286
287            let mut ack = [0u8; 9];
288            server.read_exact(&mut ack).await.unwrap();
289            assert_eq!(ack, settings_ack_frame().as_slice());
290
291            let mut cs_headers = [0u8; 9];
292            server.read_exact(&mut cs_headers).await.unwrap();
293            assert_eq!(cs_headers, headers_frame(STREAM_CLIENT_SERVER).as_slice());
294
295            let mut cs_msg1_header = [0u8; 9];
296            server.read_exact(&mut cs_msg1_header).await.unwrap();
297            let cs_msg1_len = ((cs_msg1_header[0] as usize) << 16)
298                | ((cs_msg1_header[1] as usize) << 8)
299                | (cs_msg1_header[2] as usize);
300            let mut cs_msg1 = vec![0u8; cs_msg1_len];
301            server.read_exact(&mut cs_msg1).await.unwrap();
302            assert_eq!(
303                cs_msg1.as_slice(),
304                empty_message(flags::ALWAYS_SET).as_ref()
305            );
306
307            server
308                .write_all(&data_frame(STREAM_CLIENT_SERVER, &empty))
309                .await
310                .unwrap();
311            server.flush().await.unwrap();
312
313            let mut sc_headers = [0u8; 9];
314            server.read_exact(&mut sc_headers).await.unwrap();
315            assert_eq!(sc_headers, headers_frame(STREAM_SERVER_CLIENT).as_slice());
316
317            let mut sc_msg2_header = [0u8; 9];
318            server.read_exact(&mut sc_msg2_header).await.unwrap();
319            let sc_msg2_len = ((sc_msg2_header[0] as usize) << 16)
320                | ((sc_msg2_header[1] as usize) << 8)
321                | (sc_msg2_header[2] as usize);
322            let mut sc_msg2 = vec![0u8; sc_msg2_len];
323            server.read_exact(&mut sc_msg2).await.unwrap();
324            assert_eq!(
325                decode_message_payload(&sc_msg2),
326                (flags::INIT_HANDSHAKE | flags::ALWAYS_SET, 0)
327            );
328
329            server
330                .write_all(&data_frame(STREAM_SERVER_CLIENT, &empty))
331                .await
332                .unwrap();
333            server.flush().await.unwrap();
334
335            let mut cs_msg3_header = [0u8; 9];
336            server.read_exact(&mut cs_msg3_header).await.unwrap();
337            let cs_msg3_len = ((cs_msg3_header[0] as usize) << 16)
338                | ((cs_msg3_header[1] as usize) << 8)
339                | (cs_msg3_header[2] as usize);
340            let mut cs_msg3 = vec![0u8; cs_msg3_len];
341            server.read_exact(&mut cs_msg3).await.unwrap();
342            assert_eq!(
343                decode_message_payload(&cs_msg3),
344                (flags::ALWAYS_SET | 0x200, 0)
345            );
346
347            server
348                .write_all(&data_frame(STREAM_CLIENT_SERVER, &empty))
349                .await
350                .unwrap();
351            server.flush().await.unwrap();
352
353            let mut request_header = [0u8; 9];
354            server.read_exact(&mut request_header).await.unwrap();
355            assert_eq!(request_header[3], FRAME_DATA);
356            let request_len = ((request_header[0] as usize) << 16)
357                | ((request_header[1] as usize) << 8)
358                | (request_header[2] as usize);
359            let mut request = vec![0u8; request_len];
360            server.read_exact(&mut request).await.unwrap();
361            assert_eq!(
362                decode_message_payload(&request),
363                (flags::ALWAYS_SET | flags::DATA | flags::WANTING_REPLY, 1)
364            );
365
366            server
367                .write_all(&data_frame(STREAM_SERVER_CLIENT, &reply))
368                .await
369                .unwrap();
370            server.flush().await.unwrap();
371        });
372
373        let mut client = timeout(Duration::from_secs(1), XpcClient::connect_stream(client))
374            .await
375            .expect("connect timed out")
376            .expect("connect should succeed");
377
378        let response = timeout(
379            Duration::from_secs(1),
380            client.call(XpcValue::Dictionary(IndexMap::new())),
381        )
382        .await
383        .expect("call timed out")
384        .expect("call should succeed");
385
386        assert_eq!(
387            response.flags,
388            flags::ALWAYS_SET | flags::REPLY | flags::DATA
389        );
390        assert_eq!(response.msg_id, 1);
391
392        server_task.await.unwrap();
393    }
394
395    #[tokio::test]
396    async fn call_recv_client_server_reads_reply_from_stream_1() {
397        let (client, mut server) = duplex(4096);
398
399        let empty = encode_message(&XpcMessage {
400            flags: flags::ALWAYS_SET,
401            msg_id: 0,
402            body: None,
403        })
404        .expect("message should encode");
405        let reply = encode_message(&XpcMessage {
406            flags: flags::ALWAYS_SET | flags::REPLY | flags::DATA,
407            msg_id: 1,
408            body: Some(XpcValue::Dictionary(IndexMap::from([(
409                "FileList".to_string(),
410                XpcValue::Array(vec![XpcValue::String("Documents".into())]),
411            )]))),
412        })
413        .expect("message should encode");
414
415        let server_task = tokio::spawn(async move {
416            let mut preface = [0u8; 24];
417            server.read_exact(&mut preface).await.unwrap();
418            assert_eq!(&preface, crate::xpc::h2_raw::H2_PREFACE);
419
420            let mut settings = [0u8; 21];
421            server.read_exact(&mut settings).await.unwrap();
422            assert_eq!(settings[3], FRAME_SETTINGS);
423
424            let mut window_update = [0u8; 13];
425            server.read_exact(&mut window_update).await.unwrap();
426            assert_eq!(window_update[3], 0x08);
427
428            server.write_all(&settings_frame()).await.unwrap();
429            server.flush().await.unwrap();
430
431            let mut ack = [0u8; 9];
432            server.read_exact(&mut ack).await.unwrap();
433            assert_eq!(ack, settings_ack_frame().as_slice());
434
435            let mut cs_headers = [0u8; 9];
436            server.read_exact(&mut cs_headers).await.unwrap();
437            assert_eq!(cs_headers, headers_frame(STREAM_CLIENT_SERVER).as_slice());
438
439            let mut cs_msg1_header = [0u8; 9];
440            server.read_exact(&mut cs_msg1_header).await.unwrap();
441            let cs_msg1_len = ((cs_msg1_header[0] as usize) << 16)
442                | ((cs_msg1_header[1] as usize) << 8)
443                | (cs_msg1_header[2] as usize);
444            let mut cs_msg1 = vec![0u8; cs_msg1_len];
445            server.read_exact(&mut cs_msg1).await.unwrap();
446            assert_eq!(
447                cs_msg1.as_slice(),
448                empty_message(flags::ALWAYS_SET).as_ref()
449            );
450
451            server
452                .write_all(&data_frame(STREAM_CLIENT_SERVER, &empty))
453                .await
454                .unwrap();
455            server.flush().await.unwrap();
456
457            let mut sc_headers = [0u8; 9];
458            server.read_exact(&mut sc_headers).await.unwrap();
459            assert_eq!(sc_headers, headers_frame(STREAM_SERVER_CLIENT).as_slice());
460
461            let mut sc_msg2_header = [0u8; 9];
462            server.read_exact(&mut sc_msg2_header).await.unwrap();
463            let sc_msg2_len = ((sc_msg2_header[0] as usize) << 16)
464                | ((sc_msg2_header[1] as usize) << 8)
465                | (sc_msg2_header[2] as usize);
466            let mut sc_msg2 = vec![0u8; sc_msg2_len];
467            server.read_exact(&mut sc_msg2).await.unwrap();
468            assert_eq!(
469                decode_message_payload(&sc_msg2),
470                (flags::INIT_HANDSHAKE | flags::ALWAYS_SET, 0)
471            );
472
473            server
474                .write_all(&data_frame(STREAM_SERVER_CLIENT, &empty))
475                .await
476                .unwrap();
477            server.flush().await.unwrap();
478
479            let mut cs_msg3_header = [0u8; 9];
480            server.read_exact(&mut cs_msg3_header).await.unwrap();
481            let cs_msg3_len = ((cs_msg3_header[0] as usize) << 16)
482                | ((cs_msg3_header[1] as usize) << 8)
483                | (cs_msg3_header[2] as usize);
484            let mut cs_msg3 = vec![0u8; cs_msg3_len];
485            server.read_exact(&mut cs_msg3).await.unwrap();
486            assert_eq!(
487                decode_message_payload(&cs_msg3),
488                (flags::ALWAYS_SET | 0x200, 0)
489            );
490
491            server
492                .write_all(&data_frame(STREAM_CLIENT_SERVER, &empty))
493                .await
494                .unwrap();
495            server.flush().await.unwrap();
496
497            let mut request_header = [0u8; 9];
498            server.read_exact(&mut request_header).await.unwrap();
499            assert_eq!(request_header[3], FRAME_DATA);
500            let request_len = ((request_header[0] as usize) << 16)
501                | ((request_header[1] as usize) << 8)
502                | (request_header[2] as usize);
503            let mut request = vec![0u8; request_len];
504            server.read_exact(&mut request).await.unwrap();
505            assert_eq!(
506                decode_message_payload(&request),
507                (flags::ALWAYS_SET | flags::DATA | flags::WANTING_REPLY, 1)
508            );
509
510            server
511                .write_all(&data_frame(STREAM_CLIENT_SERVER, &reply))
512                .await
513                .unwrap();
514            server.flush().await.unwrap();
515        });
516
517        let mut client = timeout(Duration::from_secs(1), XpcClient::connect_stream(client))
518            .await
519            .expect("connect timed out")
520            .expect("connect should succeed");
521
522        let response = timeout(
523            Duration::from_secs(1),
524            client.call_recv_client_server(XpcValue::Dictionary(IndexMap::new())),
525        )
526        .await
527        .expect("call timed out")
528        .expect("call should succeed");
529
530        assert_eq!(response.msg_id, 1);
531        let body = response.body.and_then(|value| match value {
532            XpcValue::Dictionary(dict) => Some(dict),
533            _ => None,
534        });
535        assert!(body.unwrap().contains_key("FileList"));
536
537        server_task.await.unwrap();
538    }
539
540    #[tokio::test]
541    async fn call_waits_for_matching_reply_id_and_buffers_early_replies() {
542        let (client, mut server) = duplex(4096);
543
544        let empty = encode_message(&XpcMessage {
545            flags: flags::ALWAYS_SET,
546            msg_id: 0,
547            body: None,
548        })
549        .expect("message should encode");
550        let early_reply = encode_message(&XpcMessage {
551            flags: flags::ALWAYS_SET | flags::REPLY | flags::DATA,
552            msg_id: 2,
553            body: Some(XpcValue::Dictionary(IndexMap::from([(
554                "kind".to_string(),
555                XpcValue::String("early".into()),
556            )]))),
557        })
558        .expect("message should encode");
559        let matching_reply = encode_message(&XpcMessage {
560            flags: flags::ALWAYS_SET | flags::REPLY | flags::DATA,
561            msg_id: 1,
562            body: Some(XpcValue::Dictionary(IndexMap::from([(
563                "kind".to_string(),
564                XpcValue::String("matching".into()),
565            )]))),
566        })
567        .expect("message should encode");
568
569        let server_task = tokio::spawn(async move {
570            let mut preface = [0u8; 24];
571            server.read_exact(&mut preface).await.unwrap();
572            assert_eq!(&preface, crate::xpc::h2_raw::H2_PREFACE);
573
574            let mut settings = [0u8; 21];
575            server.read_exact(&mut settings).await.unwrap();
576            assert_eq!(settings[3], FRAME_SETTINGS);
577
578            let mut window_update = [0u8; 13];
579            server.read_exact(&mut window_update).await.unwrap();
580            assert_eq!(window_update[3], 0x08);
581
582            server.write_all(&settings_frame()).await.unwrap();
583            server.flush().await.unwrap();
584
585            let mut ack = [0u8; 9];
586            server.read_exact(&mut ack).await.unwrap();
587            assert_eq!(ack, settings_ack_frame().as_slice());
588
589            let mut cs_headers = [0u8; 9];
590            server.read_exact(&mut cs_headers).await.unwrap();
591            assert_eq!(cs_headers, headers_frame(STREAM_CLIENT_SERVER).as_slice());
592
593            let mut cs_msg1_header = [0u8; 9];
594            server.read_exact(&mut cs_msg1_header).await.unwrap();
595            let cs_msg1_len = ((cs_msg1_header[0] as usize) << 16)
596                | ((cs_msg1_header[1] as usize) << 8)
597                | (cs_msg1_header[2] as usize);
598            let mut cs_msg1 = vec![0u8; cs_msg1_len];
599            server.read_exact(&mut cs_msg1).await.unwrap();
600            assert_eq!(
601                cs_msg1.as_slice(),
602                empty_message(flags::ALWAYS_SET).as_ref()
603            );
604
605            server
606                .write_all(&data_frame(STREAM_CLIENT_SERVER, &empty))
607                .await
608                .unwrap();
609            server.flush().await.unwrap();
610
611            let mut sc_headers = [0u8; 9];
612            server.read_exact(&mut sc_headers).await.unwrap();
613            assert_eq!(sc_headers, headers_frame(STREAM_SERVER_CLIENT).as_slice());
614
615            let mut sc_msg2_header = [0u8; 9];
616            server.read_exact(&mut sc_msg2_header).await.unwrap();
617            let sc_msg2_len = ((sc_msg2_header[0] as usize) << 16)
618                | ((sc_msg2_header[1] as usize) << 8)
619                | (sc_msg2_header[2] as usize);
620            let mut sc_msg2 = vec![0u8; sc_msg2_len];
621            server.read_exact(&mut sc_msg2).await.unwrap();
622            assert_eq!(
623                decode_message_payload(&sc_msg2),
624                (flags::INIT_HANDSHAKE | flags::ALWAYS_SET, 0)
625            );
626
627            server
628                .write_all(&data_frame(STREAM_SERVER_CLIENT, &empty))
629                .await
630                .unwrap();
631            server.flush().await.unwrap();
632
633            let mut cs_msg3_header = [0u8; 9];
634            server.read_exact(&mut cs_msg3_header).await.unwrap();
635            let cs_msg3_len = ((cs_msg3_header[0] as usize) << 16)
636                | ((cs_msg3_header[1] as usize) << 8)
637                | (cs_msg3_header[2] as usize);
638            let mut cs_msg3 = vec![0u8; cs_msg3_len];
639            server.read_exact(&mut cs_msg3).await.unwrap();
640            assert_eq!(
641                decode_message_payload(&cs_msg3),
642                (flags::ALWAYS_SET | 0x200, 0)
643            );
644
645            server
646                .write_all(&data_frame(STREAM_CLIENT_SERVER, &empty))
647                .await
648                .unwrap();
649            server.flush().await.unwrap();
650
651            let mut request_header = [0u8; 9];
652            server.read_exact(&mut request_header).await.unwrap();
653            assert_eq!(request_header[3], FRAME_DATA);
654            let request_len = ((request_header[0] as usize) << 16)
655                | ((request_header[1] as usize) << 8)
656                | (request_header[2] as usize);
657            let mut request = vec![0u8; request_len];
658            server.read_exact(&mut request).await.unwrap();
659            assert_eq!(
660                decode_message_payload(&request),
661                (flags::ALWAYS_SET | flags::DATA | flags::WANTING_REPLY, 1)
662            );
663
664            server
665                .write_all(&data_frame(STREAM_SERVER_CLIENT, &early_reply))
666                .await
667                .unwrap();
668            server
669                .write_all(&data_frame(STREAM_SERVER_CLIENT, &matching_reply))
670                .await
671                .unwrap();
672            server.flush().await.unwrap();
673        });
674
675        let mut client = timeout(Duration::from_secs(1), XpcClient::connect_stream(client))
676            .await
677            .expect("connect timed out")
678            .expect("connect should succeed");
679
680        let response = timeout(
681            Duration::from_secs(1),
682            client.call(XpcValue::Dictionary(IndexMap::new())),
683        )
684        .await
685        .expect("call timed out")
686        .expect("call should succeed");
687
688        assert_eq!(response.msg_id, 1);
689        let response_kind = response
690            .body
691            .as_ref()
692            .and_then(XpcValue::as_dict)
693            .and_then(|dict| dict.get("kind"))
694            .and_then(XpcValue::as_str);
695        assert_eq!(response_kind, Some("matching"));
696
697        let buffered = client.recv().await.expect("early reply should be buffered");
698        assert_eq!(buffered.msg_id, 2);
699        let buffered_kind = buffered
700            .body
701            .as_ref()
702            .and_then(XpcValue::as_dict)
703            .and_then(|dict| dict.get("kind"))
704            .and_then(XpcValue::as_str);
705        assert_eq!(buffered_kind, Some("early"));
706
707        server_task.await.unwrap();
708    }
709
710    fn decode_message_payload(bytes: &[u8]) -> (u32, u64) {
711        let msg = crate::xpc::message::decode_message(Bytes::copy_from_slice(bytes)).unwrap();
712        (msg.flags, msg.msg_id)
713    }
714}