1pub(crate) mod fetch;
2pub(crate) mod ls_refs;
3
4use std::collections::BTreeSet;
5use std::io;
6use std::path::PathBuf;
7use std::sync::atomic::AtomicBool;
8use std::sync::Arc;
9
10use bstr::BString;
11use gix_features::progress::prodash::progress;
12use gix_protocol::handshake;
13use gix_protocol::Handshake;
14use gix_transport::client;
15use gix_transport::Protocol;
16use gix_transport::Service;
17use radicle::git::fmt::Qualified;
18use radicle::git::Oid;
19use radicle::storage::git::Repository;
20use thiserror::Error;
21
22use crate::git::packfile::Keepfile;
23use crate::git::repository;
24use crate::stage::RefPrefix;
25
26pub trait ConnectionStream {
29 type Read: io::Read;
30 type Write: io::Write + SignalEof;
31
32 fn open(&mut self) -> (&mut Self::Read, &mut Self::Write);
33}
34
35pub trait SignalEof {
38 fn eof(&mut self) -> io::Result<()>;
50}
51
52pub struct Transport<S> {
55 git_dir: PathBuf,
56 repo: BString,
57 stream: S,
58}
59
60#[derive(Error, Debug)]
61pub enum Error {
62 #[error("gix ls-refs error: {0}")]
63 LsRefs(#[from] gix_protocol::ls_refs::Error),
64 #[error("gix fetch error: {0}")]
65 Fetch(#[from] gix_protocol::fetch::Error),
66 #[error("empty or no packfile received")]
67 Empty,
68 #[error("wanted object not found: {0}")]
69 NotFound(Oid),
70 #[error("gix pack index error: {0}")]
71 PackIndex(#[from] gix_pack::index::init::Error),
72}
73
74impl<S> Transport<S>
75where
76 S: ConnectionStream,
77{
78 pub fn new(git_dir: PathBuf, mut repo: BString, stream: S) -> Self {
79 let repo = if repo.starts_with(b"/") {
80 repo
81 } else {
82 let mut path = BString::new(b"/".to_vec());
83 path.append(&mut repo);
84 path
85 };
86 Self {
87 git_dir,
88 repo,
89 stream,
90 }
91 }
92
93 #[allow(clippy::result_large_err)]
95 pub(crate) fn handshake(&mut self) -> Result<Handshake, handshake::Error> {
96 log::trace!("Performing handshake for {}", self.repo);
97 let (read, write) = self.stream.open();
98 gix_protocol::handshake(
99 &mut Connection::new(read, write, self.repo.clone()),
100 Service::UploadPack,
101 |_| Ok(None),
102 vec![],
103 &mut progress::Discard,
104 )
105 }
106
107 pub(crate) fn ls_refs(
109 &mut self,
110 prefixes: impl IntoIterator<Item = RefPrefix>,
111 handshake: &Handshake,
112 ) -> Result<Vec<handshake::Ref>, Error> {
113 let prefixes = prefixes.into_iter().collect::<BTreeSet<_>>();
114 let (read, write) = self.stream.open();
115 Ok(ls_refs::run(
116 ls_refs::Config {
117 prefixes,
118 repo: self.repo.clone(),
119 },
120 handshake,
121 Connection::new(read, write, self.repo.clone()),
122 &mut progress::Discard,
123 )?)
124 }
125
126 pub(crate) fn fetch(
128 &mut self,
129 wants_haves: WantsHaves,
130 interrupt: Arc<AtomicBool>,
131 handshake: &Handshake,
132 ) -> Result<Option<Keepfile>, Error> {
133 log::trace!(
134 "Running fetch wants={:?}, haves={:?}",
135 wants_haves.wants,
136 wants_haves.haves
137 );
138 let out = {
139 let (read, write) = self.stream.open();
140 fetch::run(
141 wants_haves.clone(),
142 fetch::PackWriter {
143 git_dir: self.git_dir.clone(),
144 interrupt,
145 },
146 handshake,
147 Connection::new(read, write, self.repo.clone()),
148 &mut progress::Discard,
149 )?
150 };
151 let pack_path = out
152 .pack
153 .ok_or(Error::Empty)?
154 .index_path
155 .expect("written packfile must have a path");
156
157 {
162 use gix_pack::index::File;
163
164 let idx = File::at(pack_path, gix_hash::Kind::Sha1)?;
165 for oid in wants_haves.wants {
166 if idx.lookup(oid).is_none() {
167 return Err(Error::NotFound(oid));
168 }
169 }
170 }
171
172 Ok(out.keepfile)
173 }
174
175 pub(crate) fn done(&mut self) -> io::Result<()> {
178 let (_, w) = self.stream.open();
179 w.eof()
180 }
181}
182
183pub(crate) struct Connection<R, W> {
184 inner: client::git::blocking_io::Connection<R, W>,
185}
186
187impl<R, W> Connection<R, W>
188where
189 R: io::Read,
190 W: io::Write,
191{
192 pub fn new(read: R, write: W, repo: BString) -> Self {
193 Self {
194 inner: client::git::blocking_io::Connection::new(
195 read,
196 write,
197 Protocol::V2,
198 repo,
199 None::<(String, Option<u16>)>,
200 client::git::ConnectMode::Daemon,
201 false,
202 ),
203 }
204 }
205}
206
207impl<R, W> client::blocking_io::Transport for Connection<R, W>
208where
209 R: std::io::Read,
210 W: std::io::Write,
211{
212 fn request(
213 &mut self,
214 write_mode: client::WriteMode,
215 on_into_read: client::MessageKind,
216 trace: bool,
217 ) -> Result<client::blocking_io::RequestWriter<'_>, client::Error> {
218 self.inner.request(write_mode, on_into_read, trace)
219 }
220
221 fn handshake<'b>(
222 &mut self,
223 service: Service,
224 extra_parameters: &'b [(&'b str, Option<&'b str>)],
225 ) -> Result<client::blocking_io::SetServiceResponse<'_>, client::Error> {
226 self.inner.handshake(service, extra_parameters)
227 }
228}
229
230impl<R, W> client::TransportWithoutIO for Connection<R, W>
231where
232 R: std::io::Read,
233 W: std::io::Write,
234{
235 fn to_url(&self) -> std::borrow::Cow<'_, bstr::BStr> {
236 self.inner.to_url()
237 }
238
239 fn connection_persists_across_multiple_requests(&self) -> bool {
240 false
241 }
242
243 fn configure(
244 &mut self,
245 config: &dyn std::any::Any,
246 ) -> Result<(), Box<dyn std::error::Error + Send + Sync + 'static>> {
247 self.inner.configure(config)
248 }
249
250 fn supported_protocol_versions(&self) -> &[Protocol] {
251 &[Protocol::V2]
252 }
253}
254
255#[derive(Debug, Error)]
256pub enum WantsHavesError {
257 #[error(transparent)]
258 Ancestry(#[from] repository::error::Ancestry),
259 #[error(transparent)]
260 Contains(#[from] repository::error::Contains),
261 #[error(transparent)]
262 Resolve(#[from] repository::error::Resolve),
263}
264
265#[derive(Clone, Default)]
266pub(crate) struct WantsHaves {
267 pub wants: BTreeSet<Oid>,
268 pub haves: BTreeSet<Oid>,
269}
270
271impl WantsHaves {
272 pub fn want(&mut self, oid: Oid) {
273 if !self.haves.contains(&oid) {
275 self.wants.insert(oid);
276 }
277 }
278
279 pub fn have(&mut self, oid: Oid) {
280 self.wants.remove(&oid);
282 self.haves.insert(oid);
283 }
284
285 pub fn add<'a, N>(
299 &mut self,
300 repo: &Repository,
301 refs: impl IntoIterator<Item = (N, Oid)>,
302 ) -> Result<&mut Self, WantsHavesError>
303 where
304 N: Into<Qualified<'a>>,
305 {
306 refs.into_iter().try_fold(self, |acc, (refname, tip)| {
307 match repository::refname_to_id(repo, refname)? {
308 Some(oid) => {
309 let want = oid != tip && !repository::contains(repo, tip)?;
310 acc.have(oid);
311
312 if want {
313 acc.want(tip)
314 }
315 }
316 None => {
317 if !repository::contains(repo, tip)? {
318 acc.want(tip);
319 }
320 }
321 };
322 Ok(acc)
323 })
324 }
325}
326
327fn agent_name() -> String {
328 let version = match radicle::git::version() {
329 Ok(version) => version,
330 Err(err) => {
331 use radicle::git::VERSION_REQUIRED;
332 log::debug!("The git version could not be determined: {err}");
333 log::debug!("Pretending that we are on git version {VERSION_REQUIRED}.");
334 VERSION_REQUIRED
335 }
336 };
337 format!("git/{version}")
338}