1use std::{
2 path::Path,
3 sync::atomic::{AtomicBool, Ordering},
4};
5
6use gix_features::progress::DynNestedProgress;
7
8#[cfg(feature = "async-client")]
9use crate::transport::client::async_io::{ExtendedBufRead, HandleProgress, Transport};
10#[cfg(feature = "blocking-client")]
11use crate::transport::client::blocking_io::{ExtendedBufRead, HandleProgress, Transport};
12use crate::{
13 fetch::{
14 negotiate, Arguments, Context, Error, Negotiate, NegotiateOutcome, Options, Outcome, ProgressId, Shallow, Tags,
15 },
16 transport::packetline::read::ProgressAction,
17};
18
19#[maybe_async::maybe_async]
42pub async fn fetch<P, T, E>(
43 negotiate: &mut impl Negotiate,
44 consume_pack: impl FnOnce(&mut dyn std::io::BufRead, &mut dyn DynNestedProgress, &AtomicBool) -> Result<bool, E>,
45 mut progress: P,
46 should_interrupt: &AtomicBool,
47 Context {
48 handshake,
49 transport,
50 user_agent,
51 trace_packetlines,
52 }: Context<'_, T>,
53 Options {
54 shallow_file,
55 shallow,
56 tags,
57 reject_shallow_remote,
58 }: Options<'_>,
59) -> Result<Option<Outcome>, Error>
60where
61 P: gix_features::progress::NestedProgress,
62 P::SubProgress: 'static,
63 T: Transport,
64 E: Into<Box<dyn std::error::Error + Send + Sync + 'static>>,
65{
66 let _span = gix_trace::coarse!("gix_protocol::fetch()");
67 let v1_shallow_updates = handshake.v1_shallow_updates.take();
68 let protocol_version = handshake.server_protocol_version;
69
70 let fetch = crate::Command::Fetch;
71 let fetch_features = {
72 let mut f = fetch.default_features(protocol_version, &handshake.capabilities);
73 f.push(user_agent);
74 f
75 };
76
77 crate::fetch::Response::check_required_features(protocol_version, &fetch_features)?;
78 let sideband_all = fetch_features.iter().any(|(n, _)| *n == "sideband-all");
79 let mut arguments = Arguments::new(protocol_version, fetch_features, trace_packetlines);
80 if matches!(tags, Tags::Included) {
81 if !arguments.can_use_include_tag() {
82 return Err(Error::MissingServerFeature {
83 feature: "include-tag",
84 description:
85 "To make this work we would have to implement another pass to fetch attached tags separately",
87 });
88 }
89 arguments.use_include_tag();
90 }
91 let (shallow_commits, mut shallow_lock) = add_shallow_args(&mut arguments, shallow, &shallow_file)?;
92
93 let negotiate_span = gix_trace::detail!(
94 "negotiate",
95 protocol_version = handshake.server_protocol_version as usize
96 );
97 let action = negotiate.mark_complete_and_common_ref()?;
98 let mut previous_response = None::<crate::fetch::Response>;
99 match &action {
100 negotiate::Action::NoChange | negotiate::Action::SkipToRefUpdate => Ok(None),
101 negotiate::Action::MustNegotiate {
102 remote_ref_target_known,
103 } => {
104 if !negotiate.add_wants(&mut arguments, remote_ref_target_known) {
105 return Ok(None);
106 }
107 let mut rounds = Vec::new();
108 let is_stateless = arguments.is_stateless(!transport.connection_persists_across_multiple_requests());
109 let mut state = negotiate::one_round::State::new(is_stateless);
110 let mut reader = 'negotiation: loop {
111 let _round = gix_trace::detail!("negotiate round", round = rounds.len() + 1);
112 progress.step();
113 progress.set_name(format!("negotiate (round {})", rounds.len() + 1));
114 if should_interrupt.load(Ordering::Relaxed) {
115 return Err(Error::Negotiate(negotiate::Error::NegotiationFailed {
116 rounds: rounds.len(),
117 }));
118 }
119
120 let is_done = match negotiate.one_round(&mut state, &mut arguments, previous_response.as_ref()) {
121 Ok((round, is_done)) => {
122 rounds.push(round);
123 is_done
124 }
125 Err(err) => {
126 return Err(err.into());
127 }
128 };
129 let mut reader = arguments.send(transport, is_done).await?;
130 if sideband_all {
131 setup_remote_progress(&mut progress, &mut reader, should_interrupt);
132 }
133 let response =
134 crate::fetch::Response::from_line_reader(protocol_version, &mut reader, is_done, !is_done).await?;
135 let has_pack = response.has_pack();
136 previous_response = Some(response);
137 if has_pack {
138 progress.step();
139 progress.set_name("receiving pack".into());
140 if !sideband_all {
141 setup_remote_progress(&mut progress, &mut reader, should_interrupt);
142 }
143 break 'negotiation reader;
144 }
145 };
146 drop(negotiate_span);
147
148 let mut previous_response = previous_response.expect("knowledge of a pack means a response was received");
149 previous_response.append_v1_shallow_updates(v1_shallow_updates);
150 if !previous_response.shallow_updates().is_empty() && shallow_lock.is_none() {
151 if reject_shallow_remote {
152 return Err(Error::RejectShallowRemote);
153 }
154 shallow_lock = acquire_shallow_lock(&shallow_file).map(Some)?;
155 }
156
157 #[cfg(feature = "async-client")]
158 let mut rd = crate::futures_lite::io::BlockOn::new(reader);
159 #[cfg(not(feature = "async-client"))]
160 let mut rd = reader;
161 let may_read_to_end =
162 consume_pack(&mut rd, &mut progress, should_interrupt).map_err(|err| Error::ConsumePack(err.into()))?;
163 #[cfg(feature = "async-client")]
164 {
165 reader = rd.into_inner();
166 }
167 #[cfg(not(feature = "async-client"))]
168 {
169 reader = rd;
170 }
171
172 if may_read_to_end {
173 let has_read_to_end = reader.stopped_at().is_some();
175 #[cfg(feature = "async-client")]
176 {
177 if !has_read_to_end {
178 futures_lite::io::copy(&mut reader, &mut futures_lite::io::sink())
179 .await
180 .map_err(Error::ReadRemainingBytes)?;
181 }
182 }
183 #[cfg(not(feature = "async-client"))]
184 {
185 if !has_read_to_end {
186 std::io::copy(&mut reader, &mut std::io::sink()).map_err(Error::ReadRemainingBytes)?;
187 }
188 }
189 }
190 drop(reader);
191
192 if let Some(shallow_lock) = shallow_lock {
193 if !previous_response.shallow_updates().is_empty() {
194 gix_shallow::write(shallow_lock, shallow_commits, previous_response.shallow_updates())?;
195 }
196 }
197 Ok(Some(Outcome {
198 last_response: previous_response,
199 negotiate: NegotiateOutcome { action, rounds },
200 }))
201 }
202 }
203}
204
205fn acquire_shallow_lock(shallow_file: &Path) -> Result<gix_lock::File, Error> {
206 gix_lock::File::acquire_to_update_resource(shallow_file, gix_lock::acquire::Fail::Immediately, None)
207 .map_err(Into::into)
208}
209
210fn add_shallow_args(
211 args: &mut Arguments,
212 shallow: &Shallow,
213 shallow_file: &std::path::Path,
214) -> Result<(Option<Vec<gix_hash::ObjectId>>, Option<gix_lock::File>), Error> {
215 let expect_change = *shallow != Shallow::NoChange;
216 let shallow_lock = expect_change.then(|| acquire_shallow_lock(shallow_file)).transpose()?;
217
218 let shallow_commits = gix_shallow::read(shallow_file)?;
219 if (shallow_commits.is_some() || expect_change) && !args.can_use_shallow() {
220 return Err(Error::MissingServerFeature {
222 feature: "shallow",
223 description: "shallow clones need server support to remain shallow, otherwise bigger than expected packs are sent effectively unshallowing the repository",
224 });
225 }
226 if let Some(shallow_commits) = &shallow_commits {
227 for commit in shallow_commits.iter() {
228 args.shallow(commit);
229 }
230 }
231 match shallow {
232 Shallow::NoChange => {}
233 Shallow::DepthAtRemote(commits) => args.deepen(commits.get() as usize),
234 Shallow::Deepen(commits) => {
235 args.deepen(*commits as usize);
236 args.deepen_relative();
237 }
238 Shallow::Since { cutoff } => {
239 args.deepen_since(cutoff.seconds);
240 }
241 Shallow::Exclude {
242 remote_refs,
243 since_cutoff,
244 } => {
245 if let Some(cutoff) = since_cutoff {
246 args.deepen_since(cutoff.seconds);
247 }
248 for ref_ in remote_refs {
249 args.deepen_not(ref_.as_ref().as_bstr());
250 }
251 }
252 }
253 Ok((shallow_commits, shallow_lock))
254}
255
256fn setup_remote_progress<'a>(
257 progress: &mut dyn gix_features::progress::DynNestedProgress,
258 reader: &mut Box<dyn ExtendedBufRead<'a> + Unpin + 'a>,
259 should_interrupt: &'a AtomicBool,
260) {
261 reader.set_progress_handler(Some(Box::new({
262 let mut remote_progress = progress.add_child_with_id("remote".to_string(), ProgressId::RemoteProgress.into());
263 move |is_err: bool, data: &[u8]| {
264 crate::RemoteProgress::translate_to_progress(is_err, data, &mut remote_progress);
265 if should_interrupt.load(Ordering::Relaxed) {
266 ProgressAction::Interrupt
267 } else {
268 ProgressAction::Continue
269 }
270 }
271 }) as HandleProgress<'a>));
272}