radicle_node/worker/
fetch.rs

1use radicle::identity::doc::CanonicalRefsError;
2use radicle::identity::CanonicalRefs;
3pub(crate) use radicle_protocol::worker::fetch::error;
4
5use std::collections::BTreeSet;
6use std::str::FromStr;
7
8use localtime::LocalTime;
9
10use radicle::cob::TypedId;
11use radicle::crypto::PublicKey;
12use radicle::identity::crefs::GetCanonicalRefs as _;
13use radicle::prelude::NodeId;
14use radicle::prelude::RepoId;
15use radicle::storage::git::Repository;
16use radicle::storage::refs::RefsAt;
17use radicle::storage::{
18    ReadRepository, ReadStorage as _, RefUpdate, RemoteRepository, RepositoryError,
19    WriteRepository as _,
20};
21use radicle::{cob, git, node, Storage};
22use radicle_fetch::git::refs::Applied;
23use radicle_fetch::{Allowed, BlockList, FetchLimit};
24pub use radicle_protocol::worker::fetch::{FetchResult, UpdatedCanonicalRefs};
25
26use super::channels::ChannelsFlush;
27
28pub enum Handle {
29    Clone {
30        handle: radicle_fetch::Handle<ChannelsFlush>,
31        tmp: tempfile::TempDir,
32    },
33    Pull {
34        handle: radicle_fetch::Handle<ChannelsFlush>,
35        notifications: node::notifications::StoreWriter,
36    },
37}
38
39impl Handle {
40    pub fn new(
41        rid: RepoId,
42        local: PublicKey,
43        storage: &Storage,
44        follow: Allowed,
45        blocked: BlockList,
46        channels: ChannelsFlush,
47        notifications: node::notifications::StoreWriter,
48    ) -> Result<Self, error::Handle> {
49        let exists = storage.contains(&rid)?;
50        if exists {
51            let repo = storage.repository(rid)?;
52            let handle = radicle_fetch::Handle::new(local, repo, follow, blocked, channels)?;
53            Ok(Handle::Pull {
54                handle,
55                notifications,
56            })
57        } else {
58            let (repo, tmp) = storage.lock_repository(rid)?;
59            let handle = radicle_fetch::Handle::new(local, repo, follow, blocked, channels)?;
60            Ok(Handle::Clone { handle, tmp })
61        }
62    }
63
64    pub fn fetch<D: node::refs::Store>(
65        self,
66        rid: RepoId,
67        storage: &Storage,
68        cache: &mut cob::cache::StoreWriter,
69        refsdb: &mut D,
70        limit: FetchLimit,
71        remote: PublicKey,
72        refs_at: Option<Vec<RefsAt>>,
73    ) -> Result<FetchResult, error::Fetch> {
74        let (result, clone, notifs) = match self {
75            Self::Clone { mut handle, tmp } => {
76                log::debug!(target: "worker", "{} cloning from {remote}", handle.local());
77                let result = radicle_fetch::clone(&mut handle, limit, remote)?;
78                mv(tmp, storage, &rid)?;
79                (result, true, None)
80            }
81            Self::Pull {
82                mut handle,
83                notifications,
84            } => {
85                log::debug!(target: "worker", "{} pulling from {remote}", handle.local());
86                let result = radicle_fetch::pull(&mut handle, limit, remote, refs_at)?;
87                (result, false, Some(notifications))
88            }
89        };
90
91        for rejected in result.rejected() {
92            log::warn!(target: "worker", "Rejected update for {}", rejected.refname())
93        }
94
95        match result {
96            radicle_fetch::FetchResult::Failed {
97                threshold,
98                delegates,
99                validations,
100            } => {
101                for fail in validations.iter() {
102                    log::error!(target: "worker", "Validation error: {fail}");
103                }
104                Err(error::Fetch::Validation {
105                    threshold,
106                    delegates: delegates.into_iter().map(|key| key.to_string()).collect(),
107                })
108            }
109            radicle_fetch::FetchResult::Success {
110                applied,
111                remotes,
112                validations,
113            } => {
114                for warn in validations {
115                    log::warn!(target: "worker", "Validation error: {warn}");
116                }
117
118                // N.b. We do not go through handle for this since the cloning handle
119                // points to a repository that is temporary and gets moved by [`mv`].
120                let repo = storage.repository(rid)?;
121                repo.set_identity_head()?;
122                match repo.set_head() {
123                    Ok(head) => {
124                        if head.is_updated() {
125                            log::trace!(target: "worker", "Set HEAD to {}", head.new);
126                        }
127                    }
128                    Err(RepositoryError::Quorum(e)) => {
129                        log::warn!(target: "worker", "Fetch could not set HEAD: {e}")
130                    }
131                    Err(e) => return Err(e.into()),
132                }
133
134                let canonical = match set_canonical_refs(&repo, &applied) {
135                    Ok(updates) => updates.unwrap_or_default(),
136                    Err(e) => {
137                        log::warn!(target: "worker", "Failed to set canonical references: {e}");
138                        UpdatedCanonicalRefs::default()
139                    }
140                };
141
142                // Notifications are only posted for pulls, not clones.
143                if let Some(mut store) = notifs {
144                    // Only create notifications for repos that we have
145                    // contributed to in some way, otherwise our inbox will
146                    // be flooded by all the repos we are seeding.
147                    if repo.remote(&storage.info().key).is_ok() {
148                        notify(&rid, &applied, &mut store)?;
149                    }
150                }
151
152                cache_cobs(&rid, &applied.updated, &repo, cache)?;
153                cache_refs(&rid, &applied.updated, refsdb)?;
154
155                Ok(FetchResult {
156                    updated: applied.updated,
157                    canonical,
158                    namespaces: remotes.into_iter().collect(),
159                    doc: repo.identity_doc()?,
160                    clone,
161                })
162            }
163        }
164    }
165}
166
167/// In the case of cloning, we have performed the fetch into a
168/// temporary directory -- ensuring that no concurrent operations
169/// see an empty repository.
170///
171/// At the end of the clone, we perform a rename of the temporary
172/// directory to the storage repository.
173///
174/// # Errors
175///   - Will fail if `storage` contains `rid` already.
176fn mv(tmp: tempfile::TempDir, storage: &Storage, rid: &RepoId) -> Result<(), error::Fetch> {
177    use std::io::{Error, ErrorKind};
178
179    let from = tmp.path();
180    let to = storage.path_of(rid);
181
182    if !to.exists() {
183        std::fs::rename(from, to)?;
184    } else {
185        log::warn!(target: "worker", "Refusing to move cloned repository {rid} already exists");
186        return Err(Error::new(
187            ErrorKind::AlreadyExists,
188            format!("repository already exists {to:?}"),
189        )
190        .into());
191    }
192
193    Ok(())
194}
195
196// Post notifications for the given refs.
197fn notify(
198    rid: &RepoId,
199    refs: &radicle_fetch::git::refs::Applied<'static>,
200    store: &mut node::notifications::StoreWriter,
201) -> Result<(), error::Fetch> {
202    let now = LocalTime::now();
203
204    for update in refs.updated.iter() {
205        if let Some(r) = update.name().to_namespaced() {
206            let r = r.strip_namespace();
207            if r == *git::refs::storage::SIGREFS_BRANCH {
208                // Don't notify about signed refs.
209                continue;
210            }
211            if r == *git::refs::storage::IDENTITY_BRANCH {
212                // Don't notify about the peers's identity branch pointer, since there will
213                // be a separate notification on the identity COB itself.
214                continue;
215            }
216            if r == *git::refs::storage::IDENTITY_ROOT {
217                // Don't notify about the peers's identity root pointer. This is only used
218                // for sigref verification.
219                continue;
220            }
221            if let Some(rest) = r.strip_prefix(git::refname!("refs/heads/patches")) {
222                if radicle::cob::ObjectId::from_str(rest.as_str()).is_ok() {
223                    // Don't notify about patch branches, since we already get
224                    // notifications about patch updates.
225                    continue;
226                }
227            }
228        }
229        if let RefUpdate::Skipped { .. } = update {
230            // Don't notify about skipped refs.
231        } else if let Err(e) = store.insert(rid, update, now) {
232            log::error!(
233                target: "worker",
234                "Failed to update notification store for {rid}: {e}"
235            );
236        }
237    }
238    Ok(())
239}
240
241/// Cache certain ref updates in our database.
242fn cache_refs<D>(repo: &RepoId, refs: &[RefUpdate], db: &mut D) -> Result<(), node::refs::Error>
243where
244    D: node::refs::Store,
245{
246    let time = LocalTime::now();
247
248    for r in refs {
249        let name = r.name();
250        let (namespace, qualified) = match radicle::git::parse_ref_namespaced(name) {
251            Err(e) => {
252                log::error!(target: "worker", "Git reference is invalid: {name:?}: {e}");
253                log::warn!(target: "worker", "Skipping refs caching for fetch of {repo}");
254                break;
255            }
256            Ok((n, q)) => (n, q),
257        };
258        if qualified != *git::refs::storage::SIGREFS_BRANCH {
259            // Only cache `rad/sigrefs`.
260            continue;
261        }
262        log::trace!(target: "node", "Updating cache for {name} in {repo}");
263
264        let result = match r {
265            RefUpdate::Updated { new, .. } => db.set(repo, &namespace, &qualified, *new, time),
266            RefUpdate::Created { oid, .. } => db.set(repo, &namespace, &qualified, *oid, time),
267            RefUpdate::Deleted { .. } => db.delete(repo, &namespace, &qualified),
268            RefUpdate::Skipped { .. } => continue,
269        };
270
271        if let Err(e) = result {
272            log::error!(target: "worker", "Error updating git refs cache for {name:?}: {e}");
273            log::warn!(target: "worker", "Skipping refs caching for fetch of {repo}");
274            break;
275        }
276    }
277    Ok(())
278}
279
280/// Write new `RefUpdate`s that are related a `Patch` or an `Issue`
281/// COB to the COB cache.
282fn cache_cobs<S, C>(
283    rid: &RepoId,
284    refs: &[RefUpdate],
285    storage: &S,
286    cache: &mut C,
287) -> Result<(), error::Cache>
288where
289    S: ReadRepository + cob::Store<Namespace = NodeId>,
290    C: cob::cache::Update<cob::issue::Issue> + cob::cache::Update<cob::patch::Patch>,
291    C: cob::cache::Remove<cob::issue::Issue> + cob::cache::Remove<cob::patch::Patch>,
292{
293    let mut issues = cob::store::Store::<cob::issue::Issue, _>::open(storage)?;
294    let mut patches = cob::store::Store::<cob::patch::Patch, _>::open(storage)?;
295
296    for update in refs {
297        match update {
298            RefUpdate::Updated { name, .. }
299            | RefUpdate::Created { name, .. }
300            | RefUpdate::Deleted { name, .. } => match name.to_namespaced() {
301                Some(name) => {
302                    let Some(identifier) = cob::TypedId::from_namespaced(&name)? else {
303                        continue;
304                    };
305                    if identifier.is_issue() {
306                        update_or_remove(&mut issues, cache, rid, identifier)?;
307                    } else if identifier.is_patch() {
308                        update_or_remove(&mut patches, cache, rid, identifier)?;
309                    } else {
310                        // Unknown COB, don't cache.
311                        continue;
312                    }
313                }
314                None => continue,
315            },
316            RefUpdate::Skipped { .. } => { /* Do nothing */ }
317        }
318    }
319
320    Ok(())
321}
322
323/// Update or remove a cache entry.
324fn update_or_remove<R, C, T>(
325    store: &mut cob::store::Store<T, R>,
326    cache: &mut C,
327    rid: &RepoId,
328    tid: TypedId,
329) -> Result<(), error::Cache>
330where
331    R: cob::Store + ReadRepository,
332    T: cob::Evaluate<R> + cob::store::Cob + cob::store::CobWithType,
333    C: cob::cache::Update<T> + cob::cache::Remove<T>,
334{
335    match store.get(&tid.id) {
336        Ok(Some(obj)) => {
337            // Object loaded correctly, update cache.
338            return cache.update(rid, &tid.id, &obj).map(|_| ()).map_err(|e| {
339                error::Cache::Update {
340                    id: tid.id,
341                    type_name: tid.type_name,
342                    err: e.into(),
343                }
344            });
345        }
346        Ok(None) => {
347            // Object was not found. Fall-through.
348        }
349        Err(e) => {
350            // Object was found, but failed to load. Fall-through.
351            log::error!(target: "fetch", "Error loading COB {tid} from storage: {e}");
352        }
353    }
354    // The object has either been removed entirely from the repository,
355    // or it failed to load. So we also remove it from the cache.
356    cob::cache::Remove::<T>::remove(cache, &tid.id)
357        .map(|_| ())
358        .map_err(|e| error::Cache::Remove {
359            id: tid.id,
360            type_name: tid.type_name,
361            err: Box::new(e),
362        })?;
363
364    Ok(())
365}
366
367fn set_canonical_refs(
368    repo: &Repository,
369    applied: &Applied,
370) -> Result<Option<UpdatedCanonicalRefs>, error::Canonical> {
371    let identity = repo.identity()?;
372    // TODO(finto): it's unfortunate that we may end up computing the default
373    // branch again after `set_head` is called after the fetch. This is due to
374    // the storage capabilities being leaked to this part of the code base.
375    let rules = identity
376        .canonical_refs_or_default(|| {
377            let rule = identity.doc().default_branch_rule()?;
378            Ok::<_, CanonicalRefsError>(CanonicalRefs::from_iter([rule]))
379        })?
380        .rules()
381        .clone();
382
383    let mut updated_refs = UpdatedCanonicalRefs::default();
384    let refnames = applied
385        .updated
386        .iter()
387        .filter_map(|update| match update {
388            RefUpdate::Updated { name, .. } | RefUpdate::Created { name, .. } => {
389                let name = name.clone().into_qualified()?;
390                let name = name.to_namespaced()?;
391                Some(name.strip_namespace())
392            }
393            _ => None,
394        })
395        .collect::<BTreeSet<_>>();
396
397    for name in refnames {
398        let canonical = match rules.canonical(name.clone(), repo) {
399            Some(canonical) => canonical,
400            None => continue,
401        };
402
403        let canonical = match canonical.find_objects() {
404            Err(err) => {
405                log::warn!(target: "worker", "Failed to find objects for canonical computation: {err}");
406                continue;
407            }
408            Ok(canonical) => canonical,
409        };
410
411        match canonical.quorum() {
412            Err(err) => {
413                log::warn!(
414                    target: "worker",
415                    "Failed to calculate canonical reference: {err}",
416                );
417                continue;
418            }
419            Ok(git::canonical::Quorum {
420                refname, object, ..
421            }) => {
422                let oid = object.id();
423                if let Err(e) = repo.backend.reference(
424                    refname.clone().as_str(),
425                    *oid,
426                    true,
427                    "set-canonical-reference from fetch (radicle)",
428                ) {
429                    log::warn!(
430                        target: "worker",
431                        "Failed to set canonical reference {refname}->{oid}: {e}"
432                    );
433                } else {
434                    updated_refs.updated(refname, oid);
435                }
436            }
437        }
438    }
439    Ok(Some(updated_refs))
440}