1use std::collections::HashMap;
9
10use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
11
12pub const SERVICE_NAME: &str = "com.apple.mobile.installation_proxy";
13
14#[derive(Debug, thiserror::Error)]
15pub enum IpError {
16 #[error("IO error: {0}")]
17 Io(#[from] std::io::Error),
18 #[error("plist error: {0}")]
19 Plist(#[from] plist::Error),
20 #[error("protocol error: {0}")]
21 Protocol(String),
22 #[error("install error: {0}")]
23 Install(String),
24}
25
26#[derive(Debug, Clone)]
28pub struct AppInfo {
29 pub bundle_id: String,
30 pub display_name: String,
31 pub version: String,
32 pub app_type: String,
33 pub path: String,
34 pub extra: HashMap<String, plist::Value>,
35}
36
37pub struct InstallationProxy<S> {
39 stream: S,
40}
41
42impl<S: AsyncRead + AsyncWrite + Unpin> InstallationProxy<S> {
43 pub fn new(stream: S) -> Self {
44 Self { stream }
45 }
46
47 pub async fn install(&mut self, package_path: &str) -> Result<(), IpError> {
49 send_plist(
50 &mut self.stream,
51 &serde_json::json!({
52 "Command": "Install",
53 "PackagePath": package_path,
54 "ClientOptions": {},
55 }),
56 )
57 .await?;
58
59 self.wait_for_completion().await
60 }
61
62 pub async fn upgrade(&mut self, package_path: &str) -> Result<(), IpError> {
64 send_plist(
65 &mut self.stream,
66 &serde_json::json!({
67 "Command": "Upgrade",
68 "PackagePath": package_path,
69 "ClientOptions": {},
70 }),
71 )
72 .await?;
73
74 self.wait_for_completion().await
75 }
76
77 pub async fn list_user_apps(&mut self) -> Result<Vec<AppInfo>, IpError> {
79 self.browse("User", true, &[]).await
80 }
81
82 pub async fn list_user_apps_with_attributes(
84 &mut self,
85 return_attributes: &[&str],
86 ) -> Result<Vec<AppInfo>, IpError> {
87 self.browse("User", true, return_attributes).await
88 }
89
90 pub async fn list_all_apps(&mut self) -> Result<Vec<AppInfo>, IpError> {
92 self.browse("", true, &[]).await
93 }
94
95 pub async fn list_system_apps(&mut self) -> Result<Vec<AppInfo>, IpError> {
97 self.browse("System", false, &[]).await
98 }
99
100 pub async fn list_hidden_apps(&mut self) -> Result<Vec<AppInfo>, IpError> {
102 self.browse("Hidden", true, &[]).await
103 }
104
105 pub async fn list_file_sharing_apps(&mut self) -> Result<Vec<AppInfo>, IpError> {
107 let apps = self.list_all_apps().await?;
108 Ok(apps
109 .into_iter()
110 .filter(|app| {
111 app.extra
112 .get("UIFileSharingEnabled")
113 .and_then(plist::Value::as_boolean)
114 .unwrap_or(false)
115 })
116 .collect())
117 }
118
119 pub async fn uninstall(&mut self, bundle_id: &str) -> Result<(), IpError> {
121 self.send_bundle_identifier_command("Uninstall", bundle_id)
122 .await
123 }
124
125 pub async fn archive(&mut self, bundle_id: &str) -> Result<(), IpError> {
127 self.send_bundle_identifier_command("Archive", bundle_id)
128 .await
129 }
130
131 pub async fn restore(&mut self, bundle_id: &str) -> Result<(), IpError> {
133 self.send_bundle_identifier_command("Restore", bundle_id)
134 .await
135 }
136
137 async fn send_bundle_identifier_command(
138 &mut self,
139 command: &str,
140 bundle_id: &str,
141 ) -> Result<(), IpError> {
142 send_plist(
143 &mut self.stream,
144 &serde_json::json!({
145 "Command": command,
146 "ApplicationIdentifier": bundle_id,
147 "ClientOptions": {},
148 }),
149 )
150 .await?;
151
152 self.wait_for_completion().await
153 }
154
155 pub async fn lookup_app(&mut self, bundle_id: &str) -> Result<Option<AppInfo>, IpError> {
157 self.lookup_app_with_attributes(bundle_id, &[]).await
158 }
159
160 pub async fn lookup_app_with_attributes(
162 &mut self,
163 bundle_id: &str,
164 return_attributes: &[&str],
165 ) -> Result<Option<AppInfo>, IpError> {
166 let mut options = serde_json::json!({
167 "BundleIDs": [bundle_id],
168 });
169 if !return_attributes.is_empty() {
170 options["ReturnAttributes"] = serde_json::Value::Array(
171 return_attributes
172 .iter()
173 .map(|attr| serde_json::Value::String((*attr).to_string()))
174 .collect(),
175 );
176 }
177
178 let response = self.lookup(options).await?;
179
180 Ok(response
181 .into_iter()
182 .next()
183 .map(|(lookup_bundle_id, value)| {
184 parse_app_info_with_bundle_id(&lookup_bundle_id, value)
185 }))
186 }
187
188 async fn browse(
189 &mut self,
190 app_type: &str,
191 show_prohibited: bool,
192 return_attributes: &[&str],
193 ) -> Result<Vec<AppInfo>, IpError> {
194 let mut client_opts = serde_json::json!({});
195 if !app_type.is_empty() {
196 client_opts["ApplicationType"] = serde_json::Value::String(app_type.to_string());
197 }
198 if show_prohibited {
199 client_opts["ShowLaunchProhibitedApps"] = serde_json::Value::Bool(true);
200 }
201 if !return_attributes.is_empty() {
202 client_opts["ReturnAttributes"] = serde_json::Value::Array(
203 return_attributes
204 .iter()
205 .map(|attr| serde_json::Value::String((*attr).to_string()))
206 .collect(),
207 );
208 }
209
210 send_plist(
211 &mut self.stream,
212 &serde_json::json!({
213 "Command": "Browse",
214 "ClientOptions": client_opts,
215 }),
216 )
217 .await?;
218
219 let mut apps = Vec::new();
220 loop {
221 let data = recv_plist_raw(&mut self.stream).await?;
222 let resp: plist::Dictionary = plist::from_bytes(&data)?;
223
224 for item in resp
225 .get("CurrentList")
226 .and_then(plist::Value::as_array)
227 .cloned()
228 .unwrap_or_default()
229 {
230 apps.push(parse_app_info(item));
231 }
232
233 if resp.get("Status").and_then(plist::Value::as_string) == Some("Complete") {
234 break;
235 }
236 }
237 Ok(apps)
238 }
239
240 async fn lookup(
241 &mut self,
242 client_options: serde_json::Value,
243 ) -> Result<HashMap<String, plist::Value>, IpError> {
244 send_plist(
245 &mut self.stream,
246 &serde_json::json!({
247 "Command": "Lookup",
248 "ClientOptions": client_options,
249 }),
250 )
251 .await?;
252
253 let data = recv_plist_raw(&mut self.stream).await?;
254 let mut dict: HashMap<String, plist::Value> = plist::from_bytes(&data)?;
255 if let Some(e) = dict.get("Error") {
256 return Err(IpError::Install(format!("{e:?}")));
257 }
258
259 let result = dict
260 .remove("LookupResult")
261 .and_then(|value| value.into_dictionary())
262 .map(|items| items.into_iter().collect())
263 .unwrap_or_default();
264 Ok(result)
265 }
266
267 async fn wait_for_completion(&mut self) -> Result<(), IpError> {
268 loop {
269 let data = recv_plist_raw(&mut self.stream).await?;
270 let dict: HashMap<String, plist::Value> = plist::from_bytes(&data)?;
271 if let Some(error) = dict.get("Error") {
272 let message = match dict.get("ErrorDescription").and_then(|v| v.as_string()) {
273 Some(description) => format!("{error:?}: {description}"),
274 None => format!("{error:?}"),
275 };
276 return Err(IpError::Install(message));
277 }
278 if dict.get("Status").and_then(|s| s.as_string()) == Some("Complete") {
279 return Ok(());
280 }
281 }
282 }
283}
284
285fn parse_app_info(val: plist::Value) -> AppInfo {
286 parse_app_info_with_bundle_id("", val)
287}
288
289fn parse_app_info_with_bundle_id(lookup_bundle_id: &str, val: plist::Value) -> AppInfo {
290 let dict = val.into_dictionary().unwrap_or_default();
291 let get_str = |k: &str| {
292 dict.get(k)
293 .and_then(|v| v.as_string())
294 .unwrap_or("")
295 .to_string()
296 };
297
298 let bundle_id = if lookup_bundle_id.is_empty() {
299 get_str("CFBundleIdentifier")
300 } else {
301 dict.get("CFBundleIdentifier")
302 .and_then(|v| v.as_string())
303 .unwrap_or(lookup_bundle_id)
304 .to_string()
305 };
306 let display_name = get_str("CFBundleDisplayName");
307 let version = get_str("CFBundleShortVersionString");
308 let app_type = get_str("ApplicationType");
309 let path = get_str("Path");
310
311 let extra = dict.into_iter().collect();
312
313 AppInfo {
314 bundle_id,
315 display_name,
316 version,
317 app_type,
318 path,
319 extra,
320 }
321}
322
323async fn send_plist<S>(stream: &mut S, value: &serde_json::Value) -> Result<(), IpError>
326where
327 S: AsyncWrite + Unpin,
328{
329 let plist_val = json_to_plist(value);
331 let mut buf = Vec::new();
332 plist::to_writer_xml(&mut buf, &plist_val)?;
333 stream.write_all(&(buf.len() as u32).to_be_bytes()).await?;
334 stream.write_all(&buf).await?;
335 stream.flush().await?;
336 Ok(())
337}
338
339async fn recv_plist_raw<S>(stream: &mut S) -> Result<Vec<u8>, IpError>
340where
341 S: AsyncRead + Unpin,
342{
343 let mut len_buf = [0u8; 4];
344 stream.read_exact(&mut len_buf).await?;
345 let len = u32::from_be_bytes(len_buf) as usize;
346 const MAX_PLIST_SIZE: usize = 4 * 1024 * 1024;
347 if len > MAX_PLIST_SIZE {
348 return Err(IpError::Protocol(format!(
349 "plist length {len} exceeds maximum of {MAX_PLIST_SIZE}"
350 )));
351 }
352 let mut buf = vec![0u8; len];
353 stream.read_exact(&mut buf).await?;
354 Ok(buf)
355}
356
357fn json_to_plist(val: &serde_json::Value) -> plist::Value {
358 match val {
359 serde_json::Value::Null => plist::Value::String(String::new()),
360 serde_json::Value::Bool(b) => plist::Value::Boolean(*b),
361 serde_json::Value::Number(n) => {
362 if let Some(i) = n.as_i64() {
363 plist::Value::Integer(plist::Integer::from(i))
364 } else {
365 plist::Value::Real(n.as_f64().unwrap_or(0.0))
366 }
367 }
368 serde_json::Value::String(s) => plist::Value::String(s.clone()),
369 serde_json::Value::Array(arr) => {
370 plist::Value::Array(arr.iter().map(json_to_plist).collect())
371 }
372 serde_json::Value::Object(map) => {
373 let dict: plist::Dictionary = map
374 .iter()
375 .map(|(k, v)| (k.clone(), json_to_plist(v)))
376 .collect();
377 plist::Value::Dictionary(dict)
378 }
379 }
380}
381
382#[cfg(test)]
383mod tests {
384 use std::pin::Pin;
385 use std::task::{Context, Poll};
386
387 use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
388
389 use super::*;
390
391 #[derive(Default)]
392 struct RecordingStream {
393 written: Vec<u8>,
394 }
395
396 impl AsyncRead for RecordingStream {
397 fn poll_read(
398 self: Pin<&mut Self>,
399 _cx: &mut Context<'_>,
400 _buf: &mut ReadBuf<'_>,
401 ) -> Poll<std::io::Result<()>> {
402 Poll::Ready(Err(std::io::Error::new(
403 std::io::ErrorKind::UnexpectedEof,
404 "test stream has no responses",
405 )))
406 }
407 }
408
409 impl AsyncWrite for RecordingStream {
410 fn poll_write(
411 mut self: Pin<&mut Self>,
412 _cx: &mut Context<'_>,
413 buf: &[u8],
414 ) -> Poll<std::io::Result<usize>> {
415 self.written.extend_from_slice(buf);
416 Poll::Ready(Ok(buf.len()))
417 }
418
419 fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
420 Poll::Ready(Ok(()))
421 }
422
423 fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
424 Poll::Ready(Ok(()))
425 }
426 }
427
428 #[tokio::test]
429 async fn list_system_apps_sends_system_filter() {
430 let mut stream = RecordingStream::default();
431 let err = {
432 let mut proxy = InstallationProxy::new(&mut stream);
433 proxy.list_system_apps().await.unwrap_err()
434 };
435
436 assert!(matches!(err, IpError::Io(_)));
437
438 let len = u32::from_be_bytes(stream.written[..4].try_into().unwrap()) as usize;
439 let payload = &stream.written[4..4 + len];
440 let dict: plist::Dictionary = plist::from_bytes(payload).unwrap();
441 let client_options = dict["ClientOptions"].as_dictionary().unwrap();
442 assert_eq!(
443 client_options["ApplicationType"].as_string(),
444 Some("System")
445 );
446 assert!(!client_options.contains_key("ShowLaunchProhibitedApps"));
447 }
448
449 #[tokio::test]
450 async fn list_hidden_apps_sends_hidden_filter() {
451 let mut stream = RecordingStream::default();
452 let err = {
453 let mut proxy = InstallationProxy::new(&mut stream);
454 proxy.list_hidden_apps().await.unwrap_err()
455 };
456
457 assert!(matches!(err, IpError::Io(_)));
458
459 let len = u32::from_be_bytes(stream.written[..4].try_into().unwrap()) as usize;
460 let payload = &stream.written[4..4 + len];
461 let dict: plist::Dictionary = plist::from_bytes(payload).unwrap();
462 let client_options = dict["ClientOptions"].as_dictionary().unwrap();
463 assert_eq!(
464 client_options["ApplicationType"].as_string(),
465 Some("Hidden")
466 );
467 assert_eq!(
468 client_options["ShowLaunchProhibitedApps"].as_boolean(),
469 Some(true)
470 );
471 }
472
473 #[tokio::test]
474 async fn list_file_sharing_apps_filters_on_ui_file_sharing_enabled() {
475 let responses = vec![
476 plist_frame(plist::Value::Dictionary(plist::Dictionary::from_iter([
477 (
478 "Status".to_string(),
479 plist::Value::String("BrowsingApplications".into()),
480 ),
481 (
482 "CurrentList".to_string(),
483 plist::Value::Array(vec![
484 plist::Value::Dictionary(plist::Dictionary::from_iter([
485 (
486 "CFBundleIdentifier".to_string(),
487 plist::Value::String("com.example.Files".into()),
488 ),
489 (
490 "CFBundleDisplayName".to_string(),
491 plist::Value::String("Files".into()),
492 ),
493 (
494 "UIFileSharingEnabled".to_string(),
495 plist::Value::Boolean(true),
496 ),
497 ])),
498 plist::Value::Dictionary(plist::Dictionary::from_iter([
499 (
500 "CFBundleIdentifier".to_string(),
501 plist::Value::String("com.example.Hidden".into()),
502 ),
503 (
504 "CFBundleDisplayName".to_string(),
505 plist::Value::String("Hidden".into()),
506 ),
507 (
508 "UIFileSharingEnabled".to_string(),
509 plist::Value::Boolean(false),
510 ),
511 ])),
512 ]),
513 ),
514 ]))),
515 plist_frame(plist::Value::Dictionary(plist::Dictionary::from_iter([(
516 "Status".to_string(),
517 plist::Value::String("Complete".into()),
518 )]))),
519 ];
520 let mut stream = ResponseStream::with_frames(responses);
521 let mut proxy = InstallationProxy::new(&mut stream);
522
523 let apps = proxy.list_file_sharing_apps().await.unwrap();
524 assert_eq!(apps.len(), 1);
525 assert_eq!(apps[0].bundle_id, "com.example.Files");
526 }
527
528 #[tokio::test]
529 async fn lookup_app_sends_lookup_command_with_bundle_ids() {
530 let mut stream = RecordingStream::default();
531 let err = {
532 let mut proxy = InstallationProxy::new(&mut stream);
533 proxy.lookup_app("com.example.test").await.unwrap_err()
534 };
535
536 assert!(matches!(err, IpError::Io(_)));
537
538 let len = u32::from_be_bytes(stream.written[..4].try_into().unwrap()) as usize;
539 let payload = &stream.written[4..4 + len];
540 let dict: plist::Dictionary = plist::from_bytes(payload).unwrap();
541 assert_eq!(dict["Command"].as_string(), Some("Lookup"));
542 let client_options = dict["ClientOptions"].as_dictionary().unwrap();
543 let bundle_ids = client_options["BundleIDs"].as_array().unwrap();
544 assert_eq!(bundle_ids.len(), 1);
545 assert_eq!(bundle_ids[0].as_string(), Some("com.example.test"));
546 }
547
548 #[tokio::test]
549 async fn lookup_app_with_attributes_sends_return_attributes() {
550 let mut stream = RecordingStream::default();
551 let err = {
552 let mut proxy = InstallationProxy::new(&mut stream);
553 proxy
554 .lookup_app_with_attributes("com.example.test", &["CFBundleVersion", "Path"])
555 .await
556 .unwrap_err()
557 };
558
559 assert!(matches!(err, IpError::Io(_)));
560
561 let len = u32::from_be_bytes(stream.written[..4].try_into().unwrap()) as usize;
562 let payload = &stream.written[4..4 + len];
563 let dict: plist::Dictionary = plist::from_bytes(payload).unwrap();
564 let client_options = dict["ClientOptions"].as_dictionary().unwrap();
565 let attrs = client_options["ReturnAttributes"].as_array().unwrap();
566 assert_eq!(attrs.len(), 2);
567 assert_eq!(attrs[0].as_string(), Some("CFBundleVersion"));
568 assert_eq!(attrs[1].as_string(), Some("Path"));
569 }
570
571 #[tokio::test]
572 async fn list_user_apps_with_attributes_sends_return_attributes() {
573 let mut stream = RecordingStream::default();
574 let err = {
575 let mut proxy = InstallationProxy::new(&mut stream);
576 proxy
577 .list_user_apps_with_attributes(&[
578 "CFBundleIdentifier",
579 "ApplicationSINF",
580 "iTunesMetadata",
581 ])
582 .await
583 .unwrap_err()
584 };
585
586 assert!(matches!(err, IpError::Io(_)));
587
588 let len = u32::from_be_bytes(stream.written[..4].try_into().unwrap()) as usize;
589 let payload = &stream.written[4..4 + len];
590 let dict: plist::Dictionary = plist::from_bytes(payload).unwrap();
591 assert_eq!(dict["Command"].as_string(), Some("Browse"));
592
593 let client_options = dict["ClientOptions"].as_dictionary().unwrap();
594 assert_eq!(client_options["ApplicationType"].as_string(), Some("User"));
595 assert_eq!(
596 client_options["ShowLaunchProhibitedApps"].as_boolean(),
597 Some(true)
598 );
599 let attrs = client_options["ReturnAttributes"].as_array().unwrap();
600 assert_eq!(attrs.len(), 3);
601 assert_eq!(attrs[0].as_string(), Some("CFBundleIdentifier"));
602 assert_eq!(attrs[1].as_string(), Some("ApplicationSINF"));
603 assert_eq!(attrs[2].as_string(), Some("iTunesMetadata"));
604 }
605
606 #[tokio::test]
607 async fn install_sends_package_path_and_waits_for_completion() {
608 let responses = vec![
609 plist_frame(plist::Value::Dictionary(plist::Dictionary::from_iter([(
610 "Status".to_string(),
611 plist::Value::String("Installing".into()),
612 )]))),
613 plist_frame(plist::Value::Dictionary(plist::Dictionary::from_iter([(
614 "Status".to_string(),
615 plist::Value::String("Complete".into()),
616 )]))),
617 ];
618 let mut stream = ResponseStream::with_frames(responses);
619 {
620 let mut proxy = InstallationProxy::new(&mut stream);
621 proxy.install("/PublicStaging/Example.ipa").await.unwrap();
622 }
623
624 let len = u32::from_be_bytes(stream.written[..4].try_into().unwrap()) as usize;
625 let payload = &stream.written[4..4 + len];
626 let dict: plist::Dictionary = plist::from_bytes(payload).unwrap();
627 assert_eq!(dict["Command"].as_string(), Some("Install"));
628 assert_eq!(
629 dict["PackagePath"].as_string(),
630 Some("/PublicStaging/Example.ipa")
631 );
632 assert_eq!(
633 dict["ClientOptions"].as_dictionary(),
634 Some(&plist::Dictionary::new())
635 );
636 }
637
638 #[tokio::test]
639 async fn upgrade_sends_upgrade_command() {
640 let responses = vec![
641 plist_frame(plist::Value::Dictionary(plist::Dictionary::from_iter([(
642 "Status".to_string(),
643 plist::Value::String("Upgrading".into()),
644 )]))),
645 plist_frame(plist::Value::Dictionary(plist::Dictionary::from_iter([(
646 "Status".to_string(),
647 plist::Value::String("Complete".into()),
648 )]))),
649 ];
650 let mut stream = ResponseStream::with_frames(responses);
651 {
652 let mut proxy = InstallationProxy::new(&mut stream);
653 proxy.upgrade("/PublicStaging/Example.ipa").await.unwrap();
654 }
655
656 let len = u32::from_be_bytes(stream.written[..4].try_into().unwrap()) as usize;
657 let payload = &stream.written[4..4 + len];
658 let dict: plist::Dictionary = plist::from_bytes(payload).unwrap();
659 assert_eq!(dict["Command"].as_string(), Some("Upgrade"));
660 }
661
662 #[tokio::test]
663 async fn archive_sends_application_identifier() {
664 let responses = vec![plist_frame(plist::Value::Dictionary(
665 plist::Dictionary::from_iter([(
666 "Status".to_string(),
667 plist::Value::String("Complete".into()),
668 )]),
669 ))];
670 let mut stream = ResponseStream::with_frames(responses);
671 {
672 let mut proxy = InstallationProxy::new(&mut stream);
673 proxy.archive("com.example.test").await.unwrap();
674 }
675
676 let len = u32::from_be_bytes(stream.written[..4].try_into().unwrap()) as usize;
677 let payload = &stream.written[4..4 + len];
678 let dict: plist::Dictionary = plist::from_bytes(payload).unwrap();
679 assert_eq!(dict["Command"].as_string(), Some("Archive"));
680 assert_eq!(
681 dict["ApplicationIdentifier"].as_string(),
682 Some("com.example.test")
683 );
684 }
685
686 #[tokio::test]
687 async fn restore_sends_application_identifier() {
688 let responses = vec![plist_frame(plist::Value::Dictionary(
689 plist::Dictionary::from_iter([(
690 "Status".to_string(),
691 plist::Value::String("Complete".into()),
692 )]),
693 ))];
694 let mut stream = ResponseStream::with_frames(responses);
695 {
696 let mut proxy = InstallationProxy::new(&mut stream);
697 proxy.restore("com.example.test").await.unwrap();
698 }
699
700 let len = u32::from_be_bytes(stream.written[..4].try_into().unwrap()) as usize;
701 let payload = &stream.written[4..4 + len];
702 let dict: plist::Dictionary = plist::from_bytes(payload).unwrap();
703 assert_eq!(dict["Command"].as_string(), Some("Restore"));
704 assert_eq!(
705 dict["ApplicationIdentifier"].as_string(),
706 Some("com.example.test")
707 );
708 }
709
710 fn plist_frame(value: plist::Value) -> Vec<u8> {
711 let mut buf = Vec::new();
712 plist::to_writer_xml(&mut buf, &value).unwrap();
713 let mut framed = Vec::with_capacity(buf.len() + 4);
714 framed.extend_from_slice(&(buf.len() as u32).to_be_bytes());
715 framed.extend_from_slice(&buf);
716 framed
717 }
718
719 struct ResponseStream {
720 written: Vec<u8>,
721 read_buf: Vec<u8>,
722 read_pos: usize,
723 }
724
725 impl ResponseStream {
726 fn with_frames(frames: Vec<Vec<u8>>) -> Self {
727 let read_buf = frames.into_iter().flatten().collect();
728 Self {
729 written: Vec::new(),
730 read_buf,
731 read_pos: 0,
732 }
733 }
734 }
735
736 impl AsyncRead for ResponseStream {
737 fn poll_read(
738 mut self: Pin<&mut Self>,
739 _cx: &mut Context<'_>,
740 buf: &mut ReadBuf<'_>,
741 ) -> Poll<std::io::Result<()>> {
742 let remaining = self.read_buf.len().saturating_sub(self.read_pos);
743 if remaining == 0 {
744 return Poll::Ready(Err(std::io::Error::new(
745 std::io::ErrorKind::UnexpectedEof,
746 "no more test data",
747 )));
748 }
749
750 let to_copy = remaining.min(buf.remaining());
751 let start = self.read_pos;
752 let end = start + to_copy;
753 buf.put_slice(&self.read_buf[start..end]);
754 self.read_pos = end;
755 Poll::Ready(Ok(()))
756 }
757 }
758
759 impl AsyncWrite for ResponseStream {
760 fn poll_write(
761 mut self: Pin<&mut Self>,
762 _cx: &mut Context<'_>,
763 buf: &[u8],
764 ) -> Poll<std::io::Result<usize>> {
765 self.written.extend_from_slice(buf);
766 Poll::Ready(Ok(buf.len()))
767 }
768
769 fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
770 Poll::Ready(Ok(()))
771 }
772
773 fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
774 Poll::Ready(Ok(()))
775 }
776 }
777}