1use std::collections::{BTreeMap, BTreeSet};
2use std::time::Instant;
3
4use gix_protocol::handshake;
5use radicle::crypto::PublicKey;
6use radicle::git::{Oid, Qualified};
7use radicle::identity::{Did, Doc, DocError};
8
9use radicle::prelude::Verified;
10use radicle::storage;
11use radicle::storage::git::Repository;
12use radicle::storage::refs::RefsAt;
13use radicle::storage::{
14 git::Validation, Remote, RemoteId, RemoteRepository, Remotes, ValidateRepository, Validations,
15};
16
17use crate::git;
18use crate::git::packfile::Keepfile;
19use crate::git::refs::{Applied, Update};
20use crate::git::repository;
21use crate::sigrefs::SignedRefsAt;
22use crate::stage;
23use crate::stage::ProtocolStage;
24use crate::{refs, sigrefs, transport, Handle};
25
26pub const DEFAULT_FETCH_SPECIAL_REFS_LIMIT: u64 = 1024 * 1024 * 5;
29pub const DEFAULT_FETCH_DATA_REFS_LIMIT: u64 = 1024 * 1024 * 1024 * 5;
32
33pub mod error {
34 use radicle::git::Oid;
35 use radicle::prelude::PublicKey;
36 use thiserror::Error;
37
38 use crate::{git, git::repository, handle, sigrefs, stage, transport};
39
40 #[derive(Debug, Error)]
41 pub enum Step {
42 #[error(transparent)]
43 Layout(#[from] stage::error::Layout),
44 #[error(transparent)]
45 Prepare(#[from] stage::error::Prepare),
46 #[error(transparent)]
47 WantsHaves(#[from] stage::error::WantsHaves),
48 #[error(transparent)]
49 Transport(#[from] transport::Error),
50 }
51
52 #[derive(Debug, Error)]
53 pub enum Protocol {
54 #[error(transparent)]
55 Ancestry(#[from] repository::error::Ancestry),
56 #[error(transparent)]
57 Canonical(#[from] Canonical),
58 #[error("delegate '{remote}' has diverged 'rad/sigrefs': {current} -> {received}")]
59 Diverged {
60 remote: PublicKey,
61 current: Oid,
62 received: Oid,
63 },
64 #[error("canonical 'refs/rad/id' is missing")]
65 MissingRadId,
66 #[error(transparent)]
67 RefdbUpdate(#[from] repository::error::Update),
68 #[error(transparent)]
69 Resolve(#[from] repository::error::Resolve),
70 #[error(transparent)]
71 Refs(#[from] radicle::storage::refs::Error),
72 #[error(transparent)]
73 RemoteRefs(#[from] sigrefs::error::RemoteRefs),
74 #[error("failed to get remote namespaces: {0}")]
75 RemoteIds(#[source] radicle::git::raw::Error),
76 #[error(transparent)]
77 Step(#[from] Step),
78 #[error(transparent)]
79 Tracking(#[from] handle::error::Tracking),
80 #[error(transparent)]
81 Validation(#[from] radicle::storage::Error),
82 }
83
84 #[derive(Debug, Error)]
85 pub enum Canonical {
86 #[error(transparent)]
87 Resolve(#[from] git::repository::error::Resolve),
88 #[error(transparent)]
89 Verified(#[from] radicle::identity::DocError),
90 }
91}
92
93type IdentityTips = BTreeMap<PublicKey, Oid>;
94type SigrefTips = BTreeMap<PublicKey, Oid>;
95
96#[derive(Clone, Copy, Debug)]
97pub struct FetchLimit {
98 pub special: u64,
99 pub refs: u64,
100}
101
102impl Default for FetchLimit {
103 fn default() -> Self {
104 Self {
105 special: DEFAULT_FETCH_SPECIAL_REFS_LIMIT,
106 refs: DEFAULT_FETCH_DATA_REFS_LIMIT,
107 }
108 }
109}
110
111#[derive(Debug)]
112pub enum FetchResult {
113 Success {
114 applied: Applied<'static>,
116 remotes: BTreeSet<PublicKey>,
118 validations: sigrefs::Validations,
120 },
121 Failed {
122 threshold: usize,
124 delegates: BTreeSet<PublicKey>,
126 validations: sigrefs::Validations,
128 },
129}
130
131impl FetchResult {
132 pub fn rejected(&self) -> impl Iterator<Item = &Update<'static>> {
133 match self {
134 Self::Success { applied, .. } => either::Either::Left(applied.rejected.iter()),
135 Self::Failed { .. } => either::Either::Right(std::iter::empty()),
136 }
137 }
138
139 pub fn is_success(&self) -> bool {
140 std::matches!(self, Self::Success { .. })
141 }
142}
143
144#[derive(Default)]
145pub struct FetchState {
146 refs: git::mem::Refdb,
150 canonical_rad_id: Option<Oid>,
152 ids: IdentityTips,
154 sigrefs: SigrefTips,
156 tips: BTreeMap<PublicKey, Vec<Update<'static>>>,
158 keepfiles: Vec<Keepfile>,
162}
163
164impl FetchState {
165 pub fn prune(&mut self, remote: &PublicKey) {
168 self.ids.remove(remote);
169 self.sigrefs.remove(remote);
170 self.tips.remove(remote);
171 }
172
173 pub fn canonical_rad_id(&self) -> Option<&Oid> {
174 self.canonical_rad_id.as_ref()
175 }
176
177 pub fn update_all<'a, I>(&mut self, other: I) -> Applied<'a>
180 where
181 I: IntoIterator<Item = (PublicKey, Vec<Update<'a>>)>,
182 {
183 let mut ap = Applied::default();
184 for (remote, ups) in other {
185 for up in &ups {
186 ap.append(&mut self.refs.update(Some(up.clone())));
187 }
188 let mut ups = ups
189 .into_iter()
190 .map(|up| up.into_owned())
191 .collect::<Vec<_>>();
192 self.tips
193 .entry(remote)
194 .and_modify(|tips| tips.append(&mut ups))
195 .or_insert(ups);
196 }
197 ap
198 }
199
200 pub(crate) fn as_cached<'a, R, S>(
201 &'a mut self,
202 handle: &'a mut Handle<R, S>,
203 ) -> Cached<'a, R, S> {
204 Cached {
205 handle,
206 state: self,
207 }
208 }
209}
210
211impl FetchState {
212 pub(super) fn run_stage<R, S, F>(
215 &mut self,
216 handle: &mut Handle<R, S>,
217 handshake: &handshake::Outcome,
218 step: &F,
219 ) -> Result<BTreeSet<PublicKey>, error::Step>
220 where
221 R: AsRef<Repository>,
222 S: transport::ConnectionStream,
223 F: ProtocolStage,
224 {
225 let refs = match step.ls_refs() {
226 Some(refs) => handle
227 .transport
228 .ls_refs(refs.into(), handshake)?
229 .into_iter()
230 .filter_map(|r| step.ref_filter(r))
231 .collect::<Vec<_>>(),
232 None => vec![],
233 };
234 log::trace!(target: "fetch", "Received refs {refs:?}");
235 step.pre_validate(&refs)?;
236
237 let wants_haves = step.wants_haves(handle.repository(), &refs)?;
238 if !wants_haves.wants.is_empty() {
239 let keepfile =
240 handle
241 .transport
242 .fetch(wants_haves, handle.interrupt.clone(), handshake)?;
243 self.keepfiles.extend(keepfile);
244 } else {
245 log::trace!(target: "fetch", "Nothing to fetch")
246 };
247
248 let mut fetched = BTreeSet::new();
249 for r in &refs {
250 match &r.name {
251 refs::ReceivedRefname::Namespaced { remote, suffix } => {
252 fetched.insert(*remote);
253 if let Some(rad) = suffix.as_ref().left() {
254 match rad {
255 refs::Special::Id => {
256 self.ids.insert(*remote, r.tip);
257 }
258
259 refs::Special::SignedRefs => {
260 self.sigrefs.insert(*remote, r.tip);
261 }
262 }
263 }
264 }
265 refs::ReceivedRefname::RadId => self.canonical_rad_id = Some(r.tip),
266 }
267 }
268
269 let up = step.prepare_updates(self, handle.repository(), &refs)?;
270 self.update_all(up.tips);
271
272 Ok(fetched)
273 }
274
275 #[allow(clippy::too_many_arguments)]
288 fn run_special_refs<R, S>(
289 &mut self,
290 handle: &mut Handle<R, S>,
291 handshake: &handshake::Outcome,
292 delegates: BTreeSet<PublicKey>,
293 threshold: usize,
294 limit: &FetchLimit,
295 remote: PublicKey,
296 refs_at: Option<Vec<RefsAt>>,
297 ) -> Result<sigrefs::RemoteRefs, error::Protocol>
298 where
299 R: AsRef<Repository>,
300 S: transport::ConnectionStream,
301 {
302 match refs_at {
303 Some(refs_at) => {
304 let sigrefs_at = stage::SigrefsAt {
305 remote,
306 delegates: delegates.clone(),
307 refs_at: refs_at.clone(),
308 blocked: handle.blocked.clone(),
309 limit: limit.special,
310 };
311 log::trace!(target: "fetch", "{sigrefs_at:?}");
312 self.run_stage(handle, handshake, &sigrefs_at)?;
313 let remotes = refs_at.iter().map(|r| &r.remote);
314
315 let signed_refs = sigrefs::RemoteRefs::load(&self.as_cached(handle), remotes)?;
316 Ok(signed_refs)
317 }
318 None => {
319 let followed = handle.allowed();
320 log::trace!(target: "fetch", "Followed nodes {followed:?}");
321 let special_refs = stage::SpecialRefs {
322 blocked: handle.blocked.clone(),
323 remote,
324 delegates: delegates.clone(),
325 followed,
326 threshold,
327 limit: limit.special,
328 };
329 log::trace!(target: "fetch", "{special_refs:?}");
330 let fetched = self.run_stage(handle, handshake, &special_refs)?;
331
332 let signed_refs = sigrefs::RemoteRefs::load(
333 &self.as_cached(handle),
334 fetched.iter().chain(delegates.iter()),
335 )?;
336 Ok(signed_refs)
337 }
338 }
339 }
340
341 pub(super) fn run<R, S>(
358 mut self,
359 handle: &mut Handle<R, S>,
360 handshake: &handshake::Outcome,
361 limit: FetchLimit,
362 remote: PublicKey,
363 refs_at: Option<Vec<RefsAt>>,
364 ) -> Result<FetchResult, error::Protocol>
365 where
366 R: AsRef<Repository>,
367 S: transport::ConnectionStream,
368 {
369 let start = Instant::now();
370 self.run_stage(
374 handle,
375 handshake,
376 &stage::CanonicalId {
377 remote,
378 limit: limit.special,
379 },
380 )?;
381 log::debug!(target: "fetch", "Fetched rad/id ({}ms)", start.elapsed().as_millis());
382
383 let anchor = self
388 .as_cached(handle)
389 .canonical()?
390 .ok_or(error::Protocol::MissingRadId)?;
391
392 let is_delegate = anchor.is_delegate(&Did::from(handle.local()));
393 let delegates = anchor
396 .delegates()
397 .iter()
398 .filter(|id| !handle.is_blocked(id))
399 .map(|did| PublicKey::from(*did))
400 .collect::<BTreeSet<_>>();
401
402 log::trace!(target: "fetch", "Identity delegates {delegates:?}");
403
404 let threshold = if is_delegate {
407 anchor.threshold() - 1
408 } else {
409 anchor.threshold()
410 };
411 let signed_refs = self.run_special_refs(
412 handle,
413 handshake,
414 delegates.clone(),
415 threshold,
416 &limit,
417 remote,
418 refs_at,
419 )?;
420 log::debug!(
421 target: "fetch",
422 "Fetched data for {} remote(s) ({}ms)",
423 signed_refs.len(),
424 start.elapsed().as_millis()
425 );
426
427 let data_refs = stage::DataRefs {
428 remote,
429 remotes: signed_refs,
430 limit: limit.refs,
431 };
432 self.run_stage(handle, handshake, &data_refs)?;
433 log::debug!(
434 target: "fetch",
435 "Fetched data refs for {} remotes ({}ms)",
436 data_refs.remotes.len(),
437 start.elapsed().as_millis()
438 );
439
440 match handle.transport.done() {
444 Ok(()) => log::debug!(target: "fetch", "Sent done signal to remote {remote}"),
445 Err(err) => {
446 log::warn!(target: "fetch", "Attempted to send done to remote {remote}: {err}")
447 }
448 }
449
450 let mut failures = sigrefs::Validations::default();
454 let signed_refs = data_refs.remotes;
455
456 let mut remotes = BTreeSet::new();
459
460 let mut valid_delegates = handle
463 .repository()
464 .remote_ids()
465 .map_err(error::Protocol::RemoteIds)?
466 .filter_map(|id| id.ok())
467 .filter(|id| delegates.contains(id))
468 .collect::<BTreeSet<_>>();
469 let mut failed_delegates = BTreeSet::new();
470
471 for remote in signed_refs.keys() {
474 if handle.is_blocked(remote) {
475 log::trace!(target: "fetch", "Skipping blocked remote {remote}");
476 continue;
477 }
478
479 let remote = sigrefs::DelegateStatus::empty(*remote, &delegates)
480 .load(&self.as_cached(handle))?;
481 match remote {
482 sigrefs::DelegateStatus::NonDelegate { remote, data: None } => {
483 log::debug!(target: "fetch", "Pruning non-delegate {remote} tips, missing 'rad/sigrefs'");
484 failures.push(sigrefs::Validation::MissingRadSigRefs(remote));
485 self.prune(&remote);
486 }
487 sigrefs::DelegateStatus::Delegate { remote, data: None } => {
488 log::warn!(target: "fetch", "Pruning delegate {remote} tips, missing 'rad/sigrefs'");
489 failures.push(sigrefs::Validation::MissingRadSigRefs(remote));
490 self.prune(&remote);
491 valid_delegates.remove(&remote);
497 failed_delegates.insert(remote);
498 }
499 sigrefs::DelegateStatus::NonDelegate {
500 remote,
501 data: Some(sigrefs),
502 } => {
503 if let Some(SignedRefsAt { at, .. }) =
504 SignedRefsAt::load(remote, handle.repository())?
505 {
506 if matches!(
510 repository::ancestry(handle.repository(), at, sigrefs.at)?,
511 repository::Ancestry::Behind | repository::Ancestry::Diverged
512 ) {
513 self.prune(&remote);
514 continue;
515 }
516 }
517
518 let cache = self.as_cached(handle);
519 if let Some(warns) = sigrefs::validate(&cache, sigrefs)?.as_mut() {
520 log::debug!(
521 target: "fetch",
522 "Pruning non-delegate {remote} tips, due to validation failures"
523 );
524 self.prune(&remote);
525 failures.append(warns);
526 } else {
527 remotes.insert(remote);
528 }
529 }
530 sigrefs::DelegateStatus::Delegate {
531 remote,
532 data: Some(sigrefs),
533 } => {
534 if let Some(SignedRefsAt { at, .. }) =
535 SignedRefsAt::load(remote, handle.repository())?
536 {
537 let ancestry = repository::ancestry(handle.repository(), at, sigrefs.at)?;
538 if matches!(ancestry, repository::Ancestry::Behind) {
539 log::trace!(target: "fetch", "Advertised `rad/sigrefs` {} is behind {at} for {remote}", sigrefs.at);
540 self.prune(&remote);
541 continue;
542 } else if matches!(ancestry, repository::Ancestry::Diverged) {
543 return Err(error::Protocol::Diverged {
544 remote,
545 current: at,
546 received: sigrefs.at,
547 });
548 }
549 }
550
551 let cache = self.as_cached(handle);
552 let mut fails =
553 sigrefs::validate(&cache, sigrefs)?.unwrap_or(Validations::default());
554 if !fails.is_empty() {
555 log::warn!(target: "fetch", "Pruning delegate {remote} tips, due to validation failures");
556 self.prune(&remote);
557 valid_delegates.remove(&remote);
558 failed_delegates.insert(remote);
559 failures.append(&mut fails)
560 } else {
561 valid_delegates.insert(remote);
562 remotes.insert(remote);
563 }
564 }
565 }
566 }
567 log::debug!(
568 target: "fetch",
569 "Validated {} remote(s) ({}ms)",
570 remotes.len(),
571 start.elapsed().as_millis()
572 );
573
574 if valid_delegates.len() >= threshold {
577 let applied = repository::update(
578 handle.repository(),
579 self.tips
580 .clone()
581 .into_values()
582 .flat_map(|ups| ups.into_iter()),
583 )?;
584 log::debug!(target: "fetch", "Applied updates ({}ms)", start.elapsed().as_millis());
585 Ok(FetchResult::Success {
586 applied,
587 remotes,
588 validations: failures,
589 })
590 } else {
591 log::debug!(
592 target: "fetch",
593 "Fetch failed: {} failure(s) ({}ms)",
594 failures.len(),
595 start.elapsed().as_millis()
596 );
597 Ok(FetchResult::Failed {
598 threshold,
599 delegates: failed_delegates,
600 validations: failures,
601 })
602 }
603 }
604}
605
606pub(crate) struct Cached<'a, R, S> {
609 handle: &'a mut Handle<R, S>,
610 state: &'a mut FetchState,
611}
612
613impl<R, S> Cached<'_, R, S>
614where
615 R: AsRef<Repository>,
616{
617 pub fn refname_to_id<'b, N>(
620 &self,
621 refname: N,
622 ) -> Result<Option<Oid>, repository::error::Resolve>
623 where
624 N: Into<Qualified<'b>>,
625 {
626 let refname = refname.into();
627 match self.state.refs.refname_to_id(refname.clone()) {
628 None => repository::refname_to_id(self.handle.repository(), refname),
629 Some(oid) => Ok(Some(oid)),
630 }
631 }
632
633 pub fn canonical_rad_id(&self) -> Option<Oid> {
635 self.state.canonical_rad_id().copied()
636 }
637
638 pub fn verified(&self, head: Oid) -> Result<Doc, DocError> {
639 self.handle.verified(head)
640 }
641
642 pub fn canonical(&self) -> Result<Option<Doc>, error::Canonical> {
643 let tip = self.refname_to_id(refs::REFS_RAD_ID.clone())?;
644 let cached_tip = self.canonical_rad_id();
645
646 tip.or(cached_tip)
647 .map(|tip| self.verified(tip).map_err(error::Canonical::from))
648 .transpose()
649 }
650
651 pub fn load(&self, remote: &PublicKey) -> Result<Option<SignedRefsAt>, sigrefs::error::Load> {
652 match self.state.sigrefs.get(remote) {
653 None => SignedRefsAt::load(*remote, self.handle.repository()),
654 Some(tip) => SignedRefsAt::load_at(*tip, *remote, self.handle.repository()).map(Some),
655 }
656 }
657
658 #[allow(dead_code)]
659 pub(crate) fn inspect(&self) {
660 self.state.refs.inspect()
661 }
662}
663
664impl<R, S> RemoteRepository for Cached<'_, R, S>
665where
666 R: AsRef<Repository>,
667{
668 fn remote(&self, remote: &RemoteId) -> Result<Remote, storage::refs::Error> {
669 self.handle.repository().remote(remote)
672 }
673
674 fn remotes(&self) -> Result<Remotes<Verified>, storage::refs::Error> {
675 self.state
676 .sigrefs
677 .keys()
678 .map(|id| self.remote(id).map(|remote| (*id, remote)))
679 .collect::<Result<_, _>>()
680 }
681
682 fn remote_refs_at(&self) -> Result<Vec<RefsAt>, storage::refs::Error> {
683 self.handle.repository().remote_refs_at()
684 }
685}
686
687impl<R, S> ValidateRepository for Cached<'_, R, S>
688where
689 R: AsRef<Repository>,
690{
691 fn validate_remote(&self, remote: &Remote) -> Result<Validations, storage::Error> {
695 let mut signed = BTreeMap::from((*remote.refs).clone());
697 let mut validations = Validations::default();
698 let mut has_sigrefs = false;
699
700 for (refname, oid) in self.state.refs.references_of(&remote.id) {
702 if refname == storage::refs::SIGREFS_BRANCH.to_ref_string() {
704 has_sigrefs = true;
705 continue;
706 }
707 if let Some(signed_oid) = signed.remove(&refname) {
708 if oid != signed_oid {
709 validations.push(Validation::MismatchedRef {
710 refname,
711 expected: signed_oid,
712 actual: oid,
713 });
714 }
715 } else {
716 validations.push(Validation::UnsignedRef(refname));
717 }
718 }
719
720 if !has_sigrefs {
721 validations.push(Validation::MissingRadSigRefs(remote.id));
722 }
723
724 for (name, _) in signed.into_iter() {
727 validations.push(Validation::MissingRef {
728 refname: name,
729 remote: remote.id,
730 });
731 }
732
733 Ok(validations)
734 }
735}