1use std::collections::HashMap;
9
10use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
11
12pub const SERVICE_NAME: &str = "com.apple.mobile.installation_proxy";
13
14#[derive(Debug, thiserror::Error)]
15pub enum IpError {
16 #[error("IO error: {0}")]
17 Io(#[from] std::io::Error),
18 #[error("plist error: {0}")]
19 Plist(String),
20 #[error("install error: {0}")]
21 Install(String),
22}
23
24#[derive(Debug, Clone)]
26pub struct AppInfo {
27 pub bundle_id: String,
28 pub display_name: String,
29 pub version: String,
30 pub app_type: String,
31 pub path: String,
32 pub extra: HashMap<String, plist::Value>,
33}
34
35pub struct InstallationProxy<S> {
37 stream: S,
38}
39
40impl<S: AsyncRead + AsyncWrite + Unpin> InstallationProxy<S> {
41 pub fn new(stream: S) -> Self {
42 Self { stream }
43 }
44
45 pub async fn install(&mut self, package_path: &str) -> Result<(), IpError> {
47 send_plist(
48 &mut self.stream,
49 &serde_json::json!({
50 "Command": "Install",
51 "PackagePath": package_path,
52 "ClientOptions": {},
53 }),
54 )
55 .await?;
56
57 self.wait_for_completion().await
58 }
59
60 pub async fn upgrade(&mut self, package_path: &str) -> Result<(), IpError> {
62 send_plist(
63 &mut self.stream,
64 &serde_json::json!({
65 "Command": "Upgrade",
66 "PackagePath": package_path,
67 "ClientOptions": {},
68 }),
69 )
70 .await?;
71
72 self.wait_for_completion().await
73 }
74
75 pub async fn list_user_apps(&mut self) -> Result<Vec<AppInfo>, IpError> {
77 self.browse("User", true, &[]).await
78 }
79
80 pub async fn list_user_apps_with_attributes(
82 &mut self,
83 return_attributes: &[&str],
84 ) -> Result<Vec<AppInfo>, IpError> {
85 self.browse("User", true, return_attributes).await
86 }
87
88 pub async fn list_all_apps(&mut self) -> Result<Vec<AppInfo>, IpError> {
90 self.browse("", true, &[]).await
91 }
92
93 pub async fn list_system_apps(&mut self) -> Result<Vec<AppInfo>, IpError> {
95 self.browse("System", false, &[]).await
96 }
97
98 pub async fn list_hidden_apps(&mut self) -> Result<Vec<AppInfo>, IpError> {
100 self.browse("Hidden", true, &[]).await
101 }
102
103 pub async fn list_file_sharing_apps(&mut self) -> Result<Vec<AppInfo>, IpError> {
105 let apps = self.list_all_apps().await?;
106 Ok(apps
107 .into_iter()
108 .filter(|app| {
109 app.extra
110 .get("UIFileSharingEnabled")
111 .and_then(plist::Value::as_boolean)
112 .unwrap_or(false)
113 })
114 .collect())
115 }
116
117 pub async fn uninstall(&mut self, bundle_id: &str) -> Result<(), IpError> {
119 self.send_bundle_identifier_command("Uninstall", bundle_id)
120 .await
121 }
122
123 pub async fn archive(&mut self, bundle_id: &str) -> Result<(), IpError> {
125 self.send_bundle_identifier_command("Archive", bundle_id)
126 .await
127 }
128
129 pub async fn restore(&mut self, bundle_id: &str) -> Result<(), IpError> {
131 self.send_bundle_identifier_command("Restore", bundle_id)
132 .await
133 }
134
135 async fn send_bundle_identifier_command(
136 &mut self,
137 command: &str,
138 bundle_id: &str,
139 ) -> Result<(), IpError> {
140 send_plist(
141 &mut self.stream,
142 &serde_json::json!({
143 "Command": command,
144 "ApplicationIdentifier": bundle_id,
145 "ClientOptions": {},
146 }),
147 )
148 .await?;
149
150 self.wait_for_completion().await
151 }
152
153 pub async fn lookup_app(&mut self, bundle_id: &str) -> Result<Option<AppInfo>, IpError> {
155 self.lookup_app_with_attributes(bundle_id, &[]).await
156 }
157
158 pub async fn lookup_app_with_attributes(
160 &mut self,
161 bundle_id: &str,
162 return_attributes: &[&str],
163 ) -> Result<Option<AppInfo>, IpError> {
164 let mut options = serde_json::json!({
165 "BundleIDs": [bundle_id],
166 });
167 if !return_attributes.is_empty() {
168 options["ReturnAttributes"] = serde_json::Value::Array(
169 return_attributes
170 .iter()
171 .map(|attr| serde_json::Value::String((*attr).to_string()))
172 .collect(),
173 );
174 }
175
176 let response = self.lookup(options).await?;
177
178 Ok(response
179 .into_iter()
180 .next()
181 .map(|(lookup_bundle_id, value)| {
182 parse_app_info_with_bundle_id(&lookup_bundle_id, value)
183 }))
184 }
185
186 async fn browse(
187 &mut self,
188 app_type: &str,
189 show_prohibited: bool,
190 return_attributes: &[&str],
191 ) -> Result<Vec<AppInfo>, IpError> {
192 let mut client_opts = serde_json::json!({});
193 if !app_type.is_empty() {
194 client_opts["ApplicationType"] = serde_json::Value::String(app_type.to_string());
195 }
196 if show_prohibited {
197 client_opts["ShowLaunchProhibitedApps"] = serde_json::Value::Bool(true);
198 }
199 if !return_attributes.is_empty() {
200 client_opts["ReturnAttributes"] = serde_json::Value::Array(
201 return_attributes
202 .iter()
203 .map(|attr| serde_json::Value::String((*attr).to_string()))
204 .collect(),
205 );
206 }
207
208 send_plist(
209 &mut self.stream,
210 &serde_json::json!({
211 "Command": "Browse",
212 "ClientOptions": client_opts,
213 }),
214 )
215 .await?;
216
217 let mut apps = Vec::new();
218 loop {
219 let data = recv_plist_raw(&mut self.stream).await?;
220 let resp: plist::Dictionary =
221 plist::from_bytes(&data).map_err(|e| IpError::Plist(e.to_string()))?;
222
223 for item in resp
224 .get("CurrentList")
225 .and_then(plist::Value::as_array)
226 .cloned()
227 .unwrap_or_default()
228 {
229 apps.push(parse_app_info(item));
230 }
231
232 if resp.get("Status").and_then(plist::Value::as_string) == Some("Complete") {
233 break;
234 }
235 }
236 Ok(apps)
237 }
238
239 async fn lookup(
240 &mut self,
241 client_options: serde_json::Value,
242 ) -> Result<HashMap<String, plist::Value>, IpError> {
243 send_plist(
244 &mut self.stream,
245 &serde_json::json!({
246 "Command": "Lookup",
247 "ClientOptions": client_options,
248 }),
249 )
250 .await?;
251
252 let data = recv_plist_raw(&mut self.stream).await?;
253 let mut dict: HashMap<String, plist::Value> =
254 plist::from_bytes(&data).map_err(|e| IpError::Plist(e.to_string()))?;
255 if let Some(e) = dict.get("Error") {
256 return Err(IpError::Install(format!("{e:?}")));
257 }
258
259 let result = dict
260 .remove("LookupResult")
261 .and_then(|value| value.into_dictionary())
262 .map(|items| items.into_iter().collect())
263 .unwrap_or_default();
264 Ok(result)
265 }
266
267 async fn wait_for_completion(&mut self) -> Result<(), IpError> {
268 loop {
269 let data = recv_plist_raw(&mut self.stream).await?;
270 let dict: HashMap<String, plist::Value> =
271 plist::from_bytes(&data).map_err(|e| IpError::Plist(e.to_string()))?;
272 if let Some(error) = dict.get("Error") {
273 let message = match dict.get("ErrorDescription").and_then(|v| v.as_string()) {
274 Some(description) => format!("{error:?}: {description}"),
275 None => format!("{error:?}"),
276 };
277 return Err(IpError::Install(message));
278 }
279 if dict.get("Status").and_then(|s| s.as_string()) == Some("Complete") {
280 return Ok(());
281 }
282 }
283 }
284}
285
286fn parse_app_info(val: plist::Value) -> AppInfo {
287 parse_app_info_with_bundle_id("", val)
288}
289
290fn parse_app_info_with_bundle_id(lookup_bundle_id: &str, val: plist::Value) -> AppInfo {
291 let dict = val.into_dictionary().unwrap_or_default();
292 let get_str = |k: &str| {
293 dict.get(k)
294 .and_then(|v| v.as_string())
295 .unwrap_or("")
296 .to_string()
297 };
298
299 let bundle_id = if lookup_bundle_id.is_empty() {
300 get_str("CFBundleIdentifier")
301 } else {
302 dict.get("CFBundleIdentifier")
303 .and_then(|v| v.as_string())
304 .unwrap_or(lookup_bundle_id)
305 .to_string()
306 };
307 let display_name = get_str("CFBundleDisplayName");
308 let version = get_str("CFBundleShortVersionString");
309 let app_type = get_str("ApplicationType");
310 let path = get_str("Path");
311
312 let extra = dict.into_iter().collect();
313
314 AppInfo {
315 bundle_id,
316 display_name,
317 version,
318 app_type,
319 path,
320 extra,
321 }
322}
323
324async fn send_plist<S>(stream: &mut S, value: &serde_json::Value) -> Result<(), IpError>
327where
328 S: AsyncWrite + Unpin,
329{
330 let plist_val = json_to_plist(value);
332 let mut buf = Vec::new();
333 plist::to_writer_xml(&mut buf, &plist_val).map_err(|e| IpError::Plist(e.to_string()))?;
334 stream.write_all(&(buf.len() as u32).to_be_bytes()).await?;
335 stream.write_all(&buf).await?;
336 stream.flush().await?;
337 Ok(())
338}
339
340async fn recv_plist_raw<S>(stream: &mut S) -> Result<Vec<u8>, IpError>
341where
342 S: AsyncRead + Unpin,
343{
344 let mut len_buf = [0u8; 4];
345 stream.read_exact(&mut len_buf).await?;
346 let len = u32::from_be_bytes(len_buf) as usize;
347 const MAX_PLIST_SIZE: usize = 4 * 1024 * 1024;
348 if len > MAX_PLIST_SIZE {
349 return Err(IpError::Plist(format!(
350 "plist length {len} exceeds maximum of {MAX_PLIST_SIZE}"
351 )));
352 }
353 let mut buf = vec![0u8; len];
354 stream.read_exact(&mut buf).await?;
355 Ok(buf)
356}
357
358fn json_to_plist(val: &serde_json::Value) -> plist::Value {
359 match val {
360 serde_json::Value::Null => plist::Value::String(String::new()),
361 serde_json::Value::Bool(b) => plist::Value::Boolean(*b),
362 serde_json::Value::Number(n) => {
363 if let Some(i) = n.as_i64() {
364 plist::Value::Integer(plist::Integer::from(i))
365 } else {
366 plist::Value::Real(n.as_f64().unwrap_or(0.0))
367 }
368 }
369 serde_json::Value::String(s) => plist::Value::String(s.clone()),
370 serde_json::Value::Array(arr) => {
371 plist::Value::Array(arr.iter().map(json_to_plist).collect())
372 }
373 serde_json::Value::Object(map) => {
374 let dict: plist::Dictionary = map
375 .iter()
376 .map(|(k, v)| (k.clone(), json_to_plist(v)))
377 .collect();
378 plist::Value::Dictionary(dict)
379 }
380 }
381}
382
383#[cfg(test)]
384mod tests {
385 use std::pin::Pin;
386 use std::task::{Context, Poll};
387
388 use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
389
390 use super::*;
391
392 #[derive(Default)]
393 struct RecordingStream {
394 written: Vec<u8>,
395 }
396
397 impl AsyncRead for RecordingStream {
398 fn poll_read(
399 self: Pin<&mut Self>,
400 _cx: &mut Context<'_>,
401 _buf: &mut ReadBuf<'_>,
402 ) -> Poll<std::io::Result<()>> {
403 Poll::Ready(Err(std::io::Error::new(
404 std::io::ErrorKind::UnexpectedEof,
405 "test stream has no responses",
406 )))
407 }
408 }
409
410 impl AsyncWrite for RecordingStream {
411 fn poll_write(
412 mut self: Pin<&mut Self>,
413 _cx: &mut Context<'_>,
414 buf: &[u8],
415 ) -> Poll<std::io::Result<usize>> {
416 self.written.extend_from_slice(buf);
417 Poll::Ready(Ok(buf.len()))
418 }
419
420 fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
421 Poll::Ready(Ok(()))
422 }
423
424 fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
425 Poll::Ready(Ok(()))
426 }
427 }
428
429 #[tokio::test]
430 async fn list_system_apps_sends_system_filter() {
431 let mut stream = RecordingStream::default();
432 let err = {
433 let mut proxy = InstallationProxy::new(&mut stream);
434 proxy.list_system_apps().await.unwrap_err()
435 };
436
437 assert!(matches!(err, IpError::Io(_)));
438
439 let len = u32::from_be_bytes(stream.written[..4].try_into().unwrap()) as usize;
440 let payload = &stream.written[4..4 + len];
441 let dict: plist::Dictionary = plist::from_bytes(payload).unwrap();
442 let client_options = dict["ClientOptions"].as_dictionary().unwrap();
443 assert_eq!(
444 client_options["ApplicationType"].as_string(),
445 Some("System")
446 );
447 assert!(!client_options.contains_key("ShowLaunchProhibitedApps"));
448 }
449
450 #[tokio::test]
451 async fn list_hidden_apps_sends_hidden_filter() {
452 let mut stream = RecordingStream::default();
453 let err = {
454 let mut proxy = InstallationProxy::new(&mut stream);
455 proxy.list_hidden_apps().await.unwrap_err()
456 };
457
458 assert!(matches!(err, IpError::Io(_)));
459
460 let len = u32::from_be_bytes(stream.written[..4].try_into().unwrap()) as usize;
461 let payload = &stream.written[4..4 + len];
462 let dict: plist::Dictionary = plist::from_bytes(payload).unwrap();
463 let client_options = dict["ClientOptions"].as_dictionary().unwrap();
464 assert_eq!(
465 client_options["ApplicationType"].as_string(),
466 Some("Hidden")
467 );
468 assert_eq!(
469 client_options["ShowLaunchProhibitedApps"].as_boolean(),
470 Some(true)
471 );
472 }
473
474 #[tokio::test]
475 async fn list_file_sharing_apps_filters_on_ui_file_sharing_enabled() {
476 let responses = vec![
477 plist_frame(plist::Value::Dictionary(plist::Dictionary::from_iter([
478 (
479 "Status".to_string(),
480 plist::Value::String("BrowsingApplications".into()),
481 ),
482 (
483 "CurrentList".to_string(),
484 plist::Value::Array(vec![
485 plist::Value::Dictionary(plist::Dictionary::from_iter([
486 (
487 "CFBundleIdentifier".to_string(),
488 plist::Value::String("com.example.Files".into()),
489 ),
490 (
491 "CFBundleDisplayName".to_string(),
492 plist::Value::String("Files".into()),
493 ),
494 (
495 "UIFileSharingEnabled".to_string(),
496 plist::Value::Boolean(true),
497 ),
498 ])),
499 plist::Value::Dictionary(plist::Dictionary::from_iter([
500 (
501 "CFBundleIdentifier".to_string(),
502 plist::Value::String("com.example.Hidden".into()),
503 ),
504 (
505 "CFBundleDisplayName".to_string(),
506 plist::Value::String("Hidden".into()),
507 ),
508 (
509 "UIFileSharingEnabled".to_string(),
510 plist::Value::Boolean(false),
511 ),
512 ])),
513 ]),
514 ),
515 ]))),
516 plist_frame(plist::Value::Dictionary(plist::Dictionary::from_iter([(
517 "Status".to_string(),
518 plist::Value::String("Complete".into()),
519 )]))),
520 ];
521 let mut stream = ResponseStream::with_frames(responses);
522 let mut proxy = InstallationProxy::new(&mut stream);
523
524 let apps = proxy.list_file_sharing_apps().await.unwrap();
525 assert_eq!(apps.len(), 1);
526 assert_eq!(apps[0].bundle_id, "com.example.Files");
527 }
528
529 #[tokio::test]
530 async fn lookup_app_sends_lookup_command_with_bundle_ids() {
531 let mut stream = RecordingStream::default();
532 let err = {
533 let mut proxy = InstallationProxy::new(&mut stream);
534 proxy.lookup_app("com.example.test").await.unwrap_err()
535 };
536
537 assert!(matches!(err, IpError::Io(_)));
538
539 let len = u32::from_be_bytes(stream.written[..4].try_into().unwrap()) as usize;
540 let payload = &stream.written[4..4 + len];
541 let dict: plist::Dictionary = plist::from_bytes(payload).unwrap();
542 assert_eq!(dict["Command"].as_string(), Some("Lookup"));
543 let client_options = dict["ClientOptions"].as_dictionary().unwrap();
544 let bundle_ids = client_options["BundleIDs"].as_array().unwrap();
545 assert_eq!(bundle_ids.len(), 1);
546 assert_eq!(bundle_ids[0].as_string(), Some("com.example.test"));
547 }
548
549 #[tokio::test]
550 async fn lookup_app_with_attributes_sends_return_attributes() {
551 let mut stream = RecordingStream::default();
552 let err = {
553 let mut proxy = InstallationProxy::new(&mut stream);
554 proxy
555 .lookup_app_with_attributes("com.example.test", &["CFBundleVersion", "Path"])
556 .await
557 .unwrap_err()
558 };
559
560 assert!(matches!(err, IpError::Io(_)));
561
562 let len = u32::from_be_bytes(stream.written[..4].try_into().unwrap()) as usize;
563 let payload = &stream.written[4..4 + len];
564 let dict: plist::Dictionary = plist::from_bytes(payload).unwrap();
565 let client_options = dict["ClientOptions"].as_dictionary().unwrap();
566 let attrs = client_options["ReturnAttributes"].as_array().unwrap();
567 assert_eq!(attrs.len(), 2);
568 assert_eq!(attrs[0].as_string(), Some("CFBundleVersion"));
569 assert_eq!(attrs[1].as_string(), Some("Path"));
570 }
571
572 #[tokio::test]
573 async fn list_user_apps_with_attributes_sends_return_attributes() {
574 let mut stream = RecordingStream::default();
575 let err = {
576 let mut proxy = InstallationProxy::new(&mut stream);
577 proxy
578 .list_user_apps_with_attributes(&[
579 "CFBundleIdentifier",
580 "ApplicationSINF",
581 "iTunesMetadata",
582 ])
583 .await
584 .unwrap_err()
585 };
586
587 assert!(matches!(err, IpError::Io(_)));
588
589 let len = u32::from_be_bytes(stream.written[..4].try_into().unwrap()) as usize;
590 let payload = &stream.written[4..4 + len];
591 let dict: plist::Dictionary = plist::from_bytes(payload).unwrap();
592 assert_eq!(dict["Command"].as_string(), Some("Browse"));
593
594 let client_options = dict["ClientOptions"].as_dictionary().unwrap();
595 assert_eq!(client_options["ApplicationType"].as_string(), Some("User"));
596 assert_eq!(
597 client_options["ShowLaunchProhibitedApps"].as_boolean(),
598 Some(true)
599 );
600 let attrs = client_options["ReturnAttributes"].as_array().unwrap();
601 assert_eq!(attrs.len(), 3);
602 assert_eq!(attrs[0].as_string(), Some("CFBundleIdentifier"));
603 assert_eq!(attrs[1].as_string(), Some("ApplicationSINF"));
604 assert_eq!(attrs[2].as_string(), Some("iTunesMetadata"));
605 }
606
607 #[tokio::test]
608 async fn install_sends_package_path_and_waits_for_completion() {
609 let responses = vec![
610 plist_frame(plist::Value::Dictionary(plist::Dictionary::from_iter([(
611 "Status".to_string(),
612 plist::Value::String("Installing".into()),
613 )]))),
614 plist_frame(plist::Value::Dictionary(plist::Dictionary::from_iter([(
615 "Status".to_string(),
616 plist::Value::String("Complete".into()),
617 )]))),
618 ];
619 let mut stream = ResponseStream::with_frames(responses);
620 {
621 let mut proxy = InstallationProxy::new(&mut stream);
622 proxy.install("/PublicStaging/Example.ipa").await.unwrap();
623 }
624
625 let len = u32::from_be_bytes(stream.written[..4].try_into().unwrap()) as usize;
626 let payload = &stream.written[4..4 + len];
627 let dict: plist::Dictionary = plist::from_bytes(payload).unwrap();
628 assert_eq!(dict["Command"].as_string(), Some("Install"));
629 assert_eq!(
630 dict["PackagePath"].as_string(),
631 Some("/PublicStaging/Example.ipa")
632 );
633 assert_eq!(
634 dict["ClientOptions"].as_dictionary(),
635 Some(&plist::Dictionary::new())
636 );
637 }
638
639 #[tokio::test]
640 async fn upgrade_sends_upgrade_command() {
641 let responses = vec![
642 plist_frame(plist::Value::Dictionary(plist::Dictionary::from_iter([(
643 "Status".to_string(),
644 plist::Value::String("Upgrading".into()),
645 )]))),
646 plist_frame(plist::Value::Dictionary(plist::Dictionary::from_iter([(
647 "Status".to_string(),
648 plist::Value::String("Complete".into()),
649 )]))),
650 ];
651 let mut stream = ResponseStream::with_frames(responses);
652 {
653 let mut proxy = InstallationProxy::new(&mut stream);
654 proxy.upgrade("/PublicStaging/Example.ipa").await.unwrap();
655 }
656
657 let len = u32::from_be_bytes(stream.written[..4].try_into().unwrap()) as usize;
658 let payload = &stream.written[4..4 + len];
659 let dict: plist::Dictionary = plist::from_bytes(payload).unwrap();
660 assert_eq!(dict["Command"].as_string(), Some("Upgrade"));
661 }
662
663 #[tokio::test]
664 async fn archive_sends_application_identifier() {
665 let responses = vec![plist_frame(plist::Value::Dictionary(
666 plist::Dictionary::from_iter([(
667 "Status".to_string(),
668 plist::Value::String("Complete".into()),
669 )]),
670 ))];
671 let mut stream = ResponseStream::with_frames(responses);
672 {
673 let mut proxy = InstallationProxy::new(&mut stream);
674 proxy.archive("com.example.test").await.unwrap();
675 }
676
677 let len = u32::from_be_bytes(stream.written[..4].try_into().unwrap()) as usize;
678 let payload = &stream.written[4..4 + len];
679 let dict: plist::Dictionary = plist::from_bytes(payload).unwrap();
680 assert_eq!(dict["Command"].as_string(), Some("Archive"));
681 assert_eq!(
682 dict["ApplicationIdentifier"].as_string(),
683 Some("com.example.test")
684 );
685 }
686
687 #[tokio::test]
688 async fn restore_sends_application_identifier() {
689 let responses = vec![plist_frame(plist::Value::Dictionary(
690 plist::Dictionary::from_iter([(
691 "Status".to_string(),
692 plist::Value::String("Complete".into()),
693 )]),
694 ))];
695 let mut stream = ResponseStream::with_frames(responses);
696 {
697 let mut proxy = InstallationProxy::new(&mut stream);
698 proxy.restore("com.example.test").await.unwrap();
699 }
700
701 let len = u32::from_be_bytes(stream.written[..4].try_into().unwrap()) as usize;
702 let payload = &stream.written[4..4 + len];
703 let dict: plist::Dictionary = plist::from_bytes(payload).unwrap();
704 assert_eq!(dict["Command"].as_string(), Some("Restore"));
705 assert_eq!(
706 dict["ApplicationIdentifier"].as_string(),
707 Some("com.example.test")
708 );
709 }
710
711 fn plist_frame(value: plist::Value) -> Vec<u8> {
712 let mut buf = Vec::new();
713 plist::to_writer_xml(&mut buf, &value).unwrap();
714 let mut framed = Vec::with_capacity(buf.len() + 4);
715 framed.extend_from_slice(&(buf.len() as u32).to_be_bytes());
716 framed.extend_from_slice(&buf);
717 framed
718 }
719
720 struct ResponseStream {
721 written: Vec<u8>,
722 read_buf: Vec<u8>,
723 read_pos: usize,
724 }
725
726 impl ResponseStream {
727 fn with_frames(frames: Vec<Vec<u8>>) -> Self {
728 let read_buf = frames.into_iter().flatten().collect();
729 Self {
730 written: Vec::new(),
731 read_buf,
732 read_pos: 0,
733 }
734 }
735 }
736
737 impl AsyncRead for ResponseStream {
738 fn poll_read(
739 mut self: Pin<&mut Self>,
740 _cx: &mut Context<'_>,
741 buf: &mut ReadBuf<'_>,
742 ) -> Poll<std::io::Result<()>> {
743 let remaining = self.read_buf.len().saturating_sub(self.read_pos);
744 if remaining == 0 {
745 return Poll::Ready(Err(std::io::Error::new(
746 std::io::ErrorKind::UnexpectedEof,
747 "no more test data",
748 )));
749 }
750
751 let to_copy = remaining.min(buf.remaining());
752 let start = self.read_pos;
753 let end = start + to_copy;
754 buf.put_slice(&self.read_buf[start..end]);
755 self.read_pos = end;
756 Poll::Ready(Ok(()))
757 }
758 }
759
760 impl AsyncWrite for ResponseStream {
761 fn poll_write(
762 mut self: Pin<&mut Self>,
763 _cx: &mut Context<'_>,
764 buf: &[u8],
765 ) -> Poll<std::io::Result<usize>> {
766 self.written.extend_from_slice(buf);
767 Poll::Ready(Ok(buf.len()))
768 }
769
770 fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
771 Poll::Ready(Ok(()))
772 }
773
774 fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
775 Poll::Ready(Ok(()))
776 }
777 }
778}