Skip to main content

gix_worktree_stream/from_tree/
mod.rs

1use std::io::Write;
2
3use gix_object::{bstr::BStr, FindExt};
4
5use gix_error::{message, ResultExt};
6
7use crate::{entry, entry::Error, protocol, AdditionalEntry, SharedErrorSlot, Stream};
8
9/// Use `objects` to traverse `tree` and fetch the contained blobs to return as [`Stream`], which makes them queryable
10/// on demand with support for streaming each entry.
11///
12/// `pipeline` is used to convert blobs to their worktree representation, and `attributes` is used to read
13/// the `export-ignore` attribute. If set on a directory or blob, it won't be added to the archive.
14///
15/// ### Types of entries in stream
16///
17/// We only return blobs (with or without executable), which may be symlinks in which case their content will
18/// be target of the symlink.
19/// Directories are never returned, but maybe added by the caller via [Stream::add_entry()].
20///
21/// ### Progress and interruptions
22///
23/// For per-file progress, integrate progress handling into the calls of [`Stream::next_entry()`] as that
24/// correlates blobs.
25/// Additional interrupt handling can be wrapped around the `Read` implementation of each [`Entry`][crate::Entry].
26/// For progress on bytes-written, integrate progress reporting when consuming the stream.
27/// Further it's possible to drop the returned [`Stream`] to halt all operation.
28///
29/// ### Threaded Operation
30///
31/// This function spawns a thread that will access the tree data in the background, synchronized through
32/// `Stream` so that it will not be faster than the consumer, with at most one file in flight at any time.
33///
34/// ### Limitations
35///
36/// * `export-subst` is not support, as it requires the entire formatting engine of `git log`.
37pub fn from_tree<Find, E>(
38    tree: gix_hash::ObjectId,
39    objects: Find,
40    pipeline: gix_filter::Pipeline,
41    attributes: impl FnMut(&BStr, gix_object::tree::EntryMode, &mut gix_attributes::search::Outcome) -> Result<(), E>
42        + Send
43        + 'static,
44) -> Stream
45where
46    Find: gix_object::Find + Clone + Send + 'static,
47    E: std::error::Error + Send + Sync + 'static,
48{
49    let (stream, mut write, additional_entries) = Stream::new();
50    std::thread::spawn({
51        let slot = stream.err.clone();
52        move || {
53            if let Err(err) = run(
54                tree,
55                objects,
56                pipeline,
57                attributes,
58                &mut write,
59                slot.clone(),
60                additional_entries,
61            ) {
62                {
63                    let mut slot = slot.lock();
64                    if slot.is_none() {
65                        *slot = Some(err);
66                    } else {
67                        drop(slot);
68                        write.channel.send(Err(std::io::Error::other(err.into_error()))).ok();
69                    }
70                }
71            }
72        }
73    });
74    stream
75}
76
77fn run<Find, E>(
78    tree: gix_hash::ObjectId,
79    objects: Find,
80    mut pipeline: gix_filter::Pipeline,
81    mut attributes: impl FnMut(&BStr, gix_object::tree::EntryMode, &mut gix_attributes::search::Outcome) -> Result<(), E>
82        + Send
83        + 'static,
84    out: &mut gix_features::io::pipe::Writer,
85    err: SharedErrorSlot,
86    additional_entries: std::sync::mpsc::Receiver<AdditionalEntry>,
87) -> Result<(), Error>
88where
89    Find: gix_object::Find + Clone,
90    E: std::error::Error + Send + Sync + 'static,
91{
92    let mut buf = Vec::new();
93    let tree_iter = objects
94        .find_tree_iter(tree.as_ref(), &mut buf)
95        .or_raise(|| message("Could not find a tree to traverse"))?;
96    if pipeline.driver_context_mut().treeish.is_none() {
97        pipeline.driver_context_mut().treeish = Some(tree);
98    }
99
100    let mut attrs = gix_attributes::search::Outcome::default();
101    attrs.initialize_with_selection(&Default::default(), Some("export-ignore"));
102    let mut dlg = traverse::Delegate {
103        out,
104        err,
105        pipeline,
106        attrs,
107        objects: objects.clone(),
108        fetch_attributes: move |a: &BStr, b: gix_object::tree::EntryMode, c: &mut gix_attributes::search::Outcome| {
109            attributes(a, b, c).or_raise(|| gix_error::message!("Could not query attributes for path \"{a}\""))
110        },
111        path_deque: Default::default(),
112        path: Default::default(),
113        buf: Vec::with_capacity(1024),
114    };
115    gix_traverse::tree::breadthfirst(
116        tree_iter,
117        gix_traverse::tree::breadthfirst::State::default(),
118        &objects,
119        &mut dlg,
120    )
121    .or_raise(|| message("Could not traverse tree"))?;
122
123    for entry in additional_entries {
124        protocol::write_entry_header_and_path(
125            entry.relative_path.as_ref(),
126            &entry.id,
127            entry.mode,
128            entry.source.len(),
129            out,
130        )
131        .or_raise(|| message("Could not write entry header"))?;
132        // pipe writer always writes all in one go.
133        #[allow(clippy::unused_io_amount)]
134        match entry.source {
135            entry::Source::Memory(buf) => out.write(&buf).map(|_| ()),
136            entry::Source::Null => out.write(&[]).map(|_| ()),
137            entry::Source::Path(path) => {
138                let file = std::fs::File::open(path).or_raise(|| message("Could not open file for streaming"))?;
139                protocol::write_stream(&mut buf, file, out)
140            }
141        }
142        .or_raise(|| message("Could not write entry data"))?;
143    }
144    Ok(())
145}
146
147mod traverse;