1use crate::xpc::h2_raw::H2Framer;
2use crate::xpc::{XpcMessage, XpcValue};
3use bytes::BytesMut;
4use indexmap::IndexMap;
5use tokio::io::{AsyncRead, AsyncWrite};
6
7pub const SERVICE_NAME: &str = "com.apple.RestoreRemoteServices.restoreserviced";
8
9#[derive(Debug, thiserror::Error)]
10pub enum RestoreError {
11 #[error("IO error: {0}")]
12 Io(#[from] std::io::Error),
13 #[error("plist error: {0}")]
14 Plist(String),
15 #[error("protocol error: {0}")]
16 Protocol(String),
17}
18
19pub struct RestoreServiceClient<S> {
20 framer: H2Framer<S>,
21 next_msg_id: u64,
22 pending_control_data: BytesMut,
23}
24
25impl<S: AsyncRead + AsyncWrite + Unpin> RestoreServiceClient<S> {
26 pub async fn connect(stream: S) -> Result<Self, RestoreError> {
27 let mut framer = H2Framer::connect(stream)
28 .await
29 .map_err(|err| RestoreError::Protocol(format!("H2 error: {err}")))?;
30 bootstrap_remote_xpc(&mut framer).await?;
31 Ok(Self {
32 framer,
33 next_msg_id: 1,
34 pending_control_data: BytesMut::new(),
35 })
36 }
37
38 pub async fn enter_recovery(&mut self) -> Result<IndexMap<String, XpcValue>, RestoreError> {
39 self.validate_command("recovery").await
40 }
41
42 pub async fn delay_recovery_image(
43 &mut self,
44 ) -> Result<IndexMap<String, XpcValue>, RestoreError> {
45 self.validate_command("delayrecoveryimage").await
46 }
47
48 pub async fn reboot(&mut self) -> Result<IndexMap<String, XpcValue>, RestoreError> {
49 self.validate_command("reboot").await
50 }
51
52 pub async fn get_preflight_info(&mut self) -> Result<IndexMap<String, XpcValue>, RestoreError> {
53 self.send_command("getpreflightinfo", None).await
54 }
55
56 pub async fn get_nonces(&mut self) -> Result<IndexMap<String, XpcValue>, RestoreError> {
57 self.send_command("getnonces", None).await
58 }
59
60 pub async fn get_app_parameters(&mut self) -> Result<IndexMap<String, XpcValue>, RestoreError> {
61 self.validate_command("getappparameters").await
62 }
63
64 pub async fn restore_lang(
65 &mut self,
66 language: impl Into<String>,
67 ) -> Result<IndexMap<String, XpcValue>, RestoreError> {
68 self.send_command("restorelang", Some(XpcValue::String(language.into())))
69 .await
70 }
71
72 async fn validate_command(
73 &mut self,
74 command: &str,
75 ) -> Result<IndexMap<String, XpcValue>, RestoreError> {
76 let response = self.send_command(command, None).await?;
77 ensure_success(&response)?;
78 Ok(response)
79 }
80
81 async fn send_command(
82 &mut self,
83 command: &str,
84 argument: Option<XpcValue>,
85 ) -> Result<IndexMap<String, XpcValue>, RestoreError> {
86 let request = crate::xpc::message::encode_message(&XpcMessage {
87 flags: crate::xpc::message::flags::ALWAYS_SET
88 | crate::xpc::message::flags::DATA
89 | crate::xpc::message::flags::WANTING_REPLY,
90 msg_id: self.next_msg_id,
91 body: Some(build_command_request(command, argument)),
92 })
93 .map_err(|err| RestoreError::Protocol(format!("restore request encode failed: {err}")))?;
94 self.framer
95 .write_client_server(&request)
96 .await
97 .map_err(|err| RestoreError::Protocol(format!("restore request failed: {err}")))?;
98 self.next_msg_id += 1;
99 let response = self.recv_control_message().await?;
100 response_dict(response)
101 }
102
103 async fn recv_control_message(&mut self) -> Result<XpcMessage, RestoreError> {
104 loop {
105 if let Some(message) = self.try_take_pending_control_message()? {
106 if message.flags & crate::xpc::message::flags::FILE_TX_STREAM_REQUEST != 0 {
107 continue;
108 }
109 if message.body.is_none() {
110 continue;
111 }
112 if message
113 .body
114 .as_ref()
115 .and_then(XpcValue::as_dict)
116 .is_some_and(|dict| dict.is_empty())
117 {
118 continue;
119 }
120 return Ok(message);
121 }
122
123 let frame = self.framer.read_next_data_frame().await.map_err(|err| {
124 RestoreError::Protocol(format!("restore response read failed: {err}"))
125 })?;
126 self.pending_control_data.extend_from_slice(&frame.payload);
127 }
128 }
129
130 fn try_take_pending_control_message(&mut self) -> Result<Option<XpcMessage>, RestoreError> {
131 if self.pending_control_data.len() < 24 {
132 return Ok(None);
133 }
134
135 let body_len = u64::from_le_bytes(
136 self.pending_control_data[8..16]
137 .try_into()
138 .map_err(|_| RestoreError::Protocol("invalid XPC response header".into()))?,
139 ) as usize;
140 let total_len = 24usize
141 .checked_add(body_len)
142 .ok_or_else(|| RestoreError::Protocol("XPC response length overflow".into()))?;
143 if self.pending_control_data.len() < total_len {
144 return Ok(None);
145 }
146
147 let payload = self.pending_control_data.split_to(total_len).freeze();
148 let message = crate::xpc::message::decode_message(payload).map_err(|err| {
149 RestoreError::Protocol(format!("restore response decode failed: {err}"))
150 })?;
151 Ok(Some(message))
152 }
153}
154
155fn build_command_request(command: &str, argument: Option<XpcValue>) -> XpcValue {
156 let mut dict = IndexMap::new();
157 dict.insert("command".to_string(), XpcValue::String(command.to_string()));
158 if let Some(argument) = argument {
159 dict.insert("argument".to_string(), argument);
160 }
161 XpcValue::Dictionary(dict)
162}
163
164fn response_dict(response: XpcMessage) -> Result<IndexMap<String, XpcValue>, RestoreError> {
165 response
166 .body
167 .and_then(|value| match value {
168 XpcValue::Dictionary(dict) => Some(dict),
169 _ => None,
170 })
171 .ok_or_else(|| RestoreError::Protocol("restore response missing dictionary body".into()))
172}
173
174fn ensure_success(response: &IndexMap<String, XpcValue>) -> Result<(), RestoreError> {
175 match response.get("result").and_then(XpcValue::as_str) {
176 Some("success") => Ok(()),
177 Some(other) => Err(RestoreError::Protocol(format!(
178 "restore command failed with result '{other}': {}",
179 serde_json::to_string(&xpc_value_to_json(&XpcValue::Dictionary(response.clone())))
180 .unwrap_or_else(|_| "null".into())
181 ))),
182 None => Err(RestoreError::Protocol(format!(
183 "restore response missing result: {}",
184 serde_json::to_string(&xpc_value_to_json(&XpcValue::Dictionary(response.clone())))
185 .unwrap_or_else(|_| "null".into())
186 ))),
187 }
188}
189
190async fn bootstrap_remote_xpc<S>(framer: &mut H2Framer<S>) -> Result<(), RestoreError>
191where
192 S: AsyncRead + AsyncWrite + Unpin,
193{
194 framer
195 .write_client_server(
196 &crate::xpc::message::encode_message(&XpcMessage {
197 flags: crate::xpc::message::flags::ALWAYS_SET
198 | crate::xpc::message::flags::DATA_PRESENT,
199 msg_id: 0,
200 body: Some(XpcValue::Dictionary(IndexMap::new())),
201 })
202 .map_err(|err| {
203 RestoreError::Protocol(format!("remote XPC bootstrap encode step 1 failed: {err}"))
204 })?,
205 )
206 .await
207 .map_err(|err| {
208 RestoreError::Protocol(format!("remote XPC bootstrap step 1 failed: {err}"))
209 })?;
210
211 framer
212 .write_client_server(
213 &crate::xpc::message::encode_message(&XpcMessage {
214 flags: crate::xpc::message::flags::ALWAYS_SET | crate::xpc::message::flags::REPLY,
215 msg_id: 0,
216 body: None,
217 })
218 .map_err(|err| {
219 RestoreError::Protocol(format!("remote XPC bootstrap encode step 2 failed: {err}"))
220 })?,
221 )
222 .await
223 .map_err(|err| {
224 RestoreError::Protocol(format!("remote XPC bootstrap step 2 failed: {err}"))
225 })?;
226
227 framer
228 .write_server_client(
229 &crate::xpc::message::encode_message(&XpcMessage {
230 flags: crate::xpc::message::flags::ALWAYS_SET
231 | crate::xpc::message::flags::INIT_HANDSHAKE,
232 msg_id: 0,
233 body: None,
234 })
235 .map_err(|err| {
236 RestoreError::Protocol(format!("remote XPC bootstrap encode step 3 failed: {err}"))
237 })?,
238 )
239 .await
240 .map_err(|err| {
241 RestoreError::Protocol(format!("remote XPC bootstrap step 3 failed: {err}"))
242 })?;
243
244 Ok(())
245}
246
247pub fn xpc_value_to_json(value: &XpcValue) -> serde_json::Value {
248 match value {
249 XpcValue::Null => serde_json::Value::Null,
250 XpcValue::Bool(value) => serde_json::Value::Bool(*value),
251 XpcValue::Int64(value) => serde_json::Value::from(*value),
252 XpcValue::Uint64(value) => serde_json::Value::from(*value),
253 XpcValue::Double(value) => serde_json::Number::from_f64(*value)
254 .map(serde_json::Value::Number)
255 .unwrap_or(serde_json::Value::Null),
256 XpcValue::Date(value) => serde_json::Value::from(*value),
257 XpcValue::Data(bytes) => serde_json::Value::String(hex::encode(bytes)),
258 XpcValue::String(value) => serde_json::Value::String(value.clone()),
259 XpcValue::Uuid(bytes) => {
260 serde_json::Value::String(uuid::Uuid::from_bytes(*bytes).to_string())
261 }
262 XpcValue::Array(values) => {
263 serde_json::Value::Array(values.iter().map(xpc_value_to_json).collect())
264 }
265 XpcValue::Dictionary(values) => serde_json::Value::Object(
266 values
267 .iter()
268 .map(|(key, value)| (key.clone(), xpc_value_to_json(value)))
269 .collect(),
270 ),
271 XpcValue::FileTransfer { msg_id, data } => serde_json::json!({
272 "msg_id": msg_id,
273 "data": xpc_value_to_json(data),
274 }),
275 }
276}
277
278#[cfg(test)]
279mod tests {
280 use bytes::Bytes;
281 use tokio::io::{AsyncRead, AsyncWrite};
282
283 use super::*;
284
285 #[test]
286 fn builds_enter_recovery_command_request() {
287 let request = build_command_request("recovery", None);
288 let dict = request.as_dict().expect("restore requests should be dicts");
289 assert_eq!(
290 dict.get("command").and_then(XpcValue::as_str),
291 Some("recovery")
292 );
293 }
294
295 #[test]
296 fn builds_argumented_command_request() {
297 let request = build_command_request("restorelang", Some(XpcValue::String("en".into())));
298 let dict = request.as_dict().expect("restore requests should be dicts");
299 assert_eq!(dict.get("argument").and_then(XpcValue::as_str), Some("en"));
300 }
301
302 #[test]
303 fn converts_xpc_values_to_json() {
304 let value = XpcValue::Dictionary(IndexMap::from([
305 ("result".to_string(), XpcValue::String("success".into())),
306 (
307 "nonce".to_string(),
308 XpcValue::Data(Bytes::from_static(&[0x12, 0x34])),
309 ),
310 ]));
311
312 let json = xpc_value_to_json(&value);
313 assert_eq!(json["result"], "success");
314 assert_eq!(json["nonce"], "1234");
315 }
316
317 #[test]
318 fn rejects_non_success_restore_result() {
319 let response = IndexMap::from([
320 ("result".to_string(), XpcValue::String("failure".into())),
321 ("error".to_string(), XpcValue::String("denied".into())),
322 ]);
323
324 let err = ensure_success(&response).expect_err("non-success must fail");
325 assert!(err.to_string().contains("failure"));
326 assert!(err.to_string().contains("denied"));
327 }
328
329 #[tokio::test]
330 async fn get_nonces_roundtrips_over_remote_xpc_stream() {
331 let (client, mut server) = tokio::io::duplex(16 * 1024);
332
333 let server_task = tokio::spawn(async move {
334 perform_h2_handshake(&mut server).await;
335 perform_remote_xpc_bootstrap(&mut server).await;
336
337 let request = read_xpc_request(&mut server, 1).await;
338 let dict = request
339 .body
340 .expect("restore request body")
341 .as_dict()
342 .expect("restore request dict")
343 .clone();
344 assert_eq!(dict["command"].as_str(), Some("getnonces"));
345
346 write_xpc_response(
347 &mut server,
348 3,
349 XpcValue::Dictionary(IndexMap::from([
350 (
351 "ApNonce".to_string(),
352 XpcValue::Data(Bytes::from_static(&[0xAA, 0xBB])),
353 ),
354 (
355 "SEPNonce".to_string(),
356 XpcValue::Data(Bytes::from_static(&[0xCC, 0xDD])),
357 ),
358 ])),
359 )
360 .await;
361 });
362
363 let mut client = RestoreServiceClient::connect(client)
364 .await
365 .expect("restore client should connect");
366 let response = client.get_nonces().await.expect("nonces should succeed");
367
368 assert_eq!(
369 response.get("ApNonce"),
370 Some(&XpcValue::Data(Bytes::from_static(&[0xAA, 0xBB])))
371 );
372 assert_eq!(
373 response.get("SEPNonce"),
374 Some(&XpcValue::Data(Bytes::from_static(&[0xCC, 0xDD])))
375 );
376
377 server_task.await.unwrap();
378 }
379
380 #[tokio::test]
381 async fn get_nonces_skips_empty_dictionary_control_messages() {
382 let (client, mut server) = tokio::io::duplex(16 * 1024);
383
384 let server_task = tokio::spawn(async move {
385 perform_h2_handshake(&mut server).await;
386 perform_remote_xpc_bootstrap(&mut server).await;
387
388 let request = read_xpc_request(&mut server, 1).await;
389 let dict = request
390 .body
391 .expect("restore request body")
392 .as_dict()
393 .expect("restore request dict")
394 .clone();
395 assert_eq!(dict["command"].as_str(), Some("getnonces"));
396
397 write_xpc_response(&mut server, 1, XpcValue::Dictionary(IndexMap::new())).await;
398 write_xpc_response(
399 &mut server,
400 1,
401 XpcValue::Dictionary(IndexMap::from([(
402 "ApNonce".to_string(),
403 XpcValue::Data(Bytes::from_static(&[0xAA, 0xBB])),
404 )])),
405 )
406 .await;
407 });
408
409 let mut client = RestoreServiceClient::connect(client)
410 .await
411 .expect("restore client should connect");
412 let response = client.get_nonces().await.expect("nonces should succeed");
413
414 assert_eq!(
415 response.get("ApNonce"),
416 Some(&XpcValue::Data(Bytes::from_static(&[0xAA, 0xBB])))
417 );
418
419 server_task.await.unwrap();
420 }
421
422 #[tokio::test]
423 async fn reboot_validates_success_response() {
424 let (client, mut server) = tokio::io::duplex(16 * 1024);
425
426 let server_task = tokio::spawn(async move {
427 perform_h2_handshake(&mut server).await;
428 perform_remote_xpc_bootstrap(&mut server).await;
429
430 let request = read_xpc_request(&mut server, 1).await;
431 let dict = request
432 .body
433 .expect("restore request body")
434 .as_dict()
435 .expect("restore request dict")
436 .clone();
437 assert_eq!(dict["command"].as_str(), Some("reboot"));
438 assert_eq!(dict.get("argument"), None);
439
440 write_xpc_response(
441 &mut server,
442 3,
443 XpcValue::Dictionary(IndexMap::from([(
444 "result".to_string(),
445 XpcValue::String("success".into()),
446 )])),
447 )
448 .await;
449 });
450
451 let mut client = RestoreServiceClient::connect(client)
452 .await
453 .expect("restore client should connect");
454 let response = client.reboot().await.expect("reboot should succeed");
455
456 assert_eq!(
457 response.get("result").and_then(XpcValue::as_str),
458 Some("success")
459 );
460
461 server_task.await.unwrap();
462 }
463
464 #[tokio::test]
465 async fn restore_lang_sends_language_argument() {
466 let (client, mut server) = tokio::io::duplex(16 * 1024);
467
468 let server_task = tokio::spawn(async move {
469 perform_h2_handshake(&mut server).await;
470 perform_remote_xpc_bootstrap(&mut server).await;
471
472 let request = read_xpc_request(&mut server, 1).await;
473 let dict = request
474 .body
475 .expect("restore request body")
476 .as_dict()
477 .expect("restore request dict")
478 .clone();
479 assert_eq!(dict["command"].as_str(), Some("restorelang"));
480 assert_eq!(dict["argument"].as_str(), Some("en"));
481
482 write_xpc_response(
483 &mut server,
484 3,
485 XpcValue::Dictionary(IndexMap::from([(
486 "language".to_string(),
487 XpcValue::String("en".into()),
488 )])),
489 )
490 .await;
491 });
492
493 let mut client = RestoreServiceClient::connect(client)
494 .await
495 .expect("restore client should connect");
496 let response = client
497 .restore_lang("en")
498 .await
499 .expect("restore lang should succeed");
500
501 assert_eq!(
502 response.get("language").and_then(XpcValue::as_str),
503 Some("en")
504 );
505
506 server_task.await.unwrap();
507 }
508
509 async fn perform_h2_handshake<S>(stream: &mut S)
510 where
511 S: AsyncRead + AsyncWrite + Unpin,
512 {
513 let mut preface = [0u8; 24];
514 tokio::io::AsyncReadExt::read_exact(stream, &mut preface)
515 .await
516 .unwrap();
517 assert_eq!(&preface, crate::xpc::h2_raw::H2_PREFACE);
518
519 let settings = read_raw_frame(stream).await;
520 assert_eq!(settings.frame_type, 0x04);
521
522 let window_update = read_raw_frame(stream).await;
523 assert_eq!(window_update.frame_type, 0x08);
524
525 write_raw_frame(stream, 0x04, 0, 0, &[]).await;
526
527 let settings_ack = read_raw_frame(stream).await;
528 assert_eq!(settings_ack.frame_type, 0x04);
529 assert_eq!(settings_ack.flags, 0x01);
530 }
531
532 async fn perform_remote_xpc_bootstrap<S>(stream: &mut S)
533 where
534 S: AsyncRead + AsyncWrite + Unpin,
535 {
536 read_headers_frame(stream, 1).await;
537 let _ = read_xpc_request(stream, 1).await;
538 write_empty_xpc(stream, 1).await;
539
540 let _ = read_xpc_request(stream, 1).await;
541 write_empty_xpc(stream, 1).await;
542
543 read_headers_frame(stream, 3).await;
544 let _ = read_xpc_request(stream, 3).await;
545 write_empty_xpc(stream, 3).await;
546 }
547
548 async fn read_headers_frame<S>(stream: &mut S, stream_id: u32)
549 where
550 S: AsyncRead + AsyncWrite + Unpin,
551 {
552 let frame = read_raw_frame(stream).await;
553 assert_eq!(frame.frame_type, 0x01);
554 assert_eq!(frame.flags, 0x04);
555 assert_eq!(frame.stream_id, stream_id);
556 }
557
558 async fn read_xpc_request<S>(stream: &mut S, stream_id: u32) -> XpcMessage
559 where
560 S: AsyncRead + AsyncWrite + Unpin,
561 {
562 let frame = read_raw_frame(stream).await;
563 assert_eq!(frame.frame_type, 0x00);
564 assert_eq!(frame.stream_id, stream_id);
565 crate::xpc::message::decode_message(bytes::Bytes::from(frame.payload)).unwrap()
566 }
567
568 async fn write_empty_xpc<S>(stream: &mut S, stream_id: u32)
569 where
570 S: AsyncRead + AsyncWrite + Unpin,
571 {
572 write_raw_frame(
573 stream,
574 0x00,
575 0,
576 stream_id,
577 &crate::xpc::message::encode_message(&XpcMessage {
578 flags: crate::xpc::message::flags::ALWAYS_SET,
579 msg_id: 0,
580 body: None,
581 })
582 .unwrap(),
583 )
584 .await;
585 }
586
587 async fn write_xpc_response<S>(stream: &mut S, stream_id: u32, body: XpcValue)
588 where
589 S: AsyncRead + AsyncWrite + Unpin,
590 {
591 write_raw_frame(
592 stream,
593 0x00,
594 0,
595 stream_id,
596 &crate::xpc::message::encode_message(&XpcMessage {
597 flags: crate::xpc::message::flags::ALWAYS_SET
598 | crate::xpc::message::flags::DATA
599 | crate::xpc::message::flags::REPLY,
600 msg_id: 1,
601 body: Some(body),
602 })
603 .unwrap(),
604 )
605 .await;
606 }
607
608 async fn write_raw_frame<S>(
609 stream: &mut S,
610 frame_type: u8,
611 flags: u8,
612 stream_id: u32,
613 payload: &[u8],
614 ) where
615 S: AsyncRead + AsyncWrite + Unpin,
616 {
617 let len = payload.len();
618 let mut frame = Vec::with_capacity(9 + len);
619 frame.push(((len >> 16) & 0xff) as u8);
620 frame.push(((len >> 8) & 0xff) as u8);
621 frame.push((len & 0xff) as u8);
622 frame.push(frame_type);
623 frame.push(flags);
624 frame.extend_from_slice(&(stream_id & 0x7fff_ffff).to_be_bytes());
625 frame.extend_from_slice(payload);
626 tokio::io::AsyncWriteExt::write_all(stream, &frame)
627 .await
628 .unwrap();
629 tokio::io::AsyncWriteExt::flush(stream).await.unwrap();
630 }
631
632 async fn read_raw_frame<S>(stream: &mut S) -> TestFrame
633 where
634 S: AsyncRead + AsyncWrite + Unpin,
635 {
636 let mut header = [0u8; 9];
637 tokio::io::AsyncReadExt::read_exact(stream, &mut header)
638 .await
639 .unwrap();
640 let len = ((header[0] as usize) << 16) | ((header[1] as usize) << 8) | header[2] as usize;
641 let mut payload = vec![0u8; len];
642 if len > 0 {
643 tokio::io::AsyncReadExt::read_exact(stream, &mut payload)
644 .await
645 .unwrap();
646 }
647 TestFrame {
648 frame_type: header[3],
649 flags: header[4],
650 stream_id: u32::from_be_bytes([header[5] & 0x7f, header[6], header[7], header[8]]),
651 payload,
652 }
653 }
654
655 struct TestFrame {
656 frame_type: u8,
657 flags: u8,
658 stream_id: u32,
659 payload: Vec<u8>,
660 }
661}