1use std::collections::{BTreeMap, BTreeSet};
2use std::time::Instant;
3
4use gix_protocol::handshake;
5use radicle::crypto::PublicKey;
6use radicle::git::{Oid, Qualified};
7use radicle::identity::{Did, Doc, DocError};
8
9use radicle::prelude::Verified;
10use radicle::storage;
11use radicle::storage::refs::RefsAt;
12use radicle::storage::{
13 git::Validation, Remote, RemoteId, RemoteRepository, Remotes, ValidateRepository, Validations,
14};
15
16use crate::git;
17use crate::git::packfile::Keepfile;
18use crate::git::refs::{Applied, Update};
19use crate::git::repository;
20use crate::sigrefs::SignedRefsAt;
21use crate::stage;
22use crate::stage::ProtocolStage;
23use crate::{refs, sigrefs, transport, Handle};
24
25pub const DEFAULT_FETCH_SPECIAL_REFS_LIMIT: u64 = 1024 * 1024 * 5;
28pub const DEFAULT_FETCH_DATA_REFS_LIMIT: u64 = 1024 * 1024 * 1024 * 5;
31
32pub mod error {
33 use std::io;
34
35 use radicle::git::Oid;
36 use radicle::prelude::PublicKey;
37 use thiserror::Error;
38
39 use crate::{git, git::repository, handle, sigrefs, stage};
40
41 #[derive(Debug, Error)]
42 pub enum Step {
43 #[error(transparent)]
44 Io(#[from] io::Error),
45 #[error(transparent)]
46 Layout(#[from] stage::error::Layout),
47 #[error(transparent)]
48 Prepare(#[from] stage::error::Prepare),
49 #[error(transparent)]
50 WantsHaves(#[from] stage::error::WantsHaves),
51 }
52
53 #[derive(Debug, Error)]
54 pub enum Protocol {
55 #[error(transparent)]
56 Ancestry(#[from] repository::error::Ancestry),
57 #[error(transparent)]
58 Canonical(#[from] Canonical),
59 #[error("delegate '{remote}' has diverged 'rad/sigrefs': {current} -> {received}")]
60 Diverged {
61 remote: PublicKey,
62 current: Oid,
63 received: Oid,
64 },
65 #[error(transparent)]
66 Io(#[from] io::Error),
67 #[error("canonical 'refs/rad/id' is missing")]
68 MissingRadId,
69 #[error(transparent)]
70 RefdbUpdate(#[from] repository::error::Update),
71 #[error(transparent)]
72 Resolve(#[from] repository::error::Resolve),
73 #[error(transparent)]
74 Refs(#[from] radicle::storage::refs::Error),
75 #[error(transparent)]
76 RemoteRefs(#[from] sigrefs::error::RemoteRefs),
77 #[error("failed to get remote namespaces: {0}")]
78 RemoteIds(#[source] radicle::git::raw::Error),
79 #[error(transparent)]
80 Step(#[from] Step),
81 #[error(transparent)]
82 Tracking(#[from] handle::error::Tracking),
83 #[error(transparent)]
84 Validation(#[from] radicle::storage::Error),
85 }
86
87 #[derive(Debug, Error)]
88 pub enum Canonical {
89 #[error(transparent)]
90 Resolve(#[from] git::repository::error::Resolve),
91 #[error(transparent)]
92 Verified(#[from] radicle::identity::DocError),
93 }
94}
95
96type IdentityTips = BTreeMap<PublicKey, Oid>;
97type SigrefTips = BTreeMap<PublicKey, Oid>;
98
99#[derive(Clone, Copy, Debug)]
100pub struct FetchLimit {
101 pub special: u64,
102 pub refs: u64,
103}
104
105impl Default for FetchLimit {
106 fn default() -> Self {
107 Self {
108 special: DEFAULT_FETCH_SPECIAL_REFS_LIMIT,
109 refs: DEFAULT_FETCH_DATA_REFS_LIMIT,
110 }
111 }
112}
113
114#[derive(Debug)]
115pub enum FetchResult {
116 Success {
117 applied: Applied<'static>,
119 remotes: BTreeSet<PublicKey>,
121 validations: sigrefs::Validations,
123 },
124 Failed {
125 threshold: usize,
127 delegates: BTreeSet<PublicKey>,
129 validations: sigrefs::Validations,
131 },
132}
133
134impl FetchResult {
135 pub fn rejected(&self) -> impl Iterator<Item = &Update<'static>> {
136 match self {
137 Self::Success { applied, .. } => either::Either::Left(applied.rejected.iter()),
138 Self::Failed { .. } => either::Either::Right(std::iter::empty()),
139 }
140 }
141
142 pub fn is_success(&self) -> bool {
143 match self {
144 Self::Success { .. } => true,
145 Self::Failed { .. } => false,
146 }
147 }
148}
149
150#[derive(Default)]
151pub struct FetchState {
152 refs: git::mem::Refdb,
156 canonical_rad_id: Option<Oid>,
158 ids: IdentityTips,
160 sigrefs: SigrefTips,
162 tips: BTreeMap<PublicKey, Vec<Update<'static>>>,
164 keepfiles: Vec<Keepfile>,
168}
169
170impl FetchState {
171 pub fn prune(&mut self, remote: &PublicKey) {
174 self.ids.remove(remote);
175 self.sigrefs.remove(remote);
176 self.tips.remove(remote);
177 }
178
179 pub fn canonical_rad_id(&self) -> Option<&Oid> {
180 self.canonical_rad_id.as_ref()
181 }
182
183 pub fn update_all<'a, I>(&mut self, other: I) -> Applied<'a>
186 where
187 I: IntoIterator<Item = (PublicKey, Vec<Update<'a>>)>,
188 {
189 let mut ap = Applied::default();
190 for (remote, ups) in other {
191 for up in &ups {
192 ap.append(&mut self.refs.update(Some(up.clone())));
193 }
194 let mut ups = ups
195 .into_iter()
196 .map(|up| up.into_owned())
197 .collect::<Vec<_>>();
198 self.tips
199 .entry(remote)
200 .and_modify(|tips| tips.append(&mut ups))
201 .or_insert(ups);
202 }
203 ap
204 }
205
206 pub(crate) fn as_cached<'a, S>(&'a mut self, handle: &'a mut Handle<S>) -> Cached<'a, S> {
207 Cached {
208 handle,
209 state: self,
210 }
211 }
212}
213
214impl FetchState {
215 pub(super) fn run_stage<S, F>(
218 &mut self,
219 handle: &mut Handle<S>,
220 handshake: &handshake::Outcome,
221 step: &F,
222 ) -> Result<BTreeSet<PublicKey>, error::Step>
223 where
224 S: transport::ConnectionStream,
225 F: ProtocolStage,
226 {
227 let refs = match step.ls_refs() {
228 Some(refs) => handle
229 .transport
230 .ls_refs(refs.into(), handshake)?
231 .into_iter()
232 .filter_map(|r| step.ref_filter(r))
233 .collect::<Vec<_>>(),
234 None => vec![],
235 };
236 log::trace!(target: "fetch", "Received refs {refs:?}");
237 step.pre_validate(&refs)?;
238
239 let wants_haves = step.wants_haves(&handle.repo, &refs)?;
240 if !wants_haves.wants.is_empty() {
241 let keepfile =
242 handle
243 .transport
244 .fetch(wants_haves, handle.interrupt.clone(), handshake)?;
245 self.keepfiles.extend(keepfile);
246 } else {
247 log::trace!(target: "fetch", "Nothing to fetch")
248 };
249
250 let mut fetched = BTreeSet::new();
251 for r in &refs {
252 match &r.name {
253 refs::ReceivedRefname::Namespaced { remote, suffix } => {
254 fetched.insert(*remote);
255 if let Some(rad) = suffix.as_ref().left() {
256 match rad {
257 refs::Special::Id => {
258 self.ids.insert(*remote, r.tip);
259 }
260
261 refs::Special::SignedRefs => {
262 self.sigrefs.insert(*remote, r.tip);
263 }
264 }
265 }
266 }
267 refs::ReceivedRefname::RadId => self.canonical_rad_id = Some(r.tip),
268 }
269 }
270
271 let up = step.prepare_updates(self, &handle.repo, &refs)?;
272 self.update_all(up.tips);
273
274 Ok(fetched)
275 }
276
277 #[allow(clippy::too_many_arguments)]
290 fn run_special_refs<S>(
291 &mut self,
292 handle: &mut Handle<S>,
293 handshake: &handshake::Outcome,
294 delegates: BTreeSet<PublicKey>,
295 threshold: usize,
296 limit: &FetchLimit,
297 remote: PublicKey,
298 refs_at: Option<Vec<RefsAt>>,
299 ) -> Result<sigrefs::RemoteRefs, error::Protocol>
300 where
301 S: transport::ConnectionStream,
302 {
303 match refs_at {
304 Some(refs_at) => {
305 let sigrefs_at = stage::SigrefsAt {
306 remote,
307 delegates: delegates.clone(),
308 refs_at: refs_at.clone(),
309 blocked: handle.blocked.clone(),
310 limit: limit.special,
311 };
312 log::trace!(target: "fetch", "{sigrefs_at:?}");
313 self.run_stage(handle, handshake, &sigrefs_at)?;
314 let remotes = refs_at.iter().map(|r| &r.remote);
315
316 let signed_refs = sigrefs::RemoteRefs::load(&self.as_cached(handle), remotes)?;
317 Ok(signed_refs)
318 }
319 None => {
320 let followed = handle.allowed();
321 log::trace!(target: "fetch", "Followed nodes {followed:?}");
322 let special_refs = stage::SpecialRefs {
323 blocked: handle.blocked.clone(),
324 remote,
325 delegates: delegates.clone(),
326 followed,
327 threshold,
328 limit: limit.special,
329 };
330 log::trace!(target: "fetch", "{special_refs:?}");
331 let fetched = self.run_stage(handle, handshake, &special_refs)?;
332
333 let signed_refs = sigrefs::RemoteRefs::load(
334 &self.as_cached(handle),
335 fetched.iter().chain(delegates.iter()),
336 )?;
337 Ok(signed_refs)
338 }
339 }
340 }
341
342 pub(super) fn run<S>(
359 mut self,
360 handle: &mut Handle<S>,
361 handshake: &handshake::Outcome,
362 limit: FetchLimit,
363 remote: PublicKey,
364 refs_at: Option<Vec<RefsAt>>,
365 ) -> Result<FetchResult, error::Protocol>
366 where
367 S: transport::ConnectionStream,
368 {
369 let start = Instant::now();
370 self.run_stage(
374 handle,
375 handshake,
376 &stage::CanonicalId {
377 remote,
378 limit: limit.special,
379 },
380 )?;
381 log::debug!(target: "fetch", "Fetched rad/id ({}ms)", start.elapsed().as_millis());
382
383 let anchor = self
388 .as_cached(handle)
389 .canonical()?
390 .ok_or(error::Protocol::MissingRadId)?;
391
392 let is_delegate = anchor.is_delegate(&Did::from(handle.local()));
393 let delegates = anchor
396 .delegates()
397 .iter()
398 .filter(|id| !handle.is_blocked(id))
399 .map(|did| PublicKey::from(*did))
400 .collect::<BTreeSet<_>>();
401
402 log::trace!(target: "fetch", "Identity delegates {delegates:?}");
403
404 let threshold = if is_delegate {
407 anchor.threshold() - 1
408 } else {
409 anchor.threshold()
410 };
411 let signed_refs = self.run_special_refs(
412 handle,
413 handshake,
414 delegates.clone(),
415 threshold,
416 &limit,
417 remote,
418 refs_at,
419 )?;
420 log::debug!(
421 target: "fetch",
422 "Fetched data for {} remote(s) ({}ms)",
423 signed_refs.len(),
424 start.elapsed().as_millis()
425 );
426
427 let data_refs = stage::DataRefs {
428 remote,
429 remotes: signed_refs,
430 limit: limit.refs,
431 };
432 self.run_stage(handle, handshake, &data_refs)?;
433 log::debug!(
434 target: "fetch",
435 "Fetched data refs for {} remotes ({}ms)",
436 data_refs.remotes.len(),
437 start.elapsed().as_millis()
438 );
439
440 match handle.transport.done() {
444 Ok(()) => log::debug!(target: "fetch", "Sent done signal to remote {remote}"),
445 Err(err) => {
446 log::warn!(target: "fetch", "Attempted to send done to remote {remote}: {err}")
447 }
448 }
449
450 let mut failures = sigrefs::Validations::default();
454 let signed_refs = data_refs.remotes;
455
456 let mut remotes = BTreeSet::new();
459
460 let mut valid_delegates = handle
463 .repository()
464 .remote_ids()
465 .map_err(error::Protocol::RemoteIds)?
466 .filter_map(|id| id.ok())
467 .filter(|id| delegates.contains(id))
468 .collect::<BTreeSet<_>>();
469 let mut failed_delegates = BTreeSet::new();
470
471 for remote in signed_refs.keys() {
474 if handle.is_blocked(remote) {
475 log::trace!(target: "fetch", "Skipping blocked remote {remote}");
476 continue;
477 }
478
479 let remote = sigrefs::DelegateStatus::empty(*remote, &delegates)
480 .load(&self.as_cached(handle))?;
481 match remote {
482 sigrefs::DelegateStatus::NonDelegate { remote, data: None } => {
483 log::debug!(target: "fetch", "Pruning non-delegate {remote} tips, missing 'rad/sigrefs'");
484 failures.push(sigrefs::Validation::MissingRadSigRefs(remote));
485 self.prune(&remote);
486 }
487 sigrefs::DelegateStatus::Delegate { remote, data: None } => {
488 log::warn!(target: "fetch", "Pruning delegate {remote} tips, missing 'rad/sigrefs'");
489 failures.push(sigrefs::Validation::MissingRadSigRefs(remote));
490 self.prune(&remote);
491 valid_delegates.remove(&remote);
497 failed_delegates.insert(remote);
498 }
499 sigrefs::DelegateStatus::NonDelegate {
500 remote,
501 data: Some(sigrefs),
502 } => {
503 if let Some(SignedRefsAt { at, .. }) = SignedRefsAt::load(remote, &handle.repo)?
504 {
505 if matches!(
509 repository::ancestry(&handle.repo, at, sigrefs.at)?,
510 repository::Ancestry::Behind | repository::Ancestry::Diverged
511 ) {
512 self.prune(&remote);
513 continue;
514 }
515 }
516
517 let cache = self.as_cached(handle);
518 if let Some(warns) = sigrefs::validate(&cache, sigrefs)?.as_mut() {
519 log::debug!(
520 target: "fetch",
521 "Pruning non-delegate {remote} tips, due to validation failures"
522 );
523 self.prune(&remote);
524 failures.append(warns);
525 } else {
526 remotes.insert(remote);
527 }
528 }
529 sigrefs::DelegateStatus::Delegate {
530 remote,
531 data: Some(sigrefs),
532 } => {
533 if let Some(SignedRefsAt { at, .. }) = SignedRefsAt::load(remote, &handle.repo)?
534 {
535 let ancestry = repository::ancestry(&handle.repo, at, sigrefs.at)?;
536 if matches!(ancestry, repository::Ancestry::Behind) {
537 log::trace!(target: "fetch", "Advertised `rad/sigrefs` {} is behind {at} for {remote}", sigrefs.at);
538 self.prune(&remote);
539 continue;
540 } else if matches!(ancestry, repository::Ancestry::Diverged) {
541 return Err(error::Protocol::Diverged {
542 remote,
543 current: at,
544 received: sigrefs.at,
545 });
546 }
547 }
548
549 let cache = self.as_cached(handle);
550 let mut fails =
551 sigrefs::validate(&cache, sigrefs)?.unwrap_or(Validations::default());
552 if !fails.is_empty() {
553 log::warn!(target: "fetch", "Pruning delegate {remote} tips, due to validation failures");
554 self.prune(&remote);
555 valid_delegates.remove(&remote);
556 failed_delegates.insert(remote);
557 failures.append(&mut fails)
558 } else {
559 valid_delegates.insert(remote);
560 remotes.insert(remote);
561 }
562 }
563 }
564 }
565 log::debug!(
566 target: "fetch",
567 "Validated {} remote(s) ({}ms)",
568 remotes.len(),
569 start.elapsed().as_millis()
570 );
571
572 if valid_delegates.len() >= threshold {
575 let applied = repository::update(
576 &handle.repo,
577 self.tips
578 .clone()
579 .into_values()
580 .flat_map(|ups| ups.into_iter()),
581 )?;
582 log::debug!(target: "fetch", "Applied updates ({}ms)", start.elapsed().as_millis());
583 Ok(FetchResult::Success {
584 applied,
585 remotes,
586 validations: failures,
587 })
588 } else {
589 log::debug!(
590 target: "fetch",
591 "Fetch failed: {} failure(s) ({}ms)",
592 failures.len(),
593 start.elapsed().as_millis()
594 );
595 Ok(FetchResult::Failed {
596 threshold,
597 delegates: failed_delegates,
598 validations: failures,
599 })
600 }
601 }
602}
603
604pub(crate) struct Cached<'a, S> {
607 handle: &'a mut Handle<S>,
608 state: &'a mut FetchState,
609}
610
611impl<S> Cached<'_, S> {
612 pub fn refname_to_id<'b, N>(
615 &self,
616 refname: N,
617 ) -> Result<Option<Oid>, repository::error::Resolve>
618 where
619 N: Into<Qualified<'b>>,
620 {
621 let refname = refname.into();
622 match self.state.refs.refname_to_id(refname.clone()) {
623 None => repository::refname_to_id(&self.handle.repo, refname),
624 Some(oid) => Ok(Some(oid)),
625 }
626 }
627
628 pub fn canonical_rad_id(&self) -> Option<Oid> {
630 self.state.canonical_rad_id().copied()
631 }
632
633 pub fn verified(&self, head: Oid) -> Result<Doc, DocError> {
634 self.handle.verified(head)
635 }
636
637 pub fn canonical(&self) -> Result<Option<Doc>, error::Canonical> {
638 let tip = self.refname_to_id(refs::REFS_RAD_ID.clone())?;
639 let cached_tip = self.canonical_rad_id();
640
641 tip.or(cached_tip)
642 .map(|tip| self.verified(tip).map_err(error::Canonical::from))
643 .transpose()
644 }
645
646 pub fn load(&self, remote: &PublicKey) -> Result<Option<SignedRefsAt>, sigrefs::error::Load> {
647 match self.state.sigrefs.get(remote) {
648 None => SignedRefsAt::load(*remote, &self.handle.repo),
649 Some(tip) => SignedRefsAt::load_at(*tip, *remote, &self.handle.repo).map(Some),
650 }
651 }
652
653 #[allow(dead_code)]
654 pub(crate) fn inspect(&self) {
655 self.state.refs.inspect()
656 }
657}
658
659impl<S> RemoteRepository for Cached<'_, S> {
660 fn remote(&self, remote: &RemoteId) -> Result<Remote, storage::refs::Error> {
661 self.handle.repo.remote(remote)
664 }
665
666 fn remotes(&self) -> Result<Remotes<Verified>, storage::refs::Error> {
667 self.state
668 .sigrefs
669 .keys()
670 .map(|id| self.remote(id).map(|remote| (*id, remote)))
671 .collect::<Result<_, _>>()
672 }
673
674 fn remote_refs_at(&self) -> Result<Vec<RefsAt>, storage::refs::Error> {
675 self.handle.repo.remote_refs_at()
676 }
677}
678
679impl<S> ValidateRepository for Cached<'_, S> {
680 fn validate_remote(&self, remote: &Remote) -> Result<Validations, storage::Error> {
684 let mut signed = BTreeMap::from((*remote.refs).clone());
686 let mut validations = Validations::default();
687 let mut has_sigrefs = false;
688
689 for (refname, oid) in self.state.refs.references_of(&remote.id) {
691 if refname == storage::refs::SIGREFS_BRANCH.to_ref_string() {
693 has_sigrefs = true;
694 continue;
695 }
696 if let Some(signed_oid) = signed.remove(&refname) {
697 if oid != signed_oid {
698 validations.push(Validation::MismatchedRef {
699 refname,
700 expected: signed_oid,
701 actual: oid,
702 });
703 }
704 } else {
705 validations.push(Validation::UnsignedRef(refname));
706 }
707 }
708
709 if !has_sigrefs {
710 validations.push(Validation::MissingRadSigRefs(remote.id));
711 }
712
713 for (name, _) in signed.into_iter() {
716 validations.push(Validation::MissingRef {
717 refname: name,
718 remote: remote.id,
719 });
720 }
721
722 Ok(validations)
723 }
724}