radicle_node/worker/
fetch.rs

1use radicle::identity::doc::CanonicalRefsError;
2use radicle::identity::CanonicalRefs;
3use radicle::storage::git::TempRepository;
4pub(crate) use radicle_protocol::worker::fetch::error;
5
6use std::collections::BTreeSet;
7use std::str::FromStr;
8
9use localtime::LocalTime;
10
11use radicle::cob::TypedId;
12use radicle::crypto::PublicKey;
13use radicle::identity::crefs::GetCanonicalRefs as _;
14use radicle::prelude::NodeId;
15use radicle::prelude::RepoId;
16use radicle::storage::git::Repository;
17use radicle::storage::refs::RefsAt;
18use radicle::storage::{
19    ReadRepository, ReadStorage as _, RefUpdate, RemoteRepository, RepositoryError,
20    WriteRepository as _,
21};
22use radicle::{cob, git, node, Storage};
23use radicle_fetch::git::refs::Applied;
24use radicle_fetch::{Allowed, BlockList, FetchLimit};
25pub use radicle_protocol::worker::fetch::{FetchResult, UpdatedCanonicalRefs};
26
27use super::channels::ChannelsFlush;
28
29pub enum Handle {
30    Clone {
31        handle: radicle_fetch::Handle<TempRepository, ChannelsFlush>,
32    },
33    Pull {
34        handle: radicle_fetch::Handle<Repository, ChannelsFlush>,
35        notifications: node::notifications::StoreWriter,
36    },
37}
38
39impl Handle {
40    pub fn new(
41        rid: RepoId,
42        local: PublicKey,
43        storage: &Storage,
44        follow: Allowed,
45        blocked: BlockList,
46        channels: ChannelsFlush,
47        notifications: node::notifications::StoreWriter,
48    ) -> Result<Self, error::Handle> {
49        let exists = storage.contains(&rid)?;
50        if exists {
51            let repo = storage.repository(rid)?;
52            let handle = radicle_fetch::Handle::new(local, repo, follow, blocked, channels)?;
53            Ok(Handle::Pull {
54                handle,
55                notifications,
56            })
57        } else {
58            let repo = storage.temporary_repository(rid)?;
59            let handle = radicle_fetch::Handle::new(local, repo, follow, blocked, channels)?;
60            Ok(Handle::Clone { handle })
61        }
62    }
63
64    pub fn fetch<D: node::refs::Store>(
65        self,
66        rid: RepoId,
67        storage: &Storage,
68        cache: &mut cob::cache::StoreWriter,
69        refsdb: &mut D,
70        limit: FetchLimit,
71        remote: PublicKey,
72        refs_at: Option<Vec<RefsAt>>,
73    ) -> Result<FetchResult, error::Fetch> {
74        let (result, clone, notifs) = match self {
75            Self::Clone { mut handle } => {
76                log::debug!(target: "worker", "{} cloning from {remote}", handle.local());
77                match radicle_fetch::clone(&mut handle, limit, remote) {
78                    Err(err) => {
79                        handle.into_inner().cleanup();
80                        return Err(err.into());
81                    }
82                    Ok(result) => {
83                        handle.into_inner().mv(storage.path_of(&rid))?;
84                        (result, true, None)
85                    }
86                }
87            }
88            Self::Pull {
89                mut handle,
90                notifications,
91            } => {
92                log::debug!(target: "worker", "{} pulling from {remote}", handle.local());
93                let result = radicle_fetch::pull(&mut handle, limit, remote, refs_at)?;
94                (result, false, Some(notifications))
95            }
96        };
97
98        for rejected in result.rejected() {
99            log::warn!(target: "worker", "Rejected update for {}", rejected.refname())
100        }
101
102        match result {
103            radicle_fetch::FetchResult::Failed {
104                threshold,
105                delegates,
106                validations,
107            } => {
108                for fail in validations.iter() {
109                    log::error!(target: "worker", "Validation error: {fail}");
110                }
111                Err(error::Fetch::Validation {
112                    threshold,
113                    delegates: delegates.into_iter().map(|key| key.to_string()).collect(),
114                })
115            }
116            radicle_fetch::FetchResult::Success {
117                applied,
118                remotes,
119                validations,
120            } => {
121                for warn in validations {
122                    log::warn!(target: "worker", "Validation error: {warn}");
123                }
124
125                // N.b. We do not go through handle for this since the cloning handle
126                // points to a repository that is temporary and gets moved by [`mv`].
127                let repo = storage.repository(rid)?;
128                repo.set_identity_head()?;
129                match repo.set_head() {
130                    Ok(head) => {
131                        if head.is_updated() {
132                            log::trace!(target: "worker", "Set HEAD to {}", head.new);
133                        }
134                    }
135                    Err(RepositoryError::Quorum(e)) => {
136                        log::warn!(target: "worker", "Fetch could not set HEAD: {e}")
137                    }
138                    Err(e) => return Err(e.into()),
139                }
140
141                let canonical = match set_canonical_refs(&repo, &applied) {
142                    Ok(updates) => updates.unwrap_or_default(),
143                    Err(e) => {
144                        log::warn!(target: "worker", "Failed to set canonical references: {e}");
145                        UpdatedCanonicalRefs::default()
146                    }
147                };
148
149                // Notifications are only posted for pulls, not clones.
150                if let Some(mut store) = notifs {
151                    // Only create notifications for repos that we have
152                    // contributed to in some way, otherwise our inbox will
153                    // be flooded by all the repos we are seeding.
154                    if repo.remote(&storage.info().key).is_ok() {
155                        notify(&rid, &applied, &mut store)?;
156                    }
157                }
158
159                cache_cobs(&rid, &applied.updated, &repo, cache)?;
160                cache_refs(&rid, &applied.updated, refsdb)?;
161
162                Ok(FetchResult {
163                    updated: applied.updated,
164                    canonical,
165                    namespaces: remotes.into_iter().collect(),
166                    doc: repo.identity_doc()?,
167                    clone,
168                })
169            }
170        }
171    }
172}
173
174// Post notifications for the given refs.
175fn notify(
176    rid: &RepoId,
177    refs: &radicle_fetch::git::refs::Applied<'static>,
178    store: &mut node::notifications::StoreWriter,
179) -> Result<(), error::Fetch> {
180    let now = LocalTime::now();
181
182    for update in refs.updated.iter() {
183        if let Some(r) = update.name().to_namespaced() {
184            let r = r.strip_namespace();
185            if r == *git::refs::storage::SIGREFS_BRANCH {
186                // Don't notify about signed refs.
187                continue;
188            }
189            if r == *git::refs::storage::IDENTITY_BRANCH {
190                // Don't notify about the peers's identity branch pointer, since there will
191                // be a separate notification on the identity COB itself.
192                continue;
193            }
194            if r == *git::refs::storage::IDENTITY_ROOT {
195                // Don't notify about the peers's identity root pointer. This is only used
196                // for sigref verification.
197                continue;
198            }
199            if let Some(rest) = r.strip_prefix(git::refname!("refs/heads/patches")) {
200                if radicle::cob::ObjectId::from_str(rest.as_str()).is_ok() {
201                    // Don't notify about patch branches, since we already get
202                    // notifications about patch updates.
203                    continue;
204                }
205            }
206        }
207        if let RefUpdate::Skipped { .. } = update {
208            // Don't notify about skipped refs.
209        } else if let Err(e) = store.insert(rid, update, now) {
210            log::error!(
211                target: "worker",
212                "Failed to update notification store for {rid}: {e}"
213            );
214        }
215    }
216    Ok(())
217}
218
219/// Cache certain ref updates in our database.
220fn cache_refs<D>(repo: &RepoId, refs: &[RefUpdate], db: &mut D) -> Result<(), node::refs::Error>
221where
222    D: node::refs::Store,
223{
224    let time = LocalTime::now();
225
226    for r in refs {
227        let name = r.name();
228        let (namespace, qualified) = match radicle::git::parse_ref_namespaced(name) {
229            Err(e) => {
230                log::error!(target: "worker", "Git reference is invalid: {name:?}: {e}");
231                log::warn!(target: "worker", "Skipping refs caching for fetch of {repo}");
232                break;
233            }
234            Ok((n, q)) => (n, q),
235        };
236        if qualified != *git::refs::storage::SIGREFS_BRANCH {
237            // Only cache `rad/sigrefs`.
238            continue;
239        }
240        log::trace!(target: "node", "Updating cache for {name} in {repo}");
241
242        let result = match r {
243            RefUpdate::Updated { new, .. } => db.set(repo, &namespace, &qualified, *new, time),
244            RefUpdate::Created { oid, .. } => db.set(repo, &namespace, &qualified, *oid, time),
245            RefUpdate::Deleted { .. } => db.delete(repo, &namespace, &qualified),
246            RefUpdate::Skipped { .. } => continue,
247        };
248
249        if let Err(e) = result {
250            log::error!(target: "worker", "Error updating git refs cache for {name:?}: {e}");
251            log::warn!(target: "worker", "Skipping refs caching for fetch of {repo}");
252            break;
253        }
254    }
255    Ok(())
256}
257
258/// Write new `RefUpdate`s that are related a `Patch` or an `Issue`
259/// COB to the COB cache.
260fn cache_cobs<S, C>(
261    rid: &RepoId,
262    refs: &[RefUpdate],
263    storage: &S,
264    cache: &mut C,
265) -> Result<(), error::Cache>
266where
267    S: ReadRepository + cob::Store<Namespace = NodeId>,
268    C: cob::cache::Update<cob::issue::Issue> + cob::cache::Update<cob::patch::Patch>,
269    C: cob::cache::Remove<cob::issue::Issue> + cob::cache::Remove<cob::patch::Patch>,
270{
271    let mut issues = cob::store::Store::<cob::issue::Issue, _>::open(storage)?;
272    let mut patches = cob::store::Store::<cob::patch::Patch, _>::open(storage)?;
273
274    for update in refs {
275        match update {
276            RefUpdate::Updated { name, .. }
277            | RefUpdate::Created { name, .. }
278            | RefUpdate::Deleted { name, .. } => match name.to_namespaced() {
279                Some(name) => {
280                    let Some(identifier) = cob::TypedId::from_namespaced(&name)? else {
281                        continue;
282                    };
283                    if identifier.is_issue() {
284                        update_or_remove(&mut issues, cache, rid, identifier)?;
285                    } else if identifier.is_patch() {
286                        update_or_remove(&mut patches, cache, rid, identifier)?;
287                    } else {
288                        // Unknown COB, don't cache.
289                        continue;
290                    }
291                }
292                None => continue,
293            },
294            RefUpdate::Skipped { .. } => { /* Do nothing */ }
295        }
296    }
297
298    Ok(())
299}
300
301/// Update or remove a cache entry.
302fn update_or_remove<R, C, T>(
303    store: &mut cob::store::Store<T, R>,
304    cache: &mut C,
305    rid: &RepoId,
306    tid: TypedId,
307) -> Result<(), error::Cache>
308where
309    R: cob::Store + ReadRepository,
310    T: cob::Evaluate<R> + cob::store::Cob + cob::store::CobWithType,
311    C: cob::cache::Update<T> + cob::cache::Remove<T>,
312{
313    match store.get(&tid.id) {
314        Ok(Some(obj)) => {
315            // Object loaded correctly, update cache.
316            return cache.update(rid, &tid.id, &obj).map(|_| ()).map_err(|e| {
317                error::Cache::Update {
318                    id: tid.id,
319                    type_name: tid.type_name,
320                    err: e.into(),
321                }
322            });
323        }
324        Ok(None) => {
325            // Object was not found. Fall-through.
326        }
327        Err(e) => {
328            // Object was found, but failed to load. Fall-through.
329            log::error!(target: "fetch", "Error loading COB {tid} from storage: {e}");
330        }
331    }
332    // The object has either been removed entirely from the repository,
333    // or it failed to load. So we also remove it from the cache.
334    cob::cache::Remove::<T>::remove(cache, &tid.id)
335        .map(|_| ())
336        .map_err(|e| error::Cache::Remove {
337            id: tid.id,
338            type_name: tid.type_name,
339            err: Box::new(e),
340        })?;
341
342    Ok(())
343}
344
345fn set_canonical_refs(
346    repo: &Repository,
347    applied: &Applied,
348) -> Result<Option<UpdatedCanonicalRefs>, error::Canonical> {
349    let identity = repo.identity()?;
350    // TODO(finto): it's unfortunate that we may end up computing the default
351    // branch again after `set_head` is called after the fetch. This is due to
352    // the storage capabilities being leaked to this part of the code base.
353    let rules = identity
354        .canonical_refs_or_default(|| {
355            let rule = identity.doc().default_branch_rule()?;
356            Ok::<_, CanonicalRefsError>(CanonicalRefs::from_iter([rule]))
357        })?
358        .rules()
359        .clone();
360
361    let mut updated_refs = UpdatedCanonicalRefs::default();
362    let refnames = applied
363        .updated
364        .iter()
365        .filter_map(|update| match update {
366            RefUpdate::Updated { name, .. } | RefUpdate::Created { name, .. } => {
367                let name = name.clone().into_qualified()?;
368                let name = name.to_namespaced()?;
369                Some(name.strip_namespace())
370            }
371            _ => None,
372        })
373        .collect::<BTreeSet<_>>();
374
375    for name in refnames {
376        let canonical = match rules.canonical(name.clone(), repo) {
377            Some(canonical) => canonical,
378            None => continue,
379        };
380
381        let canonical = match canonical.find_objects() {
382            Err(err) => {
383                log::warn!(target: "worker", "Failed to find objects for canonical computation: {err}");
384                continue;
385            }
386            Ok(canonical) => canonical,
387        };
388
389        match canonical.quorum() {
390            Err(err) => {
391                log::warn!(
392                    target: "worker",
393                    "Failed to calculate canonical reference: {err}",
394                );
395                continue;
396            }
397            Ok(git::canonical::Quorum {
398                refname, object, ..
399            }) => {
400                let oid = object.id();
401                if let Err(e) = repo.backend.reference(
402                    refname.clone().as_str(),
403                    *oid,
404                    true,
405                    "set-canonical-reference from fetch (radicle)",
406                ) {
407                    log::warn!(
408                        target: "worker",
409                        "Failed to set canonical reference {refname}->{oid}: {e}"
410                    );
411                } else {
412                    updated_refs.updated(refname, oid);
413                }
414            }
415        }
416    }
417    Ok(Some(updated_refs))
418}