1use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
4
5use crate::error::{Error, Result};
6
7pub(crate) const SYNC_DATA_MAX: usize = 64 * 1024;
8
9const SYNC_MSG_MAX: usize = 256 * 1024;
11
12#[derive(Debug, Clone)]
18pub struct RemoteStat {
19 pub mode: u32,
21 pub size: u64,
23 pub mtime: u64,
25 pub dev: u64,
27 pub ino: u64,
29 pub nlink: u32,
31 pub uid: u32,
33 pub gid: u32,
35 pub atime: u64,
37 pub ctime: u64,
39}
40
41impl RemoteStat {
42 #[must_use]
46 pub fn exists(&self) -> bool {
47 self.mode != 0 || self.size != 0 || self.mtime != 0
48 }
49
50 fn from_v1(mode: u32, size: u32, mtime: u32) -> Self {
51 Self {
52 mode,
53 size: size as u64,
54 mtime: mtime as u64,
55 dev: 0, ino: 0, nlink: 0, uid: 0, gid: 0, atime: 0, ctime: 0,
56 }
57 }
58
59 #[must_use]
61 pub fn is_file(&self) -> bool {
62 (self.mode & 0o170000) == 0o100000
63 }
64
65 #[must_use]
67 pub fn is_dir(&self) -> bool {
68 (self.mode & 0o170000) == 0o040000
69 }
70}
71
72pub(crate) async fn sync_send(
75 w: &mut (impl AsyncWrite + Unpin),
76 tag: &[u8; 4],
77 data: &[u8],
78) -> Result<()> {
79 let mut pkt = Vec::with_capacity(8 + data.len());
80 pkt.extend_from_slice(tag);
81 pkt.extend_from_slice(&(data.len() as u32).to_le_bytes());
82 pkt.extend_from_slice(data);
83 w.write_all(&pkt).await?;
84 w.flush().await?;
85 Ok(())
86}
87
88pub(crate) async fn sync_read_header(
89 r: &mut (impl AsyncRead + Unpin),
90) -> Result<([u8; 4], u32)> {
91 let mut buf = [0u8; 8];
92 r.read_exact(&mut buf).await?;
93 let tag: [u8; 4] = buf[..4].try_into().unwrap();
94 let len = u32::from_le_bytes(buf[4..8].try_into().unwrap());
95 Ok((tag, len))
96}
97
98async fn read_sync_fail(r: &mut (impl AsyncRead + Unpin), len: u32) -> Error {
99 let msg_len = (len as usize).min(SYNC_MSG_MAX);
100 let mut msg = vec![0u8; msg_len];
101 match r.read_exact(&mut msg).await {
102 Ok(_) => Error::Adb(String::from_utf8_lossy(&msg).to_string()),
103 Err(e) => e.into(),
104 }
105}
106
107pub(crate) async fn stat_v1_sync(
110 stream: &mut (impl AsyncRead + AsyncWrite + Unpin),
111 remote_path: &str,
112) -> Result<RemoteStat> {
113 sync_send(stream, b"STAT", remote_path.as_bytes()).await?;
114
115 let mut resp = [0u8; 16]; stream.read_exact(&mut resp).await?;
117
118 if &resp[..4] != b"STAT" {
119 return Err(Error::Protocol(format!(
120 "expected STAT response, got {:?}",
121 String::from_utf8_lossy(&resp[..4])
122 )));
123 }
124
125 let mode = u32::from_le_bytes(resp[4..8].try_into().unwrap());
126 let size = u32::from_le_bytes(resp[8..12].try_into().unwrap());
127 let mtime = u32::from_le_bytes(resp[12..16].try_into().unwrap());
128
129 Ok(RemoteStat::from_v1(mode, size, mtime))
130}
131
132const STAT2_RESP_LEN: usize = 72;
135
136pub(crate) async fn stat2_sync(
137 stream: &mut (impl AsyncRead + AsyncWrite + Unpin),
138 remote_path: &str,
139) -> Result<RemoteStat> {
140 sync_send(stream, b"STA2", remote_path.as_bytes()).await?;
141
142 let mut resp = [0u8; STAT2_RESP_LEN];
143 stream.read_exact(&mut resp).await?;
144
145 let tag = &resp[..4];
146 if tag != b"STA2" {
147 return Err(Error::Protocol(format!(
148 "expected STA2 response, got {:?}",
149 String::from_utf8_lossy(tag)
150 )));
151 }
152
153 let error = u32::from_le_bytes(resp[4..8].try_into().unwrap());
154 if error != 0 {
155 return Err(Error::Adb(format!("STA2 error: {error}")));
156 }
157
158 let dev = u64::from_le_bytes(resp[8..16].try_into().unwrap());
159 let ino = u64::from_le_bytes(resp[16..24].try_into().unwrap());
160 let mode = u32::from_le_bytes(resp[24..28].try_into().unwrap());
161 let nlink = u32::from_le_bytes(resp[28..32].try_into().unwrap());
162 let uid = u32::from_le_bytes(resp[32..36].try_into().unwrap());
163 let gid = u32::from_le_bytes(resp[36..40].try_into().unwrap());
164 let size = u64::from_le_bytes(resp[40..48].try_into().unwrap());
165 let atime = u64::from_le_bytes(resp[48..56].try_into().unwrap());
166 let mtime = u64::from_le_bytes(resp[56..64].try_into().unwrap());
167 let ctime = u64::from_le_bytes(resp[64..72].try_into().unwrap());
168
169 Ok(RemoteStat { mode, size, mtime, dev, ino, nlink, uid, gid, atime, ctime })
170}
171
172#[derive(Debug, Clone)]
176pub struct DirEntry {
177 pub name: String,
179 pub mode: u32,
181 pub size: u32,
183 pub mtime: u32,
185}
186
187pub(crate) async fn list_sync(
188 stream: &mut (impl AsyncRead + AsyncWrite + Unpin),
189 remote_path: &str,
190) -> Result<Vec<DirEntry>> {
191 sync_send(stream, b"LIST", remote_path.as_bytes()).await?;
192
193 let mut entries = Vec::new();
194 loop {
195 let mut header = [0u8; 20];
198 stream.read_exact(&mut header).await?;
199
200 let tag: [u8; 4] = header[..4].try_into().unwrap();
201 match &tag {
202 b"DENT" => {
203 let mode = u32::from_le_bytes(header[4..8].try_into().unwrap());
204 let size = u32::from_le_bytes(header[8..12].try_into().unwrap());
205 let mtime = u32::from_le_bytes(header[12..16].try_into().unwrap());
206 let name_len = u32::from_le_bytes(header[16..20].try_into().unwrap()) as usize;
207
208 if name_len > SYNC_DATA_MAX {
209 return Err(Error::Protocol("DENT name too long".into()));
210 }
211 let mut name_buf = vec![0u8; name_len];
212 if name_len > 0 {
213 stream.read_exact(&mut name_buf).await?;
214 }
215 let name = String::from_utf8_lossy(&name_buf).to_string();
216
217 if name != "." && name != ".." {
218 entries.push(DirEntry { name, mode, size, mtime });
219 }
220 }
221 b"DONE" => break,
222 b"FAIL" => {
223 let msg_len = u32::from_le_bytes(header[4..8].try_into().unwrap());
224 return Err(read_sync_fail(stream, msg_len).await);
225 }
226 _ => {
227 return Err(Error::Protocol(format!(
228 "unexpected LIST tag: {:?}",
229 String::from_utf8_lossy(&tag)
230 )));
231 }
232 }
233 }
234
235 Ok(entries)
236}
237
238pub(crate) async fn pull_sync(
241 stream: &mut (impl AsyncRead + AsyncWrite + Unpin),
242 remote_path: &str,
243 writer: &mut (impl AsyncWrite + Unpin),
244) -> Result<u64> {
245 sync_send(stream, b"RECV", remote_path.as_bytes()).await?;
246
247 let mut total: u64 = 0;
248 let mut buf = vec![0u8; SYNC_DATA_MAX];
249 loop {
250 let (tag, len) = sync_read_header(stream).await?;
251 match &tag {
252 b"DATA" => {
253 let mut remaining = len as usize;
254 while remaining > 0 {
255 let n = remaining.min(buf.len());
256 stream.read_exact(&mut buf[..n]).await?;
257 writer.write_all(&buf[..n]).await?;
258 remaining -= n;
259 total += n as u64;
260 }
261 }
262 b"DONE" => break,
263 b"FAIL" => return Err(read_sync_fail(stream, len).await),
264 _ => {
265 return Err(Error::Protocol(format!(
266 "unexpected sync tag: {:?}",
267 String::from_utf8_lossy(&tag)
268 )))
269 }
270 }
271 }
272
273 sync_send(stream, b"QUIT", &[]).await?;
274 Ok(total)
275}
276
277pub(crate) async fn push_sync(
280 stream: &mut (impl AsyncRead + AsyncWrite + Unpin),
281 remote_path: &str,
282 mode: u32,
283 mtime: u32,
284 reader: &mut (impl AsyncRead + Unpin),
285) -> Result<()> {
286 let header = format!("{remote_path},{mode}");
287 sync_send(stream, b"SEND", header.as_bytes()).await?;
288
289 let mut buf = vec![0u8; 8 + SYNC_DATA_MAX];
290 loop {
291 let n = read_fill(reader, &mut buf[8..]).await?;
292 if n == 0 {
293 break;
294 }
295 buf[..4].copy_from_slice(b"DATA");
296 buf[4..8].copy_from_slice(&(n as u32).to_le_bytes());
297 stream.write_all(&buf[..8 + n]).await?;
298 }
299
300 let mut done = [0u8; 8];
301 done[..4].copy_from_slice(b"DONE");
302 done[4..8].copy_from_slice(&mtime.to_le_bytes());
303 stream.write_all(&done).await?;
304 stream.flush().await?;
305
306 let (tag, len) = sync_read_header(stream).await?;
307 match &tag {
308 b"OKAY" => Ok(()),
309 b"FAIL" => Err(read_sync_fail(stream, len).await),
310 _ => Err(Error::Protocol(format!(
311 "unexpected sync response: {:?}",
312 String::from_utf8_lossy(&tag)
313 ))),
314 }
315}
316
317async fn read_fill(r: &mut (impl AsyncRead + Unpin), buf: &mut [u8]) -> std::io::Result<usize> {
319 let mut pos = 0;
320 while pos < buf.len() {
321 match r.read(&mut buf[pos..]).await? {
322 0 => break,
323 n => pos += n,
324 }
325 }
326 Ok(pos)
327}
328
329#[cfg(test)]
330mod tests {
331 use super::*;
332 use std::io::Cursor;
333
334 struct MockStream {
336 read: Cursor<Vec<u8>>,
337 written: Vec<u8>,
338 }
339
340 impl MockStream {
341 fn from_response(data: Vec<u8>) -> Self {
342 Self {
343 read: Cursor::new(data),
344 written: Vec::new(),
345 }
346 }
347 }
348
349 impl AsyncRead for MockStream {
350 fn poll_read(
351 mut self: std::pin::Pin<&mut Self>,
352 _cx: &mut std::task::Context<'_>,
353 buf: &mut tokio::io::ReadBuf<'_>,
354 ) -> std::task::Poll<std::io::Result<()>> {
355 let pos = self.read.position() as usize;
356 let inner = self.read.get_ref();
357 let remaining = &inner[pos..];
358 let n = remaining.len().min(buf.remaining());
359 buf.put_slice(&remaining[..n]);
360 self.read.set_position((pos + n) as u64);
361 std::task::Poll::Ready(Ok(()))
362 }
363 }
364
365 impl AsyncWrite for MockStream {
366 fn poll_write(
367 mut self: std::pin::Pin<&mut Self>,
368 _cx: &mut std::task::Context<'_>,
369 buf: &[u8],
370 ) -> std::task::Poll<std::io::Result<usize>> {
371 self.written.extend_from_slice(buf);
372 std::task::Poll::Ready(Ok(buf.len()))
373 }
374
375 fn poll_flush(
376 self: std::pin::Pin<&mut Self>,
377 _cx: &mut std::task::Context<'_>,
378 ) -> std::task::Poll<std::io::Result<()>> {
379 std::task::Poll::Ready(Ok(()))
380 }
381
382 fn poll_shutdown(
383 self: std::pin::Pin<&mut Self>,
384 _cx: &mut std::task::Context<'_>,
385 ) -> std::task::Poll<std::io::Result<()>> {
386 std::task::Poll::Ready(Ok(()))
387 }
388 }
389
390 #[tokio::test]
391 async fn sync_send_roundtrip() {
392 let mut buf = Vec::new();
393 sync_send(&mut buf, b"RECV", b"/sdcard/test.txt").await.unwrap();
394
395 assert_eq!(&buf[..4], b"RECV");
396 let len = u32::from_le_bytes(buf[4..8].try_into().unwrap());
397 assert_eq!(len, 16);
398 assert_eq!(&buf[8..], b"/sdcard/test.txt");
399 }
400
401 #[tokio::test]
402 async fn sync_header_roundtrip() {
403 let bytes: Vec<u8> = b"DATA"
404 .iter()
405 .chain(&100u32.to_le_bytes())
406 .copied()
407 .collect();
408 let mut cur = Cursor::new(bytes);
409 let (tag, len) = sync_read_header(&mut cur).await.unwrap();
410 assert_eq!(&tag, b"DATA");
411 assert_eq!(len, 100);
412 }
413
414 #[tokio::test]
415 async fn pull_sync_single_chunk() {
416 let payload = b"hello world";
417 let mut wire = Vec::new();
418 wire.extend_from_slice(b"DATA");
419 wire.extend_from_slice(&(payload.len() as u32).to_le_bytes());
420 wire.extend_from_slice(payload);
421 wire.extend_from_slice(b"DONE");
422 wire.extend_from_slice(&0u32.to_le_bytes());
423
424 let mut stream = MockStream::from_response(wire);
425 let mut output = Vec::new();
426 let n = pull_sync(&mut stream, "/sdcard/test.txt", &mut output).await.unwrap();
427
428 assert_eq!(n, 11);
429 assert_eq!(&output, b"hello world");
430 assert_eq!(&stream.written[..4], b"RECV");
431 }
432
433 #[tokio::test]
434 async fn pull_sync_multiple_chunks() {
435 let mut wire = Vec::new();
436 for chunk in [b"aaa".as_slice(), b"bbb"] {
437 wire.extend_from_slice(b"DATA");
438 wire.extend_from_slice(&(chunk.len() as u32).to_le_bytes());
439 wire.extend_from_slice(chunk);
440 }
441 wire.extend_from_slice(b"DONE");
442 wire.extend_from_slice(&0u32.to_le_bytes());
443
444 let mut stream = MockStream::from_response(wire);
445 let mut output = Vec::new();
446 let n = pull_sync(&mut stream, "/test", &mut output).await.unwrap();
447
448 assert_eq!(n, 6);
449 assert_eq!(&output, b"aaabbb");
450 }
451
452 #[tokio::test]
453 async fn pull_sync_fail() {
454 let msg = b"file not found";
455 let mut wire = Vec::new();
456 wire.extend_from_slice(b"FAIL");
457 wire.extend_from_slice(&(msg.len() as u32).to_le_bytes());
458 wire.extend_from_slice(msg);
459
460 let mut stream = MockStream::from_response(wire);
461 let mut output = Vec::new();
462 let err = pull_sync(&mut stream, "/nope", &mut output).await.unwrap_err();
463 assert!(matches!(err, Error::Adb(m) if m == "file not found"));
464 }
465
466 #[tokio::test]
467 async fn push_sync_roundtrip() {
468 let response: Vec<u8> = b"OKAY"
469 .iter()
470 .chain(&0u32.to_le_bytes())
471 .copied()
472 .collect();
473 let mut stream = MockStream::from_response(response);
474
475 let data = b"file contents";
476 let mut reader = Cursor::new(data.as_slice());
477 push_sync(&mut stream, "/sdcard/out.txt", 0o644, 1000, &mut reader).await.unwrap();
478
479 assert_eq!(&stream.written[..4], b"SEND");
480 let w = &stream.written;
481 let tail = &w[w.len() - 8..];
482 assert_eq!(&tail[..4], b"DONE");
483 assert_eq!(u32::from_le_bytes(tail[4..8].try_into().unwrap()), 1000);
484 }
485
486 #[tokio::test]
487 async fn read_fill_partial() {
488 let data = b"hello";
489 let mut reader = Cursor::new(data.as_slice());
490 let mut buf = [0u8; 10];
491 let n = read_fill(&mut reader, &mut buf).await.unwrap();
492 assert_eq!(n, 5);
493 assert_eq!(&buf[..5], b"hello");
494 }
495
496 #[tokio::test]
499 async fn stat_v1_existing_file() {
500 let mut wire = Vec::new();
501 wire.extend_from_slice(b"STAT");
502 wire.extend_from_slice(&0o100644u32.to_le_bytes());
503 wire.extend_from_slice(&1024u32.to_le_bytes());
504 wire.extend_from_slice(&1700000000u32.to_le_bytes());
505
506 let mut stream = MockStream::from_response(wire);
507 let st = stat_v1_sync(&mut stream, "/sdcard/test.txt").await.unwrap();
508
509 assert!(st.exists());
510 assert!(st.is_file());
511 assert!(!st.is_dir());
512 assert_eq!(st.size, 1024);
513 assert_eq!(st.mtime, 1700000000);
514 }
515
516 #[tokio::test]
517 async fn stat_v1_directory() {
518 let mut wire = Vec::new();
519 wire.extend_from_slice(b"STAT");
520 wire.extend_from_slice(&0o040755u32.to_le_bytes());
521 wire.extend_from_slice(&4096u32.to_le_bytes());
522 wire.extend_from_slice(&1700000000u32.to_le_bytes());
523
524 let mut stream = MockStream::from_response(wire);
525 let st = stat_v1_sync(&mut stream, "/sdcard").await.unwrap();
526
527 assert!(st.exists());
528 assert!(!st.is_file());
529 assert!(st.is_dir());
530 }
531
532 #[tokio::test]
533 async fn stat_v1_nonexistent() {
534 let mut wire = Vec::new();
535 wire.extend_from_slice(b"STAT");
536 wire.extend_from_slice(&0u32.to_le_bytes());
537 wire.extend_from_slice(&0u32.to_le_bytes());
538 wire.extend_from_slice(&0u32.to_le_bytes());
539
540 let mut stream = MockStream::from_response(wire);
541 let st = stat_v1_sync(&mut stream, "/nonexistent").await.unwrap();
542
543 assert!(!st.exists());
544 }
545
546 #[tokio::test]
549 async fn stat2_existing_file() {
550 let mut wire = Vec::new();
551 wire.extend_from_slice(b"STA2");
552 wire.extend_from_slice(&0u32.to_le_bytes()); wire.extend_from_slice(&1u64.to_le_bytes()); wire.extend_from_slice(&12345u64.to_le_bytes()); wire.extend_from_slice(&0o100644u32.to_le_bytes()); wire.extend_from_slice(&1u32.to_le_bytes()); wire.extend_from_slice(&1000u32.to_le_bytes()); wire.extend_from_slice(&1000u32.to_le_bytes()); wire.extend_from_slice(&5_000_000_000u64.to_le_bytes()); wire.extend_from_slice(&1700000000u64.to_le_bytes()); wire.extend_from_slice(&1700000001u64.to_le_bytes()); wire.extend_from_slice(&1700000002u64.to_le_bytes()); let mut stream = MockStream::from_response(wire);
565 let st = stat2_sync(&mut stream, "/sdcard/large.bin").await.unwrap();
566
567 assert!(st.exists());
568 assert!(st.is_file());
569 assert_eq!(st.size, 5_000_000_000);
570 assert_eq!(st.mtime, 1700000001);
571 assert_eq!(st.uid, 1000);
572 assert_eq!(st.ino, 12345);
573 }
574
575 #[tokio::test]
576 async fn stat2_error_returns_err() {
577 let mut wire = Vec::new();
578 wire.extend_from_slice(b"STA2");
579 wire.extend_from_slice(&1u32.to_le_bytes()); wire.extend_from_slice(&[0u8; 64]); let mut stream = MockStream::from_response(wire);
583 assert!(stat2_sync(&mut stream, "/nope").await.is_err());
584 }
585
586 fn make_dent(name: &str, mode: u32, size: u32, mtime: u32) -> Vec<u8> {
589 let mut buf = Vec::new();
590 buf.extend_from_slice(b"DENT");
591 buf.extend_from_slice(&mode.to_le_bytes());
592 buf.extend_from_slice(&size.to_le_bytes());
593 buf.extend_from_slice(&mtime.to_le_bytes());
594 buf.extend_from_slice(&(name.len() as u32).to_le_bytes());
595 buf.extend_from_slice(name.as_bytes());
596 buf
597 }
598
599 fn make_done() -> Vec<u8> {
600 let mut buf = Vec::new();
601 buf.extend_from_slice(b"DONE");
602 buf.extend_from_slice(&[0u8; 16]); buf
604 }
605
606 #[tokio::test]
607 async fn list_sync_entries() {
608 let mut wire = Vec::new();
609 wire.extend(make_dent(".", 0o040755, 0, 0));
610 wire.extend(make_dent("..", 0o040755, 0, 0));
611 wire.extend(make_dent("hello.txt", 0o100644, 42, 1700000000));
612 wire.extend(make_dent("subdir", 0o040755, 4096, 1700000000));
613 wire.extend(make_done());
614
615 let mut stream = MockStream::from_response(wire);
616 let entries = list_sync(&mut stream, "/sdcard").await.unwrap();
617
618 assert_eq!(entries.len(), 2); assert_eq!(entries[0].name, "hello.txt");
620 assert_eq!(entries[0].size, 42);
621 assert_eq!(entries[1].name, "subdir");
622 }
623
624 #[tokio::test]
625 async fn list_sync_empty_dir() {
626 let wire = make_done();
627 let mut stream = MockStream::from_response(wire);
628 let entries = list_sync(&mut stream, "/empty").await.unwrap();
629 assert!(entries.is_empty());
630 }
631}