1use crate::xpc::h2_raw::H2Framer;
8use crate::xpc::{XpcMessage, XpcValue};
9use indexmap::IndexMap;
10use tokio::io::{AsyncRead, AsyncWrite};
11
12pub const SERVICE_NAME: &str = "com.apple.RestoreRemoteServices.restoreserviced";
14
15service_error!(RestoreError);
16
17#[derive(Debug, Clone, PartialEq)]
19pub enum RestoreLifecycleEvent {
20 Progress {
22 operation: Option<String>,
24 progress: Option<u64>,
26 },
27 Status {
29 code: u64,
31 message: Option<String>,
33 log: Option<String>,
35 finished: bool,
37 },
38 Checkpoint {
40 name: Option<String>,
42 raw: IndexMap<String, XpcValue>,
44 },
45 DataRequest {
47 data_type: Option<String>,
49 data_port: Option<u64>,
51 async_request: bool,
53 raw: IndexMap<String, XpcValue>,
55 },
56 PreviousRestoreLog(String),
58 RestoredCrash {
60 backtrace: Vec<String>,
62 },
63 Unknown {
65 msg_type: Option<String>,
67 raw: IndexMap<String, XpcValue>,
69 },
70}
71
72impl RestoreLifecycleEvent {
73 pub fn from_xpc_dictionary(message: &IndexMap<String, XpcValue>) -> Self {
75 match message.get("MsgType").and_then(XpcValue::as_str) {
76 Some("ProgressMsg") => Self::Progress {
77 operation: xpc_string(message, "Operation"),
78 progress: xpc_u64(message, "Progress"),
79 },
80 Some("StatusMsg") => {
81 let code = xpc_u64(message, "Status").unwrap_or_default();
82 Self::Status {
83 code,
84 message: restore_status_message(code).map(ToString::to_string),
85 log: xpc_string(message, "Log"),
86 finished: code == 0,
87 }
88 }
89 Some("CheckpointMsg") => Self::Checkpoint {
90 name: xpc_string(message, "Checkpoint"),
91 raw: message.clone(),
92 },
93 Some("DataRequestMsg") | Some("AsyncDataRequestMsg") => Self::DataRequest {
94 data_type: xpc_string(message, "DataType"),
95 data_port: xpc_u64(message, "DataPort"),
96 async_request: matches!(
97 message.get("MsgType").and_then(XpcValue::as_str),
98 Some("AsyncDataRequestMsg")
99 ),
100 raw: message.clone(),
101 },
102 Some("PreviousRestoreLogMsg") => Self::PreviousRestoreLog(
103 xpc_string(message, "PreviousRestoreLog").unwrap_or_default(),
104 ),
105 Some("RestoredCrash") => Self::RestoredCrash {
106 backtrace: xpc_string_array(message, "RestoredBacktrace"),
107 },
108 other => Self::Unknown {
109 msg_type: other.map(ToString::to_string),
110 raw: message.clone(),
111 },
112 }
113 }
114}
115
116pub fn restore_status_message(status: u64) -> Option<&'static str> {
118 match status {
119 0 => Some("success"),
120 6 => Some("disk failure"),
121 14 => Some("fail"),
122 27 => Some("failed to mount filesystems"),
123 50 | 51 => Some("failed to load SEP firmware"),
124 53 => Some("failed to recover FDR data"),
125 1015 => Some("X-Gold Baseband Update Failed. Defective Unit?"),
126 0xFFFF_FFFF_FFFF_FFFF => Some("verification error"),
127 _ => None,
128 }
129}
130
131pub struct RestoreServiceClient<S> {
133 framer: H2Framer<S>,
134 next_msg_id: u64,
135 control_messages: crate::xpc::message::XpcMessageBuffer,
136}
137
138impl<S: AsyncRead + AsyncWrite + Unpin> RestoreServiceClient<S> {
139 pub async fn connect(stream: S) -> Result<Self, RestoreError> {
141 let mut framer = H2Framer::connect(stream)
142 .await
143 .map_err(|err| RestoreError::Protocol(format!("H2 error: {err}")))?;
144 bootstrap_remote_xpc(&mut framer).await?;
145 Ok(Self {
146 framer,
147 next_msg_id: 1,
148 control_messages: crate::xpc::message::XpcMessageBuffer::new(),
149 })
150 }
151
152 pub async fn enter_recovery(&mut self) -> Result<IndexMap<String, XpcValue>, RestoreError> {
154 self.validate_command("recovery").await
155 }
156
157 pub async fn delay_recovery_image(
159 &mut self,
160 ) -> Result<IndexMap<String, XpcValue>, RestoreError> {
161 self.validate_command("delayrecoveryimage").await
162 }
163
164 pub async fn reboot(&mut self) -> Result<IndexMap<String, XpcValue>, RestoreError> {
166 self.validate_command("reboot").await
167 }
168
169 pub async fn get_preflight_info(&mut self) -> Result<IndexMap<String, XpcValue>, RestoreError> {
171 self.send_command("getpreflightinfo", None).await
172 }
173
174 pub async fn get_nonces(&mut self) -> Result<IndexMap<String, XpcValue>, RestoreError> {
176 self.send_command("getnonces", None).await
177 }
178
179 pub async fn get_app_parameters(&mut self) -> Result<IndexMap<String, XpcValue>, RestoreError> {
181 self.validate_command("getappparameters").await
182 }
183
184 pub async fn restore_lang(
186 &mut self,
187 language: impl Into<String>,
188 ) -> Result<IndexMap<String, XpcValue>, RestoreError> {
189 self.send_command("restorelang", Some(XpcValue::String(language.into())))
190 .await
191 }
192
193 pub async fn next_lifecycle_event(&mut self) -> Result<RestoreLifecycleEvent, RestoreError> {
195 let response = self.recv_control_message().await?;
196 let body = response_dict(response)?;
197 Ok(RestoreLifecycleEvent::from_xpc_dictionary(&body))
198 }
199
200 async fn validate_command(
201 &mut self,
202 command: &str,
203 ) -> Result<IndexMap<String, XpcValue>, RestoreError> {
204 let response = self.send_command(command, None).await?;
205 ensure_success(&response)?;
206 Ok(response)
207 }
208
209 async fn send_command(
210 &mut self,
211 command: &str,
212 argument: Option<XpcValue>,
213 ) -> Result<IndexMap<String, XpcValue>, RestoreError> {
214 let request = crate::xpc::message::encode_message(&XpcMessage {
215 flags: crate::xpc::message::flags::ALWAYS_SET
216 | crate::xpc::message::flags::DATA
217 | crate::xpc::message::flags::WANTING_REPLY,
218 msg_id: self.next_msg_id,
219 body: Some(build_command_request(command, argument)),
220 })
221 .map_err(|err| RestoreError::Protocol(format!("restore request encode failed: {err}")))?;
222 self.framer
223 .write_client_server(&request)
224 .await
225 .map_err(|err| RestoreError::Protocol(format!("restore request failed: {err}")))?;
226 self.next_msg_id += 1;
227 let response = self.recv_control_message().await?;
228 response_dict(response)
229 }
230
231 async fn recv_control_message(&mut self) -> Result<XpcMessage, RestoreError> {
232 loop {
233 if let Some(message) = self.try_take_pending_control_message()? {
234 if message.flags & crate::xpc::message::flags::FILE_TX_STREAM_REQUEST != 0 {
235 continue;
236 }
237 if message.body.is_none() {
238 continue;
239 }
240 if message
241 .body
242 .as_ref()
243 .and_then(XpcValue::as_dict)
244 .is_some_and(|dict| dict.is_empty())
245 {
246 continue;
247 }
248 return Ok(message);
249 }
250
251 let frame = self.framer.read_next_data_frame().await.map_err(|err| {
252 RestoreError::Protocol(format!("restore response read failed: {err}"))
253 })?;
254 if frame.is_end_stream() && frame.payload.is_empty() {
255 continue;
256 }
257 if !frame.is_remote_xpc_control_stream() {
258 continue;
259 }
260 self.control_messages.push(&frame.payload);
261 }
262 }
263
264 fn try_take_pending_control_message(&mut self) -> Result<Option<XpcMessage>, RestoreError> {
265 self.control_messages
266 .try_next()
267 .map_err(|err| RestoreError::Protocol(format!("restore response decode failed: {err}")))
268 }
269}
270
271fn build_command_request(command: &str, argument: Option<XpcValue>) -> XpcValue {
272 let mut dict = IndexMap::new();
273 dict.insert("command".to_string(), XpcValue::String(command.to_string()));
274 if let Some(argument) = argument {
275 dict.insert("argument".to_string(), argument);
276 }
277 XpcValue::Dictionary(dict)
278}
279
280fn response_dict(response: XpcMessage) -> Result<IndexMap<String, XpcValue>, RestoreError> {
281 response
282 .body
283 .and_then(|value| match value {
284 XpcValue::Dictionary(dict) => Some(dict),
285 _ => None,
286 })
287 .ok_or_else(|| RestoreError::Protocol("restore response missing dictionary body".into()))
288}
289
290fn xpc_string(values: &IndexMap<String, XpcValue>, key: &str) -> Option<String> {
291 values
292 .get(key)
293 .and_then(XpcValue::as_str)
294 .map(ToString::to_string)
295}
296
297fn xpc_u64(values: &IndexMap<String, XpcValue>, key: &str) -> Option<u64> {
298 match values.get(key)? {
299 XpcValue::Uint64(value) => Some(*value),
300 XpcValue::Int64(value) if *value >= 0 => Some(*value as u64),
301 _ => None,
302 }
303}
304
305fn xpc_string_array(values: &IndexMap<String, XpcValue>, key: &str) -> Vec<String> {
306 match values.get(key) {
307 Some(XpcValue::Array(items)) => items
308 .iter()
309 .filter_map(XpcValue::as_str)
310 .map(ToString::to_string)
311 .collect(),
312 _ => Vec::new(),
313 }
314}
315
316fn ensure_success(response: &IndexMap<String, XpcValue>) -> Result<(), RestoreError> {
317 match response.get("result").and_then(XpcValue::as_str) {
318 Some("success") => Ok(()),
319 Some(other) => Err(RestoreError::Protocol(format!(
320 "restore command failed with result '{other}': {}",
321 serde_json::to_string(&xpc_value_to_json(&XpcValue::Dictionary(response.clone())))
322 .unwrap_or_else(|_| "null".into())
323 ))),
324 None => Err(RestoreError::Protocol(format!(
325 "restore response missing result: {}",
326 serde_json::to_string(&xpc_value_to_json(&XpcValue::Dictionary(response.clone())))
327 .unwrap_or_else(|_| "null".into())
328 ))),
329 }
330}
331
332async fn bootstrap_remote_xpc<S>(framer: &mut H2Framer<S>) -> Result<(), RestoreError>
333where
334 S: AsyncRead + AsyncWrite + Unpin,
335{
336 framer
337 .write_client_server(
338 &crate::xpc::message::encode_message(&XpcMessage {
339 flags: crate::xpc::message::flags::ALWAYS_SET
340 | crate::xpc::message::flags::DATA_PRESENT,
341 msg_id: 0,
342 body: Some(XpcValue::Dictionary(IndexMap::new())),
343 })
344 .map_err(|err| {
345 RestoreError::Protocol(format!("remote XPC bootstrap encode step 1 failed: {err}"))
346 })?,
347 )
348 .await
349 .map_err(|err| {
350 RestoreError::Protocol(format!("remote XPC bootstrap step 1 failed: {err}"))
351 })?;
352
353 framer
354 .write_client_server(
355 &crate::xpc::message::encode_message(&XpcMessage {
356 flags: crate::xpc::message::flags::ALWAYS_SET | crate::xpc::message::flags::REPLY,
357 msg_id: 0,
358 body: None,
359 })
360 .map_err(|err| {
361 RestoreError::Protocol(format!("remote XPC bootstrap encode step 2 failed: {err}"))
362 })?,
363 )
364 .await
365 .map_err(|err| {
366 RestoreError::Protocol(format!("remote XPC bootstrap step 2 failed: {err}"))
367 })?;
368
369 framer
370 .write_server_client(
371 &crate::xpc::message::encode_message(&XpcMessage {
372 flags: crate::xpc::message::flags::ALWAYS_SET
373 | crate::xpc::message::flags::INIT_HANDSHAKE,
374 msg_id: 0,
375 body: None,
376 })
377 .map_err(|err| {
378 RestoreError::Protocol(format!("remote XPC bootstrap encode step 3 failed: {err}"))
379 })?,
380 )
381 .await
382 .map_err(|err| {
383 RestoreError::Protocol(format!("remote XPC bootstrap step 3 failed: {err}"))
384 })?;
385
386 Ok(())
387}
388
389pub fn xpc_value_to_json(value: &XpcValue) -> serde_json::Value {
391 match value {
392 XpcValue::Null => serde_json::Value::Null,
393 XpcValue::Bool(value) => serde_json::Value::Bool(*value),
394 XpcValue::Int64(value) => serde_json::Value::from(*value),
395 XpcValue::Uint64(value) => serde_json::Value::from(*value),
396 XpcValue::Double(value) => serde_json::Number::from_f64(*value)
397 .map(serde_json::Value::Number)
398 .unwrap_or(serde_json::Value::Null),
399 XpcValue::Date(value) => serde_json::Value::from(*value),
400 XpcValue::Data(bytes) => serde_json::Value::String(hex::encode(bytes)),
401 XpcValue::String(value) => serde_json::Value::String(value.clone()),
402 XpcValue::Uuid(bytes) => {
403 serde_json::Value::String(uuid::Uuid::from_bytes(*bytes).to_string())
404 }
405 XpcValue::Array(values) => {
406 serde_json::Value::Array(values.iter().map(xpc_value_to_json).collect())
407 }
408 XpcValue::Dictionary(values) => serde_json::Value::Object(
409 values
410 .iter()
411 .map(|(key, value)| (key.clone(), xpc_value_to_json(value)))
412 .collect(),
413 ),
414 XpcValue::FileTransfer { msg_id, data } => serde_json::json!({
415 "msg_id": msg_id,
416 "data": xpc_value_to_json(data),
417 }),
418 }
419}
420
421pub fn restore_lifecycle_event_to_json(event: &RestoreLifecycleEvent) -> serde_json::Value {
423 match event {
424 RestoreLifecycleEvent::Progress {
425 operation,
426 progress,
427 } => serde_json::json!({
428 "type": "progress",
429 "operation": operation,
430 "progress": progress,
431 }),
432 RestoreLifecycleEvent::Status {
433 code,
434 message,
435 log,
436 finished,
437 } => serde_json::json!({
438 "type": "status",
439 "code": code,
440 "message": message,
441 "log": log,
442 "finished": finished,
443 }),
444 RestoreLifecycleEvent::Checkpoint { name, raw } => serde_json::json!({
445 "type": "checkpoint",
446 "name": name,
447 "raw": xpc_value_to_json(&XpcValue::Dictionary(raw.clone())),
448 }),
449 RestoreLifecycleEvent::DataRequest {
450 data_type,
451 data_port,
452 async_request,
453 raw,
454 } => serde_json::json!({
455 "type": "data_request",
456 "data_type": data_type,
457 "data_port": data_port,
458 "async": async_request,
459 "raw": xpc_value_to_json(&XpcValue::Dictionary(raw.clone())),
460 }),
461 RestoreLifecycleEvent::PreviousRestoreLog(log) => serde_json::json!({
462 "type": "previous_restore_log",
463 "log": log,
464 }),
465 RestoreLifecycleEvent::RestoredCrash { backtrace } => serde_json::json!({
466 "type": "restored_crash",
467 "backtrace": backtrace,
468 }),
469 RestoreLifecycleEvent::Unknown { msg_type, raw } => serde_json::json!({
470 "type": "unknown",
471 "msg_type": msg_type,
472 "raw": xpc_value_to_json(&XpcValue::Dictionary(raw.clone())),
473 }),
474 }
475}
476
477#[cfg(test)]
478mod tests {
479 use bytes::Bytes;
480 use tokio::io::{AsyncRead, AsyncWrite};
481
482 use super::*;
483
484 #[test]
485 fn builds_enter_recovery_command_request() {
486 let request = build_command_request("recovery", None);
487 let dict = request.as_dict().expect("restore requests should be dicts");
488 assert_eq!(
489 dict.get("command").and_then(XpcValue::as_str),
490 Some("recovery")
491 );
492 }
493
494 #[test]
495 fn builds_argumented_command_request() {
496 let request = build_command_request("restorelang", Some(XpcValue::String("en".into())));
497 let dict = request.as_dict().expect("restore requests should be dicts");
498 assert_eq!(dict.get("argument").and_then(XpcValue::as_str), Some("en"));
499 }
500
501 #[test]
502 fn converts_xpc_values_to_json() {
503 let value = XpcValue::Dictionary(IndexMap::from([
504 ("result".to_string(), XpcValue::String("success".into())),
505 (
506 "nonce".to_string(),
507 XpcValue::Data(Bytes::from_static(&[0x12, 0x34])),
508 ),
509 ]));
510
511 let json = xpc_value_to_json(&value);
512 assert_eq!(json["result"], "success");
513 assert_eq!(json["nonce"], "1234");
514 }
515
516 #[test]
517 fn rejects_non_success_restore_result() {
518 let response = IndexMap::from([
519 ("result".to_string(), XpcValue::String("failure".into())),
520 ("error".to_string(), XpcValue::String("denied".into())),
521 ]);
522
523 let err = ensure_success(&response).expect_err("non-success must fail");
524 assert!(err.to_string().contains("failure"));
525 assert!(err.to_string().contains("denied"));
526 }
527
528 #[test]
529 fn parses_restore_lifecycle_status_with_known_error() {
530 let message = IndexMap::from([
531 ("MsgType".to_string(), XpcValue::String("StatusMsg".into())),
532 ("Status".to_string(), XpcValue::Uint64(27)),
533 (
534 "Log".to_string(),
535 XpcValue::String("mount failed".to_string()),
536 ),
537 ]);
538
539 let event = RestoreLifecycleEvent::from_xpc_dictionary(&message);
540
541 assert_eq!(
542 event,
543 RestoreLifecycleEvent::Status {
544 code: 27,
545 message: Some("failed to mount filesystems".to_string()),
546 log: Some("mount failed".to_string()),
547 finished: false,
548 }
549 );
550 }
551
552 #[test]
553 fn restore_lifecycle_event_json_includes_raw_checkpoint_payload() {
554 let event = RestoreLifecycleEvent::Checkpoint {
555 name: Some("preflight".to_string()),
556 raw: IndexMap::from([
557 (
558 "MsgType".to_string(),
559 XpcValue::String("CheckpointMsg".to_string()),
560 ),
561 (
562 "Checkpoint".to_string(),
563 XpcValue::String("preflight".to_string()),
564 ),
565 ]),
566 };
567
568 let json = restore_lifecycle_event_to_json(&event);
569
570 assert_eq!(json["type"], "checkpoint");
571 assert_eq!(json["name"], "preflight");
572 assert_eq!(json["raw"]["MsgType"], "CheckpointMsg");
573 }
574
575 #[test]
576 fn restore_lifecycle_event_json_marks_async_data_requests() {
577 let message = IndexMap::from([
578 (
579 "MsgType".to_string(),
580 XpcValue::String("AsyncDataRequestMsg".to_string()),
581 ),
582 (
583 "DataType".to_string(),
584 XpcValue::String("SystemImageData".to_string()),
585 ),
586 ("DataPort".to_string(), XpcValue::Uint64(12345)),
587 ]);
588
589 let event = RestoreLifecycleEvent::from_xpc_dictionary(&message);
590 let json = restore_lifecycle_event_to_json(&event);
591
592 assert_eq!(json["type"], "data_request");
593 assert_eq!(json["data_type"], "SystemImageData");
594 assert_eq!(json["data_port"], 12345);
595 assert_eq!(json["async"], true);
596 }
597
598 #[tokio::test]
599 async fn next_lifecycle_event_reads_restore_status_message() {
600 let (client, mut server) = tokio::io::duplex(16 * 1024);
601
602 let server_task = tokio::spawn(async move {
603 perform_h2_handshake(&mut server).await;
604 perform_remote_xpc_bootstrap(&mut server).await;
605
606 write_xpc_response(
607 &mut server,
608 1,
609 XpcValue::Dictionary(IndexMap::from([
610 ("MsgType".to_string(), XpcValue::String("StatusMsg".into())),
611 ("Status".to_string(), XpcValue::Uint64(0)),
612 ("Log".to_string(), XpcValue::String("done".into())),
613 ])),
614 )
615 .await;
616 });
617
618 let mut client = RestoreServiceClient::connect(client)
619 .await
620 .expect("restore client should connect");
621 let event = client
622 .next_lifecycle_event()
623 .await
624 .expect("status event should decode");
625
626 assert_eq!(
627 event,
628 RestoreLifecycleEvent::Status {
629 code: 0,
630 message: Some("success".to_string()),
631 log: Some("done".to_string()),
632 finished: true,
633 }
634 );
635
636 server_task.await.unwrap();
637 }
638
639 #[tokio::test]
640 async fn get_nonces_roundtrips_over_remote_xpc_stream() {
641 let (client, mut server) = tokio::io::duplex(16 * 1024);
642
643 let server_task = tokio::spawn(async move {
644 perform_h2_handshake(&mut server).await;
645 perform_remote_xpc_bootstrap(&mut server).await;
646
647 let request = read_xpc_request(&mut server, 1).await;
648 let dict = request
649 .body
650 .expect("restore request body")
651 .as_dict()
652 .expect("restore request dict")
653 .clone();
654 assert_eq!(dict["command"].as_str(), Some("getnonces"));
655
656 write_xpc_response(
657 &mut server,
658 3,
659 XpcValue::Dictionary(IndexMap::from([
660 (
661 "ApNonce".to_string(),
662 XpcValue::Data(Bytes::from_static(&[0xAA, 0xBB])),
663 ),
664 (
665 "SEPNonce".to_string(),
666 XpcValue::Data(Bytes::from_static(&[0xCC, 0xDD])),
667 ),
668 ])),
669 )
670 .await;
671 });
672
673 let mut client = RestoreServiceClient::connect(client)
674 .await
675 .expect("restore client should connect");
676 let response = client.get_nonces().await.expect("nonces should succeed");
677
678 assert_eq!(
679 response.get("ApNonce"),
680 Some(&XpcValue::Data(Bytes::from_static(&[0xAA, 0xBB])))
681 );
682 assert_eq!(
683 response.get("SEPNonce"),
684 Some(&XpcValue::Data(Bytes::from_static(&[0xCC, 0xDD])))
685 );
686
687 server_task.await.unwrap();
688 }
689
690 #[tokio::test]
691 async fn get_nonces_skips_empty_dictionary_control_messages() {
692 let (client, mut server) = tokio::io::duplex(16 * 1024);
693
694 let server_task = tokio::spawn(async move {
695 perform_h2_handshake(&mut server).await;
696 perform_remote_xpc_bootstrap(&mut server).await;
697
698 let request = read_xpc_request(&mut server, 1).await;
699 let dict = request
700 .body
701 .expect("restore request body")
702 .as_dict()
703 .expect("restore request dict")
704 .clone();
705 assert_eq!(dict["command"].as_str(), Some("getnonces"));
706
707 write_xpc_response(&mut server, 1, XpcValue::Dictionary(IndexMap::new())).await;
708 write_xpc_response(
709 &mut server,
710 1,
711 XpcValue::Dictionary(IndexMap::from([(
712 "ApNonce".to_string(),
713 XpcValue::Data(Bytes::from_static(&[0xAA, 0xBB])),
714 )])),
715 )
716 .await;
717 });
718
719 let mut client = RestoreServiceClient::connect(client)
720 .await
721 .expect("restore client should connect");
722 let response = client.get_nonces().await.expect("nonces should succeed");
723
724 assert_eq!(
725 response.get("ApNonce"),
726 Some(&XpcValue::Data(Bytes::from_static(&[0xAA, 0xBB])))
727 );
728
729 server_task.await.unwrap();
730 }
731
732 #[tokio::test]
733 async fn reboot_validates_success_response() {
734 let (client, mut server) = tokio::io::duplex(16 * 1024);
735
736 let server_task = tokio::spawn(async move {
737 perform_h2_handshake(&mut server).await;
738 perform_remote_xpc_bootstrap(&mut server).await;
739
740 let request = read_xpc_request(&mut server, 1).await;
741 let dict = request
742 .body
743 .expect("restore request body")
744 .as_dict()
745 .expect("restore request dict")
746 .clone();
747 assert_eq!(dict["command"].as_str(), Some("reboot"));
748 assert_eq!(dict.get("argument"), None);
749
750 write_xpc_response(
751 &mut server,
752 3,
753 XpcValue::Dictionary(IndexMap::from([(
754 "result".to_string(),
755 XpcValue::String("success".into()),
756 )])),
757 )
758 .await;
759 });
760
761 let mut client = RestoreServiceClient::connect(client)
762 .await
763 .expect("restore client should connect");
764 let response = client.reboot().await.expect("reboot should succeed");
765
766 assert_eq!(
767 response.get("result").and_then(XpcValue::as_str),
768 Some("success")
769 );
770
771 server_task.await.unwrap();
772 }
773
774 #[tokio::test]
775 async fn restore_lang_sends_language_argument() {
776 let (client, mut server) = tokio::io::duplex(16 * 1024);
777
778 let server_task = tokio::spawn(async move {
779 perform_h2_handshake(&mut server).await;
780 perform_remote_xpc_bootstrap(&mut server).await;
781
782 let request = read_xpc_request(&mut server, 1).await;
783 let dict = request
784 .body
785 .expect("restore request body")
786 .as_dict()
787 .expect("restore request dict")
788 .clone();
789 assert_eq!(dict["command"].as_str(), Some("restorelang"));
790 assert_eq!(dict["argument"].as_str(), Some("en"));
791
792 write_xpc_response(
793 &mut server,
794 3,
795 XpcValue::Dictionary(IndexMap::from([(
796 "language".to_string(),
797 XpcValue::String("en".into()),
798 )])),
799 )
800 .await;
801 });
802
803 let mut client = RestoreServiceClient::connect(client)
804 .await
805 .expect("restore client should connect");
806 let response = client
807 .restore_lang("en")
808 .await
809 .expect("restore lang should succeed");
810
811 assert_eq!(
812 response.get("language").and_then(XpcValue::as_str),
813 Some("en")
814 );
815
816 server_task.await.unwrap();
817 }
818
819 async fn perform_h2_handshake<S>(stream: &mut S)
820 where
821 S: AsyncRead + AsyncWrite + Unpin,
822 {
823 let mut preface = [0u8; 24];
824 tokio::io::AsyncReadExt::read_exact(stream, &mut preface)
825 .await
826 .unwrap();
827 assert_eq!(&preface, crate::xpc::h2_raw::H2_PREFACE);
828
829 let settings = read_raw_frame(stream).await;
830 assert_eq!(settings.frame_type, 0x04);
831
832 let window_update = read_raw_frame(stream).await;
833 assert_eq!(window_update.frame_type, 0x08);
834
835 write_raw_frame(stream, 0x04, 0, 0, &[]).await;
836
837 let settings_ack = read_raw_frame(stream).await;
838 assert_eq!(settings_ack.frame_type, 0x04);
839 assert_eq!(settings_ack.flags, 0x01);
840 }
841
842 async fn perform_remote_xpc_bootstrap<S>(stream: &mut S)
843 where
844 S: AsyncRead + AsyncWrite + Unpin,
845 {
846 read_headers_frame(stream, 1).await;
847 let _ = read_xpc_request(stream, 1).await;
848 write_empty_xpc(stream, 1).await;
849
850 let _ = read_xpc_request(stream, 1).await;
851 write_empty_xpc(stream, 1).await;
852
853 read_headers_frame(stream, 3).await;
854 let _ = read_xpc_request(stream, 3).await;
855 write_empty_xpc(stream, 3).await;
856 }
857
858 async fn read_headers_frame<S>(stream: &mut S, stream_id: u32)
859 where
860 S: AsyncRead + AsyncWrite + Unpin,
861 {
862 let frame = read_raw_frame(stream).await;
863 assert_eq!(frame.frame_type, 0x01);
864 assert_eq!(frame.flags, 0x04);
865 assert_eq!(frame.stream_id, stream_id);
866 }
867
868 async fn read_xpc_request<S>(stream: &mut S, stream_id: u32) -> XpcMessage
869 where
870 S: AsyncRead + AsyncWrite + Unpin,
871 {
872 let frame = read_raw_frame(stream).await;
873 assert_eq!(frame.frame_type, 0x00);
874 assert_eq!(frame.stream_id, stream_id);
875 crate::xpc::message::decode_message(bytes::Bytes::from(frame.payload)).unwrap()
876 }
877
878 async fn write_empty_xpc<S>(stream: &mut S, stream_id: u32)
879 where
880 S: AsyncRead + AsyncWrite + Unpin,
881 {
882 write_raw_frame(
883 stream,
884 0x00,
885 0,
886 stream_id,
887 &crate::xpc::message::encode_message(&XpcMessage {
888 flags: crate::xpc::message::flags::ALWAYS_SET,
889 msg_id: 0,
890 body: None,
891 })
892 .unwrap(),
893 )
894 .await;
895 }
896
897 async fn write_xpc_response<S>(stream: &mut S, stream_id: u32, body: XpcValue)
898 where
899 S: AsyncRead + AsyncWrite + Unpin,
900 {
901 write_raw_frame(
902 stream,
903 0x00,
904 0,
905 stream_id,
906 &crate::xpc::message::encode_message(&XpcMessage {
907 flags: crate::xpc::message::flags::ALWAYS_SET
908 | crate::xpc::message::flags::DATA
909 | crate::xpc::message::flags::REPLY,
910 msg_id: 1,
911 body: Some(body),
912 })
913 .unwrap(),
914 )
915 .await;
916 }
917
918 async fn write_raw_frame<S>(
919 stream: &mut S,
920 frame_type: u8,
921 flags: u8,
922 stream_id: u32,
923 payload: &[u8],
924 ) where
925 S: AsyncRead + AsyncWrite + Unpin,
926 {
927 let len = payload.len();
928 let mut frame = Vec::with_capacity(9 + len);
929 frame.push(((len >> 16) & 0xff) as u8);
930 frame.push(((len >> 8) & 0xff) as u8);
931 frame.push((len & 0xff) as u8);
932 frame.push(frame_type);
933 frame.push(flags);
934 frame.extend_from_slice(&(stream_id & 0x7fff_ffff).to_be_bytes());
935 frame.extend_from_slice(payload);
936 tokio::io::AsyncWriteExt::write_all(stream, &frame)
937 .await
938 .unwrap();
939 tokio::io::AsyncWriteExt::flush(stream).await.unwrap();
940 }
941
942 async fn read_raw_frame<S>(stream: &mut S) -> TestFrame
943 where
944 S: AsyncRead + AsyncWrite + Unpin,
945 {
946 let mut header = [0u8; 9];
947 tokio::io::AsyncReadExt::read_exact(stream, &mut header)
948 .await
949 .unwrap();
950 let len = ((header[0] as usize) << 16) | ((header[1] as usize) << 8) | header[2] as usize;
951 let mut payload = vec![0u8; len];
952 if len > 0 {
953 tokio::io::AsyncReadExt::read_exact(stream, &mut payload)
954 .await
955 .unwrap();
956 }
957 TestFrame {
958 frame_type: header[3],
959 flags: header[4],
960 stream_id: u32::from_be_bytes([header[5] & 0x7f, header[6], header[7], header[8]]),
961 payload,
962 }
963 }
964
965 struct TestFrame {
966 frame_type: u8,
967 flags: u8,
968 stream_id: u32,
969 payload: Vec<u8>,
970 }
971}