gix_protocol/fetch/
function.rs

1use std::{
2    path::Path,
3    sync::atomic::{AtomicBool, Ordering},
4};
5
6use gix_features::progress::DynNestedProgress;
7
8#[cfg(feature = "async-client")]
9use crate::transport::client::async_io::{ExtendedBufRead, HandleProgress, Transport};
10#[cfg(feature = "blocking-client")]
11use crate::transport::client::blocking_io::{ExtendedBufRead, HandleProgress, Transport};
12use crate::{
13    fetch::{
14        negotiate, Arguments, Context, Error, Negotiate, NegotiateOutcome, Options, Outcome, ProgressId, Shallow, Tags,
15    },
16    transport::packetline::read::ProgressAction,
17};
18
19/// Perform one fetch operation, relying on a `transport`.
20/// `negotiate` is used to run the negotiation of objects that should be contained in the pack, *if* one is to be received.
21/// `progress` and `should_interrupt` is passed to all potentially long-running parts of the operation.
22///
23/// `consume_pack(pack_read, progress, interrupt) -> bool` is always called to consume all bytes that are sent by the server, returning `true` if we should assure the pack is read to the end,
24/// or `false` to do nothing. Dropping the reader without reading to EOF (i.e. returning `false`) is an offense to the server, and
25/// `transport` won't be in the correct state to perform additional operations, or indicate the end of operation.
26/// Note that the passed reader blocking as the pack-writing is blocking as well.
27///
28/// The `Context` and `Options` further define parts of this `fetch` operation.
29///
30/// As opposed to a full `git fetch`, this operation does *not*…
31///
32/// * …update local refs
33/// * …end the interaction after the fetch
34///
35/// **Note that the interaction will never be ended**, even on error or failure, leaving it up to the caller to do that, maybe
36/// with the help of [`SendFlushOnDrop`](crate::SendFlushOnDrop) which can wrap `transport`.
37/// Generally, the `transport` is left in a state that allows for more commands to be run.
38///
39/// Return `Ok(None)` if there was nothing to do because all remote refs are at the same state as they are locally,
40/// or there was nothing wanted, or `Ok(Some(outcome))` to inform about all the changes that were made.
41#[maybe_async::maybe_async]
42pub async fn fetch<P, T, E>(
43    negotiate: &mut impl Negotiate,
44    consume_pack: impl FnOnce(&mut dyn std::io::BufRead, &mut dyn DynNestedProgress, &AtomicBool) -> Result<bool, E>,
45    mut progress: P,
46    should_interrupt: &AtomicBool,
47    Context {
48        handshake,
49        transport,
50        user_agent,
51        trace_packetlines,
52    }: Context<'_, T>,
53    Options {
54        shallow_file,
55        shallow,
56        tags,
57        reject_shallow_remote,
58    }: Options<'_>,
59) -> Result<Option<Outcome>, Error>
60where
61    P: gix_features::progress::NestedProgress,
62    P::SubProgress: 'static,
63    T: Transport,
64    E: Into<Box<dyn std::error::Error + Send + Sync + 'static>>,
65{
66    let _span = gix_trace::coarse!("gix_protocol::fetch()");
67    let v1_shallow_updates = handshake.v1_shallow_updates.take();
68    let protocol_version = handshake.server_protocol_version;
69
70    let fetch = crate::Command::Fetch;
71    let fetch_features = {
72        let mut f = fetch.default_features(protocol_version, &handshake.capabilities);
73        f.push(user_agent);
74        f
75    };
76
77    crate::fetch::Response::check_required_features(protocol_version, &fetch_features)?;
78    let sideband_all = fetch_features.iter().any(|(n, _)| *n == "sideband-all");
79    let mut arguments = Arguments::new(protocol_version, fetch_features, trace_packetlines);
80    if matches!(tags, Tags::Included) {
81        if !arguments.can_use_include_tag() {
82            return Err(Error::MissingServerFeature {
83                    feature: "include-tag",
84                    description:
85                    // NOTE: if this is an issue, we could probably do what's proposed here.
86                    "To make this work we would have to implement another pass to fetch attached tags separately",
87                });
88        }
89        arguments.use_include_tag();
90    }
91    let (shallow_commits, mut shallow_lock) = add_shallow_args(&mut arguments, shallow, &shallow_file)?;
92
93    let negotiate_span = gix_trace::detail!(
94        "negotiate",
95        protocol_version = handshake.server_protocol_version as usize
96    );
97    let action = negotiate.mark_complete_and_common_ref()?;
98    let mut previous_response = None::<crate::fetch::Response>;
99    match &action {
100        negotiate::Action::NoChange | negotiate::Action::SkipToRefUpdate => Ok(None),
101        negotiate::Action::MustNegotiate {
102            remote_ref_target_known,
103        } => {
104            if !negotiate.add_wants(&mut arguments, remote_ref_target_known) {
105                return Ok(None);
106            }
107            let mut rounds = Vec::new();
108            let is_stateless = arguments.is_stateless(!transport.connection_persists_across_multiple_requests());
109            let mut state = negotiate::one_round::State::new(is_stateless);
110            let mut reader = 'negotiation: loop {
111                let _round = gix_trace::detail!("negotiate round", round = rounds.len() + 1);
112                progress.step();
113                progress.set_name(format!("negotiate (round {})", rounds.len() + 1));
114                if should_interrupt.load(Ordering::Relaxed) {
115                    return Err(Error::Negotiate(negotiate::Error::NegotiationFailed {
116                        rounds: rounds.len(),
117                    }));
118                }
119
120                let is_done = match negotiate.one_round(&mut state, &mut arguments, previous_response.as_ref()) {
121                    Ok((round, is_done)) => {
122                        rounds.push(round);
123                        is_done
124                    }
125                    Err(err) => {
126                        return Err(err.into());
127                    }
128                };
129                let mut reader = arguments.send(transport, is_done).await?;
130                if sideband_all {
131                    setup_remote_progress(&mut progress, &mut reader, should_interrupt);
132                }
133                let response =
134                    crate::fetch::Response::from_line_reader(protocol_version, &mut reader, is_done, !is_done).await?;
135                let has_pack = response.has_pack();
136                previous_response = Some(response);
137                if has_pack {
138                    progress.step();
139                    progress.set_name("receiving pack".into());
140                    if !sideband_all {
141                        setup_remote_progress(&mut progress, &mut reader, should_interrupt);
142                    }
143                    break 'negotiation reader;
144                }
145            };
146            drop(negotiate_span);
147
148            let mut previous_response = previous_response.expect("knowledge of a pack means a response was received");
149            previous_response.append_v1_shallow_updates(v1_shallow_updates);
150            if !previous_response.shallow_updates().is_empty() && shallow_lock.is_none() {
151                if reject_shallow_remote {
152                    return Err(Error::RejectShallowRemote);
153                }
154                shallow_lock = acquire_shallow_lock(&shallow_file).map(Some)?;
155            }
156
157            #[cfg(feature = "async-client")]
158            let mut rd = crate::futures_lite::io::BlockOn::new(reader);
159            #[cfg(not(feature = "async-client"))]
160            let mut rd = reader;
161            let may_read_to_end =
162                consume_pack(&mut rd, &mut progress, should_interrupt).map_err(|err| Error::ConsumePack(err.into()))?;
163            #[cfg(feature = "async-client")]
164            {
165                reader = rd.into_inner();
166            }
167            #[cfg(not(feature = "async-client"))]
168            {
169                reader = rd;
170            }
171
172            if may_read_to_end {
173                // Assure the final flush packet is consumed.
174                let has_read_to_end = reader.stopped_at().is_some();
175                #[cfg(feature = "async-client")]
176                {
177                    if !has_read_to_end {
178                        futures_lite::io::copy(&mut reader, &mut futures_lite::io::sink())
179                            .await
180                            .map_err(Error::ReadRemainingBytes)?;
181                    }
182                }
183                #[cfg(not(feature = "async-client"))]
184                {
185                    if !has_read_to_end {
186                        std::io::copy(&mut reader, &mut std::io::sink()).map_err(Error::ReadRemainingBytes)?;
187                    }
188                }
189            }
190            drop(reader);
191
192            if let Some(shallow_lock) = shallow_lock {
193                if !previous_response.shallow_updates().is_empty() {
194                    gix_shallow::write(shallow_lock, shallow_commits, previous_response.shallow_updates())?;
195                }
196            }
197            Ok(Some(Outcome {
198                last_response: previous_response,
199                negotiate: NegotiateOutcome { action, rounds },
200            }))
201        }
202    }
203}
204
205fn acquire_shallow_lock(shallow_file: &Path) -> Result<gix_lock::File, Error> {
206    gix_lock::File::acquire_to_update_resource(shallow_file, gix_lock::acquire::Fail::Immediately, None)
207        .map_err(Into::into)
208}
209
210fn add_shallow_args(
211    args: &mut Arguments,
212    shallow: &Shallow,
213    shallow_file: &std::path::Path,
214) -> Result<(Option<Vec<gix_hash::ObjectId>>, Option<gix_lock::File>), Error> {
215    let expect_change = *shallow != Shallow::NoChange;
216    let shallow_lock = expect_change.then(|| acquire_shallow_lock(shallow_file)).transpose()?;
217
218    let shallow_commits = gix_shallow::read(shallow_file)?;
219    if (shallow_commits.is_some() || expect_change) && !args.can_use_shallow() {
220        // NOTE: if this is an issue, we can always unshallow the repo ourselves.
221        return Err(Error::MissingServerFeature {
222            feature: "shallow",
223            description: "shallow clones need server support to remain shallow, otherwise bigger than expected packs are sent effectively unshallowing the repository",
224        });
225    }
226    if let Some(shallow_commits) = &shallow_commits {
227        for commit in shallow_commits.iter() {
228            args.shallow(commit);
229        }
230    }
231    match shallow {
232        Shallow::NoChange => {}
233        Shallow::DepthAtRemote(commits) => args.deepen(commits.get() as usize),
234        Shallow::Deepen(commits) => {
235            args.deepen(*commits as usize);
236            args.deepen_relative();
237        }
238        Shallow::Since { cutoff } => {
239            args.deepen_since(cutoff.seconds);
240        }
241        Shallow::Exclude {
242            remote_refs,
243            since_cutoff,
244        } => {
245            if let Some(cutoff) = since_cutoff {
246                args.deepen_since(cutoff.seconds);
247            }
248            for ref_ in remote_refs {
249                args.deepen_not(ref_.as_ref().as_bstr());
250            }
251        }
252    }
253    Ok((shallow_commits, shallow_lock))
254}
255
256fn setup_remote_progress<'a>(
257    progress: &mut dyn gix_features::progress::DynNestedProgress,
258    reader: &mut Box<dyn ExtendedBufRead<'a> + Unpin + 'a>,
259    should_interrupt: &'a AtomicBool,
260) {
261    reader.set_progress_handler(Some(Box::new({
262        let mut remote_progress = progress.add_child_with_id("remote".to_string(), ProgressId::RemoteProgress.into());
263        move |is_err: bool, data: &[u8]| {
264            crate::RemoteProgress::translate_to_progress(is_err, data, &mut remote_progress);
265            if should_interrupt.load(Ordering::Relaxed) {
266                ProgressAction::Interrupt
267            } else {
268                ProgressAction::Continue
269            }
270        }
271    }) as HandleProgress<'a>));
272}