Skip to main content

radicle_fetch/
transport.rs

1pub(crate) mod fetch;
2pub(crate) mod ls_refs;
3
4use std::collections::BTreeSet;
5use std::io;
6use std::path::PathBuf;
7use std::sync::atomic::AtomicBool;
8use std::sync::Arc;
9
10use bstr::BString;
11use gix_features::progress::prodash::progress;
12use gix_protocol::handshake;
13use gix_protocol::Handshake;
14use gix_transport::client;
15use gix_transport::Protocol;
16use gix_transport::Service;
17use radicle::git::fmt::Qualified;
18use radicle::git::Oid;
19use radicle::storage::git::Repository;
20use thiserror::Error;
21
22use crate::git::packfile::Keepfile;
23use crate::git::repository;
24use crate::stage::RefPrefix;
25
26/// Open a reader and writer stream to pass to the ls-refs and fetch
27/// processes for communicating during their respective protocols.
28pub trait ConnectionStream {
29    type Read: io::Read;
30    type Write: io::Write + SignalEof;
31
32    fn open(&mut self) -> (&mut Self::Read, &mut Self::Write);
33}
34
35/// The ability to signal EOF to the server side so that it can stop
36/// serving for this fetch request.
37pub trait SignalEof {
38    /// Since the git protocol is tunneled over an existing
39    /// connection, we can't signal the end of the protocol via the
40    /// usual means, which is to close the connection. Git also
41    /// doesn't have any special message we can send to signal the end
42    /// of the protocol.
43    ///
44    /// Hence, there's no other way for the server to know that we're
45    /// done sending requests than to send a special message outside
46    /// the git protocol. This message can then be processed by the
47    /// remote worker to end the protocol. We use the special "eof"
48    /// control message for this.
49    fn eof(&mut self) -> io::Result<()>;
50}
51
52/// Configuration for running a Git `handshake`, `ls-refs`, or
53/// `fetch`.
54pub struct Transport<S> {
55    git_dir: PathBuf,
56    repo: BString,
57    stream: S,
58}
59
60#[derive(Error, Debug)]
61pub enum Error {
62    #[error("gix ls-refs error: {0}")]
63    LsRefs(#[from] gix_protocol::ls_refs::Error),
64    #[error("gix fetch error: {0}")]
65    Fetch(#[from] gix_protocol::fetch::Error),
66    #[error("empty or no packfile received")]
67    Empty,
68    #[error("wanted object not found: {0}")]
69    NotFound(Oid),
70    #[error("gix pack index error: {0}")]
71    PackIndex(#[from] gix_pack::index::init::Error),
72}
73
74impl<S> Transport<S>
75where
76    S: ConnectionStream,
77{
78    pub fn new(git_dir: PathBuf, mut repo: BString, stream: S) -> Self {
79        let repo = if repo.starts_with(b"/") {
80            repo
81        } else {
82            let mut path = BString::new(b"/".to_vec());
83            path.append(&mut repo);
84            path
85        };
86        Self {
87            git_dir,
88            repo,
89            stream,
90        }
91    }
92
93    /// Perform the handshake with the server side.
94    #[allow(clippy::result_large_err)]
95    pub(crate) fn handshake(&mut self) -> Result<Handshake, handshake::Error> {
96        log::trace!("Performing handshake for {}", self.repo);
97        let (read, write) = self.stream.open();
98        gix_protocol::handshake(
99            &mut Connection::new(read, write, self.repo.clone()),
100            Service::UploadPack,
101            |_| Ok(None),
102            vec![],
103            &mut progress::Discard,
104        )
105    }
106
107    /// Perform ls-refs with the server side.
108    pub(crate) fn ls_refs(
109        &mut self,
110        prefixes: impl IntoIterator<Item = RefPrefix>,
111        handshake: &Handshake,
112    ) -> Result<Vec<handshake::Ref>, Error> {
113        let prefixes = prefixes.into_iter().collect::<BTreeSet<_>>();
114        let (read, write) = self.stream.open();
115        Ok(ls_refs::run(
116            ls_refs::Config {
117                prefixes,
118                repo: self.repo.clone(),
119            },
120            handshake,
121            Connection::new(read, write, self.repo.clone()),
122            &mut progress::Discard,
123        )?)
124    }
125
126    /// Perform the fetch with the server side.
127    pub(crate) fn fetch(
128        &mut self,
129        wants_haves: WantsHaves,
130        interrupt: Arc<AtomicBool>,
131        handshake: &Handshake,
132    ) -> Result<Option<Keepfile>, Error> {
133        log::trace!(
134            "Running fetch wants={:?}, haves={:?}",
135            wants_haves.wants,
136            wants_haves.haves
137        );
138        let out = {
139            let (read, write) = self.stream.open();
140            fetch::run(
141                wants_haves.clone(),
142                fetch::PackWriter {
143                    git_dir: self.git_dir.clone(),
144                    interrupt,
145                },
146                handshake,
147                Connection::new(read, write, self.repo.clone()),
148                &mut progress::Discard,
149            )?
150        };
151        let pack_path = out
152            .pack
153            .ok_or(Error::Empty)?
154            .index_path
155            .expect("written packfile must have a path");
156
157        // Validate we got all requested tips in the pack
158        //
159        // N.b. the lookup is a binary search so is efficient for
160        // searching any given oid.
161        {
162            use gix_pack::index::File;
163
164            let idx = File::at(pack_path, gix_hash::Kind::Sha1)?;
165            for oid in wants_haves.wants {
166                if idx.lookup(oid).is_none() {
167                    return Err(Error::NotFound(oid));
168                }
169            }
170        }
171
172        Ok(out.keepfile)
173    }
174
175    /// Signal to the server side that we are done sending ls-refs and
176    /// fetch commands.
177    pub(crate) fn done(&mut self) -> io::Result<()> {
178        let (_, w) = self.stream.open();
179        w.eof()
180    }
181}
182
183pub(crate) struct Connection<R, W> {
184    inner: client::git::blocking_io::Connection<R, W>,
185}
186
187impl<R, W> Connection<R, W>
188where
189    R: io::Read,
190    W: io::Write,
191{
192    pub fn new(read: R, write: W, repo: BString) -> Self {
193        Self {
194            inner: client::git::blocking_io::Connection::new(
195                read,
196                write,
197                Protocol::V2,
198                repo,
199                None::<(String, Option<u16>)>,
200                client::git::ConnectMode::Daemon,
201                false,
202            ),
203        }
204    }
205}
206
207impl<R, W> client::blocking_io::Transport for Connection<R, W>
208where
209    R: std::io::Read,
210    W: std::io::Write,
211{
212    fn request(
213        &mut self,
214        write_mode: client::WriteMode,
215        on_into_read: client::MessageKind,
216        trace: bool,
217    ) -> Result<client::blocking_io::RequestWriter<'_>, client::Error> {
218        self.inner.request(write_mode, on_into_read, trace)
219    }
220
221    fn handshake<'b>(
222        &mut self,
223        service: Service,
224        extra_parameters: &'b [(&'b str, Option<&'b str>)],
225    ) -> Result<client::blocking_io::SetServiceResponse<'_>, client::Error> {
226        self.inner.handshake(service, extra_parameters)
227    }
228}
229
230impl<R, W> client::TransportWithoutIO for Connection<R, W>
231where
232    R: std::io::Read,
233    W: std::io::Write,
234{
235    fn to_url(&self) -> std::borrow::Cow<'_, bstr::BStr> {
236        self.inner.to_url()
237    }
238
239    fn connection_persists_across_multiple_requests(&self) -> bool {
240        false
241    }
242
243    fn configure(
244        &mut self,
245        config: &dyn std::any::Any,
246    ) -> Result<(), Box<dyn std::error::Error + Send + Sync + 'static>> {
247        self.inner.configure(config)
248    }
249
250    fn supported_protocol_versions(&self) -> &[Protocol] {
251        &[Protocol::V2]
252    }
253}
254
255#[derive(Debug, Error)]
256pub enum WantsHavesError {
257    #[error(transparent)]
258    Ancestry(#[from] repository::error::Ancestry),
259    #[error(transparent)]
260    Contains(#[from] repository::error::Contains),
261    #[error(transparent)]
262    Resolve(#[from] repository::error::Resolve),
263}
264
265#[derive(Clone, Default)]
266pub(crate) struct WantsHaves {
267    pub wants: BTreeSet<Oid>,
268    pub haves: BTreeSet<Oid>,
269}
270
271impl WantsHaves {
272    pub fn want(&mut self, oid: Oid) {
273        // N.b. if we have it, then we don't want it.
274        if !self.haves.contains(&oid) {
275            self.wants.insert(oid);
276        }
277    }
278
279    pub fn have(&mut self, oid: Oid) {
280        // N.b. ensure that oid is not in wants
281        self.wants.remove(&oid);
282        self.haves.insert(oid);
283    }
284
285    /// Add a set of references to the `wants` and `haves`.
286    ///
287    /// For each reference we want to build the range between its
288    /// current `Oid` and the advertised `Oid`. This allows the server
289    /// to send all objects between that range.
290    ///
291    /// If the reference exists, the range is given by marking the
292    /// existing `Oid` as a `have` and the tip as the `want`. If the
293    /// `tip`, however, is the same as the existing `Oid` or is in the
294    /// Odb, then there is no need to mark it as a `want`.
295    ///
296    /// If the reference does not exist, the range is simply marking
297    /// the tip as a `want`, iff it does not already exist in the Odb.
298    pub fn add<'a, N>(
299        &mut self,
300        repo: &Repository,
301        refs: impl IntoIterator<Item = (N, Oid)>,
302    ) -> Result<&mut Self, WantsHavesError>
303    where
304        N: Into<Qualified<'a>>,
305    {
306        refs.into_iter().try_fold(self, |acc, (refname, tip)| {
307            match repository::refname_to_id(repo, refname)? {
308                Some(oid) => {
309                    let want = oid != tip && !repository::contains(repo, tip)?;
310                    acc.have(oid);
311
312                    if want {
313                        acc.want(tip)
314                    }
315                }
316                None => {
317                    if !repository::contains(repo, tip)? {
318                        acc.want(tip);
319                    }
320                }
321            };
322            Ok(acc)
323        })
324    }
325}
326
327fn agent_name() -> String {
328    let version = match radicle::git::version() {
329        Ok(version) => version,
330        Err(err) => {
331            use radicle::git::VERSION_REQUIRED;
332            log::debug!("The git version could not be determined: {err}");
333            log::debug!("Pretending that we are on git version {VERSION_REQUIRED}.");
334            VERSION_REQUIRED
335        }
336    };
337    format!("git/{version}")
338}