gix_worktree_stream/lib.rs
1//! The implementation of creating an archive from a git tree, similar to `git archive`, but using an internal format.
2//!
3//! This crate can effectively be used to manipulate worktrees as streams of bytes, which can be decoded using the [`Stream`] type.
4#![deny(rust_2018_idioms, missing_docs, unsafe_code)]
5
6use std::{path::Path, sync::Arc};
7
8use gix_object::bstr::BString;
9
10/// A stream of entries that originate from a git tree and optionally from additional entries.
11///
12/// Note that a git tree is mandatory, but the empty tree can be used to effectively disable it.
13pub struct Stream {
14 read: utils::Read,
15 err: SharedErrorSlot,
16 extra_entries: Option<std::sync::mpsc::Sender<AdditionalEntry>>,
17 // additional_entries: Vec,
18 /// `None` if currently held by an entry.
19 path_buf: Option<BString>,
20 /// Another buffer to partially act like a buf-reader.
21 buf: Vec<u8>,
22 /// The offset into `buf` for entries being able to act like a buf reader.
23 pos: usize,
24 /// The amount of bytes usable from `buf` (even though it always has a fixed size)
25 filled: usize,
26}
27
28///
29pub mod entry;
30pub(crate) mod protocol;
31
32mod from_tree;
33pub use from_tree::from_tree;
34
35pub(crate) type SharedErrorSlot = Arc<parking_lot::Mutex<Option<entry::Error>>>;
36
37/// An entry in a stream. Note that they must be consumed fully, by reading from them till exhaustion.
38///
39/// ### Drop behaviour
40///
41/// If the entry is dropped without reading it till exhaustion, the stream is tainted and
42/// [`next_entry()`][Stream::next_entry()] will panic next time it is called.
43pub struct Entry<'a> {
44 /// The kind of entry at [`relative_path`][Self::relative_path()].
45 pub mode: gix_object::tree::EntryMode,
46 /// The hash of the object, uniquely identifying it.
47 pub id: gix_hash::ObjectId,
48 /// Access to our parent
49 parent: &'a mut Stream,
50 /// The path relative to the repository at which data should be written.
51 path_buf: Option<BString>,
52 /// The amount of bytes left to read if the size of bytes to read is known.
53 /// It's also our marker to say that we are depleted, which is important to signal to the
54 /// parent stream that we can proceed reading the next entry.
55 remaining: Option<usize>,
56}
57
58/// An entry that is [added to the stream][Stream::add_entry()] by the user, verbatim, without additional worktree conversions.
59///
60/// It may overwrite previously written paths, which may or may not work for the consumer of the stream.
61pub struct AdditionalEntry {
62 /// The hash of the object, uniquely identifying it.
63 /// Note that it can be [`null()`][gix_hash::ObjectId::null()] as the hash is typically ignored by consumers of the stream.
64 pub id: gix_hash::ObjectId,
65 /// The kind of entry to create.
66 pub mode: gix_object::tree::EntryMode,
67 /// The path relative to the repository at which content should be located.
68 pub relative_path: BString,
69 /// Where to get the content of the entry from.
70 pub source: entry::Source,
71}
72
73/// Lifecycle
74impl Stream {
75 /// Turn ourselves into the underlying byte stream which is a representation of the underlying git tree.
76 ///
77 /// Note that the format is unspecified, and its sole use is for transport, not for persistence.
78 /// Can be used with [`Self::from_read()`] to decode the contained entries.
79 pub fn into_read(self) -> impl std::io::Read {
80 self.read
81 }
82
83 /// Return our internal byte stream from which entries would be generated.
84 ///
85 /// Note that the stream must then be consumed in its entirety.
86 pub fn as_read_mut(&mut self) -> &mut impl std::io::Read {
87 self.extra_entries.take();
88 &mut self.read
89 }
90
91 /// Create a new instance from a stream of bytes in our format.
92 ///
93 /// It must have been created from [`Self::into_read()`] to be compatible, and must
94 /// not have been persisted.
95 pub fn from_read(read: impl std::io::Read + 'static) -> Self {
96 Self {
97 read: utils::Read::Unknown(Box::new(read)),
98 extra_entries: None,
99 path_buf: Some(Vec::with_capacity(1024).into()),
100 err: Default::default(),
101 buf: std::iter::repeat(0).take(u16::MAX as usize).collect(),
102 pos: 0,
103 filled: 0,
104 }
105 }
106}
107
108/// Entries
109impl Stream {
110 /// Add `entry` to the list of entries to be returned in calls to [`Self::next_entry()`].
111 ///
112 /// The entry will be returned after the one contained in the tree, in order of addition.
113 /// # Panics
114 /// If called after the first call to [`Self::next_entry()`].
115 pub fn add_entry(&mut self, entry: AdditionalEntry) -> &mut Self {
116 self.extra_entries
117 .as_ref()
118 .expect("BUG: must not add entries after the start of entries traversal")
119 .send(entry)
120 .expect("Failure is impossible as thread blocks on the receiving end");
121 self
122 }
123
124 /// Add the item at `path` as entry to this stream, which is expected to be under `root`.
125 ///
126 /// Note that the created entries will always have a null SHA1, and that we access this path
127 /// to determine its type, and will access it again when it is requested.
128 pub fn add_entry_from_path(&mut self, root: &Path, path: &Path) -> std::io::Result<&mut Self> {
129 let rela_path = path
130 .strip_prefix(root)
131 .map_err(|err| std::io::Error::new(std::io::ErrorKind::Other, err))?;
132 let meta = path.symlink_metadata()?;
133 let relative_path = gix_path::to_unix_separators_on_windows(gix_path::into_bstr(rela_path)).into_owned();
134 let id = gix_hash::ObjectId::null(gix_hash::Kind::Sha1);
135
136 let entry = if meta.is_symlink() {
137 let content = std::fs::read_link(path)?;
138 let content = gix_path::into_bstr(content).into_owned();
139 AdditionalEntry {
140 id,
141 mode: gix_object::tree::EntryKind::Link.into(),
142 relative_path,
143 source: entry::Source::Memory(content.into()),
144 }
145 } else if meta.is_dir() {
146 AdditionalEntry {
147 id,
148 mode: gix_object::tree::EntryKind::Tree.into(),
149 relative_path,
150 source: entry::Source::Null,
151 }
152 } else {
153 let mode = if gix_fs::is_executable(&meta) {
154 gix_object::tree::EntryKind::BlobExecutable
155 } else {
156 gix_object::tree::EntryKind::Blob
157 }
158 .into();
159 AdditionalEntry {
160 id,
161 mode,
162 relative_path,
163 source: entry::Source::Path(path.to_owned()),
164 }
165 };
166 Ok(self.add_entry(entry))
167 }
168}
169
170impl Stream {
171 pub(crate) fn new() -> (
172 Stream,
173 gix_features::io::pipe::Writer,
174 std::sync::mpsc::Receiver<AdditionalEntry>,
175 ) {
176 // 1 write for entry header and 1 for hash, 1 for entry path, + 1 for a buffer, then 32 of these.
177 // Giving some buffer, at the expense of memory, is important to allow consumers to take off bytes more quickly,
178 // otherwise, both threads effectively run in lock-step and nullify the benefit.
179 let in_flight_writes = (2 + 1) * 32;
180 let (write, read) = gix_features::io::pipe::unidirectional(in_flight_writes);
181 let (tx_entries, rx_entries) = std::sync::mpsc::channel();
182 (
183 Stream {
184 read: utils::Read::Known(read),
185 extra_entries: Some(tx_entries),
186 path_buf: Some(Vec::with_capacity(1024).into()),
187 err: Default::default(),
188 buf: std::iter::repeat(0).take(u16::MAX as usize).collect(),
189 pos: 0,
190 filled: 0,
191 },
192 write,
193 rx_entries,
194 )
195 }
196}
197
198pub(crate) mod utils {
199 pub enum Read {
200 Known(gix_features::io::pipe::Reader),
201 Unknown(Box<dyn std::io::Read>),
202 }
203
204 impl std::io::Read for Read {
205 fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
206 match self {
207 Read::Known(r) => r.read(buf),
208 Read::Unknown(r) => r.read(buf),
209 }
210 }
211 }
212}