1use crate::xpc::h2_raw::H2Framer;
2use crate::xpc::{XpcMessage, XpcValue};
3use bytes::BytesMut;
4use indexmap::IndexMap;
5use tokio::io::{AsyncRead, AsyncWrite};
6
7pub const SERVICE_NAME: &str = "com.apple.RestoreRemoteServices.restoreserviced";
8
9service_error!(RestoreError);
10
11pub struct RestoreServiceClient<S> {
12 framer: H2Framer<S>,
13 next_msg_id: u64,
14 pending_control_data: BytesMut,
15}
16
17impl<S: AsyncRead + AsyncWrite + Unpin> RestoreServiceClient<S> {
18 pub async fn connect(stream: S) -> Result<Self, RestoreError> {
19 let mut framer = H2Framer::connect(stream)
20 .await
21 .map_err(|err| RestoreError::Protocol(format!("H2 error: {err}")))?;
22 bootstrap_remote_xpc(&mut framer).await?;
23 Ok(Self {
24 framer,
25 next_msg_id: 1,
26 pending_control_data: BytesMut::new(),
27 })
28 }
29
30 pub async fn enter_recovery(&mut self) -> Result<IndexMap<String, XpcValue>, RestoreError> {
31 self.validate_command("recovery").await
32 }
33
34 pub async fn delay_recovery_image(
35 &mut self,
36 ) -> Result<IndexMap<String, XpcValue>, RestoreError> {
37 self.validate_command("delayrecoveryimage").await
38 }
39
40 pub async fn reboot(&mut self) -> Result<IndexMap<String, XpcValue>, RestoreError> {
41 self.validate_command("reboot").await
42 }
43
44 pub async fn get_preflight_info(&mut self) -> Result<IndexMap<String, XpcValue>, RestoreError> {
45 self.send_command("getpreflightinfo", None).await
46 }
47
48 pub async fn get_nonces(&mut self) -> Result<IndexMap<String, XpcValue>, RestoreError> {
49 self.send_command("getnonces", None).await
50 }
51
52 pub async fn get_app_parameters(&mut self) -> Result<IndexMap<String, XpcValue>, RestoreError> {
53 self.validate_command("getappparameters").await
54 }
55
56 pub async fn restore_lang(
57 &mut self,
58 language: impl Into<String>,
59 ) -> Result<IndexMap<String, XpcValue>, RestoreError> {
60 self.send_command("restorelang", Some(XpcValue::String(language.into())))
61 .await
62 }
63
64 async fn validate_command(
65 &mut self,
66 command: &str,
67 ) -> Result<IndexMap<String, XpcValue>, RestoreError> {
68 let response = self.send_command(command, None).await?;
69 ensure_success(&response)?;
70 Ok(response)
71 }
72
73 async fn send_command(
74 &mut self,
75 command: &str,
76 argument: Option<XpcValue>,
77 ) -> Result<IndexMap<String, XpcValue>, RestoreError> {
78 let request = crate::xpc::message::encode_message(&XpcMessage {
79 flags: crate::xpc::message::flags::ALWAYS_SET
80 | crate::xpc::message::flags::DATA
81 | crate::xpc::message::flags::WANTING_REPLY,
82 msg_id: self.next_msg_id,
83 body: Some(build_command_request(command, argument)),
84 })
85 .map_err(|err| RestoreError::Protocol(format!("restore request encode failed: {err}")))?;
86 self.framer
87 .write_client_server(&request)
88 .await
89 .map_err(|err| RestoreError::Protocol(format!("restore request failed: {err}")))?;
90 self.next_msg_id += 1;
91 let response = self.recv_control_message().await?;
92 response_dict(response)
93 }
94
95 async fn recv_control_message(&mut self) -> Result<XpcMessage, RestoreError> {
96 loop {
97 if let Some(message) = self.try_take_pending_control_message()? {
98 if message.flags & crate::xpc::message::flags::FILE_TX_STREAM_REQUEST != 0 {
99 continue;
100 }
101 if message.body.is_none() {
102 continue;
103 }
104 if message
105 .body
106 .as_ref()
107 .and_then(XpcValue::as_dict)
108 .is_some_and(|dict| dict.is_empty())
109 {
110 continue;
111 }
112 return Ok(message);
113 }
114
115 let frame = self.framer.read_next_data_frame().await.map_err(|err| {
116 RestoreError::Protocol(format!("restore response read failed: {err}"))
117 })?;
118 self.pending_control_data.extend_from_slice(&frame.payload);
119 }
120 }
121
122 fn try_take_pending_control_message(&mut self) -> Result<Option<XpcMessage>, RestoreError> {
123 if self.pending_control_data.len() < 24 {
124 return Ok(None);
125 }
126
127 let body_len = u64::from_le_bytes(
128 self.pending_control_data[8..16]
129 .try_into()
130 .map_err(|_| RestoreError::Protocol("invalid XPC response header".into()))?,
131 ) as usize;
132 let total_len = 24usize
133 .checked_add(body_len)
134 .ok_or_else(|| RestoreError::Protocol("XPC response length overflow".into()))?;
135 if self.pending_control_data.len() < total_len {
136 return Ok(None);
137 }
138
139 let payload = self.pending_control_data.split_to(total_len).freeze();
140 let message = crate::xpc::message::decode_message(payload).map_err(|err| {
141 RestoreError::Protocol(format!("restore response decode failed: {err}"))
142 })?;
143 Ok(Some(message))
144 }
145}
146
147fn build_command_request(command: &str, argument: Option<XpcValue>) -> XpcValue {
148 let mut dict = IndexMap::new();
149 dict.insert("command".to_string(), XpcValue::String(command.to_string()));
150 if let Some(argument) = argument {
151 dict.insert("argument".to_string(), argument);
152 }
153 XpcValue::Dictionary(dict)
154}
155
156fn response_dict(response: XpcMessage) -> Result<IndexMap<String, XpcValue>, RestoreError> {
157 response
158 .body
159 .and_then(|value| match value {
160 XpcValue::Dictionary(dict) => Some(dict),
161 _ => None,
162 })
163 .ok_or_else(|| RestoreError::Protocol("restore response missing dictionary body".into()))
164}
165
166fn ensure_success(response: &IndexMap<String, XpcValue>) -> Result<(), RestoreError> {
167 match response.get("result").and_then(XpcValue::as_str) {
168 Some("success") => Ok(()),
169 Some(other) => Err(RestoreError::Protocol(format!(
170 "restore command failed with result '{other}': {}",
171 serde_json::to_string(&xpc_value_to_json(&XpcValue::Dictionary(response.clone())))
172 .unwrap_or_else(|_| "null".into())
173 ))),
174 None => Err(RestoreError::Protocol(format!(
175 "restore response missing result: {}",
176 serde_json::to_string(&xpc_value_to_json(&XpcValue::Dictionary(response.clone())))
177 .unwrap_or_else(|_| "null".into())
178 ))),
179 }
180}
181
182async fn bootstrap_remote_xpc<S>(framer: &mut H2Framer<S>) -> Result<(), RestoreError>
183where
184 S: AsyncRead + AsyncWrite + Unpin,
185{
186 framer
187 .write_client_server(
188 &crate::xpc::message::encode_message(&XpcMessage {
189 flags: crate::xpc::message::flags::ALWAYS_SET
190 | crate::xpc::message::flags::DATA_PRESENT,
191 msg_id: 0,
192 body: Some(XpcValue::Dictionary(IndexMap::new())),
193 })
194 .map_err(|err| {
195 RestoreError::Protocol(format!("remote XPC bootstrap encode step 1 failed: {err}"))
196 })?,
197 )
198 .await
199 .map_err(|err| {
200 RestoreError::Protocol(format!("remote XPC bootstrap step 1 failed: {err}"))
201 })?;
202
203 framer
204 .write_client_server(
205 &crate::xpc::message::encode_message(&XpcMessage {
206 flags: crate::xpc::message::flags::ALWAYS_SET | crate::xpc::message::flags::REPLY,
207 msg_id: 0,
208 body: None,
209 })
210 .map_err(|err| {
211 RestoreError::Protocol(format!("remote XPC bootstrap encode step 2 failed: {err}"))
212 })?,
213 )
214 .await
215 .map_err(|err| {
216 RestoreError::Protocol(format!("remote XPC bootstrap step 2 failed: {err}"))
217 })?;
218
219 framer
220 .write_server_client(
221 &crate::xpc::message::encode_message(&XpcMessage {
222 flags: crate::xpc::message::flags::ALWAYS_SET
223 | crate::xpc::message::flags::INIT_HANDSHAKE,
224 msg_id: 0,
225 body: None,
226 })
227 .map_err(|err| {
228 RestoreError::Protocol(format!("remote XPC bootstrap encode step 3 failed: {err}"))
229 })?,
230 )
231 .await
232 .map_err(|err| {
233 RestoreError::Protocol(format!("remote XPC bootstrap step 3 failed: {err}"))
234 })?;
235
236 Ok(())
237}
238
239pub fn xpc_value_to_json(value: &XpcValue) -> serde_json::Value {
240 match value {
241 XpcValue::Null => serde_json::Value::Null,
242 XpcValue::Bool(value) => serde_json::Value::Bool(*value),
243 XpcValue::Int64(value) => serde_json::Value::from(*value),
244 XpcValue::Uint64(value) => serde_json::Value::from(*value),
245 XpcValue::Double(value) => serde_json::Number::from_f64(*value)
246 .map(serde_json::Value::Number)
247 .unwrap_or(serde_json::Value::Null),
248 XpcValue::Date(value) => serde_json::Value::from(*value),
249 XpcValue::Data(bytes) => serde_json::Value::String(hex::encode(bytes)),
250 XpcValue::String(value) => serde_json::Value::String(value.clone()),
251 XpcValue::Uuid(bytes) => {
252 serde_json::Value::String(uuid::Uuid::from_bytes(*bytes).to_string())
253 }
254 XpcValue::Array(values) => {
255 serde_json::Value::Array(values.iter().map(xpc_value_to_json).collect())
256 }
257 XpcValue::Dictionary(values) => serde_json::Value::Object(
258 values
259 .iter()
260 .map(|(key, value)| (key.clone(), xpc_value_to_json(value)))
261 .collect(),
262 ),
263 XpcValue::FileTransfer { msg_id, data } => serde_json::json!({
264 "msg_id": msg_id,
265 "data": xpc_value_to_json(data),
266 }),
267 }
268}
269
270#[cfg(test)]
271mod tests {
272 use bytes::Bytes;
273 use tokio::io::{AsyncRead, AsyncWrite};
274
275 use super::*;
276
277 #[test]
278 fn builds_enter_recovery_command_request() {
279 let request = build_command_request("recovery", None);
280 let dict = request.as_dict().expect("restore requests should be dicts");
281 assert_eq!(
282 dict.get("command").and_then(XpcValue::as_str),
283 Some("recovery")
284 );
285 }
286
287 #[test]
288 fn builds_argumented_command_request() {
289 let request = build_command_request("restorelang", Some(XpcValue::String("en".into())));
290 let dict = request.as_dict().expect("restore requests should be dicts");
291 assert_eq!(dict.get("argument").and_then(XpcValue::as_str), Some("en"));
292 }
293
294 #[test]
295 fn converts_xpc_values_to_json() {
296 let value = XpcValue::Dictionary(IndexMap::from([
297 ("result".to_string(), XpcValue::String("success".into())),
298 (
299 "nonce".to_string(),
300 XpcValue::Data(Bytes::from_static(&[0x12, 0x34])),
301 ),
302 ]));
303
304 let json = xpc_value_to_json(&value);
305 assert_eq!(json["result"], "success");
306 assert_eq!(json["nonce"], "1234");
307 }
308
309 #[test]
310 fn rejects_non_success_restore_result() {
311 let response = IndexMap::from([
312 ("result".to_string(), XpcValue::String("failure".into())),
313 ("error".to_string(), XpcValue::String("denied".into())),
314 ]);
315
316 let err = ensure_success(&response).expect_err("non-success must fail");
317 assert!(err.to_string().contains("failure"));
318 assert!(err.to_string().contains("denied"));
319 }
320
321 #[tokio::test]
322 async fn get_nonces_roundtrips_over_remote_xpc_stream() {
323 let (client, mut server) = tokio::io::duplex(16 * 1024);
324
325 let server_task = tokio::spawn(async move {
326 perform_h2_handshake(&mut server).await;
327 perform_remote_xpc_bootstrap(&mut server).await;
328
329 let request = read_xpc_request(&mut server, 1).await;
330 let dict = request
331 .body
332 .expect("restore request body")
333 .as_dict()
334 .expect("restore request dict")
335 .clone();
336 assert_eq!(dict["command"].as_str(), Some("getnonces"));
337
338 write_xpc_response(
339 &mut server,
340 3,
341 XpcValue::Dictionary(IndexMap::from([
342 (
343 "ApNonce".to_string(),
344 XpcValue::Data(Bytes::from_static(&[0xAA, 0xBB])),
345 ),
346 (
347 "SEPNonce".to_string(),
348 XpcValue::Data(Bytes::from_static(&[0xCC, 0xDD])),
349 ),
350 ])),
351 )
352 .await;
353 });
354
355 let mut client = RestoreServiceClient::connect(client)
356 .await
357 .expect("restore client should connect");
358 let response = client.get_nonces().await.expect("nonces should succeed");
359
360 assert_eq!(
361 response.get("ApNonce"),
362 Some(&XpcValue::Data(Bytes::from_static(&[0xAA, 0xBB])))
363 );
364 assert_eq!(
365 response.get("SEPNonce"),
366 Some(&XpcValue::Data(Bytes::from_static(&[0xCC, 0xDD])))
367 );
368
369 server_task.await.unwrap();
370 }
371
372 #[tokio::test]
373 async fn get_nonces_skips_empty_dictionary_control_messages() {
374 let (client, mut server) = tokio::io::duplex(16 * 1024);
375
376 let server_task = tokio::spawn(async move {
377 perform_h2_handshake(&mut server).await;
378 perform_remote_xpc_bootstrap(&mut server).await;
379
380 let request = read_xpc_request(&mut server, 1).await;
381 let dict = request
382 .body
383 .expect("restore request body")
384 .as_dict()
385 .expect("restore request dict")
386 .clone();
387 assert_eq!(dict["command"].as_str(), Some("getnonces"));
388
389 write_xpc_response(&mut server, 1, XpcValue::Dictionary(IndexMap::new())).await;
390 write_xpc_response(
391 &mut server,
392 1,
393 XpcValue::Dictionary(IndexMap::from([(
394 "ApNonce".to_string(),
395 XpcValue::Data(Bytes::from_static(&[0xAA, 0xBB])),
396 )])),
397 )
398 .await;
399 });
400
401 let mut client = RestoreServiceClient::connect(client)
402 .await
403 .expect("restore client should connect");
404 let response = client.get_nonces().await.expect("nonces should succeed");
405
406 assert_eq!(
407 response.get("ApNonce"),
408 Some(&XpcValue::Data(Bytes::from_static(&[0xAA, 0xBB])))
409 );
410
411 server_task.await.unwrap();
412 }
413
414 #[tokio::test]
415 async fn reboot_validates_success_response() {
416 let (client, mut server) = tokio::io::duplex(16 * 1024);
417
418 let server_task = tokio::spawn(async move {
419 perform_h2_handshake(&mut server).await;
420 perform_remote_xpc_bootstrap(&mut server).await;
421
422 let request = read_xpc_request(&mut server, 1).await;
423 let dict = request
424 .body
425 .expect("restore request body")
426 .as_dict()
427 .expect("restore request dict")
428 .clone();
429 assert_eq!(dict["command"].as_str(), Some("reboot"));
430 assert_eq!(dict.get("argument"), None);
431
432 write_xpc_response(
433 &mut server,
434 3,
435 XpcValue::Dictionary(IndexMap::from([(
436 "result".to_string(),
437 XpcValue::String("success".into()),
438 )])),
439 )
440 .await;
441 });
442
443 let mut client = RestoreServiceClient::connect(client)
444 .await
445 .expect("restore client should connect");
446 let response = client.reboot().await.expect("reboot should succeed");
447
448 assert_eq!(
449 response.get("result").and_then(XpcValue::as_str),
450 Some("success")
451 );
452
453 server_task.await.unwrap();
454 }
455
456 #[tokio::test]
457 async fn restore_lang_sends_language_argument() {
458 let (client, mut server) = tokio::io::duplex(16 * 1024);
459
460 let server_task = tokio::spawn(async move {
461 perform_h2_handshake(&mut server).await;
462 perform_remote_xpc_bootstrap(&mut server).await;
463
464 let request = read_xpc_request(&mut server, 1).await;
465 let dict = request
466 .body
467 .expect("restore request body")
468 .as_dict()
469 .expect("restore request dict")
470 .clone();
471 assert_eq!(dict["command"].as_str(), Some("restorelang"));
472 assert_eq!(dict["argument"].as_str(), Some("en"));
473
474 write_xpc_response(
475 &mut server,
476 3,
477 XpcValue::Dictionary(IndexMap::from([(
478 "language".to_string(),
479 XpcValue::String("en".into()),
480 )])),
481 )
482 .await;
483 });
484
485 let mut client = RestoreServiceClient::connect(client)
486 .await
487 .expect("restore client should connect");
488 let response = client
489 .restore_lang("en")
490 .await
491 .expect("restore lang should succeed");
492
493 assert_eq!(
494 response.get("language").and_then(XpcValue::as_str),
495 Some("en")
496 );
497
498 server_task.await.unwrap();
499 }
500
501 async fn perform_h2_handshake<S>(stream: &mut S)
502 where
503 S: AsyncRead + AsyncWrite + Unpin,
504 {
505 let mut preface = [0u8; 24];
506 tokio::io::AsyncReadExt::read_exact(stream, &mut preface)
507 .await
508 .unwrap();
509 assert_eq!(&preface, crate::xpc::h2_raw::H2_PREFACE);
510
511 let settings = read_raw_frame(stream).await;
512 assert_eq!(settings.frame_type, 0x04);
513
514 let window_update = read_raw_frame(stream).await;
515 assert_eq!(window_update.frame_type, 0x08);
516
517 write_raw_frame(stream, 0x04, 0, 0, &[]).await;
518
519 let settings_ack = read_raw_frame(stream).await;
520 assert_eq!(settings_ack.frame_type, 0x04);
521 assert_eq!(settings_ack.flags, 0x01);
522 }
523
524 async fn perform_remote_xpc_bootstrap<S>(stream: &mut S)
525 where
526 S: AsyncRead + AsyncWrite + Unpin,
527 {
528 read_headers_frame(stream, 1).await;
529 let _ = read_xpc_request(stream, 1).await;
530 write_empty_xpc(stream, 1).await;
531
532 let _ = read_xpc_request(stream, 1).await;
533 write_empty_xpc(stream, 1).await;
534
535 read_headers_frame(stream, 3).await;
536 let _ = read_xpc_request(stream, 3).await;
537 write_empty_xpc(stream, 3).await;
538 }
539
540 async fn read_headers_frame<S>(stream: &mut S, stream_id: u32)
541 where
542 S: AsyncRead + AsyncWrite + Unpin,
543 {
544 let frame = read_raw_frame(stream).await;
545 assert_eq!(frame.frame_type, 0x01);
546 assert_eq!(frame.flags, 0x04);
547 assert_eq!(frame.stream_id, stream_id);
548 }
549
550 async fn read_xpc_request<S>(stream: &mut S, stream_id: u32) -> XpcMessage
551 where
552 S: AsyncRead + AsyncWrite + Unpin,
553 {
554 let frame = read_raw_frame(stream).await;
555 assert_eq!(frame.frame_type, 0x00);
556 assert_eq!(frame.stream_id, stream_id);
557 crate::xpc::message::decode_message(bytes::Bytes::from(frame.payload)).unwrap()
558 }
559
560 async fn write_empty_xpc<S>(stream: &mut S, stream_id: u32)
561 where
562 S: AsyncRead + AsyncWrite + Unpin,
563 {
564 write_raw_frame(
565 stream,
566 0x00,
567 0,
568 stream_id,
569 &crate::xpc::message::encode_message(&XpcMessage {
570 flags: crate::xpc::message::flags::ALWAYS_SET,
571 msg_id: 0,
572 body: None,
573 })
574 .unwrap(),
575 )
576 .await;
577 }
578
579 async fn write_xpc_response<S>(stream: &mut S, stream_id: u32, body: XpcValue)
580 where
581 S: AsyncRead + AsyncWrite + Unpin,
582 {
583 write_raw_frame(
584 stream,
585 0x00,
586 0,
587 stream_id,
588 &crate::xpc::message::encode_message(&XpcMessage {
589 flags: crate::xpc::message::flags::ALWAYS_SET
590 | crate::xpc::message::flags::DATA
591 | crate::xpc::message::flags::REPLY,
592 msg_id: 1,
593 body: Some(body),
594 })
595 .unwrap(),
596 )
597 .await;
598 }
599
600 async fn write_raw_frame<S>(
601 stream: &mut S,
602 frame_type: u8,
603 flags: u8,
604 stream_id: u32,
605 payload: &[u8],
606 ) where
607 S: AsyncRead + AsyncWrite + Unpin,
608 {
609 let len = payload.len();
610 let mut frame = Vec::with_capacity(9 + len);
611 frame.push(((len >> 16) & 0xff) as u8);
612 frame.push(((len >> 8) & 0xff) as u8);
613 frame.push((len & 0xff) as u8);
614 frame.push(frame_type);
615 frame.push(flags);
616 frame.extend_from_slice(&(stream_id & 0x7fff_ffff).to_be_bytes());
617 frame.extend_from_slice(payload);
618 tokio::io::AsyncWriteExt::write_all(stream, &frame)
619 .await
620 .unwrap();
621 tokio::io::AsyncWriteExt::flush(stream).await.unwrap();
622 }
623
624 async fn read_raw_frame<S>(stream: &mut S) -> TestFrame
625 where
626 S: AsyncRead + AsyncWrite + Unpin,
627 {
628 let mut header = [0u8; 9];
629 tokio::io::AsyncReadExt::read_exact(stream, &mut header)
630 .await
631 .unwrap();
632 let len = ((header[0] as usize) << 16) | ((header[1] as usize) << 8) | header[2] as usize;
633 let mut payload = vec![0u8; len];
634 if len > 0 {
635 tokio::io::AsyncReadExt::read_exact(stream, &mut payload)
636 .await
637 .unwrap();
638 }
639 TestFrame {
640 frame_type: header[3],
641 flags: header[4],
642 stream_id: u32::from_be_bytes([header[5] & 0x7f, header[6], header[7], header[8]]),
643 payload,
644 }
645 }
646
647 struct TestFrame {
648 frame_type: u8,
649 flags: u8,
650 stream_id: u32,
651 payload: Vec<u8>,
652 }
653}