radicle_node/worker/
fetch.rs

1pub mod error;
2
3use std::collections::HashSet;
4use std::str::FromStr;
5
6use localtime::LocalTime;
7
8use radicle::cob::TypedId;
9use radicle::crypto::PublicKey;
10use radicle::identity::DocAt;
11use radicle::prelude::NodeId;
12use radicle::prelude::RepoId;
13use radicle::storage::refs::RefsAt;
14use radicle::storage::{
15    ReadRepository, ReadStorage as _, RefUpdate, RemoteRepository, RepositoryError,
16    WriteRepository as _,
17};
18use radicle::{cob, git, node, Storage};
19use radicle_fetch::{Allowed, BlockList, FetchLimit};
20
21use super::channels::ChannelsFlush;
22
23#[derive(Debug, Clone)]
24pub struct FetchResult {
25    /// The set of updated references.
26    pub updated: Vec<RefUpdate>,
27    /// The set of remote namespaces that were updated.
28    pub namespaces: HashSet<PublicKey>,
29    /// The fetch was a full clone.
30    pub clone: bool,
31    /// Identity doc of fetched repo.
32    pub doc: DocAt,
33}
34
35impl FetchResult {
36    pub fn new(doc: DocAt) -> Self {
37        Self {
38            updated: vec![],
39            namespaces: HashSet::new(),
40            clone: false,
41            doc,
42        }
43    }
44}
45
46pub enum Handle {
47    Clone {
48        handle: radicle_fetch::Handle<ChannelsFlush>,
49        tmp: tempfile::TempDir,
50    },
51    Pull {
52        handle: radicle_fetch::Handle<ChannelsFlush>,
53        notifications: node::notifications::StoreWriter,
54    },
55}
56
57impl Handle {
58    pub fn new(
59        rid: RepoId,
60        local: PublicKey,
61        storage: &Storage,
62        follow: Allowed,
63        blocked: BlockList,
64        channels: ChannelsFlush,
65        notifications: node::notifications::StoreWriter,
66    ) -> Result<Self, error::Handle> {
67        let exists = storage.contains(&rid)?;
68        if exists {
69            let repo = storage.repository(rid)?;
70            let handle = radicle_fetch::Handle::new(local, repo, follow, blocked, channels)?;
71            Ok(Handle::Pull {
72                handle,
73                notifications,
74            })
75        } else {
76            let (repo, tmp) = storage.lock_repository(rid)?;
77            let handle = radicle_fetch::Handle::new(local, repo, follow, blocked, channels)?;
78            Ok(Handle::Clone { handle, tmp })
79        }
80    }
81
82    pub fn fetch<D: node::refs::Store>(
83        self,
84        rid: RepoId,
85        storage: &Storage,
86        cache: &mut cob::cache::StoreWriter,
87        refsdb: &mut D,
88        limit: FetchLimit,
89        remote: PublicKey,
90        refs_at: Option<Vec<RefsAt>>,
91    ) -> Result<FetchResult, error::Fetch> {
92        use git::canonical::QuorumError::{Diverging, NoCandidates};
93
94        let (result, clone, notifs) = match self {
95            Self::Clone { mut handle, tmp } => {
96                log::debug!(target: "worker", "{} cloning from {remote}", handle.local());
97                let result = radicle_fetch::clone(&mut handle, limit, remote)?;
98                mv(tmp, storage, &rid)?;
99                (result, true, None)
100            }
101            Self::Pull {
102                mut handle,
103                notifications,
104            } => {
105                log::debug!(target: "worker", "{} pulling from {remote}", handle.local());
106                let result = radicle_fetch::pull(&mut handle, limit, remote, refs_at)?;
107                (result, false, Some(notifications))
108            }
109        };
110
111        for rejected in result.rejected() {
112            log::warn!(target: "worker", "Rejected update for {}", rejected.refname())
113        }
114
115        match result {
116            radicle_fetch::FetchResult::Failed {
117                threshold,
118                delegates,
119                validations,
120            } => {
121                for fail in validations.iter() {
122                    log::error!(target: "worker", "Validation error: {}", fail);
123                }
124                Err(error::Fetch::Validation {
125                    threshold,
126                    delegates: delegates.into_iter().map(|key| key.to_string()).collect(),
127                })
128            }
129            radicle_fetch::FetchResult::Success {
130                applied,
131                remotes,
132                validations,
133            } => {
134                for warn in validations {
135                    log::warn!(target: "worker", "Validation error: {}", warn);
136                }
137
138                // N.b. We do not go through handle for this since the cloning handle
139                // points to a repository that is temporary and gets moved by [`mv`].
140                let repo = storage.repository(rid)?;
141                repo.set_identity_head()?;
142                match repo.set_head() {
143                    Ok(head) => {
144                        if head.is_updated() {
145                            log::trace!(target: "worker", "Set HEAD to {}", head.new);
146                        }
147                    }
148                    Err(RepositoryError::Quorum(Diverging(e))) => {
149                        log::warn!(target: "worker", "Fetch could not set HEAD: {e}")
150                    }
151                    Err(RepositoryError::Quorum(NoCandidates(e))) => {
152                        log::warn!(target: "worker", "Fetch could not set HEAD: {e}")
153                    }
154                    Err(e) => return Err(e.into()),
155                }
156
157                // Notifications are only posted for pulls, not clones.
158                if let Some(mut store) = notifs {
159                    // Only create notifications for repos that we have
160                    // contributed to in some way, otherwise our inbox will
161                    // be flooded by all the repos we are seeding.
162                    if repo.remote(&storage.info().key).is_ok() {
163                        notify(&rid, &applied, &mut store)?;
164                    }
165                }
166
167                cache_cobs(&rid, &applied.updated, &repo, cache)?;
168                cache_refs(&rid, &applied.updated, refsdb)?;
169
170                Ok(FetchResult {
171                    updated: applied.updated,
172                    namespaces: remotes.into_iter().collect(),
173                    doc: repo.identity_doc()?,
174                    clone,
175                })
176            }
177        }
178    }
179}
180
181/// In the case of cloning, we have performed the fetch into a
182/// temporary directory -- ensuring that no concurrent operations
183/// see an empty repository.
184///
185/// At the end of the clone, we perform a rename of the temporary
186/// directory to the storage repository.
187///
188/// # Errors
189///   - Will fail if `storage` contains `rid` already.
190fn mv(tmp: tempfile::TempDir, storage: &Storage, rid: &RepoId) -> Result<(), error::Fetch> {
191    use std::io::{Error, ErrorKind};
192
193    let from = tmp.path();
194    let to = storage.path_of(rid);
195
196    if !to.exists() {
197        std::fs::rename(from, to)?;
198    } else {
199        log::warn!(target: "worker", "Refusing to move cloned repository {rid} already exists");
200        return Err(Error::new(
201            ErrorKind::AlreadyExists,
202            format!("repository already exists {:?}", to),
203        )
204        .into());
205    }
206
207    Ok(())
208}
209
210// Post notifications for the given refs.
211fn notify(
212    rid: &RepoId,
213    refs: &radicle_fetch::git::refs::Applied<'static>,
214    store: &mut node::notifications::StoreWriter,
215) -> Result<(), error::Fetch> {
216    let now = LocalTime::now();
217
218    for update in refs.updated.iter() {
219        if let Some(r) = update.name().to_namespaced() {
220            let r = r.strip_namespace();
221            if r == *git::refs::storage::SIGREFS_BRANCH {
222                // Don't notify about signed refs.
223                continue;
224            }
225            if r == *git::refs::storage::IDENTITY_BRANCH {
226                // Don't notify about the peers's identity branch pointer, since there will
227                // be a separate notification on the identity COB itself.
228                continue;
229            }
230            if r == *git::refs::storage::IDENTITY_ROOT {
231                // Don't notify about the peers's identity root pointer. This is only used
232                // for sigref verification.
233                continue;
234            }
235            if let Some(rest) = r.strip_prefix(git::refname!("refs/heads/patches")) {
236                if radicle::cob::ObjectId::from_str(rest.as_str()).is_ok() {
237                    // Don't notify about patch branches, since we already get
238                    // notifications about patch updates.
239                    continue;
240                }
241            }
242        }
243        if let RefUpdate::Skipped { .. } = update {
244            // Don't notify about skipped refs.
245        } else if let Err(e) = store.insert(rid, update, now) {
246            log::error!(
247                target: "worker",
248                "Failed to update notification store for {rid}: {e}"
249            );
250        }
251    }
252    Ok(())
253}
254
255/// Cache certain ref updates in our database.
256fn cache_refs<D>(repo: &RepoId, refs: &[RefUpdate], db: &mut D) -> Result<(), node::refs::Error>
257where
258    D: node::refs::Store,
259{
260    let time = LocalTime::now();
261
262    for r in refs {
263        let name = r.name();
264        let (namespace, qualified) = match radicle::git::parse_ref_namespaced(name) {
265            Err(e) => {
266                log::error!(target: "worker", "Git reference is invalid: {name:?}: {e}");
267                log::warn!(target: "worker", "Skipping refs caching for fetch of {repo}");
268                break;
269            }
270            Ok((n, q)) => (n, q),
271        };
272        if qualified != *git::refs::storage::SIGREFS_BRANCH {
273            // Only cache `rad/sigrefs`.
274            continue;
275        }
276        log::trace!(target: "node", "Updating cache for {name} in {repo}");
277
278        let result = match r {
279            RefUpdate::Updated { new, .. } => db.set(repo, &namespace, &qualified, *new, time),
280            RefUpdate::Created { oid, .. } => db.set(repo, &namespace, &qualified, *oid, time),
281            RefUpdate::Deleted { .. } => db.delete(repo, &namespace, &qualified),
282            RefUpdate::Skipped { .. } => continue,
283        };
284
285        if let Err(e) = result {
286            log::error!(target: "worker", "Error updating git refs cache for {name:?}: {e}");
287            log::warn!(target: "worker", "Skipping refs caching for fetch of {repo}");
288            break;
289        }
290    }
291    Ok(())
292}
293
294/// Write new `RefUpdate`s that are related a `Patch` or an `Issue`
295/// COB to the COB cache.
296fn cache_cobs<S, C>(
297    rid: &RepoId,
298    refs: &[RefUpdate],
299    storage: &S,
300    cache: &mut C,
301) -> Result<(), error::Cache>
302where
303    S: ReadRepository + cob::Store<Namespace = NodeId>,
304    C: cob::cache::Update<cob::issue::Issue> + cob::cache::Update<cob::patch::Patch>,
305    C: cob::cache::Remove<cob::issue::Issue> + cob::cache::Remove<cob::patch::Patch>,
306{
307    let mut issues = cob::store::Store::<cob::issue::Issue, _>::open(storage)?;
308    let mut patches = cob::store::Store::<cob::patch::Patch, _>::open(storage)?;
309
310    for update in refs {
311        match update {
312            RefUpdate::Updated { name, .. }
313            | RefUpdate::Created { name, .. }
314            | RefUpdate::Deleted { name, .. } => match name.to_namespaced() {
315                Some(name) => {
316                    let Some(identifier) = cob::TypedId::from_namespaced(&name)? else {
317                        continue;
318                    };
319                    if identifier.is_issue() {
320                        update_or_remove(&mut issues, cache, rid, identifier)?;
321                    } else if identifier.is_patch() {
322                        update_or_remove(&mut patches, cache, rid, identifier)?;
323                    } else {
324                        // Unknown COB, don't cache.
325                        continue;
326                    }
327                }
328                None => continue,
329            },
330            RefUpdate::Skipped { .. } => { /* Do nothing */ }
331        }
332    }
333
334    Ok(())
335}
336
337/// Update or remove a cache entry.
338fn update_or_remove<R, C, T>(
339    store: &mut cob::store::Store<T, R>,
340    cache: &mut C,
341    rid: &RepoId,
342    tid: TypedId,
343) -> Result<(), error::Cache>
344where
345    R: cob::Store + ReadRepository,
346    T: cob::Evaluate<R> + cob::store::Cob + cob::store::CobWithType,
347    C: cob::cache::Update<T> + cob::cache::Remove<T>,
348{
349    match store.get(&tid.id) {
350        Ok(Some(obj)) => {
351            // Object loaded correctly, update cache.
352            return cache.update(rid, &tid.id, &obj).map(|_| ()).map_err(|e| {
353                error::Cache::Update {
354                    id: tid.id,
355                    type_name: tid.type_name,
356                    err: e.into(),
357                }
358            });
359        }
360        Ok(None) => {
361            // Object was not found. Fall-through.
362        }
363        Err(e) => {
364            // Object was found, but failed to load. Fall-through.
365            log::error!(target: "fetch", "Error loading COB {tid} from storage: {e}");
366        }
367    }
368    // The object has either been removed entirely from the repository,
369    // or it failed to load. So we also remove it from the cache.
370    cob::cache::Remove::<T>::remove(cache, &tid.id)
371        .map(|_| ())
372        .map_err(|e| error::Cache::Remove {
373            id: tid.id,
374            type_name: tid.type_name,
375            err: Box::new(e),
376        })?;
377
378    Ok(())
379}