1use std::net::{Ipv6Addr, SocketAddr};
4
5use tokio::io::{AsyncRead, AsyncWrite};
6use tokio::net::TcpStream;
7
8use crate::xpc::h2_raw::H2Framer;
9use crate::xpc::message::{flags, XpcMessage, XpcValue};
10use crate::xpc::rsd::{initialize_xpc_connection_on_framer, XpcConnection};
11use crate::xpc::XpcError;
12
13trait XpcStream: AsyncRead + AsyncWrite + Unpin + Send {}
14impl<T> XpcStream for T where T: AsyncRead + AsyncWrite + Unpin + Send {}
15
16type DynStream = Box<dyn XpcStream>;
17
18pub struct XpcClient {
20 inner: XpcConnection<DynStream>,
21}
22
23impl XpcClient {
24 pub async fn connect(addr: Ipv6Addr, port: u16) -> Result<Self, XpcError> {
26 let sock_addr = SocketAddr::new(addr.into(), port);
27 let stream = TcpStream::connect(sock_addr).await?;
28 Self::connect_stream(stream).await
29 }
30
31 pub async fn connect_stream<S>(stream: S) -> Result<Self, XpcError>
33 where
34 S: AsyncRead + AsyncWrite + Unpin + Send + 'static,
35 {
36 let stream: DynStream = Box::new(stream);
37 let mut framer = H2Framer::connect(stream)
38 .await
39 .map_err(|e| XpcError::Tls(format!("H2: {e}")))?;
40 initialize_xpc_connection_on_framer(&mut framer).await?;
41 Ok(Self {
42 inner: XpcConnection::new(framer),
43 })
44 }
45
46 pub async fn call(&mut self, body: XpcValue) -> Result<XpcMessage, XpcError> {
48 let msg_id = self
49 .inner
50 .send_with_flags(body, flags::WANTING_REPLY)
51 .await?;
52 self.inner
53 .recv_reply_on_stream(crate::xpc::h2_raw::STREAM_SERVER_CLIENT, msg_id)
54 .await
55 }
56
57 pub async fn call_recv_client_server(
59 &mut self,
60 body: XpcValue,
61 ) -> Result<XpcMessage, XpcError> {
62 let msg_id = self
63 .inner
64 .send_with_flags(body, flags::WANTING_REPLY)
65 .await?;
66 self.inner
67 .recv_reply_on_stream(crate::xpc::h2_raw::STREAM_CLIENT_SERVER, msg_id)
68 .await
69 }
70
71 pub async fn send(&mut self, body: XpcValue) -> Result<(), XpcError> {
73 self.inner.send(body).await
74 }
75
76 pub async fn recv(&mut self) -> Result<XpcMessage, XpcError> {
78 self.inner.recv().await
79 }
80
81 pub async fn recv_client_server(&mut self) -> Result<XpcMessage, XpcError> {
83 self.inner.recv_client_server().await
84 }
85}
86
87#[cfg(test)]
88mod tests {
89 use bytes::Bytes;
90 use indexmap::IndexMap;
91 use tokio::io::{duplex, AsyncReadExt, AsyncWriteExt};
92 use tokio::time::{timeout, Duration};
93
94 use super::*;
95 use crate::xpc::message::{encode_message, flags, XpcMessage, XpcValue};
96
97 const FRAME_DATA: u8 = 0x00;
98 const FRAME_HEADERS: u8 = 0x01;
99 const FRAME_SETTINGS: u8 = 0x04;
100 const FLAG_END_HEADERS: u8 = 0x04;
101 const FLAG_SETTINGS_ACK: u8 = 0x01;
102 const STREAM_INIT: u32 = 0;
103 const STREAM_CLIENT_SERVER: u32 = 1;
104 const STREAM_SERVER_CLIENT: u32 = 3;
105
106 fn build_frame(frame_type: u8, flags: u8, stream_id: u32, payload: &[u8]) -> Vec<u8> {
107 let len = payload.len();
108 let mut out = Vec::with_capacity(9 + len);
109 out.push(((len >> 16) & 0xFF) as u8);
110 out.push(((len >> 8) & 0xFF) as u8);
111 out.push((len & 0xFF) as u8);
112 out.push(frame_type);
113 out.push(flags);
114 out.extend_from_slice(&(stream_id & 0x7FFF_FFFF).to_be_bytes());
115 out.extend_from_slice(payload);
116 out
117 }
118
119 fn settings_frame() -> Vec<u8> {
120 build_frame(FRAME_SETTINGS, 0, STREAM_INIT, &[])
121 }
122
123 fn settings_ack_frame() -> Vec<u8> {
124 build_frame(FRAME_SETTINGS, FLAG_SETTINGS_ACK, STREAM_INIT, &[])
125 }
126
127 fn headers_frame(stream_id: u32) -> Vec<u8> {
128 build_frame(FRAME_HEADERS, FLAG_END_HEADERS, stream_id, &[])
129 }
130
131 fn data_frame(stream_id: u32, payload: &[u8]) -> Vec<u8> {
132 build_frame(FRAME_DATA, 0, stream_id, payload)
133 }
134
135 fn empty_message(flags: u32) -> Bytes {
136 encode_message(&XpcMessage {
137 flags,
138 msg_id: 0,
139 body: Some(XpcValue::Dictionary(IndexMap::new()))
140 .filter(|_| flags == flags::ALWAYS_SET),
141 })
142 .expect("message should encode")
143 }
144
145 #[tokio::test]
146 async fn connect_stream_bootstraps_remote_xpc_before_returning() {
147 let (client, mut server) = duplex(4096);
148
149 let msg1 = empty_message(flags::ALWAYS_SET);
150 let msg2 = encode_message(&XpcMessage {
151 flags: flags::ALWAYS_SET,
152 msg_id: 0,
153 body: None,
154 })
155 .expect("message should encode");
156 let msg3 = encode_message(&XpcMessage {
157 flags: flags::ALWAYS_SET,
158 msg_id: 0,
159 body: None,
160 })
161 .expect("message should encode");
162
163 let server_task = tokio::spawn(async move {
164 let mut preface = [0u8; 24];
165 server.read_exact(&mut preface).await.unwrap();
166 assert_eq!(&preface, crate::xpc::h2_raw::H2_PREFACE);
167
168 let mut settings = [0u8; 21];
169 server.read_exact(&mut settings).await.unwrap();
170 assert_eq!(settings[3], FRAME_SETTINGS);
171
172 let mut window_update = [0u8; 13];
173 server.read_exact(&mut window_update).await.unwrap();
174 assert_eq!(window_update[3], 0x08);
175
176 server.write_all(&settings_frame()).await.unwrap();
177 server.flush().await.unwrap();
178
179 let mut ack = [0u8; 9];
180 server.read_exact(&mut ack).await.unwrap();
181 assert_eq!(ack, settings_ack_frame().as_slice());
182
183 let mut cs_headers = [0u8; 9];
184 server.read_exact(&mut cs_headers).await.unwrap();
185 assert_eq!(cs_headers, headers_frame(STREAM_CLIENT_SERVER).as_slice());
186
187 let mut cs_msg1_header = [0u8; 9];
188 server.read_exact(&mut cs_msg1_header).await.unwrap();
189 assert_eq!(cs_msg1_header[3], FRAME_DATA);
190 let cs_msg1_len = ((cs_msg1_header[0] as usize) << 16)
191 | ((cs_msg1_header[1] as usize) << 8)
192 | (cs_msg1_header[2] as usize);
193 let mut cs_msg1 = vec![0u8; cs_msg1_len];
194 server.read_exact(&mut cs_msg1).await.unwrap();
195 assert_eq!(cs_msg1, msg1);
196
197 server
198 .write_all(&data_frame(STREAM_CLIENT_SERVER, &msg2))
199 .await
200 .unwrap();
201 server.flush().await.unwrap();
202
203 let mut sc_headers = [0u8; 9];
204 server.read_exact(&mut sc_headers).await.unwrap();
205 assert_eq!(sc_headers, headers_frame(STREAM_SERVER_CLIENT).as_slice());
206
207 let mut sc_msg2_header = [0u8; 9];
208 server.read_exact(&mut sc_msg2_header).await.unwrap();
209 assert_eq!(sc_msg2_header[3], FRAME_DATA);
210 let sc_msg2_len = ((sc_msg2_header[0] as usize) << 16)
211 | ((sc_msg2_header[1] as usize) << 8)
212 | (sc_msg2_header[2] as usize);
213 let mut sc_msg2 = vec![0u8; sc_msg2_len];
214 server.read_exact(&mut sc_msg2).await.unwrap();
215 assert_eq!(
216 decode_message_payload(&sc_msg2),
217 (flags::INIT_HANDSHAKE | flags::ALWAYS_SET, 0)
218 );
219
220 server
221 .write_all(&data_frame(STREAM_SERVER_CLIENT, &msg2))
222 .await
223 .unwrap();
224 server.flush().await.unwrap();
225
226 let mut cs_msg3_header = [0u8; 9];
227 server.read_exact(&mut cs_msg3_header).await.unwrap();
228 assert_eq!(cs_msg3_header[3], FRAME_DATA);
229 let cs_msg3_len = ((cs_msg3_header[0] as usize) << 16)
230 | ((cs_msg3_header[1] as usize) << 8)
231 | (cs_msg3_header[2] as usize);
232 let mut cs_msg3 = vec![0u8; cs_msg3_len];
233 server.read_exact(&mut cs_msg3).await.unwrap();
234 assert_eq!(
235 decode_message_payload(&cs_msg3),
236 (flags::ALWAYS_SET | 0x200, 0)
237 );
238
239 server
240 .write_all(&data_frame(STREAM_CLIENT_SERVER, &msg3))
241 .await
242 .unwrap();
243 server.flush().await.unwrap();
244 });
245
246 timeout(Duration::from_secs(1), XpcClient::connect_stream(client))
247 .await
248 .expect("connect timed out")
249 .expect("connect should succeed");
250
251 server_task.await.unwrap();
252 }
253
254 #[tokio::test]
255 async fn call_sets_wanting_reply_on_outgoing_request() {
256 let (client, mut server) = duplex(4096);
257
258 let empty = encode_message(&XpcMessage {
259 flags: flags::ALWAYS_SET,
260 msg_id: 0,
261 body: None,
262 })
263 .expect("message should encode");
264 let reply = encode_message(&XpcMessage {
265 flags: flags::ALWAYS_SET | flags::REPLY | flags::DATA,
266 msg_id: 1,
267 body: Some(XpcValue::Dictionary(IndexMap::new())),
268 })
269 .expect("message should encode");
270
271 let server_task = tokio::spawn(async move {
272 let mut preface = [0u8; 24];
273 server.read_exact(&mut preface).await.unwrap();
274 assert_eq!(&preface, crate::xpc::h2_raw::H2_PREFACE);
275
276 let mut settings = [0u8; 21];
277 server.read_exact(&mut settings).await.unwrap();
278 assert_eq!(settings[3], FRAME_SETTINGS);
279
280 let mut window_update = [0u8; 13];
281 server.read_exact(&mut window_update).await.unwrap();
282 assert_eq!(window_update[3], 0x08);
283
284 server.write_all(&settings_frame()).await.unwrap();
285 server.flush().await.unwrap();
286
287 let mut ack = [0u8; 9];
288 server.read_exact(&mut ack).await.unwrap();
289 assert_eq!(ack, settings_ack_frame().as_slice());
290
291 let mut cs_headers = [0u8; 9];
292 server.read_exact(&mut cs_headers).await.unwrap();
293 assert_eq!(cs_headers, headers_frame(STREAM_CLIENT_SERVER).as_slice());
294
295 let mut cs_msg1_header = [0u8; 9];
296 server.read_exact(&mut cs_msg1_header).await.unwrap();
297 let cs_msg1_len = ((cs_msg1_header[0] as usize) << 16)
298 | ((cs_msg1_header[1] as usize) << 8)
299 | (cs_msg1_header[2] as usize);
300 let mut cs_msg1 = vec![0u8; cs_msg1_len];
301 server.read_exact(&mut cs_msg1).await.unwrap();
302 assert_eq!(
303 cs_msg1.as_slice(),
304 empty_message(flags::ALWAYS_SET).as_ref()
305 );
306
307 server
308 .write_all(&data_frame(STREAM_CLIENT_SERVER, &empty))
309 .await
310 .unwrap();
311 server.flush().await.unwrap();
312
313 let mut sc_headers = [0u8; 9];
314 server.read_exact(&mut sc_headers).await.unwrap();
315 assert_eq!(sc_headers, headers_frame(STREAM_SERVER_CLIENT).as_slice());
316
317 let mut sc_msg2_header = [0u8; 9];
318 server.read_exact(&mut sc_msg2_header).await.unwrap();
319 let sc_msg2_len = ((sc_msg2_header[0] as usize) << 16)
320 | ((sc_msg2_header[1] as usize) << 8)
321 | (sc_msg2_header[2] as usize);
322 let mut sc_msg2 = vec![0u8; sc_msg2_len];
323 server.read_exact(&mut sc_msg2).await.unwrap();
324 assert_eq!(
325 decode_message_payload(&sc_msg2),
326 (flags::INIT_HANDSHAKE | flags::ALWAYS_SET, 0)
327 );
328
329 server
330 .write_all(&data_frame(STREAM_SERVER_CLIENT, &empty))
331 .await
332 .unwrap();
333 server.flush().await.unwrap();
334
335 let mut cs_msg3_header = [0u8; 9];
336 server.read_exact(&mut cs_msg3_header).await.unwrap();
337 let cs_msg3_len = ((cs_msg3_header[0] as usize) << 16)
338 | ((cs_msg3_header[1] as usize) << 8)
339 | (cs_msg3_header[2] as usize);
340 let mut cs_msg3 = vec![0u8; cs_msg3_len];
341 server.read_exact(&mut cs_msg3).await.unwrap();
342 assert_eq!(
343 decode_message_payload(&cs_msg3),
344 (flags::ALWAYS_SET | 0x200, 0)
345 );
346
347 server
348 .write_all(&data_frame(STREAM_CLIENT_SERVER, &empty))
349 .await
350 .unwrap();
351 server.flush().await.unwrap();
352
353 let mut request_header = [0u8; 9];
354 server.read_exact(&mut request_header).await.unwrap();
355 assert_eq!(request_header[3], FRAME_DATA);
356 let request_len = ((request_header[0] as usize) << 16)
357 | ((request_header[1] as usize) << 8)
358 | (request_header[2] as usize);
359 let mut request = vec![0u8; request_len];
360 server.read_exact(&mut request).await.unwrap();
361 assert_eq!(
362 decode_message_payload(&request),
363 (flags::ALWAYS_SET | flags::DATA | flags::WANTING_REPLY, 1)
364 );
365
366 server
367 .write_all(&data_frame(STREAM_SERVER_CLIENT, &reply))
368 .await
369 .unwrap();
370 server.flush().await.unwrap();
371 });
372
373 let mut client = timeout(Duration::from_secs(1), XpcClient::connect_stream(client))
374 .await
375 .expect("connect timed out")
376 .expect("connect should succeed");
377
378 let response = timeout(
379 Duration::from_secs(1),
380 client.call(XpcValue::Dictionary(IndexMap::new())),
381 )
382 .await
383 .expect("call timed out")
384 .expect("call should succeed");
385
386 assert_eq!(
387 response.flags,
388 flags::ALWAYS_SET | flags::REPLY | flags::DATA
389 );
390 assert_eq!(response.msg_id, 1);
391
392 server_task.await.unwrap();
393 }
394
395 #[tokio::test]
396 async fn call_recv_client_server_reads_reply_from_stream_1() {
397 let (client, mut server) = duplex(4096);
398
399 let empty = encode_message(&XpcMessage {
400 flags: flags::ALWAYS_SET,
401 msg_id: 0,
402 body: None,
403 })
404 .expect("message should encode");
405 let reply = encode_message(&XpcMessage {
406 flags: flags::ALWAYS_SET | flags::REPLY | flags::DATA,
407 msg_id: 1,
408 body: Some(XpcValue::Dictionary(IndexMap::from([(
409 "FileList".to_string(),
410 XpcValue::Array(vec![XpcValue::String("Documents".into())]),
411 )]))),
412 })
413 .expect("message should encode");
414
415 let server_task = tokio::spawn(async move {
416 let mut preface = [0u8; 24];
417 server.read_exact(&mut preface).await.unwrap();
418 assert_eq!(&preface, crate::xpc::h2_raw::H2_PREFACE);
419
420 let mut settings = [0u8; 21];
421 server.read_exact(&mut settings).await.unwrap();
422 assert_eq!(settings[3], FRAME_SETTINGS);
423
424 let mut window_update = [0u8; 13];
425 server.read_exact(&mut window_update).await.unwrap();
426 assert_eq!(window_update[3], 0x08);
427
428 server.write_all(&settings_frame()).await.unwrap();
429 server.flush().await.unwrap();
430
431 let mut ack = [0u8; 9];
432 server.read_exact(&mut ack).await.unwrap();
433 assert_eq!(ack, settings_ack_frame().as_slice());
434
435 let mut cs_headers = [0u8; 9];
436 server.read_exact(&mut cs_headers).await.unwrap();
437 assert_eq!(cs_headers, headers_frame(STREAM_CLIENT_SERVER).as_slice());
438
439 let mut cs_msg1_header = [0u8; 9];
440 server.read_exact(&mut cs_msg1_header).await.unwrap();
441 let cs_msg1_len = ((cs_msg1_header[0] as usize) << 16)
442 | ((cs_msg1_header[1] as usize) << 8)
443 | (cs_msg1_header[2] as usize);
444 let mut cs_msg1 = vec![0u8; cs_msg1_len];
445 server.read_exact(&mut cs_msg1).await.unwrap();
446 assert_eq!(
447 cs_msg1.as_slice(),
448 empty_message(flags::ALWAYS_SET).as_ref()
449 );
450
451 server
452 .write_all(&data_frame(STREAM_CLIENT_SERVER, &empty))
453 .await
454 .unwrap();
455 server.flush().await.unwrap();
456
457 let mut sc_headers = [0u8; 9];
458 server.read_exact(&mut sc_headers).await.unwrap();
459 assert_eq!(sc_headers, headers_frame(STREAM_SERVER_CLIENT).as_slice());
460
461 let mut sc_msg2_header = [0u8; 9];
462 server.read_exact(&mut sc_msg2_header).await.unwrap();
463 let sc_msg2_len = ((sc_msg2_header[0] as usize) << 16)
464 | ((sc_msg2_header[1] as usize) << 8)
465 | (sc_msg2_header[2] as usize);
466 let mut sc_msg2 = vec![0u8; sc_msg2_len];
467 server.read_exact(&mut sc_msg2).await.unwrap();
468 assert_eq!(
469 decode_message_payload(&sc_msg2),
470 (flags::INIT_HANDSHAKE | flags::ALWAYS_SET, 0)
471 );
472
473 server
474 .write_all(&data_frame(STREAM_SERVER_CLIENT, &empty))
475 .await
476 .unwrap();
477 server.flush().await.unwrap();
478
479 let mut cs_msg3_header = [0u8; 9];
480 server.read_exact(&mut cs_msg3_header).await.unwrap();
481 let cs_msg3_len = ((cs_msg3_header[0] as usize) << 16)
482 | ((cs_msg3_header[1] as usize) << 8)
483 | (cs_msg3_header[2] as usize);
484 let mut cs_msg3 = vec![0u8; cs_msg3_len];
485 server.read_exact(&mut cs_msg3).await.unwrap();
486 assert_eq!(
487 decode_message_payload(&cs_msg3),
488 (flags::ALWAYS_SET | 0x200, 0)
489 );
490
491 server
492 .write_all(&data_frame(STREAM_CLIENT_SERVER, &empty))
493 .await
494 .unwrap();
495 server.flush().await.unwrap();
496
497 let mut request_header = [0u8; 9];
498 server.read_exact(&mut request_header).await.unwrap();
499 assert_eq!(request_header[3], FRAME_DATA);
500 let request_len = ((request_header[0] as usize) << 16)
501 | ((request_header[1] as usize) << 8)
502 | (request_header[2] as usize);
503 let mut request = vec![0u8; request_len];
504 server.read_exact(&mut request).await.unwrap();
505 assert_eq!(
506 decode_message_payload(&request),
507 (flags::ALWAYS_SET | flags::DATA | flags::WANTING_REPLY, 1)
508 );
509
510 server
511 .write_all(&data_frame(STREAM_CLIENT_SERVER, &reply))
512 .await
513 .unwrap();
514 server.flush().await.unwrap();
515 });
516
517 let mut client = timeout(Duration::from_secs(1), XpcClient::connect_stream(client))
518 .await
519 .expect("connect timed out")
520 .expect("connect should succeed");
521
522 let response = timeout(
523 Duration::from_secs(1),
524 client.call_recv_client_server(XpcValue::Dictionary(IndexMap::new())),
525 )
526 .await
527 .expect("call timed out")
528 .expect("call should succeed");
529
530 assert_eq!(response.msg_id, 1);
531 let body = response.body.and_then(|value| match value {
532 XpcValue::Dictionary(dict) => Some(dict),
533 _ => None,
534 });
535 assert!(body.unwrap().contains_key("FileList"));
536
537 server_task.await.unwrap();
538 }
539
540 #[tokio::test]
541 async fn call_waits_for_matching_reply_id_and_buffers_early_replies() {
542 let (client, mut server) = duplex(4096);
543
544 let empty = encode_message(&XpcMessage {
545 flags: flags::ALWAYS_SET,
546 msg_id: 0,
547 body: None,
548 })
549 .expect("message should encode");
550 let early_reply = encode_message(&XpcMessage {
551 flags: flags::ALWAYS_SET | flags::REPLY | flags::DATA,
552 msg_id: 2,
553 body: Some(XpcValue::Dictionary(IndexMap::from([(
554 "kind".to_string(),
555 XpcValue::String("early".into()),
556 )]))),
557 })
558 .expect("message should encode");
559 let matching_reply = encode_message(&XpcMessage {
560 flags: flags::ALWAYS_SET | flags::REPLY | flags::DATA,
561 msg_id: 1,
562 body: Some(XpcValue::Dictionary(IndexMap::from([(
563 "kind".to_string(),
564 XpcValue::String("matching".into()),
565 )]))),
566 })
567 .expect("message should encode");
568
569 let server_task = tokio::spawn(async move {
570 let mut preface = [0u8; 24];
571 server.read_exact(&mut preface).await.unwrap();
572 assert_eq!(&preface, crate::xpc::h2_raw::H2_PREFACE);
573
574 let mut settings = [0u8; 21];
575 server.read_exact(&mut settings).await.unwrap();
576 assert_eq!(settings[3], FRAME_SETTINGS);
577
578 let mut window_update = [0u8; 13];
579 server.read_exact(&mut window_update).await.unwrap();
580 assert_eq!(window_update[3], 0x08);
581
582 server.write_all(&settings_frame()).await.unwrap();
583 server.flush().await.unwrap();
584
585 let mut ack = [0u8; 9];
586 server.read_exact(&mut ack).await.unwrap();
587 assert_eq!(ack, settings_ack_frame().as_slice());
588
589 let mut cs_headers = [0u8; 9];
590 server.read_exact(&mut cs_headers).await.unwrap();
591 assert_eq!(cs_headers, headers_frame(STREAM_CLIENT_SERVER).as_slice());
592
593 let mut cs_msg1_header = [0u8; 9];
594 server.read_exact(&mut cs_msg1_header).await.unwrap();
595 let cs_msg1_len = ((cs_msg1_header[0] as usize) << 16)
596 | ((cs_msg1_header[1] as usize) << 8)
597 | (cs_msg1_header[2] as usize);
598 let mut cs_msg1 = vec![0u8; cs_msg1_len];
599 server.read_exact(&mut cs_msg1).await.unwrap();
600 assert_eq!(
601 cs_msg1.as_slice(),
602 empty_message(flags::ALWAYS_SET).as_ref()
603 );
604
605 server
606 .write_all(&data_frame(STREAM_CLIENT_SERVER, &empty))
607 .await
608 .unwrap();
609 server.flush().await.unwrap();
610
611 let mut sc_headers = [0u8; 9];
612 server.read_exact(&mut sc_headers).await.unwrap();
613 assert_eq!(sc_headers, headers_frame(STREAM_SERVER_CLIENT).as_slice());
614
615 let mut sc_msg2_header = [0u8; 9];
616 server.read_exact(&mut sc_msg2_header).await.unwrap();
617 let sc_msg2_len = ((sc_msg2_header[0] as usize) << 16)
618 | ((sc_msg2_header[1] as usize) << 8)
619 | (sc_msg2_header[2] as usize);
620 let mut sc_msg2 = vec![0u8; sc_msg2_len];
621 server.read_exact(&mut sc_msg2).await.unwrap();
622 assert_eq!(
623 decode_message_payload(&sc_msg2),
624 (flags::INIT_HANDSHAKE | flags::ALWAYS_SET, 0)
625 );
626
627 server
628 .write_all(&data_frame(STREAM_SERVER_CLIENT, &empty))
629 .await
630 .unwrap();
631 server.flush().await.unwrap();
632
633 let mut cs_msg3_header = [0u8; 9];
634 server.read_exact(&mut cs_msg3_header).await.unwrap();
635 let cs_msg3_len = ((cs_msg3_header[0] as usize) << 16)
636 | ((cs_msg3_header[1] as usize) << 8)
637 | (cs_msg3_header[2] as usize);
638 let mut cs_msg3 = vec![0u8; cs_msg3_len];
639 server.read_exact(&mut cs_msg3).await.unwrap();
640 assert_eq!(
641 decode_message_payload(&cs_msg3),
642 (flags::ALWAYS_SET | 0x200, 0)
643 );
644
645 server
646 .write_all(&data_frame(STREAM_CLIENT_SERVER, &empty))
647 .await
648 .unwrap();
649 server.flush().await.unwrap();
650
651 let mut request_header = [0u8; 9];
652 server.read_exact(&mut request_header).await.unwrap();
653 assert_eq!(request_header[3], FRAME_DATA);
654 let request_len = ((request_header[0] as usize) << 16)
655 | ((request_header[1] as usize) << 8)
656 | (request_header[2] as usize);
657 let mut request = vec![0u8; request_len];
658 server.read_exact(&mut request).await.unwrap();
659 assert_eq!(
660 decode_message_payload(&request),
661 (flags::ALWAYS_SET | flags::DATA | flags::WANTING_REPLY, 1)
662 );
663
664 server
665 .write_all(&data_frame(STREAM_SERVER_CLIENT, &early_reply))
666 .await
667 .unwrap();
668 server
669 .write_all(&data_frame(STREAM_SERVER_CLIENT, &matching_reply))
670 .await
671 .unwrap();
672 server.flush().await.unwrap();
673 });
674
675 let mut client = timeout(Duration::from_secs(1), XpcClient::connect_stream(client))
676 .await
677 .expect("connect timed out")
678 .expect("connect should succeed");
679
680 let response = timeout(
681 Duration::from_secs(1),
682 client.call(XpcValue::Dictionary(IndexMap::new())),
683 )
684 .await
685 .expect("call timed out")
686 .expect("call should succeed");
687
688 assert_eq!(response.msg_id, 1);
689 let response_kind = response
690 .body
691 .as_ref()
692 .and_then(XpcValue::as_dict)
693 .and_then(|dict| dict.get("kind"))
694 .and_then(XpcValue::as_str);
695 assert_eq!(response_kind, Some("matching"));
696
697 let buffered = client.recv().await.expect("early reply should be buffered");
698 assert_eq!(buffered.msg_id, 2);
699 let buffered_kind = buffered
700 .body
701 .as_ref()
702 .and_then(XpcValue::as_dict)
703 .and_then(|dict| dict.get("kind"))
704 .and_then(XpcValue::as_str);
705 assert_eq!(buffered_kind, Some("early"));
706
707 server_task.await.unwrap();
708 }
709
710 fn decode_message_payload(bytes: &[u8]) -> (u32, u64) {
711 let msg = crate::xpc::message::decode_message(Bytes::copy_from_slice(bytes)).unwrap();
712 (msg.flags, msg.msg_id)
713 }
714}