1use radicle::identity::doc::CanonicalRefsError;
2use radicle::identity::CanonicalRefs;
3pub(crate) use radicle_protocol::worker::fetch::error;
4
5use std::collections::BTreeSet;
6use std::str::FromStr;
7
8use localtime::LocalTime;
9
10use radicle::cob::TypedId;
11use radicle::crypto::PublicKey;
12use radicle::identity::crefs::GetCanonicalRefs as _;
13use radicle::prelude::NodeId;
14use radicle::prelude::RepoId;
15use radicle::storage::git::Repository;
16use radicle::storage::refs::RefsAt;
17use radicle::storage::{
18 ReadRepository, ReadStorage as _, RefUpdate, RemoteRepository, RepositoryError,
19 WriteRepository as _,
20};
21use radicle::{cob, git, node, Storage};
22use radicle_fetch::git::refs::Applied;
23use radicle_fetch::{Allowed, BlockList, FetchLimit};
24pub use radicle_protocol::worker::fetch::{FetchResult, UpdatedCanonicalRefs};
25
26use super::channels::ChannelsFlush;
27
28pub enum Handle {
29 Clone {
30 handle: radicle_fetch::Handle<ChannelsFlush>,
31 tmp: tempfile::TempDir,
32 },
33 Pull {
34 handle: radicle_fetch::Handle<ChannelsFlush>,
35 notifications: node::notifications::StoreWriter,
36 },
37}
38
39impl Handle {
40 pub fn new(
41 rid: RepoId,
42 local: PublicKey,
43 storage: &Storage,
44 follow: Allowed,
45 blocked: BlockList,
46 channels: ChannelsFlush,
47 notifications: node::notifications::StoreWriter,
48 ) -> Result<Self, error::Handle> {
49 let exists = storage.contains(&rid)?;
50 if exists {
51 let repo = storage.repository(rid)?;
52 let handle = radicle_fetch::Handle::new(local, repo, follow, blocked, channels)?;
53 Ok(Handle::Pull {
54 handle,
55 notifications,
56 })
57 } else {
58 let (repo, tmp) = storage.lock_repository(rid)?;
59 let handle = radicle_fetch::Handle::new(local, repo, follow, blocked, channels)?;
60 Ok(Handle::Clone { handle, tmp })
61 }
62 }
63
64 pub fn fetch<D: node::refs::Store>(
65 self,
66 rid: RepoId,
67 storage: &Storage,
68 cache: &mut cob::cache::StoreWriter,
69 refsdb: &mut D,
70 limit: FetchLimit,
71 remote: PublicKey,
72 refs_at: Option<Vec<RefsAt>>,
73 ) -> Result<FetchResult, error::Fetch> {
74 let (result, clone, notifs) = match self {
75 Self::Clone { mut handle, tmp } => {
76 log::debug!(target: "worker", "{} cloning from {remote}", handle.local());
77 let result = radicle_fetch::clone(&mut handle, limit, remote)?;
78 mv(tmp, storage, &rid)?;
79 (result, true, None)
80 }
81 Self::Pull {
82 mut handle,
83 notifications,
84 } => {
85 log::debug!(target: "worker", "{} pulling from {remote}", handle.local());
86 let result = radicle_fetch::pull(&mut handle, limit, remote, refs_at)?;
87 (result, false, Some(notifications))
88 }
89 };
90
91 for rejected in result.rejected() {
92 log::warn!(target: "worker", "Rejected update for {}", rejected.refname())
93 }
94
95 match result {
96 radicle_fetch::FetchResult::Failed {
97 threshold,
98 delegates,
99 validations,
100 } => {
101 for fail in validations.iter() {
102 log::error!(target: "worker", "Validation error: {fail}");
103 }
104 Err(error::Fetch::Validation {
105 threshold,
106 delegates: delegates.into_iter().map(|key| key.to_string()).collect(),
107 })
108 }
109 radicle_fetch::FetchResult::Success {
110 applied,
111 remotes,
112 validations,
113 } => {
114 for warn in validations {
115 log::warn!(target: "worker", "Validation error: {warn}");
116 }
117
118 let repo = storage.repository(rid)?;
121 repo.set_identity_head()?;
122 match repo.set_head() {
123 Ok(head) => {
124 if head.is_updated() {
125 log::trace!(target: "worker", "Set HEAD to {}", head.new);
126 }
127 }
128 Err(RepositoryError::Quorum(e)) => {
129 log::warn!(target: "worker", "Fetch could not set HEAD: {e}")
130 }
131 Err(e) => return Err(e.into()),
132 }
133
134 let canonical = match set_canonical_refs(&repo, &applied) {
135 Ok(updates) => updates.unwrap_or_default(),
136 Err(e) => {
137 log::warn!(target: "worker", "Failed to set canonical references: {e}");
138 UpdatedCanonicalRefs::default()
139 }
140 };
141
142 if let Some(mut store) = notifs {
144 if repo.remote(&storage.info().key).is_ok() {
148 notify(&rid, &applied, &mut store)?;
149 }
150 }
151
152 cache_cobs(&rid, &applied.updated, &repo, cache)?;
153 cache_refs(&rid, &applied.updated, refsdb)?;
154
155 Ok(FetchResult {
156 updated: applied.updated,
157 canonical,
158 namespaces: remotes.into_iter().collect(),
159 doc: repo.identity_doc()?,
160 clone,
161 })
162 }
163 }
164 }
165}
166
167fn mv(tmp: tempfile::TempDir, storage: &Storage, rid: &RepoId) -> Result<(), error::Fetch> {
177 use std::io::{Error, ErrorKind};
178
179 let from = tmp.path();
180 let to = storage.path_of(rid);
181
182 if !to.exists() {
183 std::fs::rename(from, to)?;
184 } else {
185 log::warn!(target: "worker", "Refusing to move cloned repository {rid} already exists");
186 return Err(Error::new(
187 ErrorKind::AlreadyExists,
188 format!("repository already exists {to:?}"),
189 )
190 .into());
191 }
192
193 Ok(())
194}
195
196fn notify(
198 rid: &RepoId,
199 refs: &radicle_fetch::git::refs::Applied<'static>,
200 store: &mut node::notifications::StoreWriter,
201) -> Result<(), error::Fetch> {
202 let now = LocalTime::now();
203
204 for update in refs.updated.iter() {
205 if let Some(r) = update.name().to_namespaced() {
206 let r = r.strip_namespace();
207 if r == *git::refs::storage::SIGREFS_BRANCH {
208 continue;
210 }
211 if r == *git::refs::storage::IDENTITY_BRANCH {
212 continue;
215 }
216 if r == *git::refs::storage::IDENTITY_ROOT {
217 continue;
220 }
221 if let Some(rest) = r.strip_prefix(git::refname!("refs/heads/patches")) {
222 if radicle::cob::ObjectId::from_str(rest.as_str()).is_ok() {
223 continue;
226 }
227 }
228 }
229 if let RefUpdate::Skipped { .. } = update {
230 } else if let Err(e) = store.insert(rid, update, now) {
232 log::error!(
233 target: "worker",
234 "Failed to update notification store for {rid}: {e}"
235 );
236 }
237 }
238 Ok(())
239}
240
241fn cache_refs<D>(repo: &RepoId, refs: &[RefUpdate], db: &mut D) -> Result<(), node::refs::Error>
243where
244 D: node::refs::Store,
245{
246 let time = LocalTime::now();
247
248 for r in refs {
249 let name = r.name();
250 let (namespace, qualified) = match radicle::git::parse_ref_namespaced(name) {
251 Err(e) => {
252 log::error!(target: "worker", "Git reference is invalid: {name:?}: {e}");
253 log::warn!(target: "worker", "Skipping refs caching for fetch of {repo}");
254 break;
255 }
256 Ok((n, q)) => (n, q),
257 };
258 if qualified != *git::refs::storage::SIGREFS_BRANCH {
259 continue;
261 }
262 log::trace!(target: "node", "Updating cache for {name} in {repo}");
263
264 let result = match r {
265 RefUpdate::Updated { new, .. } => db.set(repo, &namespace, &qualified, *new, time),
266 RefUpdate::Created { oid, .. } => db.set(repo, &namespace, &qualified, *oid, time),
267 RefUpdate::Deleted { .. } => db.delete(repo, &namespace, &qualified),
268 RefUpdate::Skipped { .. } => continue,
269 };
270
271 if let Err(e) = result {
272 log::error!(target: "worker", "Error updating git refs cache for {name:?}: {e}");
273 log::warn!(target: "worker", "Skipping refs caching for fetch of {repo}");
274 break;
275 }
276 }
277 Ok(())
278}
279
280fn cache_cobs<S, C>(
283 rid: &RepoId,
284 refs: &[RefUpdate],
285 storage: &S,
286 cache: &mut C,
287) -> Result<(), error::Cache>
288where
289 S: ReadRepository + cob::Store<Namespace = NodeId>,
290 C: cob::cache::Update<cob::issue::Issue> + cob::cache::Update<cob::patch::Patch>,
291 C: cob::cache::Remove<cob::issue::Issue> + cob::cache::Remove<cob::patch::Patch>,
292{
293 let mut issues = cob::store::Store::<cob::issue::Issue, _>::open(storage)?;
294 let mut patches = cob::store::Store::<cob::patch::Patch, _>::open(storage)?;
295
296 for update in refs {
297 match update {
298 RefUpdate::Updated { name, .. }
299 | RefUpdate::Created { name, .. }
300 | RefUpdate::Deleted { name, .. } => match name.to_namespaced() {
301 Some(name) => {
302 let Some(identifier) = cob::TypedId::from_namespaced(&name)? else {
303 continue;
304 };
305 if identifier.is_issue() {
306 update_or_remove(&mut issues, cache, rid, identifier)?;
307 } else if identifier.is_patch() {
308 update_or_remove(&mut patches, cache, rid, identifier)?;
309 } else {
310 continue;
312 }
313 }
314 None => continue,
315 },
316 RefUpdate::Skipped { .. } => { }
317 }
318 }
319
320 Ok(())
321}
322
323fn update_or_remove<R, C, T>(
325 store: &mut cob::store::Store<T, R>,
326 cache: &mut C,
327 rid: &RepoId,
328 tid: TypedId,
329) -> Result<(), error::Cache>
330where
331 R: cob::Store + ReadRepository,
332 T: cob::Evaluate<R> + cob::store::Cob + cob::store::CobWithType,
333 C: cob::cache::Update<T> + cob::cache::Remove<T>,
334{
335 match store.get(&tid.id) {
336 Ok(Some(obj)) => {
337 return cache.update(rid, &tid.id, &obj).map(|_| ()).map_err(|e| {
339 error::Cache::Update {
340 id: tid.id,
341 type_name: tid.type_name,
342 err: e.into(),
343 }
344 });
345 }
346 Ok(None) => {
347 }
349 Err(e) => {
350 log::error!(target: "fetch", "Error loading COB {tid} from storage: {e}");
352 }
353 }
354 cob::cache::Remove::<T>::remove(cache, &tid.id)
357 .map(|_| ())
358 .map_err(|e| error::Cache::Remove {
359 id: tid.id,
360 type_name: tid.type_name,
361 err: Box::new(e),
362 })?;
363
364 Ok(())
365}
366
367fn set_canonical_refs(
368 repo: &Repository,
369 applied: &Applied,
370) -> Result<Option<UpdatedCanonicalRefs>, error::Canonical> {
371 let identity = repo.identity()?;
372 let rules = identity
376 .canonical_refs_or_default(|| {
377 let rule = identity.doc().default_branch_rule()?;
378 Ok::<_, CanonicalRefsError>(CanonicalRefs::from_iter([rule]))
379 })?
380 .rules()
381 .clone();
382
383 let mut updated_refs = UpdatedCanonicalRefs::default();
384 let refnames = applied
385 .updated
386 .iter()
387 .filter_map(|update| match update {
388 RefUpdate::Updated { name, .. } | RefUpdate::Created { name, .. } => {
389 let name = name.clone().into_qualified()?;
390 let name = name.to_namespaced()?;
391 Some(name.strip_namespace())
392 }
393 _ => None,
394 })
395 .collect::<BTreeSet<_>>();
396
397 for name in refnames {
398 let canonical = match rules.canonical(name.clone(), repo) {
399 Some(canonical) => canonical,
400 None => continue,
401 };
402
403 let canonical = match canonical.find_objects() {
404 Err(err) => {
405 log::warn!(target: "worker", "Failed to find objects for canonical computation: {err}");
406 continue;
407 }
408 Ok(canonical) => canonical,
409 };
410
411 match canonical.quorum() {
412 Err(err) => {
413 log::warn!(
414 target: "worker",
415 "Failed to calculate canonical reference: {err}",
416 );
417 continue;
418 }
419 Ok(git::canonical::Quorum {
420 refname, object, ..
421 }) => {
422 let oid = object.id();
423 if let Err(e) = repo.backend.reference(
424 refname.clone().as_str(),
425 *oid,
426 true,
427 "set-canonical-reference from fetch (radicle)",
428 ) {
429 log::warn!(
430 target: "worker",
431 "Failed to set canonical reference {refname}->{oid}: {e}"
432 );
433 } else {
434 updated_refs.updated(refname, oid);
435 }
436 }
437 }
438 }
439 Ok(Some(updated_refs))
440}