1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
use std::io;

use git_features::{progress, progress::Progress};
use git_transport::{
    client,
    client::{SetServiceResponse, TransportV2Ext},
    Service,
};
use maybe_async::maybe_async;

use crate::{
    credentials,
    fetch::{refs, Action, Arguments, Command, Delegate, Error, LsRefsAction, Response},
};

/// A way to indicate how to treat the connection underlying the transport, potentially allowing to reuse it.
pub enum FetchConnection {
    /// Use this variant if server should be informed that the operation is completed and no further commands will be issued
    /// at the end of the fetch operation or after deciding that no fetch operation should happen after references were listed.
    ///
    /// When indicating the end-of-fetch, this flag is only relevant in protocol V2.
    /// Generally it only applies when using persistent transports.
    ///
    /// In most explicit client side failures modes the end-of-operation' notification will be sent to the server automatically.
    TerminateOnSuccessfulCompletion,

    /// Indicate that persistent transport connections can be reused by not sending an 'end-of-operation' notification to the server.
    /// This is useful if multiple `fetch(…)` calls are used in succession.
    ///
    /// Note that this has no effect in case of non-persistent connections, like the ones over HTTP.
    ///
    /// As an optimization, callers can use `AllowReuse` here as the server will also know the client is done
    /// if the connection is closed.
    AllowReuse,
}

impl Default for FetchConnection {
    fn default() -> Self {
        FetchConnection::TerminateOnSuccessfulCompletion
    }
}

/// Perform a 'fetch' operation with the server using `transport`, with `delegate` handling all server interactions.
/// **Note** that `delegate` has blocking operations and thus this entire call should be on an executor which can handle
/// that. This could be the current thread blocking, or another thread.
///
/// * `authenticate(operation_to_perform)` is used to receive credentials for the connection and potentially store it
///   if the server indicates 'permission denied'. Note that not all transport support authentication or authorization.
/// * `progress` is used to emit progress messages.
///
/// _Note_ that depending on the `delegate`, the actual action performed can be `ls-refs`, `clone` or `fetch`.
#[maybe_async]
pub async fn fetch<F, D, T>(
    mut transport: T,
    mut delegate: D,
    mut authenticate: F,
    mut progress: impl Progress,
    fetch_mode: FetchConnection,
) -> Result<(), Error>
where
    F: FnMut(credentials::Action<'_>) -> credentials::Result,
    D: Delegate,
    T: client::Transport,
{
    let (protocol_version, parsed_refs, capabilities) = {
        progress.init(None, progress::steps());
        progress.set_name("handshake");
        progress.step();

        let extra_parameters = delegate.handshake_extra_parameters();
        let extra_parameters: Vec<_> = extra_parameters
            .iter()
            .map(|(k, v)| (k.as_str(), v.as_ref().map(|s| s.as_str())))
            .collect();
        let supported_versions: Vec<_> = transport.supported_protocol_versions().into();

        let result = transport.handshake(Service::UploadPack, &extra_parameters).await;
        let SetServiceResponse {
            actual_protocol,
            capabilities,
            refs,
        } = match result {
            Ok(v) => Ok(v),
            Err(client::Error::Io { ref err }) if err.kind() == io::ErrorKind::PermissionDenied => {
                drop(result); // needed to workaround this: https://github.com/rust-lang/rust/issues/76149
                let url = transport.to_url();
                progress.set_name("authentication");
                let credentials::Outcome { identity, next } =
                    authenticate(credentials::Action::Fill(&url))?.expect("FILL provides an identity");
                transport.set_identity(identity)?;
                progress.step();
                progress.set_name("handshake (authenticated)");
                match transport.handshake(Service::UploadPack, &extra_parameters).await {
                    Ok(v) => {
                        authenticate(next.approve())?;
                        Ok(v)
                    }
                    // Still no permission? Reject the credentials.
                    Err(client::Error::Io { err }) if err.kind() == io::ErrorKind::PermissionDenied => {
                        authenticate(next.reject())?;
                        Err(client::Error::Io { err })
                    }
                    // Otherwise, do nothing, as we don't know if it actually got to try the credentials.
                    // If they were previously stored, they remain. In the worst case, the user has to enter them again
                    // next time they try.
                    Err(err) => Err(err),
                }
            }
            Err(err) => Err(err),
        }?;

        if !supported_versions.is_empty() && !supported_versions.contains(&actual_protocol) {
            return Err(Error::TransportProtocolPolicyViolation {
                actual_version: actual_protocol,
            });
        }

        let parsed_refs = match refs {
            Some(mut refs) => {
                assert_eq!(
                    actual_protocol,
                    git_transport::Protocol::V1,
                    "Only V1 auto-responds with refs"
                );
                Some(
                    refs::from_v1_refs_received_as_part_of_handshake_and_capabilities(&mut refs, capabilities.iter())
                        .await?,
                )
            }
            None => None,
        };
        (actual_protocol, parsed_refs, capabilities)
    }; // this scope is needed, see https://github.com/rust-lang/rust/issues/76149

    let parsed_refs = match parsed_refs {
        Some(refs) => refs,
        None => {
            assert_eq!(
                protocol_version,
                git_transport::Protocol::V2,
                "Only V2 needs a separate request to get specific refs"
            );

            let ls_refs = Command::LsRefs;
            let mut ls_features = ls_refs.default_features(protocol_version, &capabilities);
            let mut ls_args = ls_refs.initial_arguments(&ls_features);
            match delegate.prepare_ls_refs(&capabilities, &mut ls_args, &mut ls_features) {
                Ok(LsRefsAction::Skip) => Vec::new(),
                Ok(LsRefsAction::Continue) => {
                    ls_refs.validate_argument_prefixes_or_panic(
                        protocol_version,
                        &capabilities,
                        &ls_args,
                        &ls_features,
                    );

                    progress.step();
                    progress.set_name("list refs");
                    let mut remote_refs = transport
                        .invoke(
                            ls_refs.as_str(),
                            ls_features.into_iter(),
                            if ls_args.is_empty() {
                                None
                            } else {
                                Some(ls_args.into_iter())
                            },
                        )
                        .await?;
                    refs::from_v2_refs(&mut remote_refs).await?
                }
                Err(err) => {
                    indicate_end_of_interaction(transport).await?;
                    return Err(err.into());
                }
            }
        }
    };

    let fetch = Command::Fetch;
    let mut fetch_features = fetch.default_features(protocol_version, &capabilities);
    match delegate.prepare_fetch(protocol_version, &capabilities, &mut fetch_features, &parsed_refs) {
        Ok(Action::Cancel) => {
            return if matches!(protocol_version, git_transport::Protocol::V1)
                || matches!(fetch_mode, FetchConnection::TerminateOnSuccessfulCompletion)
            {
                indicate_end_of_interaction(transport).await
            } else {
                Ok(())
            };
        }
        Ok(Action::Continue) => {
            fetch.validate_argument_prefixes_or_panic(protocol_version, &capabilities, &[], &fetch_features);
        }
        Err(err) => {
            indicate_end_of_interaction(transport).await?;
            return Err(err.into());
        }
    }

    Response::check_required_features(protocol_version, &fetch_features)?;
    let sideband_all = fetch_features.iter().any(|(n, _)| *n == "sideband-all");
    let mut arguments = Arguments::new(protocol_version, fetch_features);
    let mut previous_response = None::<Response>;
    let mut round = 1;
    'negotiation: loop {
        progress.step();
        progress.set_name(format!("negotiate (round {})", round));
        round += 1;
        let action = delegate.negotiate(&parsed_refs, &mut arguments, previous_response.as_ref())?;
        let mut reader = arguments.send(&mut transport, action == Action::Cancel).await?;
        if sideband_all {
            setup_remote_progress(&mut progress, &mut reader);
        }
        let response = Response::from_line_reader(protocol_version, &mut reader).await?;
        previous_response = if response.has_pack() {
            progress.step();
            progress.set_name("receiving pack");
            if !sideband_all {
                setup_remote_progress(&mut progress, &mut reader);
            }
            delegate.receive_pack(reader, progress, &parsed_refs, &response).await?;
            break 'negotiation;
        } else {
            match action {
                Action::Cancel => break 'negotiation,
                Action::Continue => Some(response),
            }
        }
    }
    if matches!(protocol_version, git_transport::Protocol::V2)
        && matches!(fetch_mode, FetchConnection::TerminateOnSuccessfulCompletion)
    {
        indicate_end_of_interaction(transport).await?;
    }
    Ok(())
}

#[maybe_async]
async fn indicate_end_of_interaction(mut transport: impl client::Transport) -> Result<(), Error> {
    // An empty request marks the (early) end of the interaction. Only relevant in stateful transports though.
    if transport.connection_persists_across_multiple_requests() {
        transport
            .request(client::WriteMode::Binary, client::MessageKind::Flush)?
            .into_read()
            .await?;
    }
    Ok(())
}

fn setup_remote_progress(
    progress: &mut impl Progress,
    reader: &mut Box<dyn git_transport::client::ExtendedBufRead + Unpin + '_>,
) {
    reader.set_progress_handler(Some(Box::new({
        let mut remote_progress = progress.add_child("remote");
        move |is_err: bool, data: &[u8]| {
            crate::RemoteProgress::translate_to_progress(is_err, data, &mut remote_progress)
        }
    }) as git_transport::client::HandleProgress));
}