gix_protocol/fetch/
function.rs

1use std::{
2    path::Path,
3    sync::atomic::{AtomicBool, Ordering},
4};
5
6use gix_features::progress::DynNestedProgress;
7
8use crate::fetch::{
9    negotiate, Arguments, Context, Error, Negotiate, NegotiateOutcome, Options, Outcome, ProgressId, Shallow, Tags,
10};
11#[cfg(feature = "async-client")]
12use crate::transport::client::async_io::{ExtendedBufRead, HandleProgress, Transport};
13#[cfg(feature = "blocking-client")]
14use crate::transport::client::blocking_io::{ExtendedBufRead, HandleProgress, Transport};
15
16/// Perform one fetch operation, relying on a `transport`.
17/// `negotiate` is used to run the negotiation of objects that should be contained in the pack, *if* one is to be received.
18/// `progress` and `should_interrupt` is passed to all potentially long-running parts of the operation.
19///
20/// `consume_pack(pack_read, progress, interrupt) -> bool` is always called to consume all bytes that are sent by the server, returning `true` if we should assure the pack is read to the end,
21/// or `false` to do nothing. Dropping the reader without reading to EOF (i.e. returning `false`) is an offense to the server, and
22/// `transport` won't be in the correct state to perform additional operations, or indicate the end of operation.
23/// Note that the passed reader blocking as the pack-writing is blocking as well.
24///
25/// The `Context` and `Options` further define parts of this `fetch` operation.
26///
27/// As opposed to a full `git fetch`, this operation does *not*…
28///
29/// * …update local refs
30/// * …end the interaction after the fetch
31///
32/// **Note that the interaction will never be ended**, even on error or failure, leaving it up to the caller to do that, maybe
33/// with the help of [`SendFlushOnDrop`](crate::SendFlushOnDrop) which can wrap `transport`.
34/// Generally, the `transport` is left in a state that allows for more commands to be run.
35///
36/// Return `Ok(None)` if there was nothing to do because all remote refs are at the same state as they are locally,
37/// or there was nothing wanted, or `Ok(Some(outcome))` to inform about all the changes that were made.
38#[maybe_async::maybe_async]
39pub async fn fetch<P, T, E>(
40    negotiate: &mut impl Negotiate,
41    consume_pack: impl FnOnce(&mut dyn std::io::BufRead, &mut dyn DynNestedProgress, &AtomicBool) -> Result<bool, E>,
42    mut progress: P,
43    should_interrupt: &AtomicBool,
44    Context {
45        handshake,
46        transport,
47        user_agent,
48        trace_packetlines,
49    }: Context<'_, T>,
50    Options {
51        shallow_file,
52        shallow,
53        tags,
54        reject_shallow_remote,
55    }: Options<'_>,
56) -> Result<Option<Outcome>, Error>
57where
58    P: gix_features::progress::NestedProgress,
59    P::SubProgress: 'static,
60    T: Transport,
61    E: Into<Box<dyn std::error::Error + Send + Sync + 'static>>,
62{
63    let _span = gix_trace::coarse!("gix_protocol::fetch()");
64    let v1_shallow_updates = handshake.v1_shallow_updates.take();
65    let protocol_version = handshake.server_protocol_version;
66
67    let fetch = crate::Command::Fetch;
68    let fetch_features = {
69        let mut f = fetch.default_features(protocol_version, &handshake.capabilities);
70        f.push(user_agent);
71        f
72    };
73
74    crate::fetch::Response::check_required_features(protocol_version, &fetch_features)?;
75    let sideband_all = fetch_features.iter().any(|(n, _)| *n == "sideband-all");
76    let mut arguments = Arguments::new(protocol_version, fetch_features, trace_packetlines);
77    if matches!(tags, Tags::Included) {
78        if !arguments.can_use_include_tag() {
79            return Err(Error::MissingServerFeature {
80                    feature: "include-tag",
81                    description:
82                    // NOTE: if this is an issue, we could probably do what's proposed here.
83                    "To make this work we would have to implement another pass to fetch attached tags separately",
84                });
85        }
86        arguments.use_include_tag();
87    }
88    let (shallow_commits, mut shallow_lock) = add_shallow_args(&mut arguments, shallow, &shallow_file)?;
89
90    let negotiate_span = gix_trace::detail!(
91        "negotiate",
92        protocol_version = handshake.server_protocol_version as usize
93    );
94    let action = negotiate.mark_complete_and_common_ref()?;
95    let mut previous_response = None::<crate::fetch::Response>;
96    match &action {
97        negotiate::Action::NoChange | negotiate::Action::SkipToRefUpdate => Ok(None),
98        negotiate::Action::MustNegotiate {
99            remote_ref_target_known,
100        } => {
101            if !negotiate.add_wants(&mut arguments, remote_ref_target_known) {
102                return Ok(None);
103            }
104            let mut rounds = Vec::new();
105            let is_stateless = arguments.is_stateless(!transport.connection_persists_across_multiple_requests());
106            let mut state = negotiate::one_round::State::new(is_stateless);
107            let mut reader = 'negotiation: loop {
108                let _round = gix_trace::detail!("negotiate round", round = rounds.len() + 1);
109                progress.step();
110                progress.set_name(format!("negotiate (round {})", rounds.len() + 1));
111                if should_interrupt.load(Ordering::Relaxed) {
112                    return Err(Error::Negotiate(negotiate::Error::NegotiationFailed {
113                        rounds: rounds.len(),
114                    }));
115                }
116
117                let is_done = match negotiate.one_round(&mut state, &mut arguments, previous_response.as_ref()) {
118                    Ok((round, is_done)) => {
119                        rounds.push(round);
120                        is_done
121                    }
122                    Err(err) => {
123                        return Err(err.into());
124                    }
125                };
126                let mut reader = arguments.send(transport, is_done).await?;
127                if sideband_all {
128                    setup_remote_progress(&mut progress, &mut reader, should_interrupt);
129                }
130                let response =
131                    crate::fetch::Response::from_line_reader(protocol_version, &mut reader, is_done, !is_done).await?;
132                let has_pack = response.has_pack();
133                previous_response = Some(response);
134                if has_pack {
135                    progress.step();
136                    progress.set_name("receiving pack".into());
137                    if !sideband_all {
138                        setup_remote_progress(&mut progress, &mut reader, should_interrupt);
139                    }
140                    break 'negotiation reader;
141                }
142            };
143            drop(negotiate_span);
144
145            let mut previous_response = previous_response.expect("knowledge of a pack means a response was received");
146            previous_response.append_v1_shallow_updates(v1_shallow_updates);
147            if !previous_response.shallow_updates().is_empty() && shallow_lock.is_none() {
148                if reject_shallow_remote {
149                    return Err(Error::RejectShallowRemote);
150                }
151                shallow_lock = acquire_shallow_lock(&shallow_file).map(Some)?;
152            }
153
154            #[cfg(feature = "async-client")]
155            let mut rd = crate::futures_lite::io::BlockOn::new(reader);
156            #[cfg(not(feature = "async-client"))]
157            let mut rd = reader;
158            let may_read_to_end =
159                consume_pack(&mut rd, &mut progress, should_interrupt).map_err(|err| Error::ConsumePack(err.into()))?;
160            #[cfg(feature = "async-client")]
161            {
162                reader = rd.into_inner();
163            }
164            #[cfg(not(feature = "async-client"))]
165            {
166                reader = rd;
167            }
168
169            if may_read_to_end {
170                // Assure the final flush packet is consumed.
171                let has_read_to_end = reader.stopped_at().is_some();
172                #[cfg(feature = "async-client")]
173                {
174                    if !has_read_to_end {
175                        futures_lite::io::copy(&mut reader, &mut futures_lite::io::sink())
176                            .await
177                            .map_err(Error::ReadRemainingBytes)?;
178                    }
179                }
180                #[cfg(not(feature = "async-client"))]
181                {
182                    if !has_read_to_end {
183                        std::io::copy(&mut reader, &mut std::io::sink()).map_err(Error::ReadRemainingBytes)?;
184                    }
185                }
186            }
187            drop(reader);
188
189            if let Some(shallow_lock) = shallow_lock {
190                if !previous_response.shallow_updates().is_empty() {
191                    gix_shallow::write(shallow_lock, shallow_commits, previous_response.shallow_updates())?;
192                }
193            }
194            Ok(Some(Outcome {
195                last_response: previous_response,
196                negotiate: NegotiateOutcome { action, rounds },
197            }))
198        }
199    }
200}
201
202fn acquire_shallow_lock(shallow_file: &Path) -> Result<gix_lock::File, Error> {
203    gix_lock::File::acquire_to_update_resource(shallow_file, gix_lock::acquire::Fail::Immediately, None)
204        .map_err(Into::into)
205}
206
207fn add_shallow_args(
208    args: &mut Arguments,
209    shallow: &Shallow,
210    shallow_file: &std::path::Path,
211) -> Result<(Option<Vec<gix_hash::ObjectId>>, Option<gix_lock::File>), Error> {
212    let expect_change = *shallow != Shallow::NoChange;
213    let shallow_lock = expect_change.then(|| acquire_shallow_lock(shallow_file)).transpose()?;
214
215    let shallow_commits = gix_shallow::read(shallow_file)?;
216    if (shallow_commits.is_some() || expect_change) && !args.can_use_shallow() {
217        // NOTE: if this is an issue, we can always unshallow the repo ourselves.
218        return Err(Error::MissingServerFeature {
219            feature: "shallow",
220            description: "shallow clones need server support to remain shallow, otherwise bigger than expected packs are sent effectively unshallowing the repository",
221        });
222    }
223    if let Some(shallow_commits) = &shallow_commits {
224        for commit in shallow_commits.iter() {
225            args.shallow(commit);
226        }
227    }
228    match shallow {
229        Shallow::NoChange => {}
230        Shallow::DepthAtRemote(commits) => args.deepen(commits.get() as usize),
231        Shallow::Deepen(commits) => {
232            args.deepen(*commits as usize);
233            args.deepen_relative();
234        }
235        Shallow::Since { cutoff } => {
236            args.deepen_since(cutoff.seconds);
237        }
238        Shallow::Exclude {
239            remote_refs,
240            since_cutoff,
241        } => {
242            if let Some(cutoff) = since_cutoff {
243                args.deepen_since(cutoff.seconds);
244            }
245            for ref_ in remote_refs {
246                args.deepen_not(ref_.as_ref().as_bstr());
247            }
248        }
249    }
250    Ok((shallow_commits, shallow_lock))
251}
252
253fn setup_remote_progress<'a>(
254    progress: &mut dyn gix_features::progress::DynNestedProgress,
255    reader: &mut Box<dyn ExtendedBufRead<'a> + Unpin + 'a>,
256    should_interrupt: &'a AtomicBool,
257) {
258    reader.set_progress_handler(Some(Box::new({
259        let mut remote_progress = progress.add_child_with_id("remote".to_string(), ProgressId::RemoteProgress.into());
260        move |is_err: bool, data: &[u8]| {
261            crate::RemoteProgress::translate_to_progress(is_err, data, &mut remote_progress);
262            if should_interrupt.load(Ordering::Relaxed) {
263                std::ops::ControlFlow::Break(())
264            } else {
265                std::ops::ControlFlow::Continue(())
266            }
267        }
268    }) as HandleProgress<'a>));
269}