1#![allow(clippy::too_many_arguments)]
2mod channels;
3mod upload_pack;
4
5pub mod fetch;
6pub mod garbage;
7
8use std::io;
9use std::path::PathBuf;
10
11use crossbeam_channel as chan;
12
13use radicle::identity::RepoId;
14use radicle::node::{notifications, Event};
15use radicle::prelude::NodeId;
16use radicle::storage::refs::RefsAt;
17use radicle::storage::{ReadRepository, ReadStorage};
18use radicle::{cob, crypto, Storage};
19use radicle_fetch::FetchLimit;
20
21use crate::runtime::{thread, Emitter, Handle};
22use crate::service::policy;
23use crate::service::policy::SeedingPolicy;
24use crate::wire::StreamId;
25
26pub use channels::{ChannelEvent, Channels, ChannelsConfig};
27
28pub struct Config {
30 pub capacity: usize,
32 pub storage: Storage,
34 pub fetch: FetchConfig,
36 pub policy: SeedingPolicy,
38 pub policies_db: PathBuf,
40}
41
42#[derive(thiserror::Error, Debug)]
44pub enum FetchError {
45 #[error("the 'git fetch' command failed with exit code '{code}'")]
46 CommandFailed { code: i32 },
47 #[error(transparent)]
48 Io(#[from] io::Error),
49 #[error(transparent)]
50 Fetch(#[from] fetch::error::Fetch),
51 #[error(transparent)]
52 Handle(#[from] fetch::error::Handle),
53 #[error(transparent)]
54 Storage(#[from] radicle::storage::Error),
55 #[error(transparent)]
56 PolicyStore(#[from] radicle::node::policy::store::Error),
57 #[error(transparent)]
58 Policy(#[from] radicle_fetch::policy::error::Policy),
59 #[error(transparent)]
60 Blocked(#[from] radicle_fetch::policy::error::Blocked),
61}
62
63impl FetchError {
64 pub fn is_timeout(&self) -> bool {
66 matches!(self, FetchError::Io(e) if e.kind() == io::ErrorKind::TimedOut)
67 }
68}
69
70#[derive(thiserror::Error, Debug)]
72pub enum UploadError {
73 #[error("error parsing git command packet-line: {0}")]
74 PacketLine(io::Error),
75 #[error(transparent)]
76 Io(#[from] io::Error),
77 #[error("{0} is not authorized to fetch {1}")]
78 Unauthorized(NodeId, RepoId),
79 #[error(transparent)]
80 Storage(#[from] radicle::storage::Error),
81 #[error(transparent)]
82 Identity(#[from] radicle::identity::DocError),
83 #[error(transparent)]
84 Repository(#[from] radicle::storage::RepositoryError),
85 #[error(transparent)]
86 PolicyStore(#[from] radicle::node::policy::store::Error),
87}
88
89impl UploadError {
90 pub fn is_eof(&self) -> bool {
92 matches!(self, UploadError::Io(e) if e.kind() == io::ErrorKind::UnexpectedEof)
93 }
94}
95
96#[derive(Debug, Clone)]
98pub enum FetchRequest {
99 Initiator {
102 rid: RepoId,
104 remote: NodeId,
106 refs_at: Option<Vec<RefsAt>>,
108 },
109 Responder {
112 remote: NodeId,
114 emitter: Emitter<Event>,
116 },
117}
118
119impl FetchRequest {
120 pub fn remote(&self) -> NodeId {
121 match self {
122 Self::Initiator { remote, .. } | Self::Responder { remote, .. } => *remote,
123 }
124 }
125}
126
127#[derive(Debug)]
129pub enum FetchResult {
130 Initiator {
131 rid: RepoId,
133 result: Result<fetch::FetchResult, FetchError>,
135 },
136 Responder {
137 rid: Option<RepoId>,
139 result: Result<(), UploadError>,
141 },
142}
143
144pub struct Task {
147 pub fetch: FetchRequest,
148 pub stream: StreamId,
149 pub channels: Channels,
150}
151
152#[derive(Debug)]
154pub struct TaskResult {
155 pub remote: NodeId,
156 pub result: FetchResult,
157 pub stream: StreamId,
158}
159
160#[derive(Debug, Clone)]
161pub struct FetchConfig {
162 pub limit: FetchLimit,
164 pub local: crypto::PublicKey,
166 pub expiry: garbage::Expiry,
169}
170
171struct Worker {
173 nid: NodeId,
174 storage: Storage,
175 fetch_config: FetchConfig,
176 tasks: chan::Receiver<Task>,
177 handle: Handle,
178 policies: policy::Config<policy::store::Read>,
179 notifications: notifications::StoreWriter,
180 cache: cob::cache::StoreWriter,
181 db: radicle::node::Database,
182}
183
184impl Worker {
185 fn run(mut self) -> Result<(), chan::RecvError> {
188 loop {
189 let task = self.tasks.recv()?;
190 self.process(task);
191 }
192 }
193
194 fn process(&mut self, task: Task) {
195 let Task {
196 fetch,
197 channels,
198 stream,
199 } = task;
200 let remote = fetch.remote();
201 let channels = channels::ChannelsFlush::new(self.handle.clone(), channels, remote, stream);
202 let result = self._process(fetch, stream, channels, self.notifications.clone());
203
204 log::trace!(target: "worker", "Sending response back to service..");
205
206 if self
207 .handle
208 .worker_result(TaskResult {
209 remote,
210 stream,
211 result,
212 })
213 .is_err()
214 {
215 log::error!(target: "worker", "Unable to report fetch result: worker channel disconnected");
216 }
217 }
218
219 fn _process(
220 &mut self,
221 fetch: FetchRequest,
222 stream: StreamId,
223 mut channels: channels::ChannelsFlush,
224 notifs: notifications::StoreWriter,
225 ) -> FetchResult {
226 match fetch {
227 FetchRequest::Initiator {
228 rid,
229 remote,
230 refs_at,
231 } => {
232 log::debug!(target: "worker", "Worker processing outgoing fetch for {rid}");
233 let result = self.fetch(rid, remote, refs_at, channels, notifs);
234 FetchResult::Initiator { rid, result }
235 }
236 FetchRequest::Responder { remote, emitter } => {
237 log::debug!(target: "worker", "Worker processing incoming fetch for {remote} on stream {stream}..");
238
239 let timeout = channels.timeout();
240 let (mut stream_r, stream_w) = channels.split();
241 let header = match upload_pack::pktline::git_request(&mut stream_r) {
242 Ok(header) => header,
243 Err(e) => {
244 return FetchResult::Responder {
245 rid: None,
246 result: Err(e.into()),
247 }
248 }
249 };
250 log::debug!(target: "worker", "Spawning upload-pack process for {} on stream {stream}..", header.repo);
251
252 if let Err(e) = self.is_authorized(remote, header.repo) {
253 return FetchResult::Responder {
254 rid: Some(header.repo),
255 result: Err(e),
256 };
257 }
258
259 let result = upload_pack::upload_pack(
260 &self.nid,
261 remote,
262 &self.storage,
263 &emitter,
264 &header,
265 stream_r,
266 stream_w,
267 timeout,
268 )
269 .map(|_| ())
270 .map_err(|e| e.into());
271 log::debug!(target: "worker", "Upload process on stream {stream} exited with result {result:?}");
272
273 FetchResult::Responder {
274 rid: Some(header.repo),
275 result,
276 }
277 }
278 }
279 }
280
281 fn is_authorized(&self, remote: NodeId, rid: RepoId) -> Result<(), UploadError> {
282 let policy = self.policies.seed_policy(&rid)?.policy;
283 if policy.is_block() {
286 return Err(UploadError::Unauthorized(remote, rid));
287 }
288 let repo = self.storage.repository(rid)?;
289 let doc = repo.identity_doc()?;
290
291 if !doc.is_visible_to(&remote.into()) {
292 Err(UploadError::Unauthorized(remote, rid))
293 } else {
294 Ok(())
295 }
296 }
297
298 fn fetch(
299 &mut self,
300 rid: RepoId,
301 remote: NodeId,
302 refs_at: Option<Vec<RefsAt>>,
303 channels: channels::ChannelsFlush,
304 notifs: notifications::StoreWriter,
305 ) -> Result<fetch::FetchResult, FetchError> {
306 let FetchConfig {
307 limit,
308 local,
309 expiry,
310 } = &self.fetch_config;
311 let allowed = radicle_fetch::Allowed::from_config(rid, &self.policies)?;
314 let blocked = radicle_fetch::BlockList::from_config(&self.policies)?;
315
316 let mut cache = self.cache.clone();
317 let handle = fetch::Handle::new(
318 rid,
319 *local,
320 &self.storage,
321 allowed,
322 blocked,
323 channels,
324 notifs,
325 )?;
326 let result = handle.fetch(
327 rid,
328 &self.storage,
329 &mut cache,
330 &mut self.db,
331 *limit,
332 remote,
333 refs_at,
334 )?;
335
336 if let Err(e) = garbage::collect(&self.storage, rid, *expiry) {
337 debug_assert!(false, "`git gc` failed: {e}");
339
340 log::warn!(target: "worker", "Failed to run `git gc`: {e}");
341 }
342 Ok(result)
343 }
344}
345
346pub struct Pool {
348 pool: Vec<thread::JoinHandle<Result<(), chan::RecvError>>>,
349}
350
351impl Pool {
352 pub fn with(
354 tasks: chan::Receiver<Task>,
355 nid: NodeId,
356 handle: Handle,
357 notifications: notifications::StoreWriter,
358 cache: cob::cache::StoreWriter,
359 db: radicle::node::Database,
360 config: Config,
361 ) -> Result<Self, policy::Error> {
362 let mut pool = Vec::with_capacity(config.capacity);
363 for i in 0..config.capacity {
364 let policies =
365 policy::Config::new(config.policy, policy::Store::reader(&config.policies_db)?);
366 let worker = Worker {
367 nid,
368 tasks: tasks.clone(),
369 handle: handle.clone(),
370 storage: config.storage.clone(),
371 fetch_config: config.fetch.clone(),
372 policies,
373 notifications: notifications.clone(),
374 cache: cache.clone(),
375 db: db.clone(),
376 };
377 let thread = thread::spawn(&nid, format!("worker#{i}"), || worker.run());
378
379 pool.push(thread);
380 }
381 Ok(Self { pool })
382 }
383
384 pub fn run(self) -> thread::Result<()> {
388 for (i, worker) in self.pool.into_iter().enumerate() {
389 if let Err(err) = worker.join()? {
390 log::trace!(target: "pool", "Worker {i} exited: {err}");
391 }
392 }
393 log::debug!(target: "pool", "Worker pool shutting down..");
394
395 Ok(())
396 }
397}