Skip to main content

adb_wire/
sync.rs

1//! ADB sync protocol (binary-framed file transfer and stat).
2
3use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
4
5use crate::error::{Error, Result};
6
7pub(crate) const SYNC_DATA_MAX: usize = 64 * 1024;
8
9/// Maximum size for error messages from the sync protocol (256 KiB).
10const SYNC_MSG_MAX: usize = 256 * 1024;
11
12/// File metadata returned by [`AdbWire::stat`](crate::AdbWire::stat).
13///
14/// When the device supports STAT2, all fields are populated from the extended
15/// response. On older devices only `mode`, `size`, and `mtime` are set (the
16/// rest default to zero).
17#[derive(Debug, Clone)]
18pub struct RemoteStat {
19    /// File mode (type + permissions), e.g. `0o100644` for a regular file.
20    pub mode: u32,
21    /// File size in bytes.
22    pub size: u64,
23    /// Last modification time as seconds since the Unix epoch.
24    pub mtime: u64,
25    /// Device ID (STAT2 only).
26    pub dev: u64,
27    /// Inode number (STAT2 only).
28    pub ino: u64,
29    /// Number of hard links (STAT2 only).
30    pub nlink: u32,
31    /// Owner user ID (STAT2 only).
32    pub uid: u32,
33    /// Owner group ID (STAT2 only).
34    pub gid: u32,
35    /// Last access time as seconds since the Unix epoch (STAT2 only).
36    pub atime: u64,
37    /// Status change time as seconds since the Unix epoch (STAT2 only).
38    pub ctime: u64,
39}
40
41impl RemoteStat {
42    /// Returns `true` if the stat result represents an existing file or directory.
43    ///
44    /// A zeroed-out stat means the path does not exist.
45    #[must_use]
46    pub fn exists(&self) -> bool {
47        self.mode != 0 || self.size != 0 || self.mtime != 0
48    }
49
50    fn from_v1(mode: u32, size: u32, mtime: u32) -> Self {
51        Self {
52            mode,
53            size: size as u64,
54            mtime: mtime as u64,
55            dev: 0, ino: 0, nlink: 0, uid: 0, gid: 0, atime: 0, ctime: 0,
56        }
57    }
58
59    /// Returns `true` if this is a regular file.
60    #[must_use]
61    pub fn is_file(&self) -> bool {
62        (self.mode & 0o170000) == 0o100000
63    }
64
65    /// Returns `true` if this is a directory.
66    #[must_use]
67    pub fn is_dir(&self) -> bool {
68        (self.mode & 0o170000) == 0o040000
69    }
70}
71
72// -- Low-level sync helpers ---------------------------------------------------
73
74pub(crate) async fn sync_send(
75    w: &mut (impl AsyncWrite + Unpin),
76    tag: &[u8; 4],
77    data: &[u8],
78) -> Result<()> {
79    let mut pkt = Vec::with_capacity(8 + data.len());
80    pkt.extend_from_slice(tag);
81    pkt.extend_from_slice(&(data.len() as u32).to_le_bytes());
82    pkt.extend_from_slice(data);
83    w.write_all(&pkt).await?;
84    w.flush().await?;
85    Ok(())
86}
87
88pub(crate) async fn sync_read_header(
89    r: &mut (impl AsyncRead + Unpin),
90) -> Result<([u8; 4], u32)> {
91    let mut buf = [0u8; 8];
92    r.read_exact(&mut buf).await?;
93    let tag: [u8; 4] = buf[..4].try_into().unwrap();
94    let len = u32::from_le_bytes(buf[4..8].try_into().unwrap());
95    Ok((tag, len))
96}
97
98async fn read_sync_fail(r: &mut (impl AsyncRead + Unpin), len: u32) -> Error {
99    let msg_len = (len as usize).min(SYNC_MSG_MAX);
100    let mut msg = vec![0u8; msg_len];
101    match r.read_exact(&mut msg).await {
102        Ok(_) => Error::Adb(String::from_utf8_lossy(&msg).to_string()),
103        Err(e) => e.into(),
104    }
105}
106
107// -- STAT ---------------------------------------------------------------------
108
109pub(crate) async fn stat_v1_sync(
110    stream: &mut (impl AsyncRead + AsyncWrite + Unpin),
111    remote_path: &str,
112) -> Result<RemoteStat> {
113    sync_send(stream, b"STAT", remote_path.as_bytes()).await?;
114
115    let mut resp = [0u8; 16]; // "STAT" + mode(4) + size(4) + mtime(4)
116    stream.read_exact(&mut resp).await?;
117
118    if &resp[..4] != b"STAT" {
119        return Err(Error::Protocol(format!(
120            "expected STAT response, got {:?}",
121            String::from_utf8_lossy(&resp[..4])
122        )));
123    }
124
125    let mode = u32::from_le_bytes(resp[4..8].try_into().unwrap());
126    let size = u32::from_le_bytes(resp[8..12].try_into().unwrap());
127    let mtime = u32::from_le_bytes(resp[12..16].try_into().unwrap());
128
129    Ok(RemoteStat::from_v1(mode, size, mtime))
130}
131
132/// STAT2 response: tag(4) + error(4) + dev(8) + ino(8) + mode(4) + nlink(4) +
133///                 uid(4) + gid(4) + size(8) + atime(8) + mtime(8) + ctime(8) = 72 bytes
134const STAT2_RESP_LEN: usize = 72;
135
136pub(crate) async fn stat2_sync(
137    stream: &mut (impl AsyncRead + AsyncWrite + Unpin),
138    remote_path: &str,
139) -> Result<RemoteStat> {
140    sync_send(stream, b"STA2", remote_path.as_bytes()).await?;
141
142    let mut resp = [0u8; STAT2_RESP_LEN];
143    stream.read_exact(&mut resp).await?;
144
145    let tag = &resp[..4];
146    if tag != b"STA2" {
147        return Err(Error::Protocol(format!(
148            "expected STA2 response, got {:?}",
149            String::from_utf8_lossy(tag)
150        )));
151    }
152
153    let error = u32::from_le_bytes(resp[4..8].try_into().unwrap());
154    if error != 0 {
155        return Err(Error::Adb(format!("STA2 error: {error}")));
156    }
157
158    let dev   = u64::from_le_bytes(resp[8..16].try_into().unwrap());
159    let ino   = u64::from_le_bytes(resp[16..24].try_into().unwrap());
160    let mode  = u32::from_le_bytes(resp[24..28].try_into().unwrap());
161    let nlink = u32::from_le_bytes(resp[28..32].try_into().unwrap());
162    let uid   = u32::from_le_bytes(resp[32..36].try_into().unwrap());
163    let gid   = u32::from_le_bytes(resp[36..40].try_into().unwrap());
164    let size  = u64::from_le_bytes(resp[40..48].try_into().unwrap());
165    let atime = u64::from_le_bytes(resp[48..56].try_into().unwrap());
166    let mtime = u64::from_le_bytes(resp[56..64].try_into().unwrap());
167    let ctime = u64::from_le_bytes(resp[64..72].try_into().unwrap());
168
169    Ok(RemoteStat { mode, size, mtime, dev, ino, nlink, uid, gid, atime, ctime })
170}
171
172// -- LIST (directory listing) --------------------------------------------------
173
174/// Entry from a remote directory listing.
175#[derive(Debug, Clone)]
176pub struct DirEntry {
177    /// File name (without path).
178    pub name: String,
179    /// File mode (type + permissions).
180    pub mode: u32,
181    /// File size in bytes (32-bit, from sync v1 protocol).
182    pub size: u32,
183    /// Last modification time as seconds since the Unix epoch.
184    pub mtime: u32,
185}
186
187pub(crate) async fn list_sync(
188    stream: &mut (impl AsyncRead + AsyncWrite + Unpin),
189    remote_path: &str,
190) -> Result<Vec<DirEntry>> {
191    sync_send(stream, b"LIST", remote_path.as_bytes()).await?;
192
193    let mut entries = Vec::new();
194    loop {
195        // DENT header: tag(4) + mode(4) + size(4) + mtime(4) + namelen(4) = 20 bytes
196        // DONE uses the same 20-byte layout.
197        let mut header = [0u8; 20];
198        stream.read_exact(&mut header).await?;
199
200        let tag: [u8; 4] = header[..4].try_into().unwrap();
201        match &tag {
202            b"DENT" => {
203                let mode = u32::from_le_bytes(header[4..8].try_into().unwrap());
204                let size = u32::from_le_bytes(header[8..12].try_into().unwrap());
205                let mtime = u32::from_le_bytes(header[12..16].try_into().unwrap());
206                let name_len = u32::from_le_bytes(header[16..20].try_into().unwrap()) as usize;
207
208                if name_len > SYNC_DATA_MAX {
209                    return Err(Error::Protocol("DENT name too long".into()));
210                }
211                let mut name_buf = vec![0u8; name_len];
212                if name_len > 0 {
213                    stream.read_exact(&mut name_buf).await?;
214                }
215                let name = String::from_utf8_lossy(&name_buf).to_string();
216
217                if name != "." && name != ".." {
218                    entries.push(DirEntry { name, mode, size, mtime });
219                }
220            }
221            b"DONE" => break,
222            b"FAIL" => {
223                let msg_len = u32::from_le_bytes(header[4..8].try_into().unwrap());
224                return Err(read_sync_fail(stream, msg_len).await);
225            }
226            _ => {
227                return Err(Error::Protocol(format!(
228                    "unexpected LIST tag: {:?}",
229                    String::from_utf8_lossy(&tag)
230                )));
231            }
232        }
233    }
234
235    Ok(entries)
236}
237
238// -- RECV (pull) --------------------------------------------------------------
239
240pub(crate) async fn pull_sync(
241    stream: &mut (impl AsyncRead + AsyncWrite + Unpin),
242    remote_path: &str,
243    writer: &mut (impl AsyncWrite + Unpin),
244) -> Result<u64> {
245    sync_send(stream, b"RECV", remote_path.as_bytes()).await?;
246
247    let mut total: u64 = 0;
248    let mut buf = vec![0u8; SYNC_DATA_MAX];
249    loop {
250        let (tag, len) = sync_read_header(stream).await?;
251        match &tag {
252            b"DATA" => {
253                let mut remaining = len as usize;
254                while remaining > 0 {
255                    let n = remaining.min(buf.len());
256                    stream.read_exact(&mut buf[..n]).await?;
257                    writer.write_all(&buf[..n]).await?;
258                    remaining -= n;
259                    total += n as u64;
260                }
261            }
262            b"DONE" => break,
263            b"FAIL" => return Err(read_sync_fail(stream, len).await),
264            _ => {
265                return Err(Error::Protocol(format!(
266                    "unexpected sync tag: {:?}",
267                    String::from_utf8_lossy(&tag)
268                )))
269            }
270        }
271    }
272
273    sync_send(stream, b"QUIT", &[]).await?;
274    Ok(total)
275}
276
277// -- SEND (push) --------------------------------------------------------------
278
279pub(crate) async fn push_sync(
280    stream: &mut (impl AsyncRead + AsyncWrite + Unpin),
281    remote_path: &str,
282    mode: u32,
283    mtime: u32,
284    reader: &mut (impl AsyncRead + Unpin),
285) -> Result<()> {
286    let header = format!("{remote_path},{mode}");
287    sync_send(stream, b"SEND", header.as_bytes()).await?;
288
289    let mut buf = vec![0u8; 8 + SYNC_DATA_MAX];
290    loop {
291        let n = read_fill(reader, &mut buf[8..]).await?;
292        if n == 0 {
293            break;
294        }
295        buf[..4].copy_from_slice(b"DATA");
296        buf[4..8].copy_from_slice(&(n as u32).to_le_bytes());
297        stream.write_all(&buf[..8 + n]).await?;
298    }
299
300    let mut done = [0u8; 8];
301    done[..4].copy_from_slice(b"DONE");
302    done[4..8].copy_from_slice(&mtime.to_le_bytes());
303    stream.write_all(&done).await?;
304    stream.flush().await?;
305
306    let (tag, len) = sync_read_header(stream).await?;
307    match &tag {
308        b"OKAY" => Ok(()),
309        b"FAIL" => Err(read_sync_fail(stream, len).await),
310        _ => Err(Error::Protocol(format!(
311            "unexpected sync response: {:?}",
312            String::from_utf8_lossy(&tag)
313        ))),
314    }
315}
316
317/// Read until `buf` is full or EOF. Returns bytes read.
318async fn read_fill(r: &mut (impl AsyncRead + Unpin), buf: &mut [u8]) -> std::io::Result<usize> {
319    let mut pos = 0;
320    while pos < buf.len() {
321        match r.read(&mut buf[pos..]).await? {
322            0 => break,
323            n => pos += n,
324        }
325    }
326    Ok(pos)
327}
328
329#[cfg(test)]
330mod tests {
331    use super::*;
332    use std::io::Cursor;
333
334    /// Mock stream with separate read/write buffers.
335    struct MockStream {
336        read: Cursor<Vec<u8>>,
337        written: Vec<u8>,
338    }
339
340    impl MockStream {
341        fn from_response(data: Vec<u8>) -> Self {
342            Self {
343                read: Cursor::new(data),
344                written: Vec::new(),
345            }
346        }
347    }
348
349    impl AsyncRead for MockStream {
350        fn poll_read(
351            mut self: std::pin::Pin<&mut Self>,
352            _cx: &mut std::task::Context<'_>,
353            buf: &mut tokio::io::ReadBuf<'_>,
354        ) -> std::task::Poll<std::io::Result<()>> {
355            let pos = self.read.position() as usize;
356            let inner = self.read.get_ref();
357            let remaining = &inner[pos..];
358            let n = remaining.len().min(buf.remaining());
359            buf.put_slice(&remaining[..n]);
360            self.read.set_position((pos + n) as u64);
361            std::task::Poll::Ready(Ok(()))
362        }
363    }
364
365    impl AsyncWrite for MockStream {
366        fn poll_write(
367            mut self: std::pin::Pin<&mut Self>,
368            _cx: &mut std::task::Context<'_>,
369            buf: &[u8],
370        ) -> std::task::Poll<std::io::Result<usize>> {
371            self.written.extend_from_slice(buf);
372            std::task::Poll::Ready(Ok(buf.len()))
373        }
374
375        fn poll_flush(
376            self: std::pin::Pin<&mut Self>,
377            _cx: &mut std::task::Context<'_>,
378        ) -> std::task::Poll<std::io::Result<()>> {
379            std::task::Poll::Ready(Ok(()))
380        }
381
382        fn poll_shutdown(
383            self: std::pin::Pin<&mut Self>,
384            _cx: &mut std::task::Context<'_>,
385        ) -> std::task::Poll<std::io::Result<()>> {
386            std::task::Poll::Ready(Ok(()))
387        }
388    }
389
390    #[tokio::test]
391    async fn sync_send_roundtrip() {
392        let mut buf = Vec::new();
393        sync_send(&mut buf, b"RECV", b"/sdcard/test.txt").await.unwrap();
394
395        assert_eq!(&buf[..4], b"RECV");
396        let len = u32::from_le_bytes(buf[4..8].try_into().unwrap());
397        assert_eq!(len, 16);
398        assert_eq!(&buf[8..], b"/sdcard/test.txt");
399    }
400
401    #[tokio::test]
402    async fn sync_header_roundtrip() {
403        let bytes: Vec<u8> = b"DATA"
404            .iter()
405            .chain(&100u32.to_le_bytes())
406            .copied()
407            .collect();
408        let mut cur = Cursor::new(bytes);
409        let (tag, len) = sync_read_header(&mut cur).await.unwrap();
410        assert_eq!(&tag, b"DATA");
411        assert_eq!(len, 100);
412    }
413
414    #[tokio::test]
415    async fn pull_sync_single_chunk() {
416        let payload = b"hello world";
417        let mut wire = Vec::new();
418        wire.extend_from_slice(b"DATA");
419        wire.extend_from_slice(&(payload.len() as u32).to_le_bytes());
420        wire.extend_from_slice(payload);
421        wire.extend_from_slice(b"DONE");
422        wire.extend_from_slice(&0u32.to_le_bytes());
423
424        let mut stream = MockStream::from_response(wire);
425        let mut output = Vec::new();
426        let n = pull_sync(&mut stream, "/sdcard/test.txt", &mut output).await.unwrap();
427
428        assert_eq!(n, 11);
429        assert_eq!(&output, b"hello world");
430        assert_eq!(&stream.written[..4], b"RECV");
431    }
432
433    #[tokio::test]
434    async fn pull_sync_multiple_chunks() {
435        let mut wire = Vec::new();
436        for chunk in [b"aaa".as_slice(), b"bbb"] {
437            wire.extend_from_slice(b"DATA");
438            wire.extend_from_slice(&(chunk.len() as u32).to_le_bytes());
439            wire.extend_from_slice(chunk);
440        }
441        wire.extend_from_slice(b"DONE");
442        wire.extend_from_slice(&0u32.to_le_bytes());
443
444        let mut stream = MockStream::from_response(wire);
445        let mut output = Vec::new();
446        let n = pull_sync(&mut stream, "/test", &mut output).await.unwrap();
447
448        assert_eq!(n, 6);
449        assert_eq!(&output, b"aaabbb");
450    }
451
452    #[tokio::test]
453    async fn pull_sync_fail() {
454        let msg = b"file not found";
455        let mut wire = Vec::new();
456        wire.extend_from_slice(b"FAIL");
457        wire.extend_from_slice(&(msg.len() as u32).to_le_bytes());
458        wire.extend_from_slice(msg);
459
460        let mut stream = MockStream::from_response(wire);
461        let mut output = Vec::new();
462        let err = pull_sync(&mut stream, "/nope", &mut output).await.unwrap_err();
463        assert!(matches!(err, Error::Adb(m) if m == "file not found"));
464    }
465
466    #[tokio::test]
467    async fn push_sync_roundtrip() {
468        let response: Vec<u8> = b"OKAY"
469            .iter()
470            .chain(&0u32.to_le_bytes())
471            .copied()
472            .collect();
473        let mut stream = MockStream::from_response(response);
474
475        let data = b"file contents";
476        let mut reader = Cursor::new(data.as_slice());
477        push_sync(&mut stream, "/sdcard/out.txt", 0o644, 1000, &mut reader).await.unwrap();
478
479        assert_eq!(&stream.written[..4], b"SEND");
480        let w = &stream.written;
481        let tail = &w[w.len() - 8..];
482        assert_eq!(&tail[..4], b"DONE");
483        assert_eq!(u32::from_le_bytes(tail[4..8].try_into().unwrap()), 1000);
484    }
485
486    #[tokio::test]
487    async fn read_fill_partial() {
488        let data = b"hello";
489        let mut reader = Cursor::new(data.as_slice());
490        let mut buf = [0u8; 10];
491        let n = read_fill(&mut reader, &mut buf).await.unwrap();
492        assert_eq!(n, 5);
493        assert_eq!(&buf[..5], b"hello");
494    }
495
496    // -- stat v1 tests --------------------------------------------------------
497
498    #[tokio::test]
499    async fn stat_v1_existing_file() {
500        let mut wire = Vec::new();
501        wire.extend_from_slice(b"STAT");
502        wire.extend_from_slice(&0o100644u32.to_le_bytes());
503        wire.extend_from_slice(&1024u32.to_le_bytes());
504        wire.extend_from_slice(&1700000000u32.to_le_bytes());
505
506        let mut stream = MockStream::from_response(wire);
507        let st = stat_v1_sync(&mut stream, "/sdcard/test.txt").await.unwrap();
508
509        assert!(st.exists());
510        assert!(st.is_file());
511        assert!(!st.is_dir());
512        assert_eq!(st.size, 1024);
513        assert_eq!(st.mtime, 1700000000);
514    }
515
516    #[tokio::test]
517    async fn stat_v1_directory() {
518        let mut wire = Vec::new();
519        wire.extend_from_slice(b"STAT");
520        wire.extend_from_slice(&0o040755u32.to_le_bytes());
521        wire.extend_from_slice(&4096u32.to_le_bytes());
522        wire.extend_from_slice(&1700000000u32.to_le_bytes());
523
524        let mut stream = MockStream::from_response(wire);
525        let st = stat_v1_sync(&mut stream, "/sdcard").await.unwrap();
526
527        assert!(st.exists());
528        assert!(!st.is_file());
529        assert!(st.is_dir());
530    }
531
532    #[tokio::test]
533    async fn stat_v1_nonexistent() {
534        let mut wire = Vec::new();
535        wire.extend_from_slice(b"STAT");
536        wire.extend_from_slice(&0u32.to_le_bytes());
537        wire.extend_from_slice(&0u32.to_le_bytes());
538        wire.extend_from_slice(&0u32.to_le_bytes());
539
540        let mut stream = MockStream::from_response(wire);
541        let st = stat_v1_sync(&mut stream, "/nonexistent").await.unwrap();
542
543        assert!(!st.exists());
544    }
545
546    // -- stat2 tests ----------------------------------------------------------
547
548    #[tokio::test]
549    async fn stat2_existing_file() {
550        let mut wire = Vec::new();
551        wire.extend_from_slice(b"STA2");
552        wire.extend_from_slice(&0u32.to_le_bytes());         // error
553        wire.extend_from_slice(&1u64.to_le_bytes());          // dev
554        wire.extend_from_slice(&12345u64.to_le_bytes());      // ino
555        wire.extend_from_slice(&0o100644u32.to_le_bytes());   // mode
556        wire.extend_from_slice(&1u32.to_le_bytes());          // nlink
557        wire.extend_from_slice(&1000u32.to_le_bytes());       // uid
558        wire.extend_from_slice(&1000u32.to_le_bytes());       // gid
559        wire.extend_from_slice(&5_000_000_000u64.to_le_bytes()); // size (>4GB)
560        wire.extend_from_slice(&1700000000u64.to_le_bytes()); // atime
561        wire.extend_from_slice(&1700000001u64.to_le_bytes()); // mtime
562        wire.extend_from_slice(&1700000002u64.to_le_bytes()); // ctime
563
564        let mut stream = MockStream::from_response(wire);
565        let st = stat2_sync(&mut stream, "/sdcard/large.bin").await.unwrap();
566
567        assert!(st.exists());
568        assert!(st.is_file());
569        assert_eq!(st.size, 5_000_000_000);
570        assert_eq!(st.mtime, 1700000001);
571        assert_eq!(st.uid, 1000);
572        assert_eq!(st.ino, 12345);
573    }
574
575    #[tokio::test]
576    async fn stat2_error_returns_err() {
577        let mut wire = Vec::new();
578        wire.extend_from_slice(b"STA2");
579        wire.extend_from_slice(&1u32.to_le_bytes()); // error != 0
580        wire.extend_from_slice(&[0u8; 64]);           // remaining fields
581
582        let mut stream = MockStream::from_response(wire);
583        assert!(stat2_sync(&mut stream, "/nope").await.is_err());
584    }
585
586    // -- list tests -----------------------------------------------------------
587
588    fn make_dent(name: &str, mode: u32, size: u32, mtime: u32) -> Vec<u8> {
589        let mut buf = Vec::new();
590        buf.extend_from_slice(b"DENT");
591        buf.extend_from_slice(&mode.to_le_bytes());
592        buf.extend_from_slice(&size.to_le_bytes());
593        buf.extend_from_slice(&mtime.to_le_bytes());
594        buf.extend_from_slice(&(name.len() as u32).to_le_bytes());
595        buf.extend_from_slice(name.as_bytes());
596        buf
597    }
598
599    fn make_done() -> Vec<u8> {
600        let mut buf = Vec::new();
601        buf.extend_from_slice(b"DONE");
602        buf.extend_from_slice(&[0u8; 16]); // mode + size + mtime + namelen = 0
603        buf
604    }
605
606    #[tokio::test]
607    async fn list_sync_entries() {
608        let mut wire = Vec::new();
609        wire.extend(make_dent(".", 0o040755, 0, 0));
610        wire.extend(make_dent("..", 0o040755, 0, 0));
611        wire.extend(make_dent("hello.txt", 0o100644, 42, 1700000000));
612        wire.extend(make_dent("subdir", 0o040755, 4096, 1700000000));
613        wire.extend(make_done());
614
615        let mut stream = MockStream::from_response(wire);
616        let entries = list_sync(&mut stream, "/sdcard").await.unwrap();
617
618        assert_eq!(entries.len(), 2); // . and .. filtered
619        assert_eq!(entries[0].name, "hello.txt");
620        assert_eq!(entries[0].size, 42);
621        assert_eq!(entries[1].name, "subdir");
622    }
623
624    #[tokio::test]
625    async fn list_sync_empty_dir() {
626        let wire = make_done();
627        let mut stream = MockStream::from_response(wire);
628        let entries = list_sync(&mut stream, "/empty").await.unwrap();
629        assert!(entries.is_empty());
630    }
631}