gix_worktree_stream/lib.rs
1//! The implementation of creating an archive from a git tree, similar to `git archive`, but using an internal format.
2//!
3//! This crate can effectively be used to manipulate worktrees as streams of bytes, which can be decoded using the [`Stream`] type.
4#![deny(rust_2018_idioms, missing_docs, unsafe_code)]
5
6use std::{path::Path, sync::Arc};
7
8use gix_object::bstr::BString;
9
10/// A stream of entries that originate from a git tree and optionally from additional entries.
11///
12/// Note that a git tree is mandatory, but the empty tree can be used to effectively disable it.
13pub struct Stream {
14 read: utils::Read,
15 err: SharedErrorSlot,
16 extra_entries: Option<std::sync::mpsc::Sender<AdditionalEntry>>,
17 // additional_entries: Vec,
18 /// `None` if currently held by an entry.
19 path_buf: Option<BString>,
20 /// Another buffer to partially act like a buf-reader.
21 buf: Vec<u8>,
22 /// The offset into `buf` for entries being able to act like a buf reader.
23 pos: usize,
24 /// The amount of bytes usable from `buf` (even though it always has a fixed size)
25 filled: usize,
26}
27
28///
29pub mod entry;
30pub(crate) mod protocol;
31
32mod from_tree;
33pub use from_tree::from_tree;
34
35pub(crate) type SharedErrorSlot = Arc<parking_lot::Mutex<Option<entry::Error>>>;
36
37/// An entry in a stream. Note that they must be consumed fully, by reading from them till exhaustion.
38///
39/// ### Drop behaviour
40///
41/// If the entry is dropped without reading it till exhaustion, the stream is tainted and
42/// [`next_entry()`][Stream::next_entry()] will panic next time it is called.
43pub struct Entry<'a> {
44 /// The kind of entry at [`relative_path`][Self::relative_path()].
45 pub mode: gix_object::tree::EntryMode,
46 /// The hash of the object, uniquely identifying it.
47 pub id: gix_hash::ObjectId,
48 /// Access to our parent
49 parent: &'a mut Stream,
50 /// The path relative to the repository at which data should be written.
51 path_buf: Option<BString>,
52 /// The amount of bytes left to read if the size of bytes to read is known.
53 /// It's also our marker to say that we are depleted, which is important to signal to the
54 /// parent stream that we can proceed reading the next entry.
55 remaining: Option<usize>,
56}
57
58/// An entry that is [added to the stream][Stream::add_entry()] by the user, verbatim, without additional worktree conversions.
59///
60/// It may overwrite previously written paths, which may or may not work for the consumer of the stream.
61pub struct AdditionalEntry {
62 /// The hash of the object, uniquely identifying it.
63 /// Note that it can be [`null()`][gix_hash::ObjectId::null()] as the hash is typically ignored by consumers of the stream.
64 pub id: gix_hash::ObjectId,
65 /// The kind of entry to create.
66 pub mode: gix_object::tree::EntryMode,
67 /// The path relative to the repository at which content should be located.
68 pub relative_path: BString,
69 /// Where to get the content of the entry from.
70 pub source: entry::Source,
71}
72
73/// Lifecycle
74impl Stream {
75 /// Turn ourselves into the underlying byte stream which is a representation of the underlying git tree.
76 ///
77 /// Note that the format is unspecified, and its sole use is for transport, not for persistence.
78 /// Can be used with [`Self::from_read()`] to decode the contained entries.
79 pub fn into_read(self) -> impl std::io::Read {
80 self.read
81 }
82
83 /// Return our internal byte stream from which entries would be generated.
84 ///
85 /// Note that the stream must then be consumed in its entirety.
86 pub fn as_read_mut(&mut self) -> &mut impl std::io::Read {
87 self.extra_entries.take();
88 &mut self.read
89 }
90
91 /// Create a new instance from a stream of bytes in our format.
92 ///
93 /// It must have been created from [`Self::into_read()`] to be compatible, and must
94 /// not have been persisted.
95 pub fn from_read(read: impl std::io::Read + 'static) -> Self {
96 Self {
97 read: utils::Read::Unknown(Box::new(read)),
98 extra_entries: None,
99 path_buf: Some(Vec::with_capacity(1024).into()),
100 err: Default::default(),
101 buf: std::iter::repeat_n(0, u16::MAX as usize).collect(),
102 pos: 0,
103 filled: 0,
104 }
105 }
106}
107
108/// Entries
109impl Stream {
110 /// Add `entry` to the list of entries to be returned in calls to [`Self::next_entry()`].
111 ///
112 /// The entry will be returned after the one contained in the tree, in order of addition.
113 /// # Panics
114 /// If called after the first call to [`Self::next_entry()`].
115 pub fn add_entry(&mut self, entry: AdditionalEntry) -> &mut Self {
116 self.extra_entries
117 .as_ref()
118 .expect("BUG: must not add entries after the start of entries traversal")
119 .send(entry)
120 .expect("Failure is impossible as thread blocks on the receiving end");
121 self
122 }
123
124 /// Add the item at `path` as entry to this stream, which is expected to be under `root`.
125 ///
126 /// Note that the created entries will always have a null SHA1, and that we access this path
127 /// to determine its type, and will access it again when it is requested.
128 pub fn add_entry_from_path(&mut self, root: &Path, path: &Path) -> std::io::Result<&mut Self> {
129 let rela_path = path.strip_prefix(root).map_err(std::io::Error::other)?;
130 let meta = path.symlink_metadata()?;
131 let relative_path = gix_path::to_unix_separators_on_windows(gix_path::into_bstr(rela_path)).into_owned();
132 let id = gix_hash::ObjectId::null(gix_hash::Kind::Sha1);
133
134 let entry = if meta.is_symlink() {
135 let content = std::fs::read_link(path)?;
136 let content = gix_path::into_bstr(content).into_owned();
137 AdditionalEntry {
138 id,
139 mode: gix_object::tree::EntryKind::Link.into(),
140 relative_path,
141 source: entry::Source::Memory(content.into()),
142 }
143 } else if meta.is_dir() {
144 AdditionalEntry {
145 id,
146 mode: gix_object::tree::EntryKind::Tree.into(),
147 relative_path,
148 source: entry::Source::Null,
149 }
150 } else {
151 let mode = if gix_fs::is_executable(&meta) {
152 gix_object::tree::EntryKind::BlobExecutable
153 } else {
154 gix_object::tree::EntryKind::Blob
155 }
156 .into();
157 AdditionalEntry {
158 id,
159 mode,
160 relative_path,
161 source: entry::Source::Path(path.to_owned()),
162 }
163 };
164 Ok(self.add_entry(entry))
165 }
166}
167
168impl Stream {
169 pub(crate) fn new() -> (
170 Stream,
171 gix_features::io::pipe::Writer,
172 std::sync::mpsc::Receiver<AdditionalEntry>,
173 ) {
174 // 1 write for entry header and 1 for hash, 1 for entry path, + 1 for a buffer, then 32 of these.
175 // Giving some buffer, at the expense of memory, is important to allow consumers to take off bytes more quickly,
176 // otherwise, both threads effectively run in lock-step and nullify the benefit.
177 let in_flight_writes = (2 + 1) * 32;
178 let (write, read) = gix_features::io::pipe::unidirectional(in_flight_writes);
179 let (tx_entries, rx_entries) = std::sync::mpsc::channel();
180 (
181 Stream {
182 read: utils::Read::Known(read),
183 extra_entries: Some(tx_entries),
184 path_buf: Some(Vec::with_capacity(1024).into()),
185 err: Default::default(),
186 buf: std::iter::repeat_n(0, u16::MAX as usize).collect(),
187 pos: 0,
188 filled: 0,
189 },
190 write,
191 rx_entries,
192 )
193 }
194}
195
196pub(crate) mod utils {
197 pub enum Read {
198 Known(gix_features::io::pipe::Reader),
199 Unknown(Box<dyn std::io::Read>),
200 }
201
202 impl std::io::Read for Read {
203 fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
204 match self {
205 Read::Known(r) => r.read(buf),
206 Read::Unknown(r) => r.read(buf),
207 }
208 }
209 }
210}