1#![allow(clippy::too_many_arguments)]
2mod channels;
3mod upload_pack;
4
5pub mod fetch;
6pub mod garbage;
7
8use std::io;
9use std::path::PathBuf;
10
11use crossbeam_channel as chan;
12
13use radicle::identity::RepoId;
14use radicle::node::{notifications, Event};
15use radicle::prelude::NodeId;
16use radicle::storage::refs::RefsAt;
17use radicle::storage::{ReadRepository, ReadStorage};
18use radicle::{cob, crypto, Storage};
19use radicle_fetch::FetchLimit;
20
21use crate::runtime::{thread, Emitter, Handle};
22use crate::service::policy;
23use crate::service::policy::SeedingPolicy;
24use crate::wire::StreamId;
25
26pub use channels::{ChannelEvent, Channels};
27
28pub struct Config {
30    pub capacity: usize,
32    pub storage: Storage,
34    pub fetch: FetchConfig,
36    pub policy: SeedingPolicy,
38    pub policies_db: PathBuf,
40}
41
42#[derive(thiserror::Error, Debug)]
44pub enum FetchError {
45    #[error("the 'git fetch' command failed with exit code '{code}'")]
46    CommandFailed { code: i32 },
47    #[error(transparent)]
48    Io(#[from] io::Error),
49    #[error(transparent)]
50    Fetch(#[from] fetch::error::Fetch),
51    #[error(transparent)]
52    Handle(#[from] fetch::error::Handle),
53    #[error(transparent)]
54    Storage(#[from] radicle::storage::Error),
55    #[error(transparent)]
56    PolicyStore(#[from] radicle::node::policy::store::Error),
57    #[error(transparent)]
58    Policy(#[from] radicle_fetch::policy::error::Policy),
59    #[error(transparent)]
60    Blocked(#[from] radicle_fetch::policy::error::Blocked),
61}
62
63impl FetchError {
64    pub fn is_timeout(&self) -> bool {
66        matches!(self, FetchError::Io(e) if e.kind() == io::ErrorKind::TimedOut)
67    }
68}
69
70#[derive(thiserror::Error, Debug)]
72pub enum UploadError {
73    #[error("error parsing git command packet-line: {0}")]
74    PacketLine(io::Error),
75    #[error(transparent)]
76    Io(#[from] io::Error),
77    #[error("{0} is not authorized to fetch {1}")]
78    Unauthorized(NodeId, RepoId),
79    #[error(transparent)]
80    Storage(#[from] radicle::storage::Error),
81    #[error(transparent)]
82    Identity(#[from] radicle::identity::DocError),
83    #[error(transparent)]
84    Repository(#[from] radicle::storage::RepositoryError),
85    #[error(transparent)]
86    PolicyStore(#[from] radicle::node::policy::store::Error),
87}
88
89impl UploadError {
90    pub fn is_eof(&self) -> bool {
92        matches!(self, UploadError::Io(e) if e.kind() == io::ErrorKind::UnexpectedEof)
93    }
94}
95
96#[derive(Debug, Clone)]
98pub enum FetchRequest {
99    Initiator {
102        rid: RepoId,
104        remote: NodeId,
106        refs_at: Option<Vec<RefsAt>>,
108    },
109    Responder {
112        remote: NodeId,
114        emitter: Emitter<Event>,
116    },
117}
118
119impl FetchRequest {
120    pub fn remote(&self) -> NodeId {
121        match self {
122            Self::Initiator { remote, .. } | Self::Responder { remote, .. } => *remote,
123        }
124    }
125}
126
127#[derive(Debug)]
129pub enum FetchResult {
130    Initiator {
131        rid: RepoId,
133        result: Result<fetch::FetchResult, FetchError>,
135    },
136    Responder {
137        rid: Option<RepoId>,
139        result: Result<(), UploadError>,
141    },
142}
143
144pub struct Task {
147    pub fetch: FetchRequest,
148    pub stream: StreamId,
149    pub channels: Channels,
150}
151
152#[derive(Debug)]
154pub struct TaskResult {
155    pub remote: NodeId,
156    pub result: FetchResult,
157    pub stream: StreamId,
158}
159
160#[derive(Debug, Clone)]
161pub struct FetchConfig {
162    pub limit: FetchLimit,
164    pub local: crypto::PublicKey,
166    pub expiry: garbage::Expiry,
169}
170
171struct Worker {
173    nid: NodeId,
174    storage: Storage,
175    fetch_config: FetchConfig,
176    tasks: chan::Receiver<Task>,
177    handle: Handle,
178    policies: policy::Config<policy::store::Read>,
179    notifications: notifications::StoreWriter,
180    cache: cob::cache::StoreWriter,
181    db: radicle::node::Database,
182}
183
184impl Worker {
185    fn run(mut self) -> Result<(), chan::RecvError> {
188        loop {
189            let task = self.tasks.recv()?;
190            self.process(task);
191        }
192    }
193
194    fn process(&mut self, task: Task) {
195        let Task {
196            fetch,
197            channels,
198            stream,
199        } = task;
200        let remote = fetch.remote();
201        let channels = channels::ChannelsFlush::new(self.handle.clone(), channels, remote, stream);
202        let result = self._process(fetch, stream, channels, self.notifications.clone());
203
204        log::trace!(target: "worker", "Sending response back to service..");
205
206        if self
207            .handle
208            .worker_result(TaskResult {
209                remote,
210                stream,
211                result,
212            })
213            .is_err()
214        {
215            log::error!(target: "worker", "Unable to report fetch result: worker channel disconnected");
216        }
217    }
218
219    fn _process(
220        &mut self,
221        fetch: FetchRequest,
222        stream: StreamId,
223        mut channels: channels::ChannelsFlush,
224        notifs: notifications::StoreWriter,
225    ) -> FetchResult {
226        match fetch {
227            FetchRequest::Initiator {
228                rid,
229                remote,
230                refs_at,
231            } => {
232                log::debug!(target: "worker", "Worker processing outgoing fetch for {rid}");
233                let result = self.fetch(rid, remote, refs_at, channels, notifs);
234                FetchResult::Initiator { rid, result }
235            }
236            FetchRequest::Responder { remote, emitter } => {
237                log::debug!(target: "worker", "Worker processing incoming fetch for {remote} on stream {stream}..");
238
239                let timeout = channels.timeout();
240                let (mut stream_r, stream_w) = channels.split();
241                let header = match upload_pack::pktline::git_request(&mut stream_r) {
242                    Ok(header) => header,
243                    Err(e) => {
244                        return FetchResult::Responder {
245                            rid: None,
246                            result: Err(e.into()),
247                        }
248                    }
249                };
250                log::debug!(target: "worker", "Spawning upload-pack process for {} on stream {stream}..", header.repo);
251
252                if let Err(e) = self.is_authorized(remote, header.repo) {
253                    return FetchResult::Responder {
254                        rid: Some(header.repo),
255                        result: Err(e),
256                    };
257                }
258
259                let result = upload_pack::upload_pack(
260                    &self.nid,
261                    remote,
262                    &self.storage,
263                    &emitter,
264                    &header,
265                    stream_r,
266                    stream_w,
267                    timeout,
268                )
269                .map(|_| ())
270                .map_err(|e| e.into());
271                log::debug!(target: "worker", "Upload process on stream {stream} exited with result {result:?}");
272
273                FetchResult::Responder {
274                    rid: Some(header.repo),
275                    result,
276                }
277            }
278        }
279    }
280
281    fn is_authorized(&self, remote: NodeId, rid: RepoId) -> Result<(), UploadError> {
282        let policy = self.policies.seed_policy(&rid)?.policy;
283        let repo = self.storage.repository(rid)?;
284        let doc = repo.identity_doc()?;
285        if !doc.is_visible_to(&remote) || policy.is_block() {
286            Err(UploadError::Unauthorized(remote, rid))
287        } else {
288            Ok(())
289        }
290    }
291
292    fn fetch(
293        &mut self,
294        rid: RepoId,
295        remote: NodeId,
296        refs_at: Option<Vec<RefsAt>>,
297        channels: channels::ChannelsFlush,
298        notifs: notifications::StoreWriter,
299    ) -> Result<fetch::FetchResult, FetchError> {
300        let FetchConfig {
301            limit,
302            local,
303            expiry,
304        } = &self.fetch_config;
305        let allowed = radicle_fetch::Allowed::from_config(rid, &self.policies)?;
308        let blocked = radicle_fetch::BlockList::from_config(&self.policies)?;
309
310        let mut cache = self.cache.clone();
311        let handle = fetch::Handle::new(
312            rid,
313            *local,
314            &self.storage,
315            allowed,
316            blocked,
317            channels,
318            notifs,
319        )?;
320        let result = handle.fetch(
321            rid,
322            &self.storage,
323            &mut cache,
324            &mut self.db,
325            *limit,
326            remote,
327            refs_at,
328        )?;
329
330        if let Err(e) = garbage::collect(&self.storage, rid, *expiry) {
331            debug_assert!(false, "`git gc` failed: {e}");
333
334            log::warn!(target: "worker", "Failed to run `git gc`: {e}");
335        }
336        Ok(result)
337    }
338}
339
340pub struct Pool {
342    pool: Vec<thread::JoinHandle<Result<(), chan::RecvError>>>,
343}
344
345impl Pool {
346    pub fn with(
348        tasks: chan::Receiver<Task>,
349        nid: NodeId,
350        handle: Handle,
351        notifications: notifications::StoreWriter,
352        cache: cob::cache::StoreWriter,
353        db: radicle::node::Database,
354        config: Config,
355    ) -> Result<Self, policy::Error> {
356        let mut pool = Vec::with_capacity(config.capacity);
357        for i in 0..config.capacity {
358            let policies =
359                policy::Config::new(config.policy, policy::Store::reader(&config.policies_db)?);
360            let worker = Worker {
361                nid,
362                tasks: tasks.clone(),
363                handle: handle.clone(),
364                storage: config.storage.clone(),
365                fetch_config: config.fetch.clone(),
366                policies,
367                notifications: notifications.clone(),
368                cache: cache.clone(),
369                db: db.clone(),
370            };
371            let thread = thread::spawn(&nid, format!("worker#{i}"), || worker.run());
372
373            pool.push(thread);
374        }
375        Ok(Self { pool })
376    }
377
378    pub fn run(self) -> thread::Result<()> {
382        for (i, worker) in self.pool.into_iter().enumerate() {
383            if let Err(err) = worker.join()? {
384                log::trace!(target: "pool", "Worker {i} exited: {err}");
385            }
386        }
387        log::debug!(target: "pool", "Worker pool shutting down..");
388
389        Ok(())
390    }
391}