1pub mod error;
2
3use std::collections::HashSet;
4use std::str::FromStr;
5
6use localtime::LocalTime;
7
8use radicle::cob::TypedId;
9use radicle::crypto::PublicKey;
10use radicle::identity::DocAt;
11use radicle::prelude::NodeId;
12use radicle::prelude::RepoId;
13use radicle::storage::refs::RefsAt;
14use radicle::storage::{
15 ReadRepository, ReadStorage as _, RefUpdate, RemoteRepository, RepositoryError,
16 WriteRepository as _,
17};
18use radicle::{cob, git, node, Storage};
19use radicle_fetch::{Allowed, BlockList, FetchLimit};
20
21use super::channels::ChannelsFlush;
22
23#[derive(Debug, Clone)]
24pub struct FetchResult {
25 pub updated: Vec<RefUpdate>,
27 pub namespaces: HashSet<PublicKey>,
29 pub clone: bool,
31 pub doc: DocAt,
33}
34
35impl FetchResult {
36 pub fn new(doc: DocAt) -> Self {
37 Self {
38 updated: vec![],
39 namespaces: HashSet::new(),
40 clone: false,
41 doc,
42 }
43 }
44}
45
46pub enum Handle {
47 Clone {
48 handle: radicle_fetch::Handle<ChannelsFlush>,
49 tmp: tempfile::TempDir,
50 },
51 Pull {
52 handle: radicle_fetch::Handle<ChannelsFlush>,
53 notifications: node::notifications::StoreWriter,
54 },
55}
56
57impl Handle {
58 pub fn new(
59 rid: RepoId,
60 local: PublicKey,
61 storage: &Storage,
62 follow: Allowed,
63 blocked: BlockList,
64 channels: ChannelsFlush,
65 notifications: node::notifications::StoreWriter,
66 ) -> Result<Self, error::Handle> {
67 let exists = storage.contains(&rid)?;
68 if exists {
69 let repo = storage.repository(rid)?;
70 let handle = radicle_fetch::Handle::new(local, repo, follow, blocked, channels)?;
71 Ok(Handle::Pull {
72 handle,
73 notifications,
74 })
75 } else {
76 let (repo, tmp) = storage.lock_repository(rid)?;
77 let handle = radicle_fetch::Handle::new(local, repo, follow, blocked, channels)?;
78 Ok(Handle::Clone { handle, tmp })
79 }
80 }
81
82 pub fn fetch<D: node::refs::Store>(
83 self,
84 rid: RepoId,
85 storage: &Storage,
86 cache: &mut cob::cache::StoreWriter,
87 refsdb: &mut D,
88 limit: FetchLimit,
89 remote: PublicKey,
90 refs_at: Option<Vec<RefsAt>>,
91 ) -> Result<FetchResult, error::Fetch> {
92 use git::canonical::QuorumError::{Diverging, NoCandidates};
93
94 let (result, clone, notifs) = match self {
95 Self::Clone { mut handle, tmp } => {
96 log::debug!(target: "worker", "{} cloning from {remote}", handle.local());
97 let result = radicle_fetch::clone(&mut handle, limit, remote)?;
98 mv(tmp, storage, &rid)?;
99 (result, true, None)
100 }
101 Self::Pull {
102 mut handle,
103 notifications,
104 } => {
105 log::debug!(target: "worker", "{} pulling from {remote}", handle.local());
106 let result = radicle_fetch::pull(&mut handle, limit, remote, refs_at)?;
107 (result, false, Some(notifications))
108 }
109 };
110
111 for rejected in result.rejected() {
112 log::warn!(target: "worker", "Rejected update for {}", rejected.refname())
113 }
114
115 match result {
116 radicle_fetch::FetchResult::Failed {
117 threshold,
118 delegates,
119 validations,
120 } => {
121 for fail in validations.iter() {
122 log::error!(target: "worker", "Validation error: {}", fail);
123 }
124 Err(error::Fetch::Validation {
125 threshold,
126 delegates: delegates.into_iter().map(|key| key.to_string()).collect(),
127 })
128 }
129 radicle_fetch::FetchResult::Success {
130 applied,
131 remotes,
132 validations,
133 } => {
134 for warn in validations {
135 log::warn!(target: "worker", "Validation error: {}", warn);
136 }
137
138 let repo = storage.repository(rid)?;
141 repo.set_identity_head()?;
142 match repo.set_head() {
143 Ok(head) => {
144 if head.is_updated() {
145 log::trace!(target: "worker", "Set HEAD to {}", head.new);
146 }
147 }
148 Err(RepositoryError::Quorum(Diverging(e))) => {
149 log::warn!(target: "worker", "Fetch could not set HEAD: {e}")
150 }
151 Err(RepositoryError::Quorum(NoCandidates(e))) => {
152 log::warn!(target: "worker", "Fetch could not set HEAD: {e}")
153 }
154 Err(e) => return Err(e.into()),
155 }
156
157 if let Some(mut store) = notifs {
159 if repo.remote(&storage.info().key).is_ok() {
163 notify(&rid, &applied, &mut store)?;
164 }
165 }
166
167 cache_cobs(&rid, &applied.updated, &repo, cache)?;
168 cache_refs(&rid, &applied.updated, refsdb)?;
169
170 Ok(FetchResult {
171 updated: applied.updated,
172 namespaces: remotes.into_iter().collect(),
173 doc: repo.identity_doc()?,
174 clone,
175 })
176 }
177 }
178 }
179}
180
181fn mv(tmp: tempfile::TempDir, storage: &Storage, rid: &RepoId) -> Result<(), error::Fetch> {
191 use std::io::{Error, ErrorKind};
192
193 let from = tmp.path();
194 let to = storage.path_of(rid);
195
196 if !to.exists() {
197 std::fs::rename(from, to)?;
198 } else {
199 log::warn!(target: "worker", "Refusing to move cloned repository {rid} already exists");
200 return Err(Error::new(
201 ErrorKind::AlreadyExists,
202 format!("repository already exists {:?}", to),
203 )
204 .into());
205 }
206
207 Ok(())
208}
209
210fn notify(
212 rid: &RepoId,
213 refs: &radicle_fetch::git::refs::Applied<'static>,
214 store: &mut node::notifications::StoreWriter,
215) -> Result<(), error::Fetch> {
216 let now = LocalTime::now();
217
218 for update in refs.updated.iter() {
219 if let Some(r) = update.name().to_namespaced() {
220 let r = r.strip_namespace();
221 if r == *git::refs::storage::SIGREFS_BRANCH {
222 continue;
224 }
225 if r == *git::refs::storage::IDENTITY_BRANCH {
226 continue;
229 }
230 if r == *git::refs::storage::IDENTITY_ROOT {
231 continue;
234 }
235 if let Some(rest) = r.strip_prefix(git::refname!("refs/heads/patches")) {
236 if radicle::cob::ObjectId::from_str(rest.as_str()).is_ok() {
237 continue;
240 }
241 }
242 }
243 if let RefUpdate::Skipped { .. } = update {
244 } else if let Err(e) = store.insert(rid, update, now) {
246 log::error!(
247 target: "worker",
248 "Failed to update notification store for {rid}: {e}"
249 );
250 }
251 }
252 Ok(())
253}
254
255fn cache_refs<D>(repo: &RepoId, refs: &[RefUpdate], db: &mut D) -> Result<(), node::refs::Error>
257where
258 D: node::refs::Store,
259{
260 let time = LocalTime::now();
261
262 for r in refs {
263 let name = r.name();
264 let (namespace, qualified) = match radicle::git::parse_ref_namespaced(name) {
265 Err(e) => {
266 log::error!(target: "worker", "Git reference is invalid: {name:?}: {e}");
267 log::warn!(target: "worker", "Skipping refs caching for fetch of {repo}");
268 break;
269 }
270 Ok((n, q)) => (n, q),
271 };
272 if qualified != *git::refs::storage::SIGREFS_BRANCH {
273 continue;
275 }
276 log::trace!(target: "node", "Updating cache for {name} in {repo}");
277
278 let result = match r {
279 RefUpdate::Updated { new, .. } => db.set(repo, &namespace, &qualified, *new, time),
280 RefUpdate::Created { oid, .. } => db.set(repo, &namespace, &qualified, *oid, time),
281 RefUpdate::Deleted { .. } => db.delete(repo, &namespace, &qualified),
282 RefUpdate::Skipped { .. } => continue,
283 };
284
285 if let Err(e) = result {
286 log::error!(target: "worker", "Error updating git refs cache for {name:?}: {e}");
287 log::warn!(target: "worker", "Skipping refs caching for fetch of {repo}");
288 break;
289 }
290 }
291 Ok(())
292}
293
294fn cache_cobs<S, C>(
297 rid: &RepoId,
298 refs: &[RefUpdate],
299 storage: &S,
300 cache: &mut C,
301) -> Result<(), error::Cache>
302where
303 S: ReadRepository + cob::Store<Namespace = NodeId>,
304 C: cob::cache::Update<cob::issue::Issue> + cob::cache::Update<cob::patch::Patch>,
305 C: cob::cache::Remove<cob::issue::Issue> + cob::cache::Remove<cob::patch::Patch>,
306{
307 let mut issues = cob::store::Store::<cob::issue::Issue, _>::open(storage)?;
308 let mut patches = cob::store::Store::<cob::patch::Patch, _>::open(storage)?;
309
310 for update in refs {
311 match update {
312 RefUpdate::Updated { name, .. }
313 | RefUpdate::Created { name, .. }
314 | RefUpdate::Deleted { name, .. } => match name.to_namespaced() {
315 Some(name) => {
316 let Some(identifier) = cob::TypedId::from_namespaced(&name)? else {
317 continue;
318 };
319 if identifier.is_issue() {
320 update_or_remove(&mut issues, cache, rid, identifier)?;
321 } else if identifier.is_patch() {
322 update_or_remove(&mut patches, cache, rid, identifier)?;
323 } else {
324 continue;
326 }
327 }
328 None => continue,
329 },
330 RefUpdate::Skipped { .. } => { }
331 }
332 }
333
334 Ok(())
335}
336
337fn update_or_remove<R, C, T>(
339 store: &mut cob::store::Store<T, R>,
340 cache: &mut C,
341 rid: &RepoId,
342 tid: TypedId,
343) -> Result<(), error::Cache>
344where
345 R: cob::Store + ReadRepository,
346 T: cob::Evaluate<R> + cob::store::Cob + cob::store::CobWithType,
347 C: cob::cache::Update<T> + cob::cache::Remove<T>,
348{
349 match store.get(&tid.id) {
350 Ok(Some(obj)) => {
351 return cache.update(rid, &tid.id, &obj).map(|_| ()).map_err(|e| {
353 error::Cache::Update {
354 id: tid.id,
355 type_name: tid.type_name,
356 err: e.into(),
357 }
358 });
359 }
360 Ok(None) => {
361 }
363 Err(e) => {
364 log::error!(target: "fetch", "Error loading COB {tid} from storage: {e}");
366 }
367 }
368 cob::cache::Remove::<T>::remove(cache, &tid.id)
371 .map(|_| ())
372 .map_err(|e| error::Cache::Remove {
373 id: tid.id,
374 type_name: tid.type_name,
375 err: Box::new(e),
376 })?;
377
378 Ok(())
379}