1use std::collections::HashMap;
15#[cfg(feature = "mdns")]
16use std::net::{Ipv6Addr, SocketAddr};
17
18#[cfg(feature = "tunnel")]
19use bytes::Bytes;
20#[cfg(feature = "mdns")]
21use tokio::net::TcpStream;
22
23#[cfg(feature = "tunnel")]
24use crate::xpc::h2_raw::H2Framer;
25#[cfg(feature = "tunnel")]
26use crate::xpc::message::{decode_message, flags, XpcMessage, XpcValue};
27#[cfg(any(feature = "tunnel", feature = "mdns"))]
28use crate::xpc::XpcError;
29
30pub const RSD_PORT: u16 = 58783;
31
32#[derive(Debug, Clone)]
34pub struct ServiceDescriptor {
35 pub port: u16,
36}
37
38#[derive(Debug, Clone)]
40pub struct RsdHandshake {
41 pub udid: String,
42 pub services: HashMap<String, ServiceDescriptor>,
43}
44
45impl RsdHandshake {
46 pub fn get_port(&self, service: &str) -> Option<u16> {
48 if let Some(s) = self.services.get(service) {
49 return Some(s.port);
50 }
51 let shim = format!("{service}.shim.remote");
52 self.services.get(&shim).map(|s| s.port)
53 }
54}
55
56#[cfg(feature = "mdns")]
60pub async fn handshake(addr: Ipv6Addr, port: u16) -> Result<RsdHandshake, XpcError> {
61 let sock_addr = SocketAddr::new(addr.into(), port);
62 let stream = TcpStream::connect(sock_addr).await?;
63 let mut framer = H2Framer::connect(stream)
64 .await
65 .map_err(|e| XpcError::Tls(format!("H2: {e}")))?;
66
67 read_rsd_handshake(&mut framer).await
68}
69
70#[cfg(feature = "tunnel")]
73pub async fn handshake_on_framer<S>(framer: &mut H2Framer<S>) -> Result<RsdHandshake, XpcError>
74where
75 S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Unpin,
76{
77 read_rsd_handshake(framer).await
78}
79
80#[cfg(feature = "tunnel")]
85pub async fn initialize_xpc_connection_on_framer<S>(
86 framer: &mut H2Framer<S>,
87) -> Result<(), XpcError>
88where
89 S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Unpin,
90{
91 use crate::xpc::message::{encode_message, flags, XpcMessage, XpcValue};
92
93 let msg1 = encode_message(&XpcMessage {
94 flags: flags::ALWAYS_SET,
95 msg_id: 0,
96 body: Some(XpcValue::Dictionary(indexmap::IndexMap::new())),
97 })?;
98 framer
99 .write_client_server(&msg1)
100 .await
101 .map_err(|e| XpcError::Tls(format!("xpc init write 1: {e}")))?;
102 discard_xpc_on_client_server(framer).await?;
103
104 let msg2 = encode_message(&XpcMessage {
105 flags: flags::INIT_HANDSHAKE | flags::ALWAYS_SET,
106 msg_id: 0,
107 body: None,
108 })?;
109 framer
110 .write_server_client(&msg2)
111 .await
112 .map_err(|e| XpcError::Tls(format!("xpc init write 2: {e}")))?;
113 discard_xpc_on_server_client(framer).await?;
114
115 let msg3 = encode_message(&XpcMessage {
116 flags: flags::ALWAYS_SET | 0x200,
117 msg_id: 0,
118 body: None,
119 })?;
120 framer
121 .write_client_server(&msg3)
122 .await
123 .map_err(|e| XpcError::Tls(format!("xpc init write 3: {e}")))?;
124 discard_xpc_on_client_server(framer).await?;
125
126 Ok(())
127}
128
129#[cfg(feature = "tunnel")]
132pub async fn queue_rsd_handshake_bootstrap_on_framer<S>(
133 framer: &mut H2Framer<S>,
134) -> Result<(), XpcError>
135where
136 S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Unpin,
137{
138 use crate::xpc::message::{encode_message, flags, XpcMessage, XpcValue};
139
140 let msg1 = encode_message(&XpcMessage {
141 flags: flags::ALWAYS_SET,
142 msg_id: 0,
143 body: Some(XpcValue::Dictionary(indexmap::IndexMap::new())),
144 })?;
145 framer
146 .write_client_server(&msg1)
147 .await
148 .map_err(|e| XpcError::Tls(format!("rsd bootstrap write 1: {e}")))?;
149
150 let msg2 = encode_message(&XpcMessage {
151 flags: flags::ALWAYS_SET | 0x200,
152 msg_id: 0,
153 body: None,
154 })?;
155 framer
156 .write_client_server(&msg2)
157 .await
158 .map_err(|e| XpcError::Tls(format!("rsd bootstrap write 2: {e}")))?;
159
160 let msg3 = encode_message(&XpcMessage {
161 flags: flags::INIT_HANDSHAKE | flags::ALWAYS_SET,
162 msg_id: 0,
163 body: None,
164 })?;
165 framer
166 .write_server_client(&msg3)
167 .await
168 .map_err(|e| XpcError::Tls(format!("rsd bootstrap write 3: {e}")))?;
169
170 Ok(())
171}
172
173#[cfg(feature = "tunnel")]
179async fn read_rsd_handshake<S>(framer: &mut H2Framer<S>) -> Result<RsdHandshake, XpcError>
180where
181 S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Unpin,
182{
183 let mut last_err = None;
184 for _ in 0..6 {
185 let msg = read_xpc_on_client_server(framer).await?;
186 match parse_handshake_message(msg) {
187 Ok(handshake) => return Ok(handshake),
188 Err(err) => {
189 tracing::debug!("RSD: skipping non-handshake stream-1 message: {err}");
190 last_err = Some(err);
191 }
192 }
193 }
194 Err(last_err.unwrap_or_else(|| XpcError::Tls("RSD: no handshake message received".into())))
195}
196
197#[cfg(feature = "tunnel")]
198async fn read_xpc_on_client_server<S>(framer: &mut H2Framer<S>) -> Result<XpcMessage, XpcError>
199where
200 S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Unpin,
201{
202 let (header, body) = read_raw_xpc_on_client_server(framer).await?;
203 let mut full = bytes::BytesMut::new();
204 full.extend_from_slice(&header);
205 full.extend_from_slice(&body);
206 decode_message(full.freeze())
207}
208
209#[cfg(feature = "tunnel")]
210async fn discard_xpc_on_client_server<S>(framer: &mut H2Framer<S>) -> Result<(), XpcError>
211where
212 S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Unpin,
213{
214 let _ = read_raw_xpc_on_client_server(framer).await?;
215 Ok(())
216}
217
218#[cfg(feature = "tunnel")]
219async fn discard_xpc_on_server_client<S>(framer: &mut H2Framer<S>) -> Result<(), XpcError>
220where
221 S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Unpin,
222{
223 let _ = read_raw_xpc_on_server_client(framer).await?;
224 Ok(())
225}
226
227#[cfg(feature = "tunnel")]
228async fn read_raw_xpc_on_client_server<S>(
229 framer: &mut H2Framer<S>,
230) -> Result<(Bytes, Bytes), XpcError>
231where
232 S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Unpin,
233{
234 let header = framer
235 .read_client_server(24)
236 .await
237 .map_err(|e| XpcError::Tls(format!("read header: {e}")))?;
238 let body_len = u64::from_le_bytes(
239 header[8..16]
240 .try_into()
241 .map_err(|_| XpcError::Tls("bad header".into()))?,
242 ) as usize;
243 let body = if body_len > 0 {
244 framer
245 .read_client_server(body_len)
246 .await
247 .map_err(|e| XpcError::Tls(format!("read body: {e}")))?
248 } else {
249 Bytes::new()
250 };
251 Ok((header, body))
252}
253
254#[cfg(feature = "tunnel")]
255async fn read_raw_xpc_on_server_client<S>(
256 framer: &mut H2Framer<S>,
257) -> Result<(Bytes, Bytes), XpcError>
258where
259 S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Unpin,
260{
261 let header = framer
262 .read_server_client(24)
263 .await
264 .map_err(|e| XpcError::Tls(format!("read header: {e}")))?;
265 let body_len = u64::from_le_bytes(
266 header[8..16]
267 .try_into()
268 .map_err(|_| XpcError::Tls("bad header".into()))?,
269 ) as usize;
270 let body = if body_len > 0 {
271 framer
272 .read_server_client(body_len)
273 .await
274 .map_err(|e| XpcError::Tls(format!("read body: {e}")))?
275 } else {
276 Bytes::new()
277 };
278 Ok((header, body))
279}
280
281#[cfg(feature = "tunnel")]
282fn parse_handshake_message(msg: XpcMessage) -> Result<RsdHandshake, XpcError> {
283 let dict = msg
284 .body
285 .as_ref()
286 .and_then(|b| b.as_dict())
287 .ok_or_else(|| XpcError::Tls("RSD: expected XPC dict body".into()))?;
288 let message_type = dict
289 .get("MessageType")
290 .and_then(|v| v.as_str())
291 .ok_or_else(|| XpcError::Tls("RSD: missing Handshake MessageType".into()))?;
292 if message_type != "Handshake" {
293 return Err(XpcError::Tls(format!(
294 "RSD: unexpected MessageType {message_type:?}"
295 )));
296 }
297 let udid = dict
299 .get("Properties")
300 .and_then(|v| v.as_dict())
301 .and_then(|d| d.get("UniqueDeviceID"))
302 .and_then(|v| v.as_str())
303 .ok_or_else(|| XpcError::Tls("RSD: missing UniqueDeviceID".into()))?
304 .to_string();
305
306 let mut services = HashMap::new();
308 match dict.get("Services") {
309 Some(XpcValue::Dictionary(svc_map)) => {
310 tracing::debug!(
311 "RSD handshake for {} exposed {} services",
312 udid,
313 svc_map.len()
314 );
315 for (name, svc_val) in svc_map {
316 if let Some(svc_dict) = svc_val.as_dict() {
317 let port = svc_dict.get("Port").and_then(|p| match p {
319 XpcValue::String(s) => s.parse::<u16>().ok(),
320 XpcValue::Uint64(n) => Some(*n as u16),
321 _ => None,
322 });
323 if let Some(port) = port {
324 services.insert(name.clone(), ServiceDescriptor { port });
325 }
326 }
327 }
328 }
329 Some(other) => {
330 tracing::debug!("RSD Services has unexpected type: {:?}", other);
331 }
332 None => {
333 tracing::debug!("RSD handshake missing Services key");
334 }
335 }
336
337 Ok(RsdHandshake { udid, services })
338}
339
340#[cfg(feature = "tunnel")]
342pub struct XpcConnection<S> {
343 framer: H2Framer<S>,
344 msg_id: u64,
345}
346
347#[cfg(feature = "tunnel")]
348impl<S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Unpin> XpcConnection<S> {
349 pub fn new(framer: H2Framer<S>) -> Self {
350 Self { framer, msg_id: 1 }
351 }
352
353 fn next_id(&mut self) -> u64 {
354 let id = self.msg_id;
355 self.msg_id += 1;
356 id
357 }
358
359 pub async fn send(&mut self, body: XpcValue) -> Result<(), XpcError> {
361 self.send_with_flags(body, 0).await
362 }
363
364 pub async fn send_with_flags(
367 &mut self,
368 body: XpcValue,
369 extra_flags: u32,
370 ) -> Result<(), XpcError> {
371 let id = self.next_id();
372 let msg = XpcMessage {
373 flags: flags::ALWAYS_SET | flags::DATA | extra_flags,
374 msg_id: id,
375 body: Some(body),
376 };
377 let bytes = crate::xpc::message::encode_message(&msg)?;
378 self.framer
379 .write_client_server(&bytes)
380 .await
381 .map_err(|e| XpcError::Tls(e.to_string()))
382 }
383
384 pub async fn recv(&mut self) -> Result<XpcMessage, XpcError> {
386 let header = self
387 .framer
388 .read_server_client(24)
389 .await
390 .map_err(|e| XpcError::Tls(e.to_string()))?;
391 let body_len = u64::from_le_bytes(
392 header[8..16]
393 .try_into()
394 .map_err(|_| XpcError::Tls("invalid header bytes".into()))?,
395 ) as usize;
396 let body = if body_len > 0 {
397 self.framer
398 .read_server_client(body_len)
399 .await
400 .map_err(|e| XpcError::Tls(e.to_string()))?
401 } else {
402 Bytes::new()
403 };
404 let mut full = bytes::BytesMut::new();
405 full.extend_from_slice(&header);
406 full.extend_from_slice(&body);
407 decode_message(full.freeze())
408 }
409}
410
411#[cfg(test)]
412#[cfg(feature = "tunnel")]
413mod tests {
414 use bytes::Bytes;
415 use indexmap::IndexMap;
416 use tokio::io::{AsyncReadExt, AsyncWriteExt};
417 use tokio::time::{timeout, Duration};
418
419 use super::*;
420 use crate::xpc::message::{encode_message, flags, XpcMessage, XpcValue};
421
422 const FRAME_DATA: u8 = 0x00;
423 const FRAME_HEADERS: u8 = 0x01;
424 const FRAME_SETTINGS: u8 = 0x04;
425 const FLAG_END_HEADERS: u8 = 0x04;
426 const FLAG_SETTINGS_ACK: u8 = 0x01;
427 const STREAM_INIT: u32 = 0;
428 const STREAM_CLIENT_SERVER: u32 = 1;
429 const STREAM_SERVER_CLIENT: u32 = 3;
430
431 fn build_frame(frame_type: u8, flags: u8, stream_id: u32, payload: &[u8]) -> Vec<u8> {
432 let len = payload.len();
433 let mut out = Vec::with_capacity(9 + len);
434 out.push(((len >> 16) & 0xFF) as u8);
435 out.push(((len >> 8) & 0xFF) as u8);
436 out.push((len & 0xFF) as u8);
437 out.push(frame_type);
438 out.push(flags);
439 out.extend_from_slice(&(stream_id & 0x7FFF_FFFF).to_be_bytes());
440 out.extend_from_slice(payload);
441 out
442 }
443
444 fn settings_frame() -> Vec<u8> {
445 build_frame(FRAME_SETTINGS, 0, STREAM_INIT, &[])
446 }
447
448 fn settings_ack_frame() -> Vec<u8> {
449 build_frame(FRAME_SETTINGS, FLAG_SETTINGS_ACK, STREAM_INIT, &[])
450 }
451
452 fn headers_frame(stream_id: u32) -> Vec<u8> {
453 build_frame(FRAME_HEADERS, FLAG_END_HEADERS, stream_id, &[])
454 }
455
456 fn data_frame(stream_id: u32, payload: &[u8]) -> Vec<u8> {
457 build_frame(FRAME_DATA, 0, stream_id, payload)
458 }
459
460 fn sample_handshake_xpc_message(message_type: Option<&str>) -> XpcMessage {
461 let mut properties = IndexMap::new();
462 properties.insert(
463 "UniqueDeviceID".to_string(),
464 XpcValue::String("00008150-00013DD00104401C".into()),
465 );
466
467 let mut service = IndexMap::new();
468 service.insert("Port".to_string(), XpcValue::String("12345".into()));
469
470 let mut services = IndexMap::new();
471 services.insert(
472 "com.apple.instruments.dtservicehub".to_string(),
473 XpcValue::Dictionary(service),
474 );
475
476 let mut body = IndexMap::new();
477 if let Some(message_type) = message_type {
478 body.insert(
479 "MessageType".to_string(),
480 XpcValue::String(message_type.into()),
481 );
482 }
483 body.insert("Properties".to_string(), XpcValue::Dictionary(properties));
484 body.insert("Services".to_string(), XpcValue::Dictionary(services));
485
486 XpcMessage {
487 flags: flags::ALWAYS_SET | flags::DATA,
488 msg_id: 0,
489 body: Some(XpcValue::Dictionary(body)),
490 }
491 }
492
493 fn sample_handshake_message() -> Bytes {
494 encode_message(&sample_handshake_xpc_message(Some("Handshake")))
495 .expect("synthetic RSD message should encode")
496 }
497
498 #[test]
499 fn parse_handshake_message_rejects_missing_or_wrong_message_type() {
500 let missing = parse_handshake_message(sample_handshake_xpc_message(None));
501 assert!(missing.is_err());
502
503 let wrong = parse_handshake_message(sample_handshake_xpc_message(Some("NotHandshake")));
504 assert!(wrong.is_err());
505 }
506
507 #[test]
508 fn parse_handshake_message_accepts_valid_handshake() {
509 let handshake =
510 parse_handshake_message(sample_handshake_xpc_message(Some("Handshake"))).unwrap();
511
512 assert_eq!(handshake.udid, "00008150-00013DD00104401C");
513 assert_eq!(
514 handshake.get_port("com.apple.instruments.dtservicehub"),
515 Some(12345)
516 );
517 }
518
519 #[tokio::test]
520 async fn handshake_on_framer_reads_stream_1_without_xpc_init() {
521 let (client, mut server) = tokio::io::duplex(4096);
522
523 let server_task = tokio::spawn(async move {
524 let mut preface = [0u8; 24];
525 server.read_exact(&mut preface).await.unwrap();
526 assert_eq!(&preface, crate::xpc::h2_raw::H2_PREFACE);
527
528 let mut settings = [0u8; 21];
529 server.read_exact(&mut settings).await.unwrap();
530 assert_eq!(settings[3], FRAME_SETTINGS);
531
532 let mut window_update = [0u8; 13];
533 server.read_exact(&mut window_update).await.unwrap();
534 assert_eq!(window_update[3], 0x08);
535
536 server.write_all(&settings_frame()).await.unwrap();
537 server.flush().await.unwrap();
538
539 let mut ack = [0u8; 9];
540 server.read_exact(&mut ack).await.unwrap();
541 assert_eq!(ack, settings_ack_frame().as_slice());
542
543 assert!(timeout(Duration::from_millis(100), async {
545 let mut extra = [0u8; 1];
546 server.read_exact(&mut extra).await
547 })
548 .await
549 .is_err());
550
551 server
552 .write_all(&headers_frame(STREAM_CLIENT_SERVER))
553 .await
554 .unwrap();
555 server
556 .write_all(&headers_frame(STREAM_SERVER_CLIENT))
557 .await
558 .unwrap();
559 server
560 .write_all(&data_frame(
561 STREAM_CLIENT_SERVER,
562 &sample_handshake_message(),
563 ))
564 .await
565 .unwrap();
566 server.flush().await.unwrap();
567 });
568
569 let mut framer = H2Framer::connect(client).await.unwrap();
570 let handshake = timeout(Duration::from_secs(1), handshake_on_framer(&mut framer))
571 .await
572 .expect("handshake timed out")
573 .unwrap();
574
575 assert_eq!(handshake.udid, "00008150-00013DD00104401C");
576 assert_eq!(
577 handshake.get_port("com.apple.instruments.dtservicehub"),
578 Some(12345)
579 );
580
581 server_task.await.unwrap();
582 }
583
584 #[tokio::test]
585 async fn initialize_xpc_connection_consumes_step_responses_in_reference_order() {
586 let (client, mut server) = tokio::io::duplex(4096);
587
588 let empty = encode_message(&XpcMessage {
589 flags: flags::ALWAYS_SET,
590 msg_id: 0,
591 body: None,
592 })
593 .unwrap();
594
595 let server_task = tokio::spawn(async move {
596 let mut preface = [0u8; 24];
597 server.read_exact(&mut preface).await.unwrap();
598 assert_eq!(&preface, crate::xpc::h2_raw::H2_PREFACE);
599
600 let mut settings = [0u8; 21];
601 server.read_exact(&mut settings).await.unwrap();
602 assert_eq!(settings[3], FRAME_SETTINGS);
603
604 let mut window_update = [0u8; 13];
605 server.read_exact(&mut window_update).await.unwrap();
606 assert_eq!(window_update[3], 0x08);
607
608 server.write_all(&settings_frame()).await.unwrap();
609 server.flush().await.unwrap();
610
611 let mut ack = [0u8; 9];
612 server.read_exact(&mut ack).await.unwrap();
613 assert_eq!(ack, settings_ack_frame().as_slice());
614
615 let mut cs_headers = [0u8; 9];
616 server.read_exact(&mut cs_headers).await.unwrap();
617 assert_eq!(cs_headers, headers_frame(STREAM_CLIENT_SERVER).as_slice());
618
619 let mut cs_msg1_header = [0u8; 9];
620 server.read_exact(&mut cs_msg1_header).await.unwrap();
621 assert_eq!(cs_msg1_header[3], FRAME_DATA);
622 assert_eq!(
623 u32::from_be_bytes([
624 cs_msg1_header[5] & 0x7F,
625 cs_msg1_header[6],
626 cs_msg1_header[7],
627 cs_msg1_header[8]
628 ]),
629 STREAM_CLIENT_SERVER
630 );
631 let msg1_len = ((cs_msg1_header[0] as usize) << 16)
632 | ((cs_msg1_header[1] as usize) << 8)
633 | (cs_msg1_header[2] as usize);
634 let mut cs_msg1 = vec![0u8; msg1_len];
635 server.read_exact(&mut cs_msg1).await.unwrap();
636
637 server
638 .write_all(&data_frame(STREAM_CLIENT_SERVER, &empty))
639 .await
640 .unwrap();
641 server.flush().await.unwrap();
642
643 let mut sc_headers = [0u8; 9];
644 server.read_exact(&mut sc_headers).await.unwrap();
645 assert_eq!(sc_headers, headers_frame(STREAM_SERVER_CLIENT).as_slice());
646
647 let mut sc_msg2_header = [0u8; 9];
648 server.read_exact(&mut sc_msg2_header).await.unwrap();
649 assert_eq!(sc_msg2_header[3], FRAME_DATA);
650 assert_eq!(
651 u32::from_be_bytes([
652 sc_msg2_header[5] & 0x7F,
653 sc_msg2_header[6],
654 sc_msg2_header[7],
655 sc_msg2_header[8]
656 ]),
657 STREAM_SERVER_CLIENT
658 );
659 let msg2_len = ((sc_msg2_header[0] as usize) << 16)
660 | ((sc_msg2_header[1] as usize) << 8)
661 | (sc_msg2_header[2] as usize);
662 let mut sc_msg2 = vec![0u8; msg2_len];
663 server.read_exact(&mut sc_msg2).await.unwrap();
664
665 server
666 .write_all(&data_frame(STREAM_SERVER_CLIENT, &empty))
667 .await
668 .unwrap();
669 server.flush().await.unwrap();
670
671 let mut cs_msg3_header = [0u8; 9];
672 server.read_exact(&mut cs_msg3_header).await.unwrap();
673 assert_eq!(cs_msg3_header[3], FRAME_DATA);
674 assert_eq!(
675 u32::from_be_bytes([
676 cs_msg3_header[5] & 0x7F,
677 cs_msg3_header[6],
678 cs_msg3_header[7],
679 cs_msg3_header[8]
680 ]),
681 STREAM_CLIENT_SERVER
682 );
683 let msg3_len = ((cs_msg3_header[0] as usize) << 16)
684 | ((cs_msg3_header[1] as usize) << 8)
685 | (cs_msg3_header[2] as usize);
686 let mut cs_msg3 = vec![0u8; msg3_len];
687 server.read_exact(&mut cs_msg3).await.unwrap();
688
689 server
690 .write_all(&data_frame(STREAM_CLIENT_SERVER, &empty))
691 .await
692 .unwrap();
693 server.flush().await.unwrap();
694 });
695
696 let mut framer = H2Framer::connect(client).await.unwrap();
697 timeout(
698 Duration::from_secs(1),
699 initialize_xpc_connection_on_framer(&mut framer),
700 )
701 .await
702 .expect("bootstrap timed out")
703 .unwrap();
704
705 server_task.await.unwrap();
706 }
707
708 #[tokio::test]
709 async fn queue_rsd_handshake_bootstrap_matches_pymobiledevice3_order() {
710 let (client, mut server) = tokio::io::duplex(4096);
711
712 let server_task = tokio::spawn(async move {
713 let mut preface = [0u8; 24];
714 server.read_exact(&mut preface).await.unwrap();
715 assert_eq!(&preface, crate::xpc::h2_raw::H2_PREFACE);
716
717 let mut settings = [0u8; 21];
718 server.read_exact(&mut settings).await.unwrap();
719 assert_eq!(settings[3], FRAME_SETTINGS);
720
721 let mut window_update = [0u8; 13];
722 server.read_exact(&mut window_update).await.unwrap();
723 assert_eq!(window_update[3], 0x08);
724
725 server.write_all(&settings_frame()).await.unwrap();
726 server.flush().await.unwrap();
727
728 let mut ack = [0u8; 9];
729 server.read_exact(&mut ack).await.unwrap();
730 assert_eq!(ack, settings_ack_frame().as_slice());
731
732 let mut cs_headers = [0u8; 9];
733 server.read_exact(&mut cs_headers).await.unwrap();
734 assert_eq!(cs_headers, headers_frame(STREAM_CLIENT_SERVER).as_slice());
735
736 let mut cs_msg1_header = [0u8; 9];
737 server.read_exact(&mut cs_msg1_header).await.unwrap();
738 assert_eq!(cs_msg1_header[3], FRAME_DATA);
739 let cs_msg1_len = ((cs_msg1_header[0] as usize) << 16)
740 | ((cs_msg1_header[1] as usize) << 8)
741 | (cs_msg1_header[2] as usize);
742 let mut cs_msg1 = vec![0u8; cs_msg1_len];
743 server.read_exact(&mut cs_msg1).await.unwrap();
744 let decoded1 = decode_message(Bytes::from(cs_msg1)).unwrap();
745 assert_eq!(decoded1.flags, flags::ALWAYS_SET);
746 assert_eq!(
747 decoded1.body,
748 Some(XpcValue::Dictionary(IndexMap::<String, XpcValue>::new()))
749 );
750
751 let mut cs_msg2_header = [0u8; 9];
752 server.read_exact(&mut cs_msg2_header).await.unwrap();
753 assert_eq!(cs_msg2_header[3], FRAME_DATA);
754 let cs_msg2_len = ((cs_msg2_header[0] as usize) << 16)
755 | ((cs_msg2_header[1] as usize) << 8)
756 | (cs_msg2_header[2] as usize);
757 let mut cs_msg2 = vec![0u8; cs_msg2_len];
758 server.read_exact(&mut cs_msg2).await.unwrap();
759 let decoded2 = decode_message(Bytes::from(cs_msg2)).unwrap();
760 assert_eq!(decoded2.flags, flags::ALWAYS_SET | 0x200);
761 assert!(decoded2.body.is_none());
762
763 let mut sc_headers = [0u8; 9];
764 server.read_exact(&mut sc_headers).await.unwrap();
765 assert_eq!(sc_headers, headers_frame(STREAM_SERVER_CLIENT).as_slice());
766
767 let mut sc_msg3_header = [0u8; 9];
768 server.read_exact(&mut sc_msg3_header).await.unwrap();
769 assert_eq!(sc_msg3_header[3], FRAME_DATA);
770 let sc_msg3_len = ((sc_msg3_header[0] as usize) << 16)
771 | ((sc_msg3_header[1] as usize) << 8)
772 | (sc_msg3_header[2] as usize);
773 let mut sc_msg3 = vec![0u8; sc_msg3_len];
774 server.read_exact(&mut sc_msg3).await.unwrap();
775 let decoded3 = decode_message(Bytes::from(sc_msg3)).unwrap();
776 assert_eq!(decoded3.flags, flags::INIT_HANDSHAKE | flags::ALWAYS_SET);
777 assert!(decoded3.body.is_none());
778 });
779
780 let mut framer = H2Framer::connect(client).await.unwrap();
781 timeout(
782 Duration::from_secs(1),
783 queue_rsd_handshake_bootstrap_on_framer(&mut framer),
784 )
785 .await
786 .expect("queued bootstrap timed out")
787 .unwrap();
788
789 server_task.await.unwrap();
790 }
791}