gix_worktree_stream/
entry.rs

1use std::{
2    io::{ErrorKind, Read},
3    path::PathBuf,
4};
5
6use gix_object::bstr::{BStr, BString};
7
8use crate::{protocol, Entry, Stream};
9
10/// The error returned by [`next_entry()`][Stream::next_entry()].
11#[derive(Debug, thiserror::Error)]
12#[allow(missing_docs)]
13pub enum Error {
14    #[error(transparent)]
15    Io(#[from] std::io::Error),
16    #[error("Could not find a tree's leaf, typically a blob")]
17    Find(#[from] gix_object::find::existing::Error),
18    #[error("Could not find a tree to traverse")]
19    FindTree(#[from] gix_object::find::existing_iter::Error),
20    #[error("Could not query attributes for path \"{path}\"")]
21    Attributes {
22        path: BString,
23        source: Box<dyn std::error::Error + Send + Sync + 'static>,
24    },
25    #[error(transparent)]
26    Traverse(#[from] gix_traverse::tree::breadthfirst::Error),
27    #[error(transparent)]
28    ConvertToWorktree(#[from] gix_filter::pipeline::convert::to_worktree::Error),
29}
30
31impl Stream {
32    /// Access the next entry of the stream or `None` if there is nothing more to read.
33    pub fn next_entry(&mut self) -> Result<Option<Entry<'_>>, Error> {
34        assert!(
35            self.path_buf.is_some(),
36            "BUG: must consume and drop entry before getting the next one"
37        );
38        self.extra_entries.take();
39        let res = protocol::read_entry_info(
40            &mut self.read,
41            self.path_buf.as_mut().expect("set while producing an entry"),
42        );
43        match res {
44            Ok((remaining, mode, id)) => {
45                if let Some(err) = self.err.lock().take() {
46                    return Err(err);
47                }
48                Ok(Some(Entry {
49                    path_buf: self.path_buf.take(),
50                    parent: self,
51                    id,
52                    mode,
53                    remaining,
54                }))
55            }
56            Err(err) => {
57                if let Some(err) = self.err.lock().take() {
58                    return Err(err);
59                }
60                // unexpected EOF means the other side dropped. We handled potential errors already.
61                if err.kind() == ErrorKind::UnexpectedEof {
62                    return Ok(None);
63                }
64                Err(err.into())
65            }
66        }
67    }
68}
69
70/// The source of an additional entry
71pub enum Source {
72    /// There is no content, typically the case with directories which are always considered empty.
73    Null,
74    /// Read from the file at the given path.
75    Path(PathBuf),
76    /// Read from memory.
77    Memory(Vec<u8>),
78}
79
80impl Source {
81    pub(crate) fn len(&self) -> Option<usize> {
82        match self {
83            Source::Null => Some(0),
84            Source::Path(_) => None,
85            Source::Memory(buf) => Some(buf.len()),
86        }
87    }
88}
89
90impl std::fmt::Debug for Entry<'_> {
91    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
92        f.debug_struct("Entry")
93            .field("path_buf", &self.relative_path())
94            .field("mode", &self.mode)
95            .field("id", &self.id)
96            .field("remaining", &self.remaining)
97            .finish()
98    }
99}
100
101impl Entry<'_> {
102    /// Return the path of this entry as slash-separated path relative to the repository.
103    pub fn relative_path(&self) -> &BStr {
104        self.path_buf.as_ref().expect("always set during our lifetime").as_ref()
105    }
106
107    /// The amount of bytes that remain to be read, or `None` if it's fully streamed.
108    ///
109    /// This equals the length of the entry in bytes right before reading it.
110    pub fn bytes_remaining(&self) -> Option<usize> {
111        self.remaining
112    }
113}
114
115impl Drop for Entry<'_> {
116    fn drop(&mut self) {
117        if self.remaining == Some(0) {
118            self.parent.path_buf = self.path_buf.take();
119        }
120    }
121}
122
123impl Entry<'_> {
124    fn fill_buf(&mut self) -> std::io::Result<&[u8]> {
125        if self.parent.pos >= self.parent.filled {
126            let mut u16_buf = [0; 2];
127            self.parent.read.read_exact(&mut u16_buf)?;
128            let nb = u16::from_le_bytes(u16_buf) as usize;
129
130            if nb != 0 {
131                self.parent.read.read_exact(&mut self.parent.buf[..nb])?;
132            }
133            self.parent.filled = nb;
134            self.parent.pos = 0;
135        }
136        Ok(&self.parent.buf[self.parent.pos..self.parent.filled])
137    }
138}
139
140impl std::io::Read for Entry<'_> {
141    fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
142        let buf_len = buf.len();
143        if let Some(err) = self.parent.err.lock().take() {
144            return Err(std::io::Error::new(ErrorKind::Other, err));
145        }
146        let bytes_read = match self.remaining.as_mut() {
147            None => {
148                // We expect a zero-read to indicate the end of stream, which is the default way of streams to end.
149                // In our case though, it requires sending an extra zero-write, so we avoid that usually.
150                let input = self.fill_buf()?;
151                let nb = input.len().min(buf.len());
152                buf[..nb].copy_from_slice(&input[..nb]);
153                self.parent.pos += nb;
154                nb
155            }
156            Some(remaining) => {
157                let bytes_read = self.parent.read.read(&mut buf[..buf_len.min(*remaining)])?;
158                *remaining -= bytes_read;
159                bytes_read
160            }
161        };
162        if bytes_read == 0 {
163            self.remaining = Some(0);
164        }
165        Ok(bytes_read)
166    }
167}