1use std::{
2 path::Path,
3 sync::atomic::{AtomicBool, Ordering},
4};
5
6use gix_features::progress::DynNestedProgress;
7
8use crate::{
9 fetch::{
10 negotiate, Arguments, Context, Error, Negotiate, NegotiateOutcome, Options, Outcome, ProgressId, Shallow, Tags,
11 },
12 transport::packetline::read::ProgressAction,
13};
14
15#[maybe_async::maybe_async]
38pub async fn fetch<P, T, E>(
39 negotiate: &mut impl Negotiate,
40 consume_pack: impl FnOnce(&mut dyn std::io::BufRead, &mut dyn DynNestedProgress, &AtomicBool) -> Result<bool, E>,
41 mut progress: P,
42 should_interrupt: &AtomicBool,
43 Context {
44 handshake,
45 transport,
46 user_agent,
47 trace_packetlines,
48 }: Context<'_, T>,
49 Options {
50 shallow_file,
51 shallow,
52 tags,
53 reject_shallow_remote,
54 }: Options<'_>,
55) -> Result<Option<Outcome>, Error>
56where
57 P: gix_features::progress::NestedProgress,
58 P::SubProgress: 'static,
59 T: gix_transport::client::Transport,
60 E: Into<Box<dyn std::error::Error + Send + Sync + 'static>>,
61{
62 let _span = gix_trace::coarse!("gix_protocol::fetch()");
63 let v1_shallow_updates = handshake.v1_shallow_updates.take();
64 let protocol_version = handshake.server_protocol_version;
65
66 let fetch = crate::Command::Fetch;
67 let fetch_features = {
68 let mut f = fetch.default_features(protocol_version, &handshake.capabilities);
69 f.push(user_agent);
70 f
71 };
72
73 crate::fetch::Response::check_required_features(protocol_version, &fetch_features)?;
74 let sideband_all = fetch_features.iter().any(|(n, _)| *n == "sideband-all");
75 let mut arguments = Arguments::new(protocol_version, fetch_features, trace_packetlines);
76 if matches!(tags, Tags::Included) {
77 if !arguments.can_use_include_tag() {
78 return Err(Error::MissingServerFeature {
79 feature: "include-tag",
80 description:
81 "To make this work we would have to implement another pass to fetch attached tags separately",
83 });
84 }
85 arguments.use_include_tag();
86 }
87 let (shallow_commits, mut shallow_lock) = add_shallow_args(&mut arguments, shallow, &shallow_file)?;
88
89 let negotiate_span = gix_trace::detail!(
90 "negotiate",
91 protocol_version = handshake.server_protocol_version as usize
92 );
93 let action = negotiate.mark_complete_and_common_ref()?;
94 let mut previous_response = None::<crate::fetch::Response>;
95 match &action {
96 negotiate::Action::NoChange | negotiate::Action::SkipToRefUpdate => Ok(None),
97 negotiate::Action::MustNegotiate {
98 remote_ref_target_known,
99 } => {
100 if !negotiate.add_wants(&mut arguments, remote_ref_target_known) {
101 return Ok(None);
102 }
103 let mut rounds = Vec::new();
104 let is_stateless = arguments.is_stateless(!transport.connection_persists_across_multiple_requests());
105 let mut state = negotiate::one_round::State::new(is_stateless);
106 let mut reader = 'negotiation: loop {
107 let _round = gix_trace::detail!("negotiate round", round = rounds.len() + 1);
108 progress.step();
109 progress.set_name(format!("negotiate (round {})", rounds.len() + 1));
110 if should_interrupt.load(Ordering::Relaxed) {
111 return Err(Error::Negotiate(negotiate::Error::NegotiationFailed {
112 rounds: rounds.len(),
113 }));
114 }
115
116 let is_done = match negotiate.one_round(&mut state, &mut arguments, previous_response.as_ref()) {
117 Ok((round, is_done)) => {
118 rounds.push(round);
119 is_done
120 }
121 Err(err) => {
122 return Err(err.into());
123 }
124 };
125 let mut reader = arguments.send(transport, is_done).await?;
126 if sideband_all {
127 setup_remote_progress(&mut progress, &mut reader, should_interrupt);
128 }
129 let response =
130 crate::fetch::Response::from_line_reader(protocol_version, &mut reader, is_done, !is_done).await?;
131 let has_pack = response.has_pack();
132 previous_response = Some(response);
133 if has_pack {
134 progress.step();
135 progress.set_name("receiving pack".into());
136 if !sideband_all {
137 setup_remote_progress(&mut progress, &mut reader, should_interrupt);
138 }
139 break 'negotiation reader;
140 }
141 };
142 drop(negotiate_span);
143
144 let mut previous_response = previous_response.expect("knowledge of a pack means a response was received");
145 previous_response.append_v1_shallow_updates(v1_shallow_updates);
146 if !previous_response.shallow_updates().is_empty() && shallow_lock.is_none() {
147 if reject_shallow_remote {
148 return Err(Error::RejectShallowRemote);
149 }
150 shallow_lock = acquire_shallow_lock(&shallow_file).map(Some)?;
151 }
152
153 #[cfg(feature = "async-client")]
154 let mut rd = crate::futures_lite::io::BlockOn::new(reader);
155 #[cfg(not(feature = "async-client"))]
156 let mut rd = reader;
157 let may_read_to_end =
158 consume_pack(&mut rd, &mut progress, should_interrupt).map_err(|err| Error::ConsumePack(err.into()))?;
159 #[cfg(feature = "async-client")]
160 {
161 reader = rd.into_inner();
162 }
163 #[cfg(not(feature = "async-client"))]
164 {
165 reader = rd;
166 }
167
168 if may_read_to_end {
169 let has_read_to_end = reader.stopped_at().is_some();
171 #[cfg(feature = "async-client")]
172 {
173 if !has_read_to_end {
174 futures_lite::io::copy(&mut reader, &mut futures_lite::io::sink())
175 .await
176 .map_err(Error::ReadRemainingBytes)?;
177 }
178 }
179 #[cfg(not(feature = "async-client"))]
180 {
181 if !has_read_to_end {
182 std::io::copy(&mut reader, &mut std::io::sink()).map_err(Error::ReadRemainingBytes)?;
183 }
184 }
185 }
186 drop(reader);
187
188 if let Some(shallow_lock) = shallow_lock {
189 if !previous_response.shallow_updates().is_empty() {
190 gix_shallow::write(shallow_lock, shallow_commits, previous_response.shallow_updates())?;
191 }
192 }
193 Ok(Some(Outcome {
194 last_response: previous_response,
195 negotiate: NegotiateOutcome { action, rounds },
196 }))
197 }
198 }
199}
200
201fn acquire_shallow_lock(shallow_file: &Path) -> Result<gix_lock::File, Error> {
202 gix_lock::File::acquire_to_update_resource(shallow_file, gix_lock::acquire::Fail::Immediately, None)
203 .map_err(Into::into)
204}
205
206fn add_shallow_args(
207 args: &mut Arguments,
208 shallow: &Shallow,
209 shallow_file: &std::path::Path,
210) -> Result<(Option<Vec<gix_hash::ObjectId>>, Option<gix_lock::File>), Error> {
211 let expect_change = *shallow != Shallow::NoChange;
212 let shallow_lock = expect_change.then(|| acquire_shallow_lock(shallow_file)).transpose()?;
213
214 let shallow_commits = gix_shallow::read(shallow_file)?;
215 if (shallow_commits.is_some() || expect_change) && !args.can_use_shallow() {
216 return Err(Error::MissingServerFeature {
218 feature: "shallow",
219 description: "shallow clones need server support to remain shallow, otherwise bigger than expected packs are sent effectively unshallowing the repository",
220 });
221 }
222 if let Some(shallow_commits) = &shallow_commits {
223 for commit in shallow_commits.iter() {
224 args.shallow(commit);
225 }
226 }
227 match shallow {
228 Shallow::NoChange => {}
229 Shallow::DepthAtRemote(commits) => args.deepen(commits.get() as usize),
230 Shallow::Deepen(commits) => {
231 args.deepen(*commits as usize);
232 args.deepen_relative();
233 }
234 Shallow::Since { cutoff } => {
235 args.deepen_since(cutoff.seconds);
236 }
237 Shallow::Exclude {
238 remote_refs,
239 since_cutoff,
240 } => {
241 if let Some(cutoff) = since_cutoff {
242 args.deepen_since(cutoff.seconds);
243 }
244 for ref_ in remote_refs {
245 args.deepen_not(ref_.as_ref().as_bstr());
246 }
247 }
248 }
249 Ok((shallow_commits, shallow_lock))
250}
251
252fn setup_remote_progress<'a>(
253 progress: &mut dyn gix_features::progress::DynNestedProgress,
254 reader: &mut Box<dyn crate::transport::client::ExtendedBufRead<'a> + Unpin + 'a>,
255 should_interrupt: &'a AtomicBool,
256) {
257 use crate::transport::client::ExtendedBufRead;
258 reader.set_progress_handler(Some(Box::new({
259 let mut remote_progress = progress.add_child_with_id("remote".to_string(), ProgressId::RemoteProgress.into());
260 move |is_err: bool, data: &[u8]| {
261 crate::RemoteProgress::translate_to_progress(is_err, data, &mut remote_progress);
262 if should_interrupt.load(Ordering::Relaxed) {
263 ProgressAction::Interrupt
264 } else {
265 ProgressAction::Continue
266 }
267 }
268 }) as crate::transport::client::HandleProgress<'a>));
269}