1use std::collections::HashMap;
15#[cfg(feature = "tunnel")]
16use std::collections::VecDeque;
17#[cfg(feature = "mdns")]
18use std::net::{Ipv6Addr, SocketAddr};
19
20#[cfg(feature = "tunnel")]
21use bytes::Bytes;
22#[cfg(feature = "mdns")]
23use tokio::net::TcpStream;
24
25#[cfg(feature = "tunnel")]
26use crate::xpc::h2_raw::H2Framer;
27#[cfg(feature = "tunnel")]
28use crate::xpc::message::{decode_message, flags, XpcMessage, XpcValue};
29#[cfg(any(feature = "tunnel", feature = "mdns"))]
30use crate::xpc::XpcError;
31
32pub const RSD_PORT: u16 = 58783;
33
34#[derive(Debug, Clone)]
36pub struct ServiceDescriptor {
37 pub port: u16,
38}
39
40#[derive(Debug, Clone)]
42pub struct RsdHandshake {
43 pub udid: String,
44 pub services: HashMap<String, ServiceDescriptor>,
45}
46
47impl RsdHandshake {
48 pub fn get_port(&self, service: &str) -> Option<u16> {
50 if let Some(s) = self.services.get(service) {
51 return Some(s.port);
52 }
53 let shim = format!("{service}.shim.remote");
54 self.services.get(&shim).map(|s| s.port)
55 }
56}
57
58#[cfg(feature = "mdns")]
62pub async fn handshake(addr: Ipv6Addr, port: u16) -> Result<RsdHandshake, XpcError> {
63 let sock_addr = SocketAddr::new(addr.into(), port);
64 let stream = TcpStream::connect(sock_addr).await?;
65 let mut framer = H2Framer::connect(stream)
66 .await
67 .map_err(|e| XpcError::Tls(format!("H2: {e}")))?;
68
69 read_rsd_handshake(&mut framer).await
70}
71
72#[cfg(feature = "tunnel")]
75pub async fn handshake_on_framer<S>(framer: &mut H2Framer<S>) -> Result<RsdHandshake, XpcError>
76where
77 S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Unpin,
78{
79 read_rsd_handshake(framer).await
80}
81
82#[cfg(feature = "tunnel")]
87pub async fn initialize_xpc_connection_on_framer<S>(
88 framer: &mut H2Framer<S>,
89) -> Result<(), XpcError>
90where
91 S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Unpin,
92{
93 use crate::xpc::message::{encode_message, flags, XpcMessage, XpcValue};
94
95 let msg1 = encode_message(&XpcMessage {
96 flags: flags::ALWAYS_SET,
97 msg_id: 0,
98 body: Some(XpcValue::Dictionary(indexmap::IndexMap::new())),
99 })?;
100 framer
101 .write_client_server(&msg1)
102 .await
103 .map_err(|e| XpcError::Tls(format!("xpc init write 1: {e}")))?;
104 discard_xpc_on_client_server(framer).await?;
105
106 let msg2 = encode_message(&XpcMessage {
107 flags: flags::INIT_HANDSHAKE | flags::ALWAYS_SET,
108 msg_id: 0,
109 body: None,
110 })?;
111 framer
112 .write_server_client(&msg2)
113 .await
114 .map_err(|e| XpcError::Tls(format!("xpc init write 2: {e}")))?;
115 discard_xpc_on_server_client(framer).await?;
116
117 let msg3 = encode_message(&XpcMessage {
118 flags: flags::ALWAYS_SET | 0x200,
119 msg_id: 0,
120 body: None,
121 })?;
122 framer
123 .write_client_server(&msg3)
124 .await
125 .map_err(|e| XpcError::Tls(format!("xpc init write 3: {e}")))?;
126 discard_xpc_on_client_server(framer).await?;
127
128 Ok(())
129}
130
131#[cfg(feature = "tunnel")]
134pub async fn queue_rsd_handshake_bootstrap_on_framer<S>(
135 framer: &mut H2Framer<S>,
136) -> Result<(), XpcError>
137where
138 S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Unpin,
139{
140 use crate::xpc::message::{encode_message, flags, XpcMessage, XpcValue};
141
142 let msg1 = encode_message(&XpcMessage {
143 flags: flags::ALWAYS_SET,
144 msg_id: 0,
145 body: Some(XpcValue::Dictionary(indexmap::IndexMap::new())),
146 })?;
147 framer
148 .write_client_server(&msg1)
149 .await
150 .map_err(|e| XpcError::Tls(format!("rsd bootstrap write 1: {e}")))?;
151
152 let msg2 = encode_message(&XpcMessage {
153 flags: flags::ALWAYS_SET | 0x200,
154 msg_id: 0,
155 body: None,
156 })?;
157 framer
158 .write_client_server(&msg2)
159 .await
160 .map_err(|e| XpcError::Tls(format!("rsd bootstrap write 2: {e}")))?;
161
162 let msg3 = encode_message(&XpcMessage {
163 flags: flags::INIT_HANDSHAKE | flags::ALWAYS_SET,
164 msg_id: 0,
165 body: None,
166 })?;
167 framer
168 .write_server_client(&msg3)
169 .await
170 .map_err(|e| XpcError::Tls(format!("rsd bootstrap write 3: {e}")))?;
171
172 Ok(())
173}
174
175#[cfg(feature = "tunnel")]
181async fn read_rsd_handshake<S>(framer: &mut H2Framer<S>) -> Result<RsdHandshake, XpcError>
182where
183 S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Unpin,
184{
185 let mut last_err = None;
186 for _ in 0..6 {
187 let msg = read_xpc_on_client_server(framer).await?;
188 match parse_handshake_message(msg) {
189 Ok(handshake) => return Ok(handshake),
190 Err(err) => {
191 tracing::debug!("RSD: skipping non-handshake stream-1 message: {err}");
192 last_err = Some(err);
193 }
194 }
195 }
196 Err(last_err.unwrap_or_else(|| XpcError::Tls("RSD: no handshake message received".into())))
197}
198
199#[cfg(feature = "tunnel")]
200async fn read_xpc_on_client_server<S>(framer: &mut H2Framer<S>) -> Result<XpcMessage, XpcError>
201where
202 S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Unpin,
203{
204 let (header, body) = read_raw_xpc_on_client_server(framer).await?;
205 let mut full = bytes::BytesMut::new();
206 full.extend_from_slice(&header);
207 full.extend_from_slice(&body);
208 decode_message(full.freeze())
209}
210
211#[cfg(feature = "tunnel")]
212async fn discard_xpc_on_client_server<S>(framer: &mut H2Framer<S>) -> Result<(), XpcError>
213where
214 S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Unpin,
215{
216 let _ = read_raw_xpc_on_client_server(framer).await?;
217 Ok(())
218}
219
220#[cfg(feature = "tunnel")]
221async fn discard_xpc_on_server_client<S>(framer: &mut H2Framer<S>) -> Result<(), XpcError>
222where
223 S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Unpin,
224{
225 let _ = read_raw_xpc_on_server_client(framer).await?;
226 Ok(())
227}
228
229#[cfg(feature = "tunnel")]
230async fn read_raw_xpc_on_client_server<S>(
231 framer: &mut H2Framer<S>,
232) -> Result<(Bytes, Bytes), XpcError>
233where
234 S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Unpin,
235{
236 let header = framer
237 .read_client_server(24)
238 .await
239 .map_err(|e| XpcError::Tls(format!("read header: {e}")))?;
240 let body_len = u64::from_le_bytes(
241 header[8..16]
242 .try_into()
243 .map_err(|_| XpcError::Tls("bad header".into()))?,
244 ) as usize;
245 let body = if body_len > 0 {
246 framer
247 .read_client_server(body_len)
248 .await
249 .map_err(|e| XpcError::Tls(format!("read body: {e}")))?
250 } else {
251 Bytes::new()
252 };
253 Ok((header, body))
254}
255
256#[cfg(feature = "tunnel")]
257async fn read_raw_xpc_on_server_client<S>(
258 framer: &mut H2Framer<S>,
259) -> Result<(Bytes, Bytes), XpcError>
260where
261 S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Unpin,
262{
263 let header = framer
264 .read_server_client(24)
265 .await
266 .map_err(|e| XpcError::Tls(format!("read header: {e}")))?;
267 let body_len = u64::from_le_bytes(
268 header[8..16]
269 .try_into()
270 .map_err(|_| XpcError::Tls("bad header".into()))?,
271 ) as usize;
272 let body = if body_len > 0 {
273 framer
274 .read_server_client(body_len)
275 .await
276 .map_err(|e| XpcError::Tls(format!("read body: {e}")))?
277 } else {
278 Bytes::new()
279 };
280 Ok((header, body))
281}
282
283#[cfg(feature = "tunnel")]
284fn parse_handshake_message(msg: XpcMessage) -> Result<RsdHandshake, XpcError> {
285 let dict = msg
286 .body
287 .as_ref()
288 .and_then(|b| b.as_dict())
289 .ok_or_else(|| XpcError::Tls("RSD: expected XPC dict body".into()))?;
290 let message_type = dict
291 .get("MessageType")
292 .and_then(|v| v.as_str())
293 .ok_or_else(|| XpcError::Tls("RSD: missing Handshake MessageType".into()))?;
294 if message_type != "Handshake" {
295 return Err(XpcError::Tls(format!(
296 "RSD: unexpected MessageType {message_type:?}"
297 )));
298 }
299 let udid = dict
301 .get("Properties")
302 .and_then(|v| v.as_dict())
303 .and_then(|d| d.get("UniqueDeviceID"))
304 .and_then(|v| v.as_str())
305 .ok_or_else(|| XpcError::Tls("RSD: missing UniqueDeviceID".into()))?
306 .to_string();
307
308 let mut services = HashMap::new();
310 match dict.get("Services") {
311 Some(XpcValue::Dictionary(svc_map)) => {
312 tracing::debug!(
313 "RSD handshake for {} exposed {} services",
314 udid,
315 svc_map.len()
316 );
317 for (name, svc_val) in svc_map {
318 if let Some(svc_dict) = svc_val.as_dict() {
319 let port = svc_dict.get("Port").and_then(|p| match p {
321 XpcValue::String(s) => s.parse::<u16>().ok(),
322 XpcValue::Uint64(n) => Some(*n as u16),
323 _ => None,
324 });
325 if let Some(port) = port {
326 services.insert(name.clone(), ServiceDescriptor { port });
327 }
328 }
329 }
330 }
331 Some(other) => {
332 tracing::debug!("RSD Services has unexpected type: {:?}", other);
333 }
334 None => {
335 tracing::debug!("RSD handshake missing Services key");
336 }
337 }
338
339 Ok(RsdHandshake { udid, services })
340}
341
342#[cfg(feature = "tunnel")]
344pub struct XpcConnection<S> {
345 framer: H2Framer<S>,
346 msg_id: u64,
347 pending_messages: HashMap<u32, VecDeque<XpcMessage>>,
348}
349
350#[cfg(feature = "tunnel")]
351impl<S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Unpin> XpcConnection<S> {
352 pub fn new(framer: H2Framer<S>) -> Self {
353 Self {
354 framer,
355 msg_id: 1,
356 pending_messages: HashMap::new(),
357 }
358 }
359
360 fn next_id(&mut self) -> u64 {
361 let id = self.msg_id;
362 self.msg_id += 1;
363 id
364 }
365
366 pub async fn send(&mut self, body: XpcValue) -> Result<(), XpcError> {
368 self.send_with_flags(body, 0).await.map(|_| ())
369 }
370
371 pub async fn send_with_flags(
374 &mut self,
375 body: XpcValue,
376 extra_flags: u32,
377 ) -> Result<u64, XpcError> {
378 let id = self.next_id();
379 let msg = XpcMessage {
380 flags: flags::ALWAYS_SET | flags::DATA | extra_flags,
381 msg_id: id,
382 body: Some(body),
383 };
384 let bytes = crate::xpc::message::encode_message(&msg)?;
385 self.framer
386 .write_client_server(&bytes)
387 .await
388 .map_err(|e| XpcError::Tls(e.to_string()))?;
389 Ok(id)
390 }
391
392 pub async fn recv(&mut self) -> Result<XpcMessage, XpcError> {
394 self.recv_on_stream(crate::xpc::h2_raw::STREAM_SERVER_CLIENT)
395 .await
396 }
397
398 pub async fn recv_client_server(&mut self) -> Result<XpcMessage, XpcError> {
400 self.recv_on_stream(crate::xpc::h2_raw::STREAM_CLIENT_SERVER)
401 .await
402 }
403
404 async fn recv_on_stream(&mut self, stream_id: u32) -> Result<XpcMessage, XpcError> {
405 if let Some(message) = self.pop_next_pending_message(stream_id) {
406 return Ok(message);
407 }
408 self.recv_fresh_on_stream(stream_id).await
409 }
410
411 pub async fn recv_reply_on_stream(
413 &mut self,
414 stream_id: u32,
415 msg_id: u64,
416 ) -> Result<XpcMessage, XpcError> {
417 if let Some(message) = self.take_pending_message(stream_id, msg_id) {
418 return Ok(message);
419 }
420
421 loop {
422 let message = self.recv_fresh_on_stream(stream_id).await?;
423 if message.msg_id == msg_id {
424 return Ok(message);
425 }
426 self.push_pending_message(stream_id, message);
427 }
428 }
429
430 async fn recv_fresh_on_stream(&mut self, stream_id: u32) -> Result<XpcMessage, XpcError> {
431 let header = self
432 .framer
433 .read_stream(stream_id, 24)
434 .await
435 .map_err(|e| XpcError::Tls(e.to_string()))?;
436 let body_len = u64::from_le_bytes(
437 header[8..16]
438 .try_into()
439 .map_err(|_| XpcError::Tls("invalid header bytes".into()))?,
440 ) as usize;
441 let body = if body_len > 0 {
442 self.framer
443 .read_stream(stream_id, body_len)
444 .await
445 .map_err(|e| XpcError::Tls(e.to_string()))?
446 } else {
447 Bytes::new()
448 };
449 let mut full = bytes::BytesMut::new();
450 full.extend_from_slice(&header);
451 full.extend_from_slice(&body);
452 decode_message(full.freeze())
453 }
454
455 fn push_pending_message(&mut self, stream_id: u32, message: XpcMessage) {
456 self.pending_messages
457 .entry(stream_id)
458 .or_default()
459 .push_back(message);
460 }
461
462 fn pop_next_pending_message(&mut self, stream_id: u32) -> Option<XpcMessage> {
463 self.pending_messages.get_mut(&stream_id)?.pop_front()
464 }
465
466 fn take_pending_message(&mut self, stream_id: u32, msg_id: u64) -> Option<XpcMessage> {
467 let pending = self.pending_messages.get_mut(&stream_id)?;
468 let index = pending
469 .iter()
470 .position(|message| message.msg_id == msg_id)?;
471 pending.remove(index)
472 }
473}
474
475#[cfg(test)]
476#[cfg(feature = "tunnel")]
477mod tests {
478 use bytes::Bytes;
479 use indexmap::IndexMap;
480 use tokio::io::{AsyncReadExt, AsyncWriteExt};
481 use tokio::time::{timeout, Duration};
482
483 use super::*;
484 use crate::xpc::message::{encode_message, flags, XpcMessage, XpcValue};
485
486 const FRAME_DATA: u8 = 0x00;
487 const FRAME_HEADERS: u8 = 0x01;
488 const FRAME_SETTINGS: u8 = 0x04;
489 const FLAG_END_HEADERS: u8 = 0x04;
490 const FLAG_SETTINGS_ACK: u8 = 0x01;
491 const STREAM_INIT: u32 = 0;
492 const STREAM_CLIENT_SERVER: u32 = 1;
493 const STREAM_SERVER_CLIENT: u32 = 3;
494
495 fn build_frame(frame_type: u8, flags: u8, stream_id: u32, payload: &[u8]) -> Vec<u8> {
496 let len = payload.len();
497 let mut out = Vec::with_capacity(9 + len);
498 out.push(((len >> 16) & 0xFF) as u8);
499 out.push(((len >> 8) & 0xFF) as u8);
500 out.push((len & 0xFF) as u8);
501 out.push(frame_type);
502 out.push(flags);
503 out.extend_from_slice(&(stream_id & 0x7FFF_FFFF).to_be_bytes());
504 out.extend_from_slice(payload);
505 out
506 }
507
508 fn settings_frame() -> Vec<u8> {
509 build_frame(FRAME_SETTINGS, 0, STREAM_INIT, &[])
510 }
511
512 fn settings_ack_frame() -> Vec<u8> {
513 build_frame(FRAME_SETTINGS, FLAG_SETTINGS_ACK, STREAM_INIT, &[])
514 }
515
516 fn headers_frame(stream_id: u32) -> Vec<u8> {
517 build_frame(FRAME_HEADERS, FLAG_END_HEADERS, stream_id, &[])
518 }
519
520 fn data_frame(stream_id: u32, payload: &[u8]) -> Vec<u8> {
521 build_frame(FRAME_DATA, 0, stream_id, payload)
522 }
523
524 fn sample_handshake_xpc_message(message_type: Option<&str>) -> XpcMessage {
525 let mut properties = IndexMap::new();
526 properties.insert(
527 "UniqueDeviceID".to_string(),
528 XpcValue::String("00008150-00013DD00104401C".into()),
529 );
530
531 let mut service = IndexMap::new();
532 service.insert("Port".to_string(), XpcValue::String("12345".into()));
533
534 let mut services = IndexMap::new();
535 services.insert(
536 "com.apple.instruments.dtservicehub".to_string(),
537 XpcValue::Dictionary(service),
538 );
539
540 let mut body = IndexMap::new();
541 if let Some(message_type) = message_type {
542 body.insert(
543 "MessageType".to_string(),
544 XpcValue::String(message_type.into()),
545 );
546 }
547 body.insert("Properties".to_string(), XpcValue::Dictionary(properties));
548 body.insert("Services".to_string(), XpcValue::Dictionary(services));
549
550 XpcMessage {
551 flags: flags::ALWAYS_SET | flags::DATA,
552 msg_id: 0,
553 body: Some(XpcValue::Dictionary(body)),
554 }
555 }
556
557 fn sample_handshake_message() -> Bytes {
558 encode_message(&sample_handshake_xpc_message(Some("Handshake")))
559 .expect("synthetic RSD message should encode")
560 }
561
562 #[test]
563 fn parse_handshake_message_rejects_missing_or_wrong_message_type() {
564 let missing = parse_handshake_message(sample_handshake_xpc_message(None));
565 assert!(missing.is_err());
566
567 let wrong = parse_handshake_message(sample_handshake_xpc_message(Some("NotHandshake")));
568 assert!(wrong.is_err());
569 }
570
571 #[test]
572 fn parse_handshake_message_accepts_valid_handshake() {
573 let handshake =
574 parse_handshake_message(sample_handshake_xpc_message(Some("Handshake"))).unwrap();
575
576 assert_eq!(handshake.udid, "00008150-00013DD00104401C");
577 assert_eq!(
578 handshake.get_port("com.apple.instruments.dtservicehub"),
579 Some(12345)
580 );
581 }
582
583 #[tokio::test]
584 async fn handshake_on_framer_reads_stream_1_without_xpc_init() {
585 let (client, mut server) = tokio::io::duplex(4096);
586
587 let server_task = tokio::spawn(async move {
588 let mut preface = [0u8; 24];
589 server.read_exact(&mut preface).await.unwrap();
590 assert_eq!(&preface, crate::xpc::h2_raw::H2_PREFACE);
591
592 let mut settings = [0u8; 21];
593 server.read_exact(&mut settings).await.unwrap();
594 assert_eq!(settings[3], FRAME_SETTINGS);
595
596 let mut window_update = [0u8; 13];
597 server.read_exact(&mut window_update).await.unwrap();
598 assert_eq!(window_update[3], 0x08);
599
600 server.write_all(&settings_frame()).await.unwrap();
601 server.flush().await.unwrap();
602
603 let mut ack = [0u8; 9];
604 server.read_exact(&mut ack).await.unwrap();
605 assert_eq!(ack, settings_ack_frame().as_slice());
606
607 assert!(timeout(Duration::from_millis(100), async {
609 let mut extra = [0u8; 1];
610 server.read_exact(&mut extra).await
611 })
612 .await
613 .is_err());
614
615 server
616 .write_all(&headers_frame(STREAM_CLIENT_SERVER))
617 .await
618 .unwrap();
619 server
620 .write_all(&headers_frame(STREAM_SERVER_CLIENT))
621 .await
622 .unwrap();
623 server
624 .write_all(&data_frame(
625 STREAM_CLIENT_SERVER,
626 &sample_handshake_message(),
627 ))
628 .await
629 .unwrap();
630 server.flush().await.unwrap();
631 });
632
633 let mut framer = H2Framer::connect(client).await.unwrap();
634 let handshake = timeout(Duration::from_secs(1), handshake_on_framer(&mut framer))
635 .await
636 .expect("handshake timed out")
637 .unwrap();
638
639 assert_eq!(handshake.udid, "00008150-00013DD00104401C");
640 assert_eq!(
641 handshake.get_port("com.apple.instruments.dtservicehub"),
642 Some(12345)
643 );
644
645 server_task.await.unwrap();
646 }
647
648 #[tokio::test]
649 async fn initialize_xpc_connection_consumes_step_responses_in_reference_order() {
650 let (client, mut server) = tokio::io::duplex(4096);
651
652 let empty = encode_message(&XpcMessage {
653 flags: flags::ALWAYS_SET,
654 msg_id: 0,
655 body: None,
656 })
657 .unwrap();
658
659 let server_task = tokio::spawn(async move {
660 let mut preface = [0u8; 24];
661 server.read_exact(&mut preface).await.unwrap();
662 assert_eq!(&preface, crate::xpc::h2_raw::H2_PREFACE);
663
664 let mut settings = [0u8; 21];
665 server.read_exact(&mut settings).await.unwrap();
666 assert_eq!(settings[3], FRAME_SETTINGS);
667
668 let mut window_update = [0u8; 13];
669 server.read_exact(&mut window_update).await.unwrap();
670 assert_eq!(window_update[3], 0x08);
671
672 server.write_all(&settings_frame()).await.unwrap();
673 server.flush().await.unwrap();
674
675 let mut ack = [0u8; 9];
676 server.read_exact(&mut ack).await.unwrap();
677 assert_eq!(ack, settings_ack_frame().as_slice());
678
679 let mut cs_headers = [0u8; 9];
680 server.read_exact(&mut cs_headers).await.unwrap();
681 assert_eq!(cs_headers, headers_frame(STREAM_CLIENT_SERVER).as_slice());
682
683 let mut cs_msg1_header = [0u8; 9];
684 server.read_exact(&mut cs_msg1_header).await.unwrap();
685 assert_eq!(cs_msg1_header[3], FRAME_DATA);
686 assert_eq!(
687 u32::from_be_bytes([
688 cs_msg1_header[5] & 0x7F,
689 cs_msg1_header[6],
690 cs_msg1_header[7],
691 cs_msg1_header[8]
692 ]),
693 STREAM_CLIENT_SERVER
694 );
695 let msg1_len = ((cs_msg1_header[0] as usize) << 16)
696 | ((cs_msg1_header[1] as usize) << 8)
697 | (cs_msg1_header[2] as usize);
698 let mut cs_msg1 = vec![0u8; msg1_len];
699 server.read_exact(&mut cs_msg1).await.unwrap();
700
701 server
702 .write_all(&data_frame(STREAM_CLIENT_SERVER, &empty))
703 .await
704 .unwrap();
705 server.flush().await.unwrap();
706
707 let mut sc_headers = [0u8; 9];
708 server.read_exact(&mut sc_headers).await.unwrap();
709 assert_eq!(sc_headers, headers_frame(STREAM_SERVER_CLIENT).as_slice());
710
711 let mut sc_msg2_header = [0u8; 9];
712 server.read_exact(&mut sc_msg2_header).await.unwrap();
713 assert_eq!(sc_msg2_header[3], FRAME_DATA);
714 assert_eq!(
715 u32::from_be_bytes([
716 sc_msg2_header[5] & 0x7F,
717 sc_msg2_header[6],
718 sc_msg2_header[7],
719 sc_msg2_header[8]
720 ]),
721 STREAM_SERVER_CLIENT
722 );
723 let msg2_len = ((sc_msg2_header[0] as usize) << 16)
724 | ((sc_msg2_header[1] as usize) << 8)
725 | (sc_msg2_header[2] as usize);
726 let mut sc_msg2 = vec![0u8; msg2_len];
727 server.read_exact(&mut sc_msg2).await.unwrap();
728
729 server
730 .write_all(&data_frame(STREAM_SERVER_CLIENT, &empty))
731 .await
732 .unwrap();
733 server.flush().await.unwrap();
734
735 let mut cs_msg3_header = [0u8; 9];
736 server.read_exact(&mut cs_msg3_header).await.unwrap();
737 assert_eq!(cs_msg3_header[3], FRAME_DATA);
738 assert_eq!(
739 u32::from_be_bytes([
740 cs_msg3_header[5] & 0x7F,
741 cs_msg3_header[6],
742 cs_msg3_header[7],
743 cs_msg3_header[8]
744 ]),
745 STREAM_CLIENT_SERVER
746 );
747 let msg3_len = ((cs_msg3_header[0] as usize) << 16)
748 | ((cs_msg3_header[1] as usize) << 8)
749 | (cs_msg3_header[2] as usize);
750 let mut cs_msg3 = vec![0u8; msg3_len];
751 server.read_exact(&mut cs_msg3).await.unwrap();
752
753 server
754 .write_all(&data_frame(STREAM_CLIENT_SERVER, &empty))
755 .await
756 .unwrap();
757 server.flush().await.unwrap();
758 });
759
760 let mut framer = H2Framer::connect(client).await.unwrap();
761 timeout(
762 Duration::from_secs(1),
763 initialize_xpc_connection_on_framer(&mut framer),
764 )
765 .await
766 .expect("bootstrap timed out")
767 .unwrap();
768
769 server_task.await.unwrap();
770 }
771
772 #[tokio::test]
773 async fn queue_rsd_handshake_bootstrap_matches_pymobiledevice3_order() {
774 let (client, mut server) = tokio::io::duplex(4096);
775
776 let server_task = tokio::spawn(async move {
777 let mut preface = [0u8; 24];
778 server.read_exact(&mut preface).await.unwrap();
779 assert_eq!(&preface, crate::xpc::h2_raw::H2_PREFACE);
780
781 let mut settings = [0u8; 21];
782 server.read_exact(&mut settings).await.unwrap();
783 assert_eq!(settings[3], FRAME_SETTINGS);
784
785 let mut window_update = [0u8; 13];
786 server.read_exact(&mut window_update).await.unwrap();
787 assert_eq!(window_update[3], 0x08);
788
789 server.write_all(&settings_frame()).await.unwrap();
790 server.flush().await.unwrap();
791
792 let mut ack = [0u8; 9];
793 server.read_exact(&mut ack).await.unwrap();
794 assert_eq!(ack, settings_ack_frame().as_slice());
795
796 let mut cs_headers = [0u8; 9];
797 server.read_exact(&mut cs_headers).await.unwrap();
798 assert_eq!(cs_headers, headers_frame(STREAM_CLIENT_SERVER).as_slice());
799
800 let mut cs_msg1_header = [0u8; 9];
801 server.read_exact(&mut cs_msg1_header).await.unwrap();
802 assert_eq!(cs_msg1_header[3], FRAME_DATA);
803 let cs_msg1_len = ((cs_msg1_header[0] as usize) << 16)
804 | ((cs_msg1_header[1] as usize) << 8)
805 | (cs_msg1_header[2] as usize);
806 let mut cs_msg1 = vec![0u8; cs_msg1_len];
807 server.read_exact(&mut cs_msg1).await.unwrap();
808 let decoded1 = decode_message(Bytes::from(cs_msg1)).unwrap();
809 assert_eq!(decoded1.flags, flags::ALWAYS_SET);
810 assert_eq!(
811 decoded1.body,
812 Some(XpcValue::Dictionary(IndexMap::<String, XpcValue>::new()))
813 );
814
815 let mut cs_msg2_header = [0u8; 9];
816 server.read_exact(&mut cs_msg2_header).await.unwrap();
817 assert_eq!(cs_msg2_header[3], FRAME_DATA);
818 let cs_msg2_len = ((cs_msg2_header[0] as usize) << 16)
819 | ((cs_msg2_header[1] as usize) << 8)
820 | (cs_msg2_header[2] as usize);
821 let mut cs_msg2 = vec![0u8; cs_msg2_len];
822 server.read_exact(&mut cs_msg2).await.unwrap();
823 let decoded2 = decode_message(Bytes::from(cs_msg2)).unwrap();
824 assert_eq!(decoded2.flags, flags::ALWAYS_SET | 0x200);
825 assert!(decoded2.body.is_none());
826
827 let mut sc_headers = [0u8; 9];
828 server.read_exact(&mut sc_headers).await.unwrap();
829 assert_eq!(sc_headers, headers_frame(STREAM_SERVER_CLIENT).as_slice());
830
831 let mut sc_msg3_header = [0u8; 9];
832 server.read_exact(&mut sc_msg3_header).await.unwrap();
833 assert_eq!(sc_msg3_header[3], FRAME_DATA);
834 let sc_msg3_len = ((sc_msg3_header[0] as usize) << 16)
835 | ((sc_msg3_header[1] as usize) << 8)
836 | (sc_msg3_header[2] as usize);
837 let mut sc_msg3 = vec![0u8; sc_msg3_len];
838 server.read_exact(&mut sc_msg3).await.unwrap();
839 let decoded3 = decode_message(Bytes::from(sc_msg3)).unwrap();
840 assert_eq!(decoded3.flags, flags::INIT_HANDSHAKE | flags::ALWAYS_SET);
841 assert!(decoded3.body.is_none());
842 });
843
844 let mut framer = H2Framer::connect(client).await.unwrap();
845 timeout(
846 Duration::from_secs(1),
847 queue_rsd_handshake_bootstrap_on_framer(&mut framer),
848 )
849 .await
850 .expect("queued bootstrap timed out")
851 .unwrap();
852
853 server_task.await.unwrap();
854 }
855}