1use radicle::identity::doc::CanonicalRefsError;
2use radicle::identity::CanonicalRefs;
3use radicle::storage::git::TempRepository;
4pub(crate) use radicle_protocol::worker::fetch::error;
5
6use std::collections::BTreeSet;
7use std::str::FromStr;
8
9use localtime::LocalTime;
10
11use radicle::cob::TypedId;
12use radicle::crypto::PublicKey;
13use radicle::identity::crefs::GetCanonicalRefs as _;
14use radicle::prelude::NodeId;
15use radicle::prelude::RepoId;
16use radicle::storage::git::Repository;
17use radicle::storage::refs::RefsAt;
18use radicle::storage::{
19 ReadRepository, ReadStorage as _, RefUpdate, RemoteRepository, RepositoryError,
20 WriteRepository as _,
21};
22use radicle::{cob, git, node, Storage};
23use radicle_fetch::git::refs::Applied;
24use radicle_fetch::{Allowed, BlockList, FetchLimit};
25pub use radicle_protocol::worker::fetch::{FetchResult, UpdatedCanonicalRefs};
26
27use super::channels::ChannelsFlush;
28
29pub enum Handle {
30 Clone {
31 handle: radicle_fetch::Handle<TempRepository, ChannelsFlush>,
32 },
33 Pull {
34 handle: radicle_fetch::Handle<Repository, ChannelsFlush>,
35 notifications: node::notifications::StoreWriter,
36 },
37}
38
39impl Handle {
40 pub fn new(
41 rid: RepoId,
42 local: PublicKey,
43 storage: &Storage,
44 follow: Allowed,
45 blocked: BlockList,
46 channels: ChannelsFlush,
47 notifications: node::notifications::StoreWriter,
48 ) -> Result<Self, error::Handle> {
49 let exists = storage.contains(&rid)?;
50 if exists {
51 let repo = storage.repository(rid)?;
52 let handle = radicle_fetch::Handle::new(local, repo, follow, blocked, channels)?;
53 Ok(Handle::Pull {
54 handle,
55 notifications,
56 })
57 } else {
58 let repo = storage.temporary_repository(rid)?;
59 let handle = radicle_fetch::Handle::new(local, repo, follow, blocked, channels)?;
60 Ok(Handle::Clone { handle })
61 }
62 }
63
64 pub fn fetch<D: node::refs::Store>(
65 self,
66 rid: RepoId,
67 storage: &Storage,
68 cache: &mut cob::cache::StoreWriter,
69 refsdb: &mut D,
70 limit: FetchLimit,
71 remote: PublicKey,
72 refs_at: Option<Vec<RefsAt>>,
73 ) -> Result<FetchResult, error::Fetch> {
74 let (result, clone, notifs) = match self {
75 Self::Clone { mut handle } => {
76 log::debug!(target: "worker", "{} cloning from {remote}", handle.local());
77 match radicle_fetch::clone(&mut handle, limit, remote) {
78 Err(err) => {
79 handle.into_inner().cleanup();
80 return Err(err.into());
81 }
82 Ok(result) => {
83 handle.into_inner().mv(storage.path_of(&rid))?;
84 (result, true, None)
85 }
86 }
87 }
88 Self::Pull {
89 mut handle,
90 notifications,
91 } => {
92 log::debug!(target: "worker", "{} pulling from {remote}", handle.local());
93 let result = radicle_fetch::pull(&mut handle, limit, remote, refs_at)?;
94 (result, false, Some(notifications))
95 }
96 };
97
98 for rejected in result.rejected() {
99 log::warn!(target: "worker", "Rejected update for {}", rejected.refname())
100 }
101
102 match result {
103 radicle_fetch::FetchResult::Failed {
104 threshold,
105 delegates,
106 validations,
107 } => {
108 for fail in validations.iter() {
109 log::error!(target: "worker", "Validation error: {fail}");
110 }
111 Err(error::Fetch::Validation {
112 threshold,
113 delegates: delegates.into_iter().map(|key| key.to_string()).collect(),
114 })
115 }
116 radicle_fetch::FetchResult::Success {
117 applied,
118 remotes,
119 validations,
120 } => {
121 for warn in validations {
122 log::warn!(target: "worker", "Validation error: {warn}");
123 }
124
125 let repo = storage.repository(rid)?;
128 repo.set_identity_head()?;
129 match repo.set_head() {
130 Ok(head) => {
131 if head.is_updated() {
132 log::trace!(target: "worker", "Set HEAD to {}", head.new);
133 }
134 }
135 Err(RepositoryError::Quorum(e)) => {
136 log::warn!(target: "worker", "Fetch could not set HEAD: {e}")
137 }
138 Err(e) => return Err(e.into()),
139 }
140
141 let canonical = match set_canonical_refs(&repo, &applied) {
142 Ok(updates) => updates.unwrap_or_default(),
143 Err(e) => {
144 log::warn!(target: "worker", "Failed to set canonical references: {e}");
145 UpdatedCanonicalRefs::default()
146 }
147 };
148
149 if let Some(mut store) = notifs {
151 if repo.remote(&storage.info().key).is_ok() {
155 notify(&rid, &applied, &mut store)?;
156 }
157 }
158
159 cache_cobs(&rid, &applied.updated, &repo, cache)?;
160 cache_refs(&rid, &applied.updated, refsdb)?;
161
162 Ok(FetchResult {
163 updated: applied.updated,
164 canonical,
165 namespaces: remotes.into_iter().collect(),
166 doc: repo.identity_doc()?,
167 clone,
168 })
169 }
170 }
171 }
172}
173
174fn notify(
176 rid: &RepoId,
177 refs: &radicle_fetch::git::refs::Applied<'static>,
178 store: &mut node::notifications::StoreWriter,
179) -> Result<(), error::Fetch> {
180 let now = LocalTime::now();
181
182 for update in refs.updated.iter() {
183 if let Some(r) = update.name().to_namespaced() {
184 let r = r.strip_namespace();
185 if r == *git::refs::storage::SIGREFS_BRANCH {
186 continue;
188 }
189 if r == *git::refs::storage::IDENTITY_BRANCH {
190 continue;
193 }
194 if r == *git::refs::storage::IDENTITY_ROOT {
195 continue;
198 }
199 if let Some(rest) = r.strip_prefix(git::refname!("refs/heads/patches")) {
200 if radicle::cob::ObjectId::from_str(rest.as_str()).is_ok() {
201 continue;
204 }
205 }
206 }
207 if let RefUpdate::Skipped { .. } = update {
208 } else if let Err(e) = store.insert(rid, update, now) {
210 log::error!(
211 target: "worker",
212 "Failed to update notification store for {rid}: {e}"
213 );
214 }
215 }
216 Ok(())
217}
218
219fn cache_refs<D>(repo: &RepoId, refs: &[RefUpdate], db: &mut D) -> Result<(), node::refs::Error>
221where
222 D: node::refs::Store,
223{
224 let time = LocalTime::now();
225
226 for r in refs {
227 let name = r.name();
228 let (namespace, qualified) = match radicle::git::parse_ref_namespaced(name) {
229 Err(e) => {
230 log::error!(target: "worker", "Git reference is invalid: {name:?}: {e}");
231 log::warn!(target: "worker", "Skipping refs caching for fetch of {repo}");
232 break;
233 }
234 Ok((n, q)) => (n, q),
235 };
236 if qualified != *git::refs::storage::SIGREFS_BRANCH {
237 continue;
239 }
240 log::trace!(target: "node", "Updating cache for {name} in {repo}");
241
242 let result = match r {
243 RefUpdate::Updated { new, .. } => db.set(repo, &namespace, &qualified, *new, time),
244 RefUpdate::Created { oid, .. } => db.set(repo, &namespace, &qualified, *oid, time),
245 RefUpdate::Deleted { .. } => db.delete(repo, &namespace, &qualified),
246 RefUpdate::Skipped { .. } => continue,
247 };
248
249 if let Err(e) = result {
250 log::error!(target: "worker", "Error updating git refs cache for {name:?}: {e}");
251 log::warn!(target: "worker", "Skipping refs caching for fetch of {repo}");
252 break;
253 }
254 }
255 Ok(())
256}
257
258fn cache_cobs<S, C>(
261 rid: &RepoId,
262 refs: &[RefUpdate],
263 storage: &S,
264 cache: &mut C,
265) -> Result<(), error::Cache>
266where
267 S: ReadRepository + cob::Store<Namespace = NodeId>,
268 C: cob::cache::Update<cob::issue::Issue> + cob::cache::Update<cob::patch::Patch>,
269 C: cob::cache::Remove<cob::issue::Issue> + cob::cache::Remove<cob::patch::Patch>,
270{
271 let mut issues = cob::store::Store::<cob::issue::Issue, _>::open(storage)?;
272 let mut patches = cob::store::Store::<cob::patch::Patch, _>::open(storage)?;
273
274 for update in refs {
275 match update {
276 RefUpdate::Updated { name, .. }
277 | RefUpdate::Created { name, .. }
278 | RefUpdate::Deleted { name, .. } => match name.to_namespaced() {
279 Some(name) => {
280 let Some(identifier) = cob::TypedId::from_namespaced(&name)? else {
281 continue;
282 };
283 if identifier.is_issue() {
284 update_or_remove(&mut issues, cache, rid, identifier)?;
285 } else if identifier.is_patch() {
286 update_or_remove(&mut patches, cache, rid, identifier)?;
287 } else {
288 continue;
290 }
291 }
292 None => continue,
293 },
294 RefUpdate::Skipped { .. } => { }
295 }
296 }
297
298 Ok(())
299}
300
301fn update_or_remove<R, C, T>(
303 store: &mut cob::store::Store<T, R>,
304 cache: &mut C,
305 rid: &RepoId,
306 tid: TypedId,
307) -> Result<(), error::Cache>
308where
309 R: cob::Store + ReadRepository,
310 T: cob::Evaluate<R> + cob::store::Cob + cob::store::CobWithType,
311 C: cob::cache::Update<T> + cob::cache::Remove<T>,
312{
313 match store.get(&tid.id) {
314 Ok(Some(obj)) => {
315 return cache.update(rid, &tid.id, &obj).map(|_| ()).map_err(|e| {
317 error::Cache::Update {
318 id: tid.id,
319 type_name: tid.type_name,
320 err: e.into(),
321 }
322 });
323 }
324 Ok(None) => {
325 }
327 Err(e) => {
328 log::error!(target: "fetch", "Error loading COB {tid} from storage: {e}");
330 }
331 }
332 cob::cache::Remove::<T>::remove(cache, &tid.id)
335 .map(|_| ())
336 .map_err(|e| error::Cache::Remove {
337 id: tid.id,
338 type_name: tid.type_name,
339 err: Box::new(e),
340 })?;
341
342 Ok(())
343}
344
345fn set_canonical_refs(
346 repo: &Repository,
347 applied: &Applied,
348) -> Result<Option<UpdatedCanonicalRefs>, error::Canonical> {
349 let identity = repo.identity()?;
350 let rules = identity
354 .canonical_refs_or_default(|| {
355 let rule = identity.doc().default_branch_rule()?;
356 Ok::<_, CanonicalRefsError>(CanonicalRefs::from_iter([rule]))
357 })?
358 .rules()
359 .clone();
360
361 let mut updated_refs = UpdatedCanonicalRefs::default();
362 let refnames = applied
363 .updated
364 .iter()
365 .filter_map(|update| match update {
366 RefUpdate::Updated { name, .. } | RefUpdate::Created { name, .. } => {
367 let name = name.clone().into_qualified()?;
368 let name = name.to_namespaced()?;
369 Some(name.strip_namespace())
370 }
371 _ => None,
372 })
373 .collect::<BTreeSet<_>>();
374
375 for name in refnames {
376 let canonical = match rules.canonical(name.clone(), repo) {
377 Some(canonical) => canonical,
378 None => continue,
379 };
380
381 let canonical = match canonical.find_objects() {
382 Err(err) => {
383 log::warn!(target: "worker", "Failed to find objects for canonical computation: {err}");
384 continue;
385 }
386 Ok(canonical) => canonical,
387 };
388
389 match canonical.quorum() {
390 Err(err) => {
391 log::warn!(
392 target: "worker",
393 "Failed to calculate canonical reference: {err}",
394 );
395 continue;
396 }
397 Ok(git::canonical::Quorum {
398 refname, object, ..
399 }) => {
400 let oid = object.id();
401 if let Err(e) = repo.backend.reference(
402 refname.clone().as_str(),
403 *oid,
404 true,
405 "set-canonical-reference from fetch (radicle)",
406 ) {
407 log::warn!(
408 target: "worker",
409 "Failed to set canonical reference {refname}->{oid}: {e}"
410 );
411 } else {
412 updated_refs.updated(refname, oid);
413 }
414 }
415 }
416 }
417 Ok(Some(updated_refs))
418}