1use std::{
2 path::Path,
3 sync::atomic::{AtomicBool, Ordering},
4};
5
6use gix_features::progress::DynNestedProgress;
7
8use crate::fetch::{
9 negotiate, Arguments, Context, Error, Negotiate, NegotiateOutcome, Options, Outcome, ProgressId, Shallow, Tags,
10};
11#[cfg(feature = "async-client")]
12use crate::transport::client::async_io::{ExtendedBufRead, HandleProgress, Transport};
13#[cfg(feature = "blocking-client")]
14use crate::transport::client::blocking_io::{ExtendedBufRead, HandleProgress, Transport};
15
16#[maybe_async::maybe_async]
39pub async fn fetch<P, T, E>(
40 negotiate: &mut impl Negotiate,
41 consume_pack: impl FnOnce(&mut dyn std::io::BufRead, &mut dyn DynNestedProgress, &AtomicBool) -> Result<bool, E>,
42 mut progress: P,
43 should_interrupt: &AtomicBool,
44 Context {
45 handshake,
46 transport,
47 user_agent,
48 trace_packetlines,
49 }: Context<'_, T>,
50 Options {
51 shallow_file,
52 shallow,
53 tags,
54 reject_shallow_remote,
55 }: Options<'_>,
56) -> Result<Option<Outcome>, Error>
57where
58 P: gix_features::progress::NestedProgress,
59 P::SubProgress: 'static,
60 T: Transport,
61 E: Into<Box<dyn std::error::Error + Send + Sync + 'static>>,
62{
63 let _span = gix_trace::coarse!("gix_protocol::fetch()");
64 let v1_shallow_updates = handshake.v1_shallow_updates.take();
65 let protocol_version = handshake.server_protocol_version;
66
67 let fetch = crate::Command::Fetch;
68 let fetch_features = {
69 let mut f = fetch.default_features(protocol_version, &handshake.capabilities);
70 f.push(user_agent);
71 f
72 };
73
74 crate::fetch::Response::check_required_features(protocol_version, &fetch_features)?;
75 let sideband_all = fetch_features.iter().any(|(n, _)| *n == "sideband-all");
76 let mut arguments = Arguments::new(protocol_version, fetch_features, trace_packetlines);
77 if matches!(tags, Tags::Included) {
78 if !arguments.can_use_include_tag() {
79 return Err(Error::MissingServerFeature {
80 feature: "include-tag",
81 description:
82 "To make this work we would have to implement another pass to fetch attached tags separately",
84 });
85 }
86 arguments.use_include_tag();
87 }
88 let (shallow_commits, mut shallow_lock) = add_shallow_args(&mut arguments, shallow, &shallow_file)?;
89
90 let negotiate_span = gix_trace::detail!(
91 "negotiate",
92 protocol_version = handshake.server_protocol_version as usize
93 );
94 let action = negotiate.mark_complete_and_common_ref()?;
95 let mut previous_response = None::<crate::fetch::Response>;
96 match &action {
97 negotiate::Action::NoChange | negotiate::Action::SkipToRefUpdate => Ok(None),
98 negotiate::Action::MustNegotiate {
99 remote_ref_target_known,
100 } => {
101 if !negotiate.add_wants(&mut arguments, remote_ref_target_known) {
102 return Ok(None);
103 }
104 let mut rounds = Vec::new();
105 let is_stateless = arguments.is_stateless(!transport.connection_persists_across_multiple_requests());
106 let mut state = negotiate::one_round::State::new(is_stateless);
107 let mut reader = 'negotiation: loop {
108 let _round = gix_trace::detail!("negotiate round", round = rounds.len() + 1);
109 progress.step();
110 progress.set_name(format!("negotiate (round {})", rounds.len() + 1));
111 if should_interrupt.load(Ordering::Relaxed) {
112 return Err(Error::Negotiate(negotiate::Error::NegotiationFailed {
113 rounds: rounds.len(),
114 }));
115 }
116
117 let is_done = match negotiate.one_round(&mut state, &mut arguments, previous_response.as_ref()) {
118 Ok((round, is_done)) => {
119 rounds.push(round);
120 is_done
121 }
122 Err(err) => {
123 return Err(err.into());
124 }
125 };
126 let mut reader = arguments.send(transport, is_done).await?;
127 if sideband_all {
128 setup_remote_progress(&mut progress, &mut reader, should_interrupt);
129 }
130 let response =
131 crate::fetch::Response::from_line_reader(protocol_version, &mut reader, is_done, !is_done).await?;
132 let has_pack = response.has_pack();
133 previous_response = Some(response);
134 if has_pack {
135 progress.step();
136 progress.set_name("receiving pack".into());
137 if !sideband_all {
138 setup_remote_progress(&mut progress, &mut reader, should_interrupt);
139 }
140 break 'negotiation reader;
141 }
142 };
143 #[allow(clippy::drop_non_drop)]
145 drop(negotiate_span);
146
147 let mut previous_response = previous_response.expect("knowledge of a pack means a response was received");
148 previous_response.append_v1_shallow_updates(v1_shallow_updates);
149 if !previous_response.shallow_updates().is_empty() && shallow_lock.is_none() {
150 if reject_shallow_remote {
151 return Err(Error::RejectShallowRemote);
152 }
153 shallow_lock = acquire_shallow_lock(&shallow_file).map(Some)?;
154 }
155
156 #[cfg(feature = "async-client")]
157 let mut rd = crate::futures_lite::io::BlockOn::new(reader);
158 #[cfg(not(feature = "async-client"))]
159 let mut rd = reader;
160 let may_read_to_end =
161 consume_pack(&mut rd, &mut progress, should_interrupt).map_err(|err| Error::ConsumePack(err.into()))?;
162 #[cfg(feature = "async-client")]
163 {
164 reader = rd.into_inner();
165 }
166 #[cfg(not(feature = "async-client"))]
167 {
168 reader = rd;
169 }
170
171 if may_read_to_end {
172 let has_read_to_end = reader.stopped_at().is_some();
174 #[cfg(feature = "async-client")]
175 {
176 if !has_read_to_end {
177 futures_lite::io::copy(&mut reader, &mut futures_lite::io::sink())
178 .await
179 .map_err(Error::ReadRemainingBytes)?;
180 }
181 }
182 #[cfg(not(feature = "async-client"))]
183 {
184 if !has_read_to_end {
185 std::io::copy(&mut reader, &mut std::io::sink()).map_err(Error::ReadRemainingBytes)?;
186 }
187 }
188 }
189 drop(reader);
190
191 if let Some(shallow_lock) = shallow_lock {
192 if !previous_response.shallow_updates().is_empty() {
193 gix_shallow::write(shallow_lock, shallow_commits, previous_response.shallow_updates())?;
194 }
195 }
196 Ok(Some(Outcome {
197 last_response: previous_response,
198 negotiate: NegotiateOutcome { action, rounds },
199 }))
200 }
201 }
202}
203
204fn acquire_shallow_lock(shallow_file: &Path) -> Result<gix_lock::File, Error> {
205 gix_lock::File::acquire_to_update_resource(shallow_file, gix_lock::acquire::Fail::Immediately, None)
206 .map_err(Into::into)
207}
208
209fn add_shallow_args(
210 args: &mut Arguments,
211 shallow: &Shallow,
212 shallow_file: &std::path::Path,
213) -> Result<(Option<nonempty::NonEmpty<gix_hash::ObjectId>>, Option<gix_lock::File>), Error> {
214 let expect_change = *shallow != Shallow::NoChange;
215 let shallow_lock = expect_change.then(|| acquire_shallow_lock(shallow_file)).transpose()?;
216
217 let shallow_commits = gix_shallow::read(shallow_file)?;
218 if (shallow_commits.is_some() || expect_change) && !args.can_use_shallow() {
219 return Err(Error::MissingServerFeature {
221 feature: "shallow",
222 description: "shallow clones need server support to remain shallow, otherwise bigger than expected packs are sent effectively unshallowing the repository",
223 });
224 }
225 if let Some(shallow_commits) = &shallow_commits {
226 for commit in shallow_commits.iter() {
227 args.shallow(commit);
228 }
229 }
230 match shallow {
231 Shallow::NoChange => {}
232 Shallow::DepthAtRemote(commits) => args.deepen(commits.get() as usize),
233 Shallow::Deepen(commits) => {
234 args.deepen(*commits as usize);
235 args.deepen_relative();
236 }
237 Shallow::Since { cutoff } => {
238 args.deepen_since(cutoff.seconds);
239 }
240 Shallow::Exclude {
241 remote_refs,
242 since_cutoff,
243 } => {
244 if let Some(cutoff) = since_cutoff {
245 args.deepen_since(cutoff.seconds);
246 }
247 for ref_ in remote_refs {
248 args.deepen_not(ref_.as_ref().as_bstr());
249 }
250 }
251 }
252 Ok((shallow_commits, shallow_lock))
253}
254
255fn setup_remote_progress<'a>(
256 progress: &mut dyn gix_features::progress::DynNestedProgress,
257 reader: &mut Box<dyn ExtendedBufRead<'a> + Unpin + 'a>,
258 should_interrupt: &'a AtomicBool,
259) {
260 reader.set_progress_handler(Some(Box::new({
261 let mut remote_progress = progress.add_child_with_id("remote".to_string(), ProgressId::RemoteProgress.into());
262 move |is_err: bool, data: &[u8]| {
263 crate::RemoteProgress::translate_to_progress(is_err, data, &mut remote_progress);
264 if should_interrupt.load(Ordering::Relaxed) {
265 std::ops::ControlFlow::Break(())
266 } else {
267 std::ops::ControlFlow::Continue(())
268 }
269 }
270 }) as HandleProgress<'a>));
271}