1use std::collections::{BTreeMap, BTreeSet};
2use std::time::Instant;
3
4use gix_protocol::handshake;
5use radicle::crypto::PublicKey;
6use radicle::git::{Oid, Qualified};
7use radicle::identity::{Did, Doc, DocError};
8
9use radicle::prelude::Verified;
10use radicle::storage;
11use radicle::storage::refs::RefsAt;
12use radicle::storage::{
13 git::Validation, Remote, RemoteId, RemoteRepository, Remotes, ValidateRepository, Validations,
14};
15
16use crate::git;
17use crate::git::packfile::Keepfile;
18use crate::git::refs::{Applied, Update};
19use crate::git::repository;
20use crate::sigrefs::SignedRefsAt;
21use crate::stage;
22use crate::stage::ProtocolStage;
23use crate::{refs, sigrefs, transport, Handle};
24
25pub const DEFAULT_FETCH_SPECIAL_REFS_LIMIT: u64 = 1024 * 1024 * 5;
28pub const DEFAULT_FETCH_DATA_REFS_LIMIT: u64 = 1024 * 1024 * 1024 * 5;
31
32pub mod error {
33 use radicle::git::Oid;
34 use radicle::prelude::PublicKey;
35 use thiserror::Error;
36
37 use crate::{git, git::repository, handle, sigrefs, stage, transport};
38
39 #[derive(Debug, Error)]
40 pub enum Step {
41 #[error(transparent)]
42 Layout(#[from] stage::error::Layout),
43 #[error(transparent)]
44 Prepare(#[from] stage::error::Prepare),
45 #[error(transparent)]
46 WantsHaves(#[from] stage::error::WantsHaves),
47 #[error(transparent)]
48 Transport(#[from] transport::Error),
49 }
50
51 #[derive(Debug, Error)]
52 pub enum Protocol {
53 #[error(transparent)]
54 Ancestry(#[from] repository::error::Ancestry),
55 #[error(transparent)]
56 Canonical(#[from] Canonical),
57 #[error("delegate '{remote}' has diverged 'rad/sigrefs': {current} -> {received}")]
58 Diverged {
59 remote: PublicKey,
60 current: Oid,
61 received: Oid,
62 },
63 #[error("canonical 'refs/rad/id' is missing")]
64 MissingRadId,
65 #[error(transparent)]
66 RefdbUpdate(#[from] repository::error::Update),
67 #[error(transparent)]
68 Resolve(#[from] repository::error::Resolve),
69 #[error(transparent)]
70 Refs(#[from] radicle::storage::refs::Error),
71 #[error(transparent)]
72 RemoteRefs(#[from] sigrefs::error::RemoteRefs),
73 #[error("failed to get remote namespaces: {0}")]
74 RemoteIds(#[source] radicle::git::raw::Error),
75 #[error(transparent)]
76 Step(#[from] Step),
77 #[error(transparent)]
78 Tracking(#[from] handle::error::Tracking),
79 #[error(transparent)]
80 Validation(#[from] radicle::storage::Error),
81 }
82
83 #[derive(Debug, Error)]
84 pub enum Canonical {
85 #[error(transparent)]
86 Resolve(#[from] git::repository::error::Resolve),
87 #[error(transparent)]
88 Verified(#[from] radicle::identity::DocError),
89 }
90}
91
92type IdentityTips = BTreeMap<PublicKey, Oid>;
93type SigrefTips = BTreeMap<PublicKey, Oid>;
94
95#[derive(Clone, Copy, Debug)]
96pub struct FetchLimit {
97 pub special: u64,
98 pub refs: u64,
99}
100
101impl Default for FetchLimit {
102 fn default() -> Self {
103 Self {
104 special: DEFAULT_FETCH_SPECIAL_REFS_LIMIT,
105 refs: DEFAULT_FETCH_DATA_REFS_LIMIT,
106 }
107 }
108}
109
110#[derive(Debug)]
111pub enum FetchResult {
112 Success {
113 applied: Applied<'static>,
115 remotes: BTreeSet<PublicKey>,
117 validations: sigrefs::Validations,
119 },
120 Failed {
121 threshold: usize,
123 delegates: BTreeSet<PublicKey>,
125 validations: sigrefs::Validations,
127 },
128}
129
130impl FetchResult {
131 pub fn rejected(&self) -> impl Iterator<Item = &Update<'static>> {
132 match self {
133 Self::Success { applied, .. } => either::Either::Left(applied.rejected.iter()),
134 Self::Failed { .. } => either::Either::Right(std::iter::empty()),
135 }
136 }
137
138 pub fn is_success(&self) -> bool {
139 match self {
140 Self::Success { .. } => true,
141 Self::Failed { .. } => false,
142 }
143 }
144}
145
146#[derive(Default)]
147pub struct FetchState {
148 refs: git::mem::Refdb,
152 canonical_rad_id: Option<Oid>,
154 ids: IdentityTips,
156 sigrefs: SigrefTips,
158 tips: BTreeMap<PublicKey, Vec<Update<'static>>>,
160 keepfiles: Vec<Keepfile>,
164}
165
166impl FetchState {
167 pub fn prune(&mut self, remote: &PublicKey) {
170 self.ids.remove(remote);
171 self.sigrefs.remove(remote);
172 self.tips.remove(remote);
173 }
174
175 pub fn canonical_rad_id(&self) -> Option<&Oid> {
176 self.canonical_rad_id.as_ref()
177 }
178
179 pub fn update_all<'a, I>(&mut self, other: I) -> Applied<'a>
182 where
183 I: IntoIterator<Item = (PublicKey, Vec<Update<'a>>)>,
184 {
185 let mut ap = Applied::default();
186 for (remote, ups) in other {
187 for up in &ups {
188 ap.append(&mut self.refs.update(Some(up.clone())));
189 }
190 let mut ups = ups
191 .into_iter()
192 .map(|up| up.into_owned())
193 .collect::<Vec<_>>();
194 self.tips
195 .entry(remote)
196 .and_modify(|tips| tips.append(&mut ups))
197 .or_insert(ups);
198 }
199 ap
200 }
201
202 pub(crate) fn as_cached<'a, S>(&'a mut self, handle: &'a mut Handle<S>) -> Cached<'a, S> {
203 Cached {
204 handle,
205 state: self,
206 }
207 }
208}
209
210impl FetchState {
211 pub(super) fn run_stage<S, F>(
214 &mut self,
215 handle: &mut Handle<S>,
216 handshake: &handshake::Outcome,
217 step: &F,
218 ) -> Result<BTreeSet<PublicKey>, error::Step>
219 where
220 S: transport::ConnectionStream,
221 F: ProtocolStage,
222 {
223 let refs = match step.ls_refs() {
224 Some(refs) => handle
225 .transport
226 .ls_refs(refs.into(), handshake)?
227 .into_iter()
228 .filter_map(|r| step.ref_filter(r))
229 .collect::<Vec<_>>(),
230 None => vec![],
231 };
232 log::trace!(target: "fetch", "Received refs {refs:?}");
233 step.pre_validate(&refs)?;
234
235 let wants_haves = step.wants_haves(&handle.repo, &refs)?;
236 if !wants_haves.wants.is_empty() {
237 let keepfile =
238 handle
239 .transport
240 .fetch(wants_haves, handle.interrupt.clone(), handshake)?;
241 self.keepfiles.extend(keepfile);
242 } else {
243 log::trace!(target: "fetch", "Nothing to fetch")
244 };
245
246 let mut fetched = BTreeSet::new();
247 for r in &refs {
248 match &r.name {
249 refs::ReceivedRefname::Namespaced { remote, suffix } => {
250 fetched.insert(*remote);
251 if let Some(rad) = suffix.as_ref().left() {
252 match rad {
253 refs::Special::Id => {
254 self.ids.insert(*remote, r.tip);
255 }
256
257 refs::Special::SignedRefs => {
258 self.sigrefs.insert(*remote, r.tip);
259 }
260 }
261 }
262 }
263 refs::ReceivedRefname::RadId => self.canonical_rad_id = Some(r.tip),
264 }
265 }
266
267 let up = step.prepare_updates(self, &handle.repo, &refs)?;
268 self.update_all(up.tips);
269
270 Ok(fetched)
271 }
272
273 #[allow(clippy::too_many_arguments)]
286 fn run_special_refs<S>(
287 &mut self,
288 handle: &mut Handle<S>,
289 handshake: &handshake::Outcome,
290 delegates: BTreeSet<PublicKey>,
291 threshold: usize,
292 limit: &FetchLimit,
293 remote: PublicKey,
294 refs_at: Option<Vec<RefsAt>>,
295 ) -> Result<sigrefs::RemoteRefs, error::Protocol>
296 where
297 S: transport::ConnectionStream,
298 {
299 match refs_at {
300 Some(refs_at) => {
301 let sigrefs_at = stage::SigrefsAt {
302 remote,
303 delegates: delegates.clone(),
304 refs_at: refs_at.clone(),
305 blocked: handle.blocked.clone(),
306 limit: limit.special,
307 };
308 log::trace!(target: "fetch", "{sigrefs_at:?}");
309 self.run_stage(handle, handshake, &sigrefs_at)?;
310 let remotes = refs_at.iter().map(|r| &r.remote);
311
312 let signed_refs = sigrefs::RemoteRefs::load(&self.as_cached(handle), remotes)?;
313 Ok(signed_refs)
314 }
315 None => {
316 let followed = handle.allowed();
317 log::trace!(target: "fetch", "Followed nodes {followed:?}");
318 let special_refs = stage::SpecialRefs {
319 blocked: handle.blocked.clone(),
320 remote,
321 delegates: delegates.clone(),
322 followed,
323 threshold,
324 limit: limit.special,
325 };
326 log::trace!(target: "fetch", "{special_refs:?}");
327 let fetched = self.run_stage(handle, handshake, &special_refs)?;
328
329 let signed_refs = sigrefs::RemoteRefs::load(
330 &self.as_cached(handle),
331 fetched.iter().chain(delegates.iter()),
332 )?;
333 Ok(signed_refs)
334 }
335 }
336 }
337
338 pub(super) fn run<S>(
355 mut self,
356 handle: &mut Handle<S>,
357 handshake: &handshake::Outcome,
358 limit: FetchLimit,
359 remote: PublicKey,
360 refs_at: Option<Vec<RefsAt>>,
361 ) -> Result<FetchResult, error::Protocol>
362 where
363 S: transport::ConnectionStream,
364 {
365 let start = Instant::now();
366 self.run_stage(
370 handle,
371 handshake,
372 &stage::CanonicalId {
373 remote,
374 limit: limit.special,
375 },
376 )?;
377 log::debug!(target: "fetch", "Fetched rad/id ({}ms)", start.elapsed().as_millis());
378
379 let anchor = self
384 .as_cached(handle)
385 .canonical()?
386 .ok_or(error::Protocol::MissingRadId)?;
387
388 let is_delegate = anchor.is_delegate(&Did::from(handle.local()));
389 let delegates = anchor
392 .delegates()
393 .iter()
394 .filter(|id| !handle.is_blocked(id))
395 .map(|did| PublicKey::from(*did))
396 .collect::<BTreeSet<_>>();
397
398 log::trace!(target: "fetch", "Identity delegates {delegates:?}");
399
400 let threshold = if is_delegate {
403 anchor.threshold() - 1
404 } else {
405 anchor.threshold()
406 };
407 let signed_refs = self.run_special_refs(
408 handle,
409 handshake,
410 delegates.clone(),
411 threshold,
412 &limit,
413 remote,
414 refs_at,
415 )?;
416 log::debug!(
417 target: "fetch",
418 "Fetched data for {} remote(s) ({}ms)",
419 signed_refs.len(),
420 start.elapsed().as_millis()
421 );
422
423 let data_refs = stage::DataRefs {
424 remote,
425 remotes: signed_refs,
426 limit: limit.refs,
427 };
428 self.run_stage(handle, handshake, &data_refs)?;
429 log::debug!(
430 target: "fetch",
431 "Fetched data refs for {} remotes ({}ms)",
432 data_refs.remotes.len(),
433 start.elapsed().as_millis()
434 );
435
436 match handle.transport.done() {
440 Ok(()) => log::debug!(target: "fetch", "Sent done signal to remote {remote}"),
441 Err(err) => {
442 log::warn!(target: "fetch", "Attempted to send done to remote {remote}: {err}")
443 }
444 }
445
446 let mut failures = sigrefs::Validations::default();
450 let signed_refs = data_refs.remotes;
451
452 let mut remotes = BTreeSet::new();
455
456 let mut valid_delegates = handle
459 .repository()
460 .remote_ids()
461 .map_err(error::Protocol::RemoteIds)?
462 .filter_map(|id| id.ok())
463 .filter(|id| delegates.contains(id))
464 .collect::<BTreeSet<_>>();
465 let mut failed_delegates = BTreeSet::new();
466
467 for remote in signed_refs.keys() {
470 if handle.is_blocked(remote) {
471 log::trace!(target: "fetch", "Skipping blocked remote {remote}");
472 continue;
473 }
474
475 let remote = sigrefs::DelegateStatus::empty(*remote, &delegates)
476 .load(&self.as_cached(handle))?;
477 match remote {
478 sigrefs::DelegateStatus::NonDelegate { remote, data: None } => {
479 log::debug!(target: "fetch", "Pruning non-delegate {remote} tips, missing 'rad/sigrefs'");
480 failures.push(sigrefs::Validation::MissingRadSigRefs(remote));
481 self.prune(&remote);
482 }
483 sigrefs::DelegateStatus::Delegate { remote, data: None } => {
484 log::warn!(target: "fetch", "Pruning delegate {remote} tips, missing 'rad/sigrefs'");
485 failures.push(sigrefs::Validation::MissingRadSigRefs(remote));
486 self.prune(&remote);
487 valid_delegates.remove(&remote);
493 failed_delegates.insert(remote);
494 }
495 sigrefs::DelegateStatus::NonDelegate {
496 remote,
497 data: Some(sigrefs),
498 } => {
499 if let Some(SignedRefsAt { at, .. }) = SignedRefsAt::load(remote, &handle.repo)?
500 {
501 if matches!(
505 repository::ancestry(&handle.repo, at, sigrefs.at)?,
506 repository::Ancestry::Behind | repository::Ancestry::Diverged
507 ) {
508 self.prune(&remote);
509 continue;
510 }
511 }
512
513 let cache = self.as_cached(handle);
514 if let Some(warns) = sigrefs::validate(&cache, sigrefs)?.as_mut() {
515 log::debug!(
516 target: "fetch",
517 "Pruning non-delegate {remote} tips, due to validation failures"
518 );
519 self.prune(&remote);
520 failures.append(warns);
521 } else {
522 remotes.insert(remote);
523 }
524 }
525 sigrefs::DelegateStatus::Delegate {
526 remote,
527 data: Some(sigrefs),
528 } => {
529 if let Some(SignedRefsAt { at, .. }) = SignedRefsAt::load(remote, &handle.repo)?
530 {
531 let ancestry = repository::ancestry(&handle.repo, at, sigrefs.at)?;
532 if matches!(ancestry, repository::Ancestry::Behind) {
533 log::trace!(target: "fetch", "Advertised `rad/sigrefs` {} is behind {at} for {remote}", sigrefs.at);
534 self.prune(&remote);
535 continue;
536 } else if matches!(ancestry, repository::Ancestry::Diverged) {
537 return Err(error::Protocol::Diverged {
538 remote,
539 current: at,
540 received: sigrefs.at,
541 });
542 }
543 }
544
545 let cache = self.as_cached(handle);
546 let mut fails =
547 sigrefs::validate(&cache, sigrefs)?.unwrap_or(Validations::default());
548 if !fails.is_empty() {
549 log::warn!(target: "fetch", "Pruning delegate {remote} tips, due to validation failures");
550 self.prune(&remote);
551 valid_delegates.remove(&remote);
552 failed_delegates.insert(remote);
553 failures.append(&mut fails)
554 } else {
555 valid_delegates.insert(remote);
556 remotes.insert(remote);
557 }
558 }
559 }
560 }
561 log::debug!(
562 target: "fetch",
563 "Validated {} remote(s) ({}ms)",
564 remotes.len(),
565 start.elapsed().as_millis()
566 );
567
568 if valid_delegates.len() >= threshold {
571 let applied = repository::update(
572 &handle.repo,
573 self.tips
574 .clone()
575 .into_values()
576 .flat_map(|ups| ups.into_iter()),
577 )?;
578 log::debug!(target: "fetch", "Applied updates ({}ms)", start.elapsed().as_millis());
579 Ok(FetchResult::Success {
580 applied,
581 remotes,
582 validations: failures,
583 })
584 } else {
585 log::debug!(
586 target: "fetch",
587 "Fetch failed: {} failure(s) ({}ms)",
588 failures.len(),
589 start.elapsed().as_millis()
590 );
591 Ok(FetchResult::Failed {
592 threshold,
593 delegates: failed_delegates,
594 validations: failures,
595 })
596 }
597 }
598}
599
600pub(crate) struct Cached<'a, S> {
603 handle: &'a mut Handle<S>,
604 state: &'a mut FetchState,
605}
606
607impl<S> Cached<'_, S> {
608 pub fn refname_to_id<'b, N>(
611 &self,
612 refname: N,
613 ) -> Result<Option<Oid>, repository::error::Resolve>
614 where
615 N: Into<Qualified<'b>>,
616 {
617 let refname = refname.into();
618 match self.state.refs.refname_to_id(refname.clone()) {
619 None => repository::refname_to_id(&self.handle.repo, refname),
620 Some(oid) => Ok(Some(oid)),
621 }
622 }
623
624 pub fn canonical_rad_id(&self) -> Option<Oid> {
626 self.state.canonical_rad_id().copied()
627 }
628
629 pub fn verified(&self, head: Oid) -> Result<Doc, DocError> {
630 self.handle.verified(head)
631 }
632
633 pub fn canonical(&self) -> Result<Option<Doc>, error::Canonical> {
634 let tip = self.refname_to_id(refs::REFS_RAD_ID.clone())?;
635 let cached_tip = self.canonical_rad_id();
636
637 tip.or(cached_tip)
638 .map(|tip| self.verified(tip).map_err(error::Canonical::from))
639 .transpose()
640 }
641
642 pub fn load(&self, remote: &PublicKey) -> Result<Option<SignedRefsAt>, sigrefs::error::Load> {
643 match self.state.sigrefs.get(remote) {
644 None => SignedRefsAt::load(*remote, &self.handle.repo),
645 Some(tip) => SignedRefsAt::load_at(*tip, *remote, &self.handle.repo).map(Some),
646 }
647 }
648
649 #[allow(dead_code)]
650 pub(crate) fn inspect(&self) {
651 self.state.refs.inspect()
652 }
653}
654
655impl<S> RemoteRepository for Cached<'_, S> {
656 fn remote(&self, remote: &RemoteId) -> Result<Remote, storage::refs::Error> {
657 self.handle.repo.remote(remote)
660 }
661
662 fn remotes(&self) -> Result<Remotes<Verified>, storage::refs::Error> {
663 self.state
664 .sigrefs
665 .keys()
666 .map(|id| self.remote(id).map(|remote| (*id, remote)))
667 .collect::<Result<_, _>>()
668 }
669
670 fn remote_refs_at(&self) -> Result<Vec<RefsAt>, storage::refs::Error> {
671 self.handle.repo.remote_refs_at()
672 }
673}
674
675impl<S> ValidateRepository for Cached<'_, S> {
676 fn validate_remote(&self, remote: &Remote) -> Result<Validations, storage::Error> {
680 let mut signed = BTreeMap::from((*remote.refs).clone());
682 let mut validations = Validations::default();
683 let mut has_sigrefs = false;
684
685 for (refname, oid) in self.state.refs.references_of(&remote.id) {
687 if refname == storage::refs::SIGREFS_BRANCH.to_ref_string() {
689 has_sigrefs = true;
690 continue;
691 }
692 if let Some(signed_oid) = signed.remove(&refname) {
693 if oid != signed_oid {
694 validations.push(Validation::MismatchedRef {
695 refname,
696 expected: signed_oid,
697 actual: oid,
698 });
699 }
700 } else {
701 validations.push(Validation::UnsignedRef(refname));
702 }
703 }
704
705 if !has_sigrefs {
706 validations.push(Validation::MissingRadSigRefs(remote.id));
707 }
708
709 for (name, _) in signed.into_iter() {
712 validations.push(Validation::MissingRef {
713 refname: name,
714 remote: remote.id,
715 });
716 }
717
718 Ok(validations)
719 }
720}