gix_worktree_stream/from_tree/
mod.rs

1use std::io::Write;
2
3use gix_object::{bstr::BStr, FindExt};
4
5use crate::{entry, entry::Error, protocol, AdditionalEntry, SharedErrorSlot, Stream};
6
7/// Use `objects` to traverse `tree` and fetch the contained blobs to return as [`Stream`], which makes them queryable
8/// on demand with support for streaming each entry.
9///
10/// `pipeline` is used to convert blobs to their worktree representation, and `attributes` is used to read
11/// the `export-ignore` attribute. If set on a directory or blob, it won't be added to the archive.
12///
13/// ### Types of entries in stream
14///
15/// We only return blobs (with or without executable), which may be symlinks in which case their content will
16/// be target of the symlink.
17/// Directories are never returned, but maybe added by the caller via [Stream::add_entry()].
18///
19/// ### Progress and interruptions
20///
21/// For per-file progress, integrate progress handling into the calls of [`Stream::next_entry()`] as that
22/// correlates blobs.
23/// Additional interrupt handling can be wrapped around the `Read` implementation of each [`Entry`][crate::Entry].
24/// For progress on bytes-written, integrate progress reporting when consuming the stream.
25/// Further it's possible to drop the returned [`Stream`] to halt all operation.
26///
27/// ### Threaded Operation
28///
29/// This function spawns a thread that will access the tree data in the background, synchronized through
30/// `Stream` so that it will not be faster than the consumer, with at most one file in flight at any time.
31///
32/// ### Limitations
33///
34/// * `export-subst` is not support, as it requires the entire formatting engine of `git log`.
35pub fn from_tree<Find, E>(
36    tree: gix_hash::ObjectId,
37    objects: Find,
38    pipeline: gix_filter::Pipeline,
39    attributes: impl FnMut(&BStr, gix_object::tree::EntryMode, &mut gix_attributes::search::Outcome) -> Result<(), E>
40        + Send
41        + 'static,
42) -> Stream
43where
44    Find: gix_object::Find + Clone + Send + 'static,
45    E: std::error::Error + Send + Sync + 'static,
46{
47    let (stream, mut write, additional_entries) = Stream::new();
48    std::thread::spawn({
49        let slot = stream.err.clone();
50        move || {
51            if let Err(err) = run(
52                tree,
53                objects,
54                pipeline,
55                attributes,
56                &mut write,
57                slot.clone(),
58                additional_entries,
59            ) {
60                {
61                    let mut slot = slot.lock();
62                    if slot.is_none() {
63                        *slot = Some(err);
64                    } else {
65                        drop(slot);
66                        write
67                            .channel
68                            .send(Err(std::io::Error::new(std::io::ErrorKind::Other, err)))
69                            .ok();
70                    }
71                }
72            }
73        }
74    });
75    stream
76}
77
78fn run<Find, E>(
79    tree: gix_hash::ObjectId,
80    objects: Find,
81    mut pipeline: gix_filter::Pipeline,
82    mut attributes: impl FnMut(&BStr, gix_object::tree::EntryMode, &mut gix_attributes::search::Outcome) -> Result<(), E>
83        + Send
84        + 'static,
85    out: &mut gix_features::io::pipe::Writer,
86    err: SharedErrorSlot,
87    additional_entries: std::sync::mpsc::Receiver<AdditionalEntry>,
88) -> Result<(), Error>
89where
90    Find: gix_object::Find + Clone,
91    E: std::error::Error + Send + Sync + 'static,
92{
93    let mut buf = Vec::new();
94    let tree_iter = objects.find_tree_iter(tree.as_ref(), &mut buf)?;
95    if pipeline.driver_context_mut().treeish.is_none() {
96        pipeline.driver_context_mut().treeish = Some(tree);
97    }
98
99    let mut attrs = gix_attributes::search::Outcome::default();
100    attrs.initialize_with_selection(&Default::default(), Some("export-ignore"));
101    let mut dlg = traverse::Delegate {
102        out,
103        err,
104        pipeline,
105        attrs,
106        objects: objects.clone(),
107        fetch_attributes: move |a: &BStr, b: gix_object::tree::EntryMode, c: &mut gix_attributes::search::Outcome| {
108            attributes(a, b, c).map_err(|err| Error::Attributes {
109                source: Box::new(err),
110                path: a.to_owned(),
111            })
112        },
113        path_deque: Default::default(),
114        path: Default::default(),
115        buf: Vec::with_capacity(1024),
116    };
117    gix_traverse::tree::breadthfirst(
118        tree_iter,
119        gix_traverse::tree::breadthfirst::State::default(),
120        &objects,
121        &mut dlg,
122    )?;
123
124    for entry in additional_entries {
125        protocol::write_entry_header_and_path(
126            entry.relative_path.as_ref(),
127            &entry.id,
128            entry.mode,
129            entry.source.len(),
130            out,
131        )?;
132        // pipe writer always writes all in one go.
133        #[allow(clippy::unused_io_amount)]
134        match entry.source {
135            entry::Source::Memory(buf) => out.write(&buf).map(|_| ()),
136            entry::Source::Null => out.write(&[]).map(|_| ()),
137            entry::Source::Path(path) => {
138                let file = std::fs::File::open(path)?;
139                protocol::write_stream(&mut buf, file, out)
140            }
141        }?;
142    }
143    Ok(())
144}
145
146mod traverse;