1pub mod adb;
6pub mod shell;
7
8#[cfg(test)]
9pub mod test;
10
11use log::{debug, trace, warn};
12use once_cell::sync::Lazy;
13use regex::Regex;
14use std::collections::BTreeMap;
15use std::convert::TryFrom;
16use std::io;
17use std::iter::FromIterator;
18use std::num::{ParseIntError, TryFromIntError};
19use std::path::{Component, Path};
20use std::str::{FromStr, Utf8Error};
21use std::time::SystemTime;
22use thiserror::Error;
23use tokio::fs::File;
24use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
25use tokio::net::TcpStream;
26pub use unix_path::{Path as UnixPath, PathBuf as UnixPathBuf};
27use uuid::Uuid;
28use walkdir::WalkDir;
29
30use crate::adb::{DeviceSerial, SyncCommand};
31
32pub type Result<T> = std::result::Result<T, DeviceError>;
33
34static SYNC_REGEX: Lazy<Regex> = Lazy::new(|| Regex::new(r"[^A-Za-z0-9_@%+=:,./-]").unwrap());
35
36#[derive(Debug, Clone, Copy, PartialEq, Default)]
37pub enum AndroidStorageInput {
38 #[default]
39 Auto,
40 App,
41 Internal,
42 Sdcard,
43}
44
45impl FromStr for AndroidStorageInput {
46 type Err = DeviceError;
47
48 fn from_str(s: &str) -> Result<Self> {
49 match s {
50 "auto" => Ok(AndroidStorageInput::Auto),
51 "app" => Ok(AndroidStorageInput::App),
52 "internal" => Ok(AndroidStorageInput::Internal),
53 "sdcard" => Ok(AndroidStorageInput::Sdcard),
54 _ => Err(DeviceError::InvalidStorage),
55 }
56 }
57}
58
59#[derive(Debug, Clone, Copy, PartialEq)]
60pub enum AndroidStorage {
61 App,
62 Internal,
63 Sdcard,
64}
65
66#[derive(Debug, Error)]
67pub enum DeviceError {
68 #[error("{0}")]
69 Adb(String),
70 #[error(transparent)]
71 FromInt(#[from] TryFromIntError),
72 #[error("Invalid storage")]
73 InvalidStorage,
74 #[error(transparent)]
75 Io(#[from] io::Error),
76 #[error("Missing package")]
77 MissingPackage,
78 #[error("Multiple Android devices online")]
79 MultipleDevices,
80 #[error(transparent)]
81 ParseInt(#[from] ParseIntError),
82 #[error("Unknown Android device with serial '{0}'")]
83 UnknownDevice(String),
84 #[error(transparent)]
85 Utf8(#[from] Utf8Error),
86 #[error(transparent)]
87 WalkDir(#[from] walkdir::Error),
88}
89
90fn encode_message(payload: &str) -> Result<String> {
91 let hex_length = u16::try_from(payload.len()).map(|len| format!("{:0>4X}", len))?;
92
93 Ok(format!("{}{}", hex_length, payload))
94}
95
96fn parse_device_info(line: &str) -> Option<DeviceInfo> {
97 let mut pairs = line.split_whitespace();
99 let serial = pairs.next();
100 let state = pairs.next();
101 if let (Some(serial), Some("device")) = (serial, state) {
102 let info: BTreeMap<String, String> = pairs
103 .filter_map(|pair| {
104 let mut kv = pair.split(':');
105 if let (Some(k), Some(v), None) = (kv.next(), kv.next(), kv.next()) {
106 Some((k.to_owned(), v.to_owned()))
107 } else {
108 None
109 }
110 })
111 .collect();
112
113 Some(DeviceInfo {
114 serial: serial.to_owned(),
115 info,
116 })
117 } else {
118 None
119 }
120}
121
122async fn read_length<R: AsyncRead + Unpin>(stream: &mut R) -> Result<usize> {
124 let mut bytes: [u8; 4] = [0; 4];
125 stream.read_exact(&mut bytes).await?;
126
127 let response = std::str::from_utf8(&bytes)?;
128
129 Ok(usize::from_str_radix(response, 16)?)
130}
131
132async fn read_length_little_endian<R: AsyncRead + Unpin>(reader: &mut R) -> Result<usize> {
134 let mut bytes: [u8; 4] = [0; 4];
135 reader.read_exact(&mut bytes).await?;
136
137 let n: usize = (bytes[0] as usize)
138 + ((bytes[1] as usize) << 8)
139 + ((bytes[2] as usize) << 16)
140 + ((bytes[3] as usize) << 24);
141
142 Ok(n)
143}
144
145async fn write_length_little_endian<W: AsyncWrite + Unpin>(
147 writer: &mut W,
148 n: usize,
149) -> Result<usize> {
150 let mut bytes = [0; 4];
151 bytes[0] = (n & 0xFF) as u8;
152 bytes[1] = ((n >> 8) & 0xFF) as u8;
153 bytes[2] = ((n >> 16) & 0xFF) as u8;
154 bytes[3] = ((n >> 24) & 0xFF) as u8;
155
156 writer.write(&bytes[..]).await.map_err(DeviceError::Io)
157}
158
159async fn read_response(
160 stream: &mut TcpStream,
161 has_output: bool,
162 has_length: bool,
163) -> Result<Vec<u8>> {
164 let mut bytes: [u8; 1024] = [0; 1024];
165
166 stream.read_exact(&mut bytes[0..4]).await?;
167
168 if !bytes.starts_with(SyncCommand::Okay.code()) {
169 let n = bytes.len().min(read_length(stream).await?);
170 stream.read_exact(&mut bytes[0..n]).await?;
171
172 let message = std::str::from_utf8(&bytes[0..n]).map(|s| format!("adb error: {}", s))?;
173
174 return Err(DeviceError::Adb(message));
175 }
176
177 let mut response = Vec::new();
178
179 if has_output {
180 stream.read_to_end(&mut response).await?;
181
182 if response.starts_with(SyncCommand::Okay.code()) {
183 response = response.split_off(4);
186 }
187
188 if response.starts_with(SyncCommand::Fail.code()) {
189 response = response.split_off(8);
192
193 let message = std::str::from_utf8(&response).map(|s| format!("adb error: {}", s))?;
194
195 return Err(DeviceError::Adb(message));
196 }
197
198 if has_length {
199 if response.len() >= 4 {
200 let message = response.split_off(4);
201 let slice: &mut &[u8] = &mut &*response;
202
203 let n = read_length(slice).await?;
204 if n != message.len() {
205 warn!(
206 "adb server response contained hexstring len {} but remaining message length is {}",
207 n,
208 message.len()
209 );
210 }
211
212 trace!(
213 "adb server response was {:?}",
214 std::str::from_utf8(&message)?
215 );
216
217 return Ok(message);
218 } else {
219 return Err(DeviceError::Adb(format!(
220 "adb server response did not contain expected hexstring length: {:?}",
221 std::str::from_utf8(&response)?
222 )));
223 }
224 }
225 }
226
227 Ok(response)
228}
229
230#[derive(Debug, Clone, Eq, PartialEq, Ord, PartialOrd)]
232pub struct DeviceInfo {
233 pub serial: DeviceSerial,
234 pub info: BTreeMap<String, String>,
235}
236
237#[derive(Debug, Clone, PartialEq)]
240pub struct Host {
241 pub host: Option<String>,
243 pub port: Option<u16>,
245}
246
247impl Default for Host {
248 fn default() -> Host {
249 Host {
250 host: Some("localhost".to_string()),
251 port: Some(5037),
252 }
253 }
254}
255
256impl Host {
257 pub async fn device_or_default<T: AsRef<str>>(
262 self,
263 device_serial: Option<&T>,
264 storage: AndroidStorageInput,
265 ) -> Result<Device> {
266 let serials: Vec<String> = self
267 .devices::<Vec<_>>()
268 .await?
269 .into_iter()
270 .map(|d| d.serial)
271 .collect();
272
273 if let Some(ref serial) = device_serial
274 .map(|v| v.as_ref().to_owned())
275 .or_else(|| std::env::var("ANDROID_SERIAL").ok())
276 {
277 if !serials.contains(serial) {
278 return Err(DeviceError::UnknownDevice(serial.clone()));
279 }
280
281 return Device::new(self, serial.to_owned(), storage).await;
282 }
283
284 if serials.len() > 1 {
285 return Err(DeviceError::MultipleDevices);
286 }
287
288 if let Some(ref serial) = serials.first() {
289 return Device::new(self, serial.to_owned().to_string(), storage).await;
290 }
291
292 Err(DeviceError::Adb("No Android devices are online".to_owned()))
293 }
294
295 pub async fn connect(&self) -> Result<TcpStream> {
296 let stream = TcpStream::connect(format!(
297 "{}:{}",
298 self.host.clone().unwrap_or_else(|| "localhost".to_owned()),
299 self.port.unwrap_or(5037)
300 ))
301 .await?;
302 Ok(stream)
303 }
304
305 pub async fn execute_command(
306 &self,
307 command: &str,
308 has_output: bool,
309 has_length: bool,
310 ) -> Result<String> {
311 let mut stream = self.connect().await?;
312
313 stream
314 .write_all(encode_message(command)?.as_bytes())
315 .await?;
316 let bytes = read_response(&mut stream, has_output, has_length).await?;
317 let response = std::str::from_utf8(&bytes)?;
320
321 Ok(response.to_owned())
322 }
323
324 pub async fn execute_host_command(
325 &self,
326 host_command: &str,
327 has_length: bool,
328 has_output: bool,
329 ) -> Result<String> {
330 self.execute_command(&format!("host:{}", host_command), has_output, has_length)
331 .await
332 }
333
334 pub async fn features<B: FromIterator<String>>(&self) -> Result<B> {
335 let features = self.execute_host_command("features", true, true).await?;
336 Ok(features.split(',').map(|x| x.to_owned()).collect())
337 }
338
339 pub async fn devices<B: FromIterator<DeviceInfo>>(&self) -> Result<B> {
340 let response = self.execute_host_command("devices-l", true, true).await?;
341
342 let infos: B = response.lines().filter_map(parse_device_info).collect();
343
344 Ok(infos)
345 }
346}
347
348#[derive(Debug, Clone)]
350pub struct Device {
351 pub host: Host,
353
354 pub serial: DeviceSerial,
356
357 pub run_as_package: Option<String>,
358
359 pub storage: AndroidStorage,
360
361 pub tempfile: UnixPathBuf,
363}
364
365#[derive(Debug, Eq, PartialEq, Ord, PartialOrd)]
366pub struct RemoteDirEntry {
367 pub depth: usize,
368 pub metadata: RemoteMetadata,
369 pub name: String,
370}
371
372#[derive(Debug, Eq, PartialEq, Ord, PartialOrd)]
373pub enum RemoteMetadata {
374 RemoteFile(RemoteFileMetadata),
375 RemoteDir,
376 RemoteSymlink,
377}
378#[derive(Debug, Eq, PartialEq, Ord, PartialOrd)]
379pub struct RemoteFileMetadata {
380 pub mode: usize,
381 pub size: usize,
382}
383
384impl Device {
385 pub async fn new(
386 host: Host,
387 serial: DeviceSerial,
388 storage: AndroidStorageInput,
389 ) -> Result<Device> {
390 let mut device = Device {
391 host,
392 serial,
393 run_as_package: None,
394 storage: AndroidStorage::App,
395 tempfile: UnixPathBuf::from("/data/local/tmp"),
396 };
397 device
398 .tempfile
399 .push(Uuid::new_v4().as_hyphenated().to_string());
400
401 device.storage = match storage {
402 AndroidStorageInput::App => AndroidStorage::App,
403 AndroidStorageInput::Internal => AndroidStorage::Internal,
404 AndroidStorageInput::Sdcard => AndroidStorage::Sdcard,
405 AndroidStorageInput::Auto => AndroidStorage::Sdcard,
406 };
407
408 Ok(device)
409 }
410
411 pub async fn clear_app_data(&self, package: &str) -> Result<bool> {
412 self.execute_host_shell_command(&format!("pm clear {}", package))
413 .await
414 .map(|v| v.contains("Success"))
415 }
416
417 pub async fn create_dir(&self, path: &UnixPath) -> Result<()> {
418 debug!("Creating {}", path.display());
419
420 let enable_run_as = self.enable_run_as_for_path(path);
421 self.execute_host_shell_command_as(&format!("mkdir -p {}", path.display()), enable_run_as)
422 .await?;
423
424 Ok(())
425 }
426
427 pub async fn chmod(&self, path: &UnixPath, mask: &str, recursive: bool) -> Result<()> {
428 let enable_run_as = self.enable_run_as_for_path(path);
429
430 let recursive = match recursive {
431 true => " -R",
432 false => "",
433 };
434
435 self.execute_host_shell_command_as(
436 &format!("chmod {} {} {}", recursive, mask, path.display()),
437 enable_run_as,
438 )
439 .await?;
440
441 Ok(())
442 }
443
444 pub async fn execute_host_command(
445 &self,
446 command: &str,
447 has_output: bool,
448 has_length: bool,
449 ) -> Result<Vec<u8>> {
450 let mut stream = self.host.connect().await?;
451
452 let switch_command = format!("host:transport:{}", self.serial);
453 trace!("execute_host_command: >> {:?}", &switch_command);
454 stream
455 .write_all(encode_message(&switch_command)?.as_bytes())
456 .await?;
457 let _bytes = read_response(&mut stream, false, false).await?;
458 trace!("execute_host_command: << {:?}", _bytes);
459 trace!("execute_host_command: >> {:?}", &command);
462 stream
463 .write_all(encode_message(command)?.as_bytes())
464 .await?;
465 let bytes = read_response(&mut stream, has_output, has_length).await?;
466 trace!("execute_host_command: << {:?}", bstr::BStr::new(&bytes));
467
468 Ok(bytes)
469 }
470
471 pub async fn execute_host_command_to_string(
472 &self,
473 command: &str,
474 has_output: bool,
475 has_length: bool,
476 ) -> Result<String> {
477 let bytes = self
478 .execute_host_command(command, has_output, has_length)
479 .await?;
480
481 let response = std::str::from_utf8(&bytes)?;
482
483 Ok(response.replace("\r\n", "\n"))
485 }
486
487 pub fn enable_run_as_for_path(&self, path: &UnixPath) -> bool {
488 match &self.run_as_package {
489 Some(package) => {
490 let mut p = UnixPathBuf::from("/data/data/");
491 p.push(package);
492 path.starts_with(p)
493 }
494 None => false,
495 }
496 }
497
498 pub async fn execute_host_shell_command(&self, shell_command: &str) -> Result<String> {
499 self.execute_host_shell_command_as(shell_command, false)
500 .await
501 }
502
503 pub async fn execute_host_exec_out_command(&self, shell_command: &str) -> Result<Vec<u8>> {
504 self.execute_host_command(&format!("exec:{}", shell_command), true, false)
505 .await
506 }
507
508 pub async fn execute_host_shell_command_as(
509 &self,
510 shell_command: &str,
511 enable_run_as: bool,
512 ) -> Result<String> {
513 if shell_command.starts_with("su") {
515 return self
516 .execute_host_command_to_string(&format!("shell:{}", shell_command), true, false)
517 .await;
518 }
519
520 let has_outer_quotes = shell_command.starts_with('"') && shell_command.ends_with('"')
521 || shell_command.starts_with('\'') && shell_command.ends_with('\'');
522
523 if enable_run_as {
525 let run_as_package = self
526 .run_as_package
527 .as_ref()
528 .ok_or(DeviceError::MissingPackage)?;
529
530 if has_outer_quotes {
531 return self
532 .execute_host_command_to_string(
533 &format!("shell:run-as {} {}", run_as_package, shell_command),
534 true,
535 false,
536 )
537 .await;
538 }
539
540 if SYNC_REGEX.is_match(shell_command) {
541 let arg: &str = &shell_command.replace('\'', "'\"'\"'")[..];
542 return self
543 .execute_host_command_to_string(
544 &format!("shell:run-as {} {}", run_as_package, arg),
545 true,
546 false,
547 )
548 .await;
549 }
550
551 return self
552 .execute_host_command_to_string(
553 &format!("shell:run-as {} \"{}\"", run_as_package, shell_command),
554 true,
555 false,
556 )
557 .await;
558 }
559
560 self.execute_host_command_to_string(&format!("shell:{}", shell_command), true, false)
561 .await
562 }
563
564 pub async fn is_app_installed(&self, package: &str) -> Result<bool> {
565 self.execute_host_shell_command(&format!("pm path {}", package))
566 .await
567 .map(|v| v.contains("package:"))
568 }
569
570 pub async fn launch<T: AsRef<str>>(
571 &self,
572 package: &str,
573 activity: &str,
574 am_start_args: &[T],
575 ) -> Result<bool> {
576 let mut am_start = format!("am start -W -n {}/{}", package, activity);
577
578 for arg in am_start_args {
579 am_start.push(' ');
580 if SYNC_REGEX.is_match(arg.as_ref()) {
581 am_start.push_str(&format!("\"{}\"", &shell::escape(arg.as_ref())));
582 } else {
583 am_start.push_str(&shell::escape(arg.as_ref()));
584 };
585 }
586
587 self.execute_host_shell_command(&am_start)
588 .await
589 .map(|v| v.contains("Complete"))
590 }
591
592 pub async fn force_stop(&self, package: &str) -> Result<()> {
593 debug!("Force stopping Android package: {}", package);
594 self.execute_host_shell_command(&format!("am force-stop {}", package))
595 .await
596 .and(Ok(()))
597 }
598
599 pub async fn forward_port(&self, local: u16, remote: u16) -> Result<u16> {
600 let command = format!(
601 "host-serial:{}:forward:tcp:{};tcp:{}",
602 self.serial, local, remote
603 );
604 let response = self.host.execute_command(&command, true, false).await?;
605
606 if local == 0 {
607 Ok(response.parse::<u16>()?)
608 } else {
609 Ok(local)
610 }
611 }
612
613 pub async fn kill_forward_port(&self, local: u16) -> Result<()> {
614 let command = format!("host-serial:{}:killforward:tcp:{}", self.serial, local);
615 self.execute_host_command(&command, true, false)
616 .await
617 .and(Ok(()))
618 }
619
620 pub async fn kill_forward_all_ports(&self) -> Result<()> {
621 let command = format!("host-serial:{}:killforward-all", self.serial);
622 self.execute_host_command(&command, false, false)
623 .await
624 .and(Ok(()))
625 }
626
627 pub async fn reverse_port(&self, remote: u16, local: u16) -> Result<u16> {
628 let command = format!("reverse:forward:tcp:{};tcp:{}", remote, local);
629 let response = self
630 .execute_host_command_to_string(&command, true, false)
631 .await?;
632
633 if remote == 0 {
634 Ok(response.parse::<u16>()?)
635 } else {
636 Ok(remote)
637 }
638 }
639
640 pub async fn kill_reverse_port(&self, remote: u16) -> Result<()> {
641 let command = format!("reverse:killforward:tcp:{}", remote);
642 self.execute_host_command(&command, true, true)
643 .await
644 .and(Ok(()))
645 }
646
647 pub async fn kill_reverse_all_ports(&self) -> Result<()> {
648 let command = "reverse:killforward-all".to_owned();
649 self.execute_host_command(&command, false, false)
650 .await
651 .and(Ok(()))
652 }
653
654 pub async fn list_dir(&self, src: &UnixPath) -> Result<Vec<RemoteDirEntry>> {
655 let src = src.to_path_buf();
656 let mut queue = vec![(src.clone(), 0, "".to_string())];
657
658 let mut listings = Vec::new();
659
660 while let Some((next, depth, prefix)) = queue.pop() {
661 for listing in self.list_dir_flat(&next, depth, prefix).await? {
662 if listing.metadata == RemoteMetadata::RemoteDir {
663 let mut child = src.clone();
664 child.push(listing.name.clone());
665 queue.push((child, depth + 1, listing.name.clone()));
666 }
667
668 listings.push(listing);
669 }
670 }
671
672 Ok(listings)
673 }
674
675 async fn list_dir_flat(
676 &self,
677 src: &UnixPath,
678 depth: usize,
679 prefix: String,
680 ) -> Result<Vec<RemoteDirEntry>> {
681 let mut stream = self.host.connect().await?;
683
684 let message = encode_message(&format!("host:transport:{}", self.serial))?;
686 stream.write_all(message.as_bytes()).await?;
687 let _bytes = read_response(&mut stream, false, true).await?;
688
689 let message = encode_message("sync:")?;
691 stream.write_all(message.as_bytes()).await?;
692 let _bytes = read_response(&mut stream, false, true).await?;
693
694 stream.write_all(SyncCommand::List.code()).await?;
696 let args_ = format!("{}", src.display());
697 let args = args_.as_bytes();
698 write_length_little_endian(&mut stream, args.len()).await?;
699 stream.write_all(args).await?;
700
701 let mut buf = [0; 64 * 1024];
703
704 let mut listings = Vec::new();
705
706 loop {
708 stream.read_exact(&mut buf[0..4]).await?;
709
710 if &buf[0..4] == SyncCommand::Dent.code() {
711 let mode = read_length_little_endian(&mut stream).await?;
721 let size = read_length_little_endian(&mut stream).await?;
722 let _time = read_length_little_endian(&mut stream).await?;
723 let name_length = read_length_little_endian(&mut stream).await?;
724 stream.read_exact(&mut buf[0..name_length]).await?;
725
726 let mut name = std::str::from_utf8(&buf[0..name_length])?.to_owned();
727
728 if name == "." || name == ".." {
729 continue;
730 }
731
732 if !prefix.is_empty() {
733 name = format!("{}/{}", prefix, &name);
734 }
735
736 let file_type = (mode >> 13) & 0b111;
737 let metadata = match file_type {
738 0b010 => RemoteMetadata::RemoteDir,
739 0b100 => RemoteMetadata::RemoteFile(RemoteFileMetadata {
740 mode: mode & 0b111111111,
741 size,
742 }),
743 0b101 => RemoteMetadata::RemoteSymlink,
744 _ => return Err(DeviceError::Adb(format!("Invalid file mode {}", file_type))),
745 };
746
747 listings.push(RemoteDirEntry {
748 name,
749 depth,
750 metadata,
751 });
752 } else if &buf[0..4] == SyncCommand::Done.code() {
753 break;
755 } else if &buf[0..4] == SyncCommand::Fail.code() {
756 let n = buf.len().min(read_length_little_endian(&mut stream).await?);
757
758 stream.read_exact(&mut buf[0..n]).await?;
759
760 let message = std::str::from_utf8(&buf[0..n])
761 .map(|s| format!("adb error: {}", s))
762 .unwrap_or_else(|_| "adb error was not utf-8".into());
763
764 return Err(DeviceError::Adb(message));
765 } else {
766 return Err(DeviceError::Adb("FAIL (unknown)".to_owned()));
767 }
768 }
769
770 Ok(listings)
771 }
772
773 pub async fn path_exists(&self, path: &UnixPath, enable_run_as: bool) -> Result<bool> {
774 self.execute_host_shell_command_as(format!("ls {}", path.display()).as_str(), enable_run_as)
775 .await
776 .map(|path| !path.contains("No such file or directory"))
777 }
778
779 pub async fn pull<W: AsyncWrite + Unpin>(&self, src: &UnixPath, buffer: &mut W) -> Result<()> {
780 let mut stream = self.host.connect().await?;
782
783 let message = encode_message(&format!("host:transport:{}", self.serial))?;
785 stream.write_all(message.as_bytes()).await?;
786 let _bytes = read_response(&mut stream, false, true).await?;
787
788 let message = encode_message("sync:")?;
790 stream.write_all(message.as_bytes()).await?;
791 let _bytes = read_response(&mut stream, false, true).await?;
792
793 stream.write_all(SyncCommand::Recv.code()).await?;
795 let args_string = format!("{}", src.display());
796 let args = args_string.as_bytes();
797 write_length_little_endian(&mut stream, args.len()).await?;
798 stream.write_all(args).await?;
799
800 let mut buf = [0; 64 * 1024];
802
803 loop {
805 stream.read_exact(&mut buf[0..4]).await?;
806
807 if &buf[0..4] == SyncCommand::Data.code() {
808 let len = read_length_little_endian(&mut stream).await?;
809 stream.read_exact(&mut buf[0..len]).await?;
810 buffer.write_all(&buf[0..len]).await?;
811 } else if &buf[0..4] == SyncCommand::Done.code() {
812 break;
814 } else if &buf[0..4] == SyncCommand::Fail.code() {
815 let n = buf.len().min(read_length_little_endian(&mut stream).await?);
816
817 stream.read_exact(&mut buf[0..n]).await?;
818
819 let message = std::str::from_utf8(&buf[0..n])
820 .map(|s| format!("adb error: {}", s))
821 .unwrap_or_else(|_| "adb error was not utf-8".into());
822
823 return Err(DeviceError::Adb(message));
824 } else {
825 return Err(DeviceError::Adb("FAIL (unknown)".to_owned()));
826 }
827 }
828
829 Ok(())
830 }
831
832 pub async fn pull_dir(&self, src: &UnixPath, dest_dir: &Path) -> Result<()> {
833 let src = src.to_path_buf();
834 let dest_dir = dest_dir.to_path_buf();
835
836 for entry in self.list_dir(&src).await? {
837 match entry.metadata {
838 RemoteMetadata::RemoteSymlink => {} RemoteMetadata::RemoteDir => {
840 let mut d = dest_dir.clone();
841 d.push(&entry.name);
842
843 std::fs::create_dir_all(&d)?;
844 }
845 RemoteMetadata::RemoteFile(_) => {
846 let mut s = src.clone();
847 s.push(&entry.name);
848 let mut d = dest_dir.clone();
849 d.push(&entry.name);
850
851 self.pull(&s, &mut File::create(d).await?).await?;
852 }
853 }
854 }
855
856 Ok(())
857 }
858
859 pub async fn push<R: AsyncRead + Unpin>(
860 &self,
861 buffer: &mut R,
862 dest: &UnixPath,
863 mode: u32,
864 ) -> Result<()> {
865 let enable_run_as = self.enable_run_as_for_path(&dest.to_path_buf());
874 let dest1 = match enable_run_as {
875 true => self.tempfile.as_path(),
876 false => UnixPath::new(dest),
877 };
878
879 let mut current = dest.parent();
891 let mut leaf: Option<&UnixPath> = None;
892 let mut root: Option<&UnixPath> = None;
893
894 while let Some(path) = current {
895 if self.path_exists(path, enable_run_as).await? {
896 break;
897 }
898 if leaf.is_none() {
899 leaf = Some(path);
900 }
901 root = Some(path);
902 current = path.parent();
903 }
904
905 if let Some(path) = leaf {
906 self.create_dir(path).await?;
907 }
908
909 if let Some(path) = root {
910 self.chmod(path, "777", true).await?;
911 }
912
913 let mut stream = self.host.connect().await?;
914
915 let message = encode_message(&format!("host:transport:{}", self.serial))?;
916 stream.write_all(message.as_bytes()).await?;
917 let _bytes = read_response(&mut stream, false, true).await?;
918
919 let message = encode_message("sync:")?;
920 stream.write_all(message.as_bytes()).await?;
921 let _bytes = read_response(&mut stream, false, true).await?;
922
923 stream.write_all(SyncCommand::Send.code()).await?;
924 let args_ = format!("{},{}", dest1.display(), mode);
925 let args = args_.as_bytes();
926 write_length_little_endian(&mut stream, args.len()).await?;
927 stream.write_all(args).await?;
928
929 let mut buf = [0; 32 * 1024];
932
933 loop {
934 let len = buffer.read(&mut buf).await?;
935
936 if len == 0 {
937 break;
938 }
939
940 stream.write_all(SyncCommand::Data.code()).await?;
941 write_length_little_endian(&mut stream, len).await?;
942 stream.write_all(&buf[0..len]).await?;
943 }
944
945 let time: u32 = ((SystemTime::now().duration_since(SystemTime::UNIX_EPOCH))
952 .unwrap()
953 .as_secs()
954 & 0xFFFF_FFFF) as u32;
955
956 stream.write_all(SyncCommand::Done.code()).await?;
957 write_length_little_endian(&mut stream, time as usize).await?;
958
959 stream.read_exact(&mut buf[0..4]).await?;
961
962 if buf.starts_with(SyncCommand::Okay.code()) {
963 if enable_run_as {
964 let result = self
966 .execute_host_shell_command_as(
967 format!("cp -aR {} {}", dest1.display(), dest.display()).as_str(),
968 enable_run_as,
969 )
970 .await;
971 if self.remove(dest1).await.is_err() {
972 warn!("Failed to remove {}", dest1.display());
973 }
974 result?;
975 }
976 Ok(())
977 } else if buf.starts_with(SyncCommand::Fail.code()) {
978 if enable_run_as && self.remove(dest1).await.is_err() {
979 warn!("Failed to remove {}", dest1.display());
980 }
981 let n = buf.len().min(read_length_little_endian(&mut stream).await?);
982
983 stream.read_exact(&mut buf[0..n]).await?;
984
985 let message = std::str::from_utf8(&buf[0..n])
986 .map(|s| format!("adb error: {}", s))
987 .unwrap_or_else(|_| "adb error was not utf-8".into());
988
989 Err(DeviceError::Adb(message))
990 } else {
991 if self.remove(dest1).await.is_err() {
992 warn!("Failed to remove {}", dest1.display());
993 }
994 Err(DeviceError::Adb("FAIL (unknown)".to_owned()))
995 }
996 }
997
998 pub async fn push_dir(&self, source: &Path, dest_dir: &UnixPath, mode: u32) -> Result<()> {
999 debug!("Pushing {} to {}", source.display(), dest_dir.display());
1000
1001 let walker = WalkDir::new(source).follow_links(false).into_iter();
1002
1003 for entry in walker {
1004 let entry = entry?;
1005 let path = entry.path();
1006
1007 if !entry.metadata()?.is_file() {
1008 continue;
1009 }
1010
1011 let mut file = File::open(path).await?;
1012
1013 let tail = path
1014 .strip_prefix(source)
1015 .map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?;
1016
1017 let dest = append_components(dest_dir, tail)?;
1018 self.push(&mut file, &dest, mode).await?;
1019 }
1020
1021 Ok(())
1022 }
1023
1024 pub async fn remove(&self, path: &UnixPath) -> Result<()> {
1025 debug!("Deleting {}", path.display());
1026
1027 self.execute_host_shell_command_as(
1028 &format!("rm -rf {}", path.display()),
1029 self.enable_run_as_for_path(path),
1030 )
1031 .await?;
1032
1033 Ok(())
1034 }
1035}
1036
1037pub(crate) fn append_components(
1038 base: &UnixPath,
1039 tail: &Path,
1040) -> std::result::Result<UnixPathBuf, io::Error> {
1041 let mut buf = base.to_path_buf();
1042
1043 for component in tail.components() {
1044 if let Component::Normal(segment) = component {
1045 let utf8 = segment.to_str().ok_or_else(|| {
1046 io::Error::new(
1047 io::ErrorKind::Other,
1048 "Could not represent path segment as UTF-8",
1049 )
1050 })?;
1051 buf.push(utf8);
1052 } else {
1053 return Err(io::Error::new(
1054 io::ErrorKind::Other,
1055 "Unexpected path component".to_owned(),
1056 ));
1057 }
1058 }
1059
1060 Ok(buf)
1061}