1use std::collections::HashMap;
15use std::net::{Ipv6Addr, SocketAddr};
16
17use bytes::Bytes;
18use tokio::net::TcpStream;
19
20use crate::xpc::h2_raw::H2Framer;
21use crate::xpc::message::{decode_message, flags, XpcMessage, XpcValue};
22use crate::xpc::XpcError;
23
24pub const RSD_PORT: u16 = 58783;
25
26#[derive(Debug, Clone)]
28pub struct ServiceDescriptor {
29 pub port: u16,
30}
31
32#[derive(Debug, Clone)]
34pub struct RsdHandshake {
35 pub udid: String,
36 pub services: HashMap<String, ServiceDescriptor>,
37}
38
39impl RsdHandshake {
40 pub fn get_port(&self, service: &str) -> Option<u16> {
42 if let Some(s) = self.services.get(service) {
43 return Some(s.port);
44 }
45 let shim = format!("{service}.shim.remote");
46 self.services.get(&shim).map(|s| s.port)
47 }
48}
49
50pub async fn handshake(addr: Ipv6Addr, port: u16) -> Result<RsdHandshake, XpcError> {
54 let sock_addr = SocketAddr::new(addr.into(), port);
55 let stream = TcpStream::connect(sock_addr).await?;
56 let mut framer = H2Framer::connect(stream)
57 .await
58 .map_err(|e| XpcError::Tls(format!("H2: {e}")))?;
59
60 read_rsd_handshake(&mut framer).await
61}
62
63pub async fn handshake_on_framer<S>(framer: &mut H2Framer<S>) -> Result<RsdHandshake, XpcError>
66where
67 S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Unpin,
68{
69 read_rsd_handshake(framer).await
70}
71
72pub async fn initialize_xpc_connection_on_framer<S>(
77 framer: &mut H2Framer<S>,
78) -> Result<(), XpcError>
79where
80 S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Unpin,
81{
82 use crate::xpc::message::{encode_message, flags, XpcMessage, XpcValue};
83
84 let msg1 = encode_message(&XpcMessage {
85 flags: flags::ALWAYS_SET,
86 msg_id: 0,
87 body: Some(XpcValue::Dictionary(indexmap::IndexMap::new())),
88 })?;
89 framer
90 .write_client_server(&msg1)
91 .await
92 .map_err(|e| XpcError::Tls(format!("xpc init write 1: {e}")))?;
93 discard_xpc_on_client_server(framer).await?;
94
95 let msg2 = encode_message(&XpcMessage {
96 flags: flags::INIT_HANDSHAKE | flags::ALWAYS_SET,
97 msg_id: 0,
98 body: None,
99 })?;
100 framer
101 .write_server_client(&msg2)
102 .await
103 .map_err(|e| XpcError::Tls(format!("xpc init write 2: {e}")))?;
104 discard_xpc_on_server_client(framer).await?;
105
106 let msg3 = encode_message(&XpcMessage {
107 flags: flags::ALWAYS_SET | 0x200,
108 msg_id: 0,
109 body: None,
110 })?;
111 framer
112 .write_client_server(&msg3)
113 .await
114 .map_err(|e| XpcError::Tls(format!("xpc init write 3: {e}")))?;
115 discard_xpc_on_client_server(framer).await?;
116
117 Ok(())
118}
119
120pub async fn queue_rsd_handshake_bootstrap_on_framer<S>(
123 framer: &mut H2Framer<S>,
124) -> Result<(), XpcError>
125where
126 S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Unpin,
127{
128 use crate::xpc::message::{encode_message, flags, XpcMessage, XpcValue};
129
130 let msg1 = encode_message(&XpcMessage {
131 flags: flags::ALWAYS_SET,
132 msg_id: 0,
133 body: Some(XpcValue::Dictionary(indexmap::IndexMap::new())),
134 })?;
135 framer
136 .write_client_server(&msg1)
137 .await
138 .map_err(|e| XpcError::Tls(format!("rsd bootstrap write 1: {e}")))?;
139
140 let msg2 = encode_message(&XpcMessage {
141 flags: flags::ALWAYS_SET | 0x200,
142 msg_id: 0,
143 body: None,
144 })?;
145 framer
146 .write_client_server(&msg2)
147 .await
148 .map_err(|e| XpcError::Tls(format!("rsd bootstrap write 2: {e}")))?;
149
150 let msg3 = encode_message(&XpcMessage {
151 flags: flags::INIT_HANDSHAKE | flags::ALWAYS_SET,
152 msg_id: 0,
153 body: None,
154 })?;
155 framer
156 .write_server_client(&msg3)
157 .await
158 .map_err(|e| XpcError::Tls(format!("rsd bootstrap write 3: {e}")))?;
159
160 Ok(())
161}
162
163async fn read_rsd_handshake<S>(framer: &mut H2Framer<S>) -> Result<RsdHandshake, XpcError>
169where
170 S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Unpin,
171{
172 let mut last_err = None;
173 for _ in 0..6 {
174 let msg = read_xpc_on_client_server(framer).await?;
175 match parse_handshake_message(msg) {
176 Ok(handshake) => return Ok(handshake),
177 Err(err) => {
178 tracing::debug!("RSD: skipping non-handshake stream-1 message: {err}");
179 last_err = Some(err);
180 }
181 }
182 }
183 Err(last_err.unwrap_or_else(|| XpcError::Tls("RSD: no handshake message received".into())))
184}
185
186async fn read_xpc_on_client_server<S>(framer: &mut H2Framer<S>) -> Result<XpcMessage, XpcError>
187where
188 S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Unpin,
189{
190 let (header, body) = read_raw_xpc_on_client_server(framer).await?;
191 let mut full = bytes::BytesMut::new();
192 full.extend_from_slice(&header);
193 full.extend_from_slice(&body);
194 decode_message(full.freeze())
195}
196
197async fn discard_xpc_on_client_server<S>(framer: &mut H2Framer<S>) -> Result<(), XpcError>
198where
199 S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Unpin,
200{
201 let _ = read_raw_xpc_on_client_server(framer).await?;
202 Ok(())
203}
204
205async fn discard_xpc_on_server_client<S>(framer: &mut H2Framer<S>) -> Result<(), XpcError>
206where
207 S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Unpin,
208{
209 let _ = read_raw_xpc_on_server_client(framer).await?;
210 Ok(())
211}
212
213async fn read_raw_xpc_on_client_server<S>(
214 framer: &mut H2Framer<S>,
215) -> Result<(Bytes, Bytes), XpcError>
216where
217 S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Unpin,
218{
219 let header = framer
220 .read_client_server(24)
221 .await
222 .map_err(|e| XpcError::Tls(format!("read header: {e}")))?;
223 let body_len = u64::from_le_bytes(
224 header[8..16]
225 .try_into()
226 .map_err(|_| XpcError::Tls("bad header".into()))?,
227 ) as usize;
228 let body = if body_len > 0 {
229 framer
230 .read_client_server(body_len)
231 .await
232 .map_err(|e| XpcError::Tls(format!("read body: {e}")))?
233 } else {
234 Bytes::new()
235 };
236 Ok((header, body))
237}
238
239async fn read_raw_xpc_on_server_client<S>(
240 framer: &mut H2Framer<S>,
241) -> Result<(Bytes, Bytes), XpcError>
242where
243 S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Unpin,
244{
245 let header = framer
246 .read_server_client(24)
247 .await
248 .map_err(|e| XpcError::Tls(format!("read header: {e}")))?;
249 let body_len = u64::from_le_bytes(
250 header[8..16]
251 .try_into()
252 .map_err(|_| XpcError::Tls("bad header".into()))?,
253 ) as usize;
254 let body = if body_len > 0 {
255 framer
256 .read_server_client(body_len)
257 .await
258 .map_err(|e| XpcError::Tls(format!("read body: {e}")))?
259 } else {
260 Bytes::new()
261 };
262 Ok((header, body))
263}
264
265pub fn parse_handshake_message_pub(msg: XpcMessage) -> Result<RsdHandshake, XpcError> {
267 parse_handshake_message(msg)
268}
269
270fn parse_handshake_message(msg: XpcMessage) -> Result<RsdHandshake, XpcError> {
271 let dict = msg
272 .body
273 .as_ref()
274 .and_then(|b| b.as_dict())
275 .ok_or_else(|| XpcError::Tls("RSD: expected XPC dict body".into()))?;
276 let message_type = dict
277 .get("MessageType")
278 .and_then(|v| v.as_str())
279 .ok_or_else(|| XpcError::Tls("RSD: missing Handshake MessageType".into()))?;
280 if message_type != "Handshake" {
281 return Err(XpcError::Tls(format!(
282 "RSD: unexpected MessageType {message_type:?}"
283 )));
284 }
285 let udid = dict
287 .get("Properties")
288 .and_then(|v| v.as_dict())
289 .and_then(|d| d.get("UniqueDeviceID"))
290 .and_then(|v| v.as_str())
291 .ok_or_else(|| XpcError::Tls("RSD: missing UniqueDeviceID".into()))?
292 .to_string();
293
294 let mut services = HashMap::new();
296 match dict.get("Services") {
297 Some(XpcValue::Dictionary(svc_map)) => {
298 tracing::debug!(
299 "RSD handshake for {} exposed {} services",
300 udid,
301 svc_map.len()
302 );
303 for (name, svc_val) in svc_map {
304 if let Some(svc_dict) = svc_val.as_dict() {
305 let port = svc_dict.get("Port").and_then(|p| match p {
307 XpcValue::String(s) => s.parse::<u16>().ok(),
308 XpcValue::Uint64(n) => Some(*n as u16),
309 _ => None,
310 });
311 if let Some(port) = port {
312 services.insert(name.clone(), ServiceDescriptor { port });
313 }
314 }
315 }
316 }
317 Some(other) => {
318 tracing::debug!("RSD Services has unexpected type: {:?}", other);
319 }
320 None => {
321 tracing::debug!("RSD handshake missing Services key");
322 }
323 }
324
325 Ok(RsdHandshake { udid, services })
326}
327
328pub struct XpcConnection<S> {
330 framer: H2Framer<S>,
331 msg_id: u64,
332}
333
334impl<S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Unpin> XpcConnection<S> {
335 pub fn new(framer: H2Framer<S>) -> Self {
336 Self { framer, msg_id: 1 }
337 }
338
339 fn next_id(&mut self) -> u64 {
340 let id = self.msg_id;
341 self.msg_id += 1;
342 id
343 }
344
345 pub async fn send(&mut self, body: XpcValue) -> Result<(), XpcError> {
347 self.send_with_flags(body, 0).await
348 }
349
350 pub async fn send_with_flags(
353 &mut self,
354 body: XpcValue,
355 extra_flags: u32,
356 ) -> Result<(), XpcError> {
357 let id = self.next_id();
358 let msg = XpcMessage {
359 flags: flags::ALWAYS_SET | flags::DATA | extra_flags,
360 msg_id: id,
361 body: Some(body),
362 };
363 let bytes = crate::xpc::message::encode_message(&msg)?;
364 self.framer
365 .write_client_server(&bytes)
366 .await
367 .map_err(|e| XpcError::Tls(e.to_string()))
368 }
369
370 pub async fn recv(&mut self) -> Result<XpcMessage, XpcError> {
372 let header = self
373 .framer
374 .read_server_client(24)
375 .await
376 .map_err(|e| XpcError::Tls(e.to_string()))?;
377 let body_len = u64::from_le_bytes(
378 header[8..16]
379 .try_into()
380 .map_err(|_| XpcError::Tls("invalid header bytes".into()))?,
381 ) as usize;
382 let body = if body_len > 0 {
383 self.framer
384 .read_server_client(body_len)
385 .await
386 .map_err(|e| XpcError::Tls(e.to_string()))?
387 } else {
388 Bytes::new()
389 };
390 let mut full = bytes::BytesMut::new();
391 full.extend_from_slice(&header);
392 full.extend_from_slice(&body);
393 decode_message(full.freeze())
394 }
395}
396
397#[cfg(test)]
398mod tests {
399 use bytes::Bytes;
400 use indexmap::IndexMap;
401 use tokio::io::{AsyncReadExt, AsyncWriteExt};
402 use tokio::time::{timeout, Duration};
403
404 use super::*;
405 use crate::xpc::message::{encode_message, flags, XpcMessage, XpcValue};
406
407 const FRAME_DATA: u8 = 0x00;
408 const FRAME_HEADERS: u8 = 0x01;
409 const FRAME_SETTINGS: u8 = 0x04;
410 const FLAG_END_HEADERS: u8 = 0x04;
411 const FLAG_SETTINGS_ACK: u8 = 0x01;
412 const STREAM_INIT: u32 = 0;
413 const STREAM_CLIENT_SERVER: u32 = 1;
414 const STREAM_SERVER_CLIENT: u32 = 3;
415
416 fn build_frame(frame_type: u8, flags: u8, stream_id: u32, payload: &[u8]) -> Vec<u8> {
417 let len = payload.len();
418 let mut out = Vec::with_capacity(9 + len);
419 out.push(((len >> 16) & 0xFF) as u8);
420 out.push(((len >> 8) & 0xFF) as u8);
421 out.push((len & 0xFF) as u8);
422 out.push(frame_type);
423 out.push(flags);
424 out.extend_from_slice(&(stream_id & 0x7FFF_FFFF).to_be_bytes());
425 out.extend_from_slice(payload);
426 out
427 }
428
429 fn settings_frame() -> Vec<u8> {
430 build_frame(FRAME_SETTINGS, 0, STREAM_INIT, &[])
431 }
432
433 fn settings_ack_frame() -> Vec<u8> {
434 build_frame(FRAME_SETTINGS, FLAG_SETTINGS_ACK, STREAM_INIT, &[])
435 }
436
437 fn headers_frame(stream_id: u32) -> Vec<u8> {
438 build_frame(FRAME_HEADERS, FLAG_END_HEADERS, stream_id, &[])
439 }
440
441 fn data_frame(stream_id: u32, payload: &[u8]) -> Vec<u8> {
442 build_frame(FRAME_DATA, 0, stream_id, payload)
443 }
444
445 fn sample_handshake_xpc_message(message_type: Option<&str>) -> XpcMessage {
446 let mut properties = IndexMap::new();
447 properties.insert(
448 "UniqueDeviceID".to_string(),
449 XpcValue::String("00008150-00013DD00104401C".into()),
450 );
451
452 let mut service = IndexMap::new();
453 service.insert("Port".to_string(), XpcValue::String("12345".into()));
454
455 let mut services = IndexMap::new();
456 services.insert(
457 "com.apple.instruments.dtservicehub".to_string(),
458 XpcValue::Dictionary(service),
459 );
460
461 let mut body = IndexMap::new();
462 if let Some(message_type) = message_type {
463 body.insert(
464 "MessageType".to_string(),
465 XpcValue::String(message_type.into()),
466 );
467 }
468 body.insert("Properties".to_string(), XpcValue::Dictionary(properties));
469 body.insert("Services".to_string(), XpcValue::Dictionary(services));
470
471 XpcMessage {
472 flags: flags::ALWAYS_SET | flags::DATA,
473 msg_id: 0,
474 body: Some(XpcValue::Dictionary(body)),
475 }
476 }
477
478 fn sample_handshake_message() -> Bytes {
479 encode_message(&sample_handshake_xpc_message(Some("Handshake")))
480 .expect("synthetic RSD message should encode")
481 }
482
483 #[test]
484 fn parse_handshake_message_rejects_missing_or_wrong_message_type() {
485 let missing = parse_handshake_message(sample_handshake_xpc_message(None));
486 assert!(missing.is_err());
487
488 let wrong = parse_handshake_message(sample_handshake_xpc_message(Some("NotHandshake")));
489 assert!(wrong.is_err());
490 }
491
492 #[test]
493 fn parse_handshake_message_accepts_valid_handshake() {
494 let handshake =
495 parse_handshake_message(sample_handshake_xpc_message(Some("Handshake"))).unwrap();
496
497 assert_eq!(handshake.udid, "00008150-00013DD00104401C");
498 assert_eq!(
499 handshake.get_port("com.apple.instruments.dtservicehub"),
500 Some(12345)
501 );
502 }
503
504 #[tokio::test]
505 async fn handshake_on_framer_reads_stream_1_without_xpc_init() {
506 let (client, mut server) = tokio::io::duplex(4096);
507
508 let server_task = tokio::spawn(async move {
509 let mut preface = [0u8; 24];
510 server.read_exact(&mut preface).await.unwrap();
511 assert_eq!(&preface, crate::xpc::h2_raw::H2_PREFACE);
512
513 let mut settings = [0u8; 21];
514 server.read_exact(&mut settings).await.unwrap();
515 assert_eq!(settings[3], FRAME_SETTINGS);
516
517 let mut window_update = [0u8; 13];
518 server.read_exact(&mut window_update).await.unwrap();
519 assert_eq!(window_update[3], 0x08);
520
521 server.write_all(&settings_frame()).await.unwrap();
522 server.flush().await.unwrap();
523
524 let mut ack = [0u8; 9];
525 server.read_exact(&mut ack).await.unwrap();
526 assert_eq!(ack, settings_ack_frame().as_slice());
527
528 assert!(timeout(Duration::from_millis(100), async {
530 let mut extra = [0u8; 1];
531 server.read_exact(&mut extra).await
532 })
533 .await
534 .is_err());
535
536 server
537 .write_all(&headers_frame(STREAM_CLIENT_SERVER))
538 .await
539 .unwrap();
540 server
541 .write_all(&headers_frame(STREAM_SERVER_CLIENT))
542 .await
543 .unwrap();
544 server
545 .write_all(&data_frame(
546 STREAM_CLIENT_SERVER,
547 &sample_handshake_message(),
548 ))
549 .await
550 .unwrap();
551 server.flush().await.unwrap();
552 });
553
554 let mut framer = H2Framer::connect(client).await.unwrap();
555 let handshake = timeout(Duration::from_secs(1), handshake_on_framer(&mut framer))
556 .await
557 .expect("handshake timed out")
558 .unwrap();
559
560 assert_eq!(handshake.udid, "00008150-00013DD00104401C");
561 assert_eq!(
562 handshake.get_port("com.apple.instruments.dtservicehub"),
563 Some(12345)
564 );
565
566 server_task.await.unwrap();
567 }
568
569 #[tokio::test]
570 async fn initialize_xpc_connection_consumes_step_responses_in_reference_order() {
571 let (client, mut server) = tokio::io::duplex(4096);
572
573 let empty = encode_message(&XpcMessage {
574 flags: flags::ALWAYS_SET,
575 msg_id: 0,
576 body: None,
577 })
578 .unwrap();
579
580 let server_task = tokio::spawn(async move {
581 let mut preface = [0u8; 24];
582 server.read_exact(&mut preface).await.unwrap();
583 assert_eq!(&preface, crate::xpc::h2_raw::H2_PREFACE);
584
585 let mut settings = [0u8; 21];
586 server.read_exact(&mut settings).await.unwrap();
587 assert_eq!(settings[3], FRAME_SETTINGS);
588
589 let mut window_update = [0u8; 13];
590 server.read_exact(&mut window_update).await.unwrap();
591 assert_eq!(window_update[3], 0x08);
592
593 server.write_all(&settings_frame()).await.unwrap();
594 server.flush().await.unwrap();
595
596 let mut ack = [0u8; 9];
597 server.read_exact(&mut ack).await.unwrap();
598 assert_eq!(ack, settings_ack_frame().as_slice());
599
600 let mut cs_headers = [0u8; 9];
601 server.read_exact(&mut cs_headers).await.unwrap();
602 assert_eq!(cs_headers, headers_frame(STREAM_CLIENT_SERVER).as_slice());
603
604 let mut cs_msg1_header = [0u8; 9];
605 server.read_exact(&mut cs_msg1_header).await.unwrap();
606 assert_eq!(cs_msg1_header[3], FRAME_DATA);
607 assert_eq!(
608 u32::from_be_bytes([
609 cs_msg1_header[5] & 0x7F,
610 cs_msg1_header[6],
611 cs_msg1_header[7],
612 cs_msg1_header[8]
613 ]),
614 STREAM_CLIENT_SERVER
615 );
616 let msg1_len = ((cs_msg1_header[0] as usize) << 16)
617 | ((cs_msg1_header[1] as usize) << 8)
618 | (cs_msg1_header[2] as usize);
619 let mut cs_msg1 = vec![0u8; msg1_len];
620 server.read_exact(&mut cs_msg1).await.unwrap();
621
622 server
623 .write_all(&data_frame(STREAM_CLIENT_SERVER, &empty))
624 .await
625 .unwrap();
626 server.flush().await.unwrap();
627
628 let mut sc_headers = [0u8; 9];
629 server.read_exact(&mut sc_headers).await.unwrap();
630 assert_eq!(sc_headers, headers_frame(STREAM_SERVER_CLIENT).as_slice());
631
632 let mut sc_msg2_header = [0u8; 9];
633 server.read_exact(&mut sc_msg2_header).await.unwrap();
634 assert_eq!(sc_msg2_header[3], FRAME_DATA);
635 assert_eq!(
636 u32::from_be_bytes([
637 sc_msg2_header[5] & 0x7F,
638 sc_msg2_header[6],
639 sc_msg2_header[7],
640 sc_msg2_header[8]
641 ]),
642 STREAM_SERVER_CLIENT
643 );
644 let msg2_len = ((sc_msg2_header[0] as usize) << 16)
645 | ((sc_msg2_header[1] as usize) << 8)
646 | (sc_msg2_header[2] as usize);
647 let mut sc_msg2 = vec![0u8; msg2_len];
648 server.read_exact(&mut sc_msg2).await.unwrap();
649
650 server
651 .write_all(&data_frame(STREAM_SERVER_CLIENT, &empty))
652 .await
653 .unwrap();
654 server.flush().await.unwrap();
655
656 let mut cs_msg3_header = [0u8; 9];
657 server.read_exact(&mut cs_msg3_header).await.unwrap();
658 assert_eq!(cs_msg3_header[3], FRAME_DATA);
659 assert_eq!(
660 u32::from_be_bytes([
661 cs_msg3_header[5] & 0x7F,
662 cs_msg3_header[6],
663 cs_msg3_header[7],
664 cs_msg3_header[8]
665 ]),
666 STREAM_CLIENT_SERVER
667 );
668 let msg3_len = ((cs_msg3_header[0] as usize) << 16)
669 | ((cs_msg3_header[1] as usize) << 8)
670 | (cs_msg3_header[2] as usize);
671 let mut cs_msg3 = vec![0u8; msg3_len];
672 server.read_exact(&mut cs_msg3).await.unwrap();
673
674 server
675 .write_all(&data_frame(STREAM_CLIENT_SERVER, &empty))
676 .await
677 .unwrap();
678 server.flush().await.unwrap();
679 });
680
681 let mut framer = H2Framer::connect(client).await.unwrap();
682 timeout(
683 Duration::from_secs(1),
684 initialize_xpc_connection_on_framer(&mut framer),
685 )
686 .await
687 .expect("bootstrap timed out")
688 .unwrap();
689
690 server_task.await.unwrap();
691 }
692
693 #[tokio::test]
694 async fn queue_rsd_handshake_bootstrap_matches_pymobiledevice3_order() {
695 let (client, mut server) = tokio::io::duplex(4096);
696
697 let server_task = tokio::spawn(async move {
698 let mut preface = [0u8; 24];
699 server.read_exact(&mut preface).await.unwrap();
700 assert_eq!(&preface, crate::xpc::h2_raw::H2_PREFACE);
701
702 let mut settings = [0u8; 21];
703 server.read_exact(&mut settings).await.unwrap();
704 assert_eq!(settings[3], FRAME_SETTINGS);
705
706 let mut window_update = [0u8; 13];
707 server.read_exact(&mut window_update).await.unwrap();
708 assert_eq!(window_update[3], 0x08);
709
710 server.write_all(&settings_frame()).await.unwrap();
711 server.flush().await.unwrap();
712
713 let mut ack = [0u8; 9];
714 server.read_exact(&mut ack).await.unwrap();
715 assert_eq!(ack, settings_ack_frame().as_slice());
716
717 let mut cs_headers = [0u8; 9];
718 server.read_exact(&mut cs_headers).await.unwrap();
719 assert_eq!(cs_headers, headers_frame(STREAM_CLIENT_SERVER).as_slice());
720
721 let mut cs_msg1_header = [0u8; 9];
722 server.read_exact(&mut cs_msg1_header).await.unwrap();
723 assert_eq!(cs_msg1_header[3], FRAME_DATA);
724 let cs_msg1_len = ((cs_msg1_header[0] as usize) << 16)
725 | ((cs_msg1_header[1] as usize) << 8)
726 | (cs_msg1_header[2] as usize);
727 let mut cs_msg1 = vec![0u8; cs_msg1_len];
728 server.read_exact(&mut cs_msg1).await.unwrap();
729 let decoded1 = decode_message(Bytes::from(cs_msg1)).unwrap();
730 assert_eq!(decoded1.flags, flags::ALWAYS_SET);
731 assert_eq!(
732 decoded1.body,
733 Some(XpcValue::Dictionary(IndexMap::<String, XpcValue>::new()))
734 );
735
736 let mut cs_msg2_header = [0u8; 9];
737 server.read_exact(&mut cs_msg2_header).await.unwrap();
738 assert_eq!(cs_msg2_header[3], FRAME_DATA);
739 let cs_msg2_len = ((cs_msg2_header[0] as usize) << 16)
740 | ((cs_msg2_header[1] as usize) << 8)
741 | (cs_msg2_header[2] as usize);
742 let mut cs_msg2 = vec![0u8; cs_msg2_len];
743 server.read_exact(&mut cs_msg2).await.unwrap();
744 let decoded2 = decode_message(Bytes::from(cs_msg2)).unwrap();
745 assert_eq!(decoded2.flags, flags::ALWAYS_SET | 0x200);
746 assert!(decoded2.body.is_none());
747
748 let mut sc_headers = [0u8; 9];
749 server.read_exact(&mut sc_headers).await.unwrap();
750 assert_eq!(sc_headers, headers_frame(STREAM_SERVER_CLIENT).as_slice());
751
752 let mut sc_msg3_header = [0u8; 9];
753 server.read_exact(&mut sc_msg3_header).await.unwrap();
754 assert_eq!(sc_msg3_header[3], FRAME_DATA);
755 let sc_msg3_len = ((sc_msg3_header[0] as usize) << 16)
756 | ((sc_msg3_header[1] as usize) << 8)
757 | (sc_msg3_header[2] as usize);
758 let mut sc_msg3 = vec![0u8; sc_msg3_len];
759 server.read_exact(&mut sc_msg3).await.unwrap();
760 let decoded3 = decode_message(Bytes::from(sc_msg3)).unwrap();
761 assert_eq!(decoded3.flags, flags::INIT_HANDSHAKE | flags::ALWAYS_SET);
762 assert!(decoded3.body.is_none());
763 });
764
765 let mut framer = H2Framer::connect(client).await.unwrap();
766 timeout(
767 Duration::from_secs(1),
768 queue_rsd_handshake_bootstrap_on_framer(&mut framer),
769 )
770 .await
771 .expect("queued bootstrap timed out")
772 .unwrap();
773
774 server_task.await.unwrap();
775 }
776}