1use std::collections::{BTreeMap, BTreeSet};
2use std::time::Instant;
3
4use gix_protocol::Handshake;
5use radicle::crypto::PublicKey;
6use radicle::git::{fmt::Qualified, Oid};
7use radicle::identity::{Did, Doc, DocError};
8
9use radicle::prelude::Verified;
10use radicle::storage;
11use radicle::storage::git::Repository;
12use radicle::storage::refs::RefsAt;
13use radicle::storage::{
14 git::Validation, Remote, RemoteId, RemoteRepository, Remotes, ValidateRepository, Validations,
15};
16
17use crate::git;
18use crate::git::packfile::Keepfile;
19use crate::git::refs::{Applied, Update};
20use crate::git::repository;
21use crate::sigrefs::SignedRefsAt;
22use crate::stage;
23use crate::stage::ProtocolStage;
24use crate::{refs, sigrefs, transport, Handle};
25
26pub const DEFAULT_FETCH_SPECIAL_REFS_LIMIT: u64 = 1024 * 1024 * 5;
29pub const DEFAULT_FETCH_DATA_REFS_LIMIT: u64 = 1024 * 1024 * 1024 * 5;
32
33pub mod error {
34 use radicle::git::Oid;
35 use radicle::prelude::PublicKey;
36 use thiserror::Error;
37
38 use crate::{git, git::repository, handle, sigrefs, stage, transport};
39
40 #[derive(Debug, Error)]
41 pub enum Step {
42 #[error(transparent)]
43 Layout(#[from] stage::error::Layout),
44 #[error(transparent)]
45 Prepare(#[from] stage::error::Prepare),
46 #[error(transparent)]
47 WantsHaves(#[from] stage::error::WantsHaves),
48 #[error(transparent)]
49 Transport(#[from] transport::Error),
50 }
51
52 #[derive(Debug, Error)]
53 pub enum Protocol {
54 #[error(transparent)]
55 Ancestry(#[from] repository::error::Ancestry),
56 #[error(transparent)]
57 Canonical(#[from] Canonical),
58 #[error("delegate '{remote}' has diverged 'rad/sigrefs': {current} -> {received}")]
59 Diverged {
60 remote: PublicKey,
61 current: Oid,
62 received: Oid,
63 },
64 #[error("canonical 'refs/rad/id' is missing")]
65 MissingRadId,
66 #[error(transparent)]
67 RefdbUpdate(#[from] repository::error::Update),
68 #[error(transparent)]
69 Resolve(#[from] repository::error::Resolve),
70 #[error(transparent)]
71 Refs(#[from] radicle::storage::refs::Error),
72 #[error(transparent)]
73 RemoteRefs(#[from] sigrefs::error::RemoteRefs),
74 #[error("failed to get remote namespaces: {0}")]
75 RemoteIds(#[source] radicle::git::raw::Error),
76 #[error(transparent)]
77 Step(#[from] Step),
78 #[error(transparent)]
79 Tracking(#[from] handle::error::Tracking),
80 #[error(transparent)]
81 Validation(#[from] radicle::storage::Error),
82 }
83
84 #[derive(Debug, Error)]
85 pub enum Canonical {
86 #[error(transparent)]
87 Resolve(#[from] git::repository::error::Resolve),
88 #[error(transparent)]
89 Verified(#[from] radicle::identity::DocError),
90 #[error("failed to verify `refs/rad/id`: {0}")]
91 Graph(#[source] radicle::git::raw::Error),
92 }
93}
94
95type IdentityTips = BTreeMap<PublicKey, Oid>;
96type SigrefTips = BTreeMap<PublicKey, Oid>;
97
98#[derive(Clone, Copy, Debug)]
99pub struct FetchLimit {
100 pub special: u64,
101 pub refs: u64,
102}
103
104impl Default for FetchLimit {
105 fn default() -> Self {
106 Self {
107 special: DEFAULT_FETCH_SPECIAL_REFS_LIMIT,
108 refs: DEFAULT_FETCH_DATA_REFS_LIMIT,
109 }
110 }
111}
112
113#[derive(Debug)]
114pub enum FetchResult {
115 Success {
116 applied: Applied<'static>,
118 remotes: BTreeSet<PublicKey>,
120 validations: sigrefs::Validations,
122 },
123 Failed {
124 threshold: usize,
126 delegates: BTreeSet<PublicKey>,
128 validations: sigrefs::Validations,
130 },
131}
132
133impl FetchResult {
134 pub fn rejected(&self) -> impl Iterator<Item = &Update<'static>> {
135 match self {
136 Self::Success { applied, .. } => either::Either::Left(applied.rejected.iter()),
137 Self::Failed { .. } => either::Either::Right(std::iter::empty()),
138 }
139 }
140
141 pub fn is_success(&self) -> bool {
142 std::matches!(self, Self::Success { .. })
143 }
144}
145
146#[derive(Default)]
147pub struct FetchState {
148 refs: git::mem::Refdb,
152 canonical_rad_id: Option<Oid>,
154 ids: IdentityTips,
156 sigrefs: SigrefTips,
158 tips: BTreeMap<PublicKey, Vec<Update<'static>>>,
160 keepfiles: Vec<Keepfile>,
164}
165
166impl FetchState {
167 pub fn prune(&mut self, remote: &PublicKey) {
170 self.ids.remove(remote);
171 self.sigrefs.remove(remote);
172 self.tips.remove(remote);
173 }
174
175 pub fn canonical_rad_id(&self) -> Option<&Oid> {
176 self.canonical_rad_id.as_ref()
177 }
178
179 pub fn update_all<'a, I>(&mut self, other: I) -> Applied<'a>
182 where
183 I: IntoIterator<Item = (PublicKey, Vec<Update<'a>>)>,
184 {
185 let mut ap = Applied::default();
186 for (remote, ups) in other {
187 for up in &ups {
188 ap.append(&mut self.refs.update(Some(up.clone())));
189 }
190 let mut ups = ups
191 .into_iter()
192 .map(|up| up.into_owned())
193 .collect::<Vec<_>>();
194 self.tips
195 .entry(remote)
196 .and_modify(|tips| tips.append(&mut ups))
197 .or_insert(ups);
198 }
199 ap
200 }
201
202 pub(crate) fn as_cached<'a, R, S>(
203 &'a mut self,
204 handle: &'a mut Handle<R, S>,
205 ) -> Cached<'a, R, S> {
206 Cached {
207 handle,
208 state: self,
209 }
210 }
211}
212
213impl FetchState {
214 pub(super) fn run_stage<R, S, F>(
217 &mut self,
218 handle: &mut Handle<R, S>,
219 handshake: &Handshake,
220 step: &F,
221 ) -> Result<BTreeSet<PublicKey>, error::Step>
222 where
223 R: AsRef<Repository>,
224 S: transport::ConnectionStream,
225 F: ProtocolStage,
226 {
227 let refs = match step.ls_refs() {
228 Some(refs) => handle
229 .transport
230 .ls_refs(refs, handshake)?
231 .into_iter()
232 .filter_map(|r| step.ref_filter(r))
233 .collect::<Vec<_>>(),
234 None => vec![],
235 };
236 log::trace!("Received refs {refs:#?}");
237 step.pre_validate(&refs)?;
238
239 let wants_haves = step.wants_haves(handle.repository(), &refs)?;
240 if !wants_haves.wants.is_empty() {
241 let keepfile =
242 handle
243 .transport
244 .fetch(wants_haves, handle.interrupt.clone(), handshake)?;
245 self.keepfiles.extend(keepfile);
246 } else {
247 log::trace!("Nothing to fetch")
248 };
249
250 let mut fetched = BTreeSet::new();
251 for r in &refs {
252 match &r.name {
253 refs::ReceivedRefname::Namespaced { remote, suffix } => {
254 fetched.insert(*remote);
255 if let Some(rad) = suffix.as_ref().left() {
256 match rad {
257 refs::Special::Id => {
258 self.ids.insert(*remote, r.tip);
259 }
260
261 refs::Special::SignedRefs => {
262 self.sigrefs.insert(*remote, r.tip);
263 }
264 }
265 }
266 }
267 refs::ReceivedRefname::RadId => self.canonical_rad_id = Some(r.tip),
268 }
269 }
270
271 let up = step.prepare_updates(self, handle.repository(), &refs)?;
272 self.update_all(up.tips);
273
274 Ok(fetched)
275 }
276
277 #[allow(clippy::too_many_arguments)]
290 fn run_special_refs<R, S>(
291 &mut self,
292 handle: &mut Handle<R, S>,
293 handshake: &Handshake,
294 delegates: BTreeSet<PublicKey>,
295 threshold: usize,
296 limit: &FetchLimit,
297 remote: PublicKey,
298 refs_at: Option<Vec<RefsAt>>,
299 ) -> Result<sigrefs::RemoteRefs, error::Protocol>
300 where
301 R: AsRef<Repository>,
302 S: transport::ConnectionStream,
303 {
304 match refs_at {
305 Some(refs_at) => {
306 let sigrefs_at = stage::SigrefsAt {
307 remote,
308 delegates: delegates.clone(),
309 refs_at: refs_at.clone(),
310 blocked: handle.blocked.clone(),
311 limit: limit.special,
312 };
313 log::trace!("{sigrefs_at:?}");
314 self.run_stage(handle, handshake, &sigrefs_at)?;
315 let remotes = refs_at.iter().map(|r| &r.remote);
316
317 let signed_refs = sigrefs::RemoteRefs::load(&self.as_cached(handle), remotes)?;
318 Ok(signed_refs)
319 }
320 None => {
321 let followed = handle.allowed();
322 log::trace!("Followed nodes {followed:?}");
323 let special_refs = stage::SpecialRefs {
324 blocked: handle.blocked.clone(),
325 remote,
326 delegates: delegates.clone(),
327 followed,
328 threshold,
329 limit: limit.special,
330 };
331 log::trace!("{special_refs:?}");
332 let fetched = self.run_stage(handle, handshake, &special_refs)?;
333
334 let signed_refs = sigrefs::RemoteRefs::load(
335 &self.as_cached(handle),
336 fetched.iter().chain(delegates.iter()),
337 )?;
338 Ok(signed_refs)
339 }
340 }
341 }
342
343 pub(super) fn run<R, S>(
360 mut self,
361 handle: &mut Handle<R, S>,
362 handshake: &Handshake,
363 limit: FetchLimit,
364 remote: PublicKey,
365 refs_at: Option<Vec<RefsAt>>,
366 ) -> Result<FetchResult, error::Protocol>
367 where
368 R: AsRef<Repository>,
369 S: transport::ConnectionStream,
370 {
371 let start = Instant::now();
372 self.run_stage(
376 handle,
377 handshake,
378 &stage::CanonicalId {
379 remote,
380 limit: limit.special,
381 },
382 )?;
383 log::debug!("Fetched rad/id ({}ms)", start.elapsed().as_millis());
384
385 let anchor = self
390 .as_cached(handle)
391 .canonical()?
392 .ok_or(error::Protocol::MissingRadId)?;
393
394 let is_delegate = anchor.is_delegate(&Did::from(handle.local()));
395 let delegates = anchor
398 .delegates()
399 .iter()
400 .filter(|id| !handle.is_blocked(id))
401 .map(|did| PublicKey::from(*did))
402 .collect::<BTreeSet<_>>();
403
404 log::trace!("Identity delegates {delegates:?}");
405
406 let threshold = if is_delegate {
409 anchor.threshold() - 1
410 } else {
411 anchor.threshold()
412 };
413 let signed_refs = self.run_special_refs(
414 handle,
415 handshake,
416 delegates.clone(),
417 threshold,
418 &limit,
419 remote,
420 refs_at,
421 )?;
422 log::debug!(
423 "Fetched data for {} remote(s) ({}ms)",
424 signed_refs.len(),
425 start.elapsed().as_millis()
426 );
427
428 let data_refs = stage::DataRefs {
429 remote,
430 remotes: signed_refs,
431 limit: limit.refs,
432 };
433 self.run_stage(handle, handshake, &data_refs)?;
434 log::debug!(
435 "Fetched data refs for {} remotes ({}ms)",
436 data_refs.remotes.len(),
437 start.elapsed().as_millis()
438 );
439
440 match handle.transport.done() {
444 Ok(()) => log::debug!("Sent done signal to remote {remote}"),
445 Err(err) => {
446 log::debug!("Failed to signal EOF to {remote}: {err}")
447 }
448 }
449
450 let mut failures = sigrefs::Validations::default();
454 let signed_refs = data_refs.remotes;
455
456 let mut remotes = BTreeSet::new();
459
460 let mut valid_delegates = handle
463 .repository()
464 .remote_ids()
465 .map_err(error::Protocol::RemoteIds)?
466 .filter_map(|id| id.ok())
467 .filter(|id| delegates.contains(id))
468 .collect::<BTreeSet<_>>();
469 let mut failed_delegates = BTreeSet::new();
470
471 for remote in signed_refs.keys() {
474 if handle.is_blocked(remote) {
475 log::trace!("Skipping blocked remote {remote}");
476 continue;
477 }
478
479 let remote = sigrefs::DelegateStatus::empty(*remote, &delegates)
480 .load(&self.as_cached(handle))?;
481 match remote {
482 sigrefs::DelegateStatus::NonDelegate { remote, data: None } => {
483 log::debug!("Pruning non-delegate {remote} tips, missing 'rad/sigrefs'");
484 failures.push(sigrefs::Validation::MissingRadSigRefs(remote));
485 self.prune(&remote);
486 }
487 sigrefs::DelegateStatus::Delegate { remote, data: None } => {
488 log::debug!("Pruning delegate {remote} tips, missing 'rad/sigrefs'");
489 failures.push(sigrefs::Validation::MissingRadSigRefs(remote));
490 self.prune(&remote);
491 valid_delegates.remove(&remote);
497 failed_delegates.insert(remote);
498 }
499 sigrefs::DelegateStatus::NonDelegate {
500 remote,
501 data: Some(sigrefs),
502 } => {
503 if let Some(SignedRefsAt { at, .. }) =
504 SignedRefsAt::load(remote, handle.repository())?
505 {
506 if matches!(
510 repository::ancestry(handle.repository(), at, sigrefs.at)?,
511 repository::Ancestry::Behind | repository::Ancestry::Diverged
512 ) {
513 self.prune(&remote);
514 continue;
515 }
516 }
517
518 let cache = self.as_cached(handle);
519 if let Some(warns) = sigrefs::validate(&cache, sigrefs)?.as_mut() {
520 log::debug!(
521 "Pruning non-delegate {remote} tips, due to validation failures"
522 );
523 self.prune(&remote);
524 failures.append(warns);
525 } else {
526 remotes.insert(remote);
527 }
528 }
529 sigrefs::DelegateStatus::Delegate {
530 remote,
531 data: Some(sigrefs),
532 } => {
533 if let Some(SignedRefsAt { at, .. }) =
534 SignedRefsAt::load(remote, handle.repository())?
535 {
536 let ancestry = repository::ancestry(handle.repository(), at, sigrefs.at)?;
537 if matches!(ancestry, repository::Ancestry::Behind) {
538 log::trace!(
539 "Advertised `rad/sigrefs` {} is behind {at} for {remote}",
540 sigrefs.at
541 );
542 self.prune(&remote);
543 continue;
544 } else if matches!(ancestry, repository::Ancestry::Diverged) {
545 return Err(error::Protocol::Diverged {
546 remote,
547 current: at,
548 received: sigrefs.at,
549 });
550 }
551 }
552
553 let cache = self.as_cached(handle);
554 let mut fails =
555 sigrefs::validate(&cache, sigrefs)?.unwrap_or(Validations::default());
556 if !fails.is_empty() {
557 log::debug!("Pruning delegate {remote} tips, due to validation failures");
558 self.prune(&remote);
559 valid_delegates.remove(&remote);
560 failed_delegates.insert(remote);
561 failures.append(&mut fails)
562 } else {
563 valid_delegates.insert(remote);
564 remotes.insert(remote);
565 }
566 }
567 }
568 }
569 log::debug!(
570 "Validated {} remote(s) ({}ms)",
571 remotes.len(),
572 start.elapsed().as_millis()
573 );
574
575 if valid_delegates.len() >= threshold {
578 let applied = repository::update(
579 handle.repository(),
580 self.tips
581 .clone()
582 .into_values()
583 .flat_map(|ups| ups.into_iter()),
584 )?;
585 log::debug!("Applied updates ({}ms)", start.elapsed().as_millis());
586 Ok(FetchResult::Success {
587 applied,
588 remotes,
589 validations: failures,
590 })
591 } else {
592 log::debug!(
593 "Fetch failed: {} failure(s) ({}ms)",
594 failures.len(),
595 start.elapsed().as_millis()
596 );
597 Ok(FetchResult::Failed {
598 threshold,
599 delegates: failed_delegates,
600 validations: failures,
601 })
602 }
603 }
604}
605
606pub(crate) struct Cached<'a, R, S> {
609 handle: &'a mut Handle<R, S>,
610 state: &'a mut FetchState,
611}
612
613impl<R, S> Cached<'_, R, S>
614where
615 R: AsRef<Repository>,
616{
617 pub fn refname_to_id<'b, N>(
620 &self,
621 refname: N,
622 ) -> Result<Option<Oid>, repository::error::Resolve>
623 where
624 N: Into<Qualified<'b>>,
625 {
626 let refname = refname.into();
627 match self.state.refs.refname_to_id(refname.clone()) {
628 None => repository::refname_to_id(self.handle.repository(), refname),
629 Some(oid) => Ok(Some(oid)),
630 }
631 }
632
633 pub fn canonical_rad_id(&self) -> Option<Oid> {
635 self.state.canonical_rad_id().copied()
636 }
637
638 pub fn verified(&self, head: Oid) -> Result<Doc, DocError> {
639 self.handle.verified(head)
640 }
641
642 pub fn canonical(&self) -> Result<Option<Doc>, error::Canonical> {
657 let tip = self.refname_to_id(refs::REFS_RAD_ID.clone())?;
658 let cached_tip = self.canonical_rad_id();
659
660 let oid = match (tip, cached_tip) {
661 (None, None) => {
662 return Ok(None);
663 }
664 (Some(oid), None) | (None, Some(oid)) => oid,
665 (Some(repository), Some(cached)) => {
666 let repo = self.handle.repository();
667 match repo
668 .backend
669 .graph_ahead_behind(repository.into(), cached.into())
670 {
671 Ok((ahead, behind)) => match (ahead, behind) {
672 (0, _) => cached,
673 _ => repository,
674 },
675 Err(err) if err.code() == radicle::git::raw::ErrorCode::NotFound => repository,
676 Err(err) => {
677 return Err(error::Canonical::Graph(err));
678 }
679 }
680 }
681 };
682
683 self.verified(oid).map(Some).map_err(error::Canonical::from)
684 }
685
686 pub fn load(&self, remote: &PublicKey) -> Result<Option<SignedRefsAt>, sigrefs::error::Load> {
687 match self.state.sigrefs.get(remote) {
688 None => SignedRefsAt::load(*remote, self.handle.repository()),
689 Some(tip) => SignedRefsAt::load_at(*tip, *remote, self.handle.repository()).map(Some),
690 }
691 }
692
693 #[allow(dead_code)]
694 pub(crate) fn inspect(&self) {
695 self.state.refs.inspect()
696 }
697}
698
699impl<R, S> RemoteRepository for Cached<'_, R, S>
700where
701 R: AsRef<Repository>,
702{
703 fn remote(&self, remote: &RemoteId) -> Result<Remote, storage::refs::Error> {
704 self.handle.repository().remote(remote)
707 }
708
709 fn remotes(&self) -> Result<Remotes<Verified>, storage::refs::Error> {
710 self.state
711 .sigrefs
712 .keys()
713 .map(|id| self.remote(id).map(|remote| (*id, remote)))
714 .collect::<Result<_, _>>()
715 }
716
717 fn remote_refs_at(&self) -> Result<Vec<RefsAt>, storage::refs::Error> {
718 self.handle.repository().remote_refs_at()
719 }
720}
721
722impl<R, S> ValidateRepository for Cached<'_, R, S>
723where
724 R: AsRef<Repository>,
725{
726 fn validate_remote(&self, remote: &Remote) -> Result<Validations, storage::Error> {
730 let mut signed = BTreeMap::from((*remote.refs).clone());
732 let mut validations = Validations::default();
733 let mut has_sigrefs = false;
734
735 for (refname, oid) in self.state.refs.references_of(&remote.id) {
737 if refname == storage::refs::SIGREFS_BRANCH.to_ref_string() {
739 has_sigrefs = true;
740 continue;
741 }
742 if let Some(signed_oid) = signed.remove(&refname) {
743 if oid != signed_oid {
744 validations.push(Validation::MismatchedRef {
745 refname,
746 expected: signed_oid,
747 actual: oid,
748 });
749 }
750 } else {
751 validations.push(Validation::UnsignedRef(refname));
752 }
753 }
754
755 if !has_sigrefs {
756 validations.push(Validation::MissingRadSigRefs(remote.id));
757 }
758
759 for (name, _) in signed.into_iter() {
762 validations.push(Validation::MissingRef {
763 refname: name,
764 remote: remote.id,
765 });
766 }
767
768 Ok(validations)
769 }
770}