radicle_node/
worker.rs

1#![allow(clippy::too_many_arguments)]
2mod channels;
3mod upload_pack;
4
5pub mod fetch;
6pub mod garbage;
7
8use std::path::PathBuf;
9
10use crossbeam_channel as chan;
11
12use radicle::identity::RepoId;
13use radicle::node::notifications;
14use radicle::node::policy::config as policy;
15use radicle::node::policy::config::SeedingPolicy;
16use radicle::prelude::NodeId;
17use radicle::storage::refs::RefsAt;
18use radicle::storage::{ReadRepository, ReadStorage};
19use radicle::{cob, crypto, Storage};
20use radicle_fetch::FetchLimit;
21
22pub use radicle_protocol::worker::{
23    AuthorizationError, FetchError, FetchRequest, FetchResult, UploadError,
24};
25
26use crate::runtime::{thread, Handle};
27use crate::wire::StreamId;
28
29pub use channels::{ChannelEvent, Channels, ChannelsConfig};
30
31/// Worker pool configuration.
32pub struct Config {
33    /// Number of worker threads.
34    pub capacity: usize,
35    /// Git storage.
36    pub storage: Storage,
37    /// Configuration for performing fetched.
38    pub fetch: FetchConfig,
39    /// Default policy, if a policy for a specific node or repository was not found.
40    pub policy: SeedingPolicy,
41    /// Path to the policies database.
42    pub policies_db: PathBuf,
43}
44
45/// Task to be accomplished on a worker thread.
46/// This is either going to be an outgoing or incoming fetch.
47pub struct Task {
48    pub fetch: FetchRequest,
49    pub stream: StreamId,
50    pub channels: Channels,
51}
52
53/// Worker response.
54#[derive(Debug)]
55pub struct TaskResult {
56    pub remote: NodeId,
57    pub result: FetchResult,
58    pub stream: StreamId,
59}
60
61#[derive(Debug, Clone)]
62pub struct FetchConfig {
63    /// Data limits when fetching from a remote.
64    pub limit: FetchLimit,
65    /// Public key of the local peer.
66    pub local: crypto::PublicKey,
67    /// Configuration for `git gc` garbage collection. Defaults to `1
68    /// hour ago`.
69    pub expiry: garbage::Expiry,
70}
71
72/// A worker that replicates git objects.
73struct Worker {
74    nid: NodeId,
75    storage: Storage,
76    fetch_config: FetchConfig,
77    tasks: chan::Receiver<Task>,
78    handle: Handle,
79    policies: policy::Config<policy::store::Read>,
80    notifications: notifications::StoreWriter,
81    cache: cob::cache::StoreWriter,
82    db: radicle::node::Database,
83}
84
85impl Worker {
86    /// Waits for tasks and runs them. Blocks indefinitely unless there is an error receiving
87    /// the next task.
88    fn run(mut self) -> Result<(), chan::RecvError> {
89        loop {
90            let task = self.tasks.recv()?;
91            self.process(task);
92        }
93    }
94
95    fn process(
96        &mut self,
97        Task {
98            fetch,
99            channels,
100            stream,
101        }: Task,
102    ) {
103        let remote = fetch.remote();
104        let channels = channels::ChannelsFlush::new(self.handle.clone(), channels, remote, stream);
105        let result = self._process(fetch, stream, channels, self.notifications.clone());
106
107        log::trace!(target: "worker", "Sending response back to service..");
108
109        if self
110            .handle
111            .worker_result(TaskResult {
112                remote,
113                stream,
114                result,
115            })
116            .is_err()
117        {
118            log::error!(target: "worker", "Unable to report fetch result: worker channel disconnected");
119        }
120    }
121
122    fn _process(
123        &mut self,
124        fetch: FetchRequest,
125        stream: StreamId,
126        mut channels: channels::ChannelsFlush,
127        notifs: notifications::StoreWriter,
128    ) -> FetchResult {
129        match fetch {
130            FetchRequest::Initiator {
131                rid,
132                remote,
133                refs_at,
134            } => {
135                log::debug!(target: "worker", "Worker processing outgoing fetch for {rid}");
136                let result = self.fetch(rid, remote, refs_at, channels, notifs);
137                FetchResult::Initiator { rid, result }
138            }
139            FetchRequest::Responder { remote, emitter } => {
140                log::debug!(target: "worker", "Worker processing incoming fetch for {remote} on stream {stream}..");
141
142                let timeout = channels.timeout();
143                let (mut stream_r, stream_w) = channels.split();
144                let header = match upload_pack::pktline::git_request(&mut stream_r) {
145                    Ok(header) => header,
146                    Err(e) => {
147                        return FetchResult::Responder {
148                            rid: None,
149                            result: Err(UploadError::PacketLine(e)),
150                        }
151                    }
152                };
153                log::debug!(target: "worker", "Spawning upload-pack process for {} on stream {stream}..", header.repo);
154
155                if let Err(e) = self.is_authorized(remote, header.repo) {
156                    return FetchResult::Responder {
157                        rid: Some(header.repo),
158                        result: Err(e.into()),
159                    };
160                }
161
162                let result = upload_pack::upload_pack(
163                    &self.nid,
164                    remote,
165                    &self.storage,
166                    &emitter,
167                    &header,
168                    stream_r,
169                    stream_w,
170                    timeout,
171                )
172                .map(drop)
173                .map_err(UploadError::UploadPack);
174                log::debug!(target: "worker", "Upload process on stream {stream} exited with result {result:?}");
175
176                FetchResult::Responder {
177                    rid: Some(header.repo),
178                    result,
179                }
180            }
181        }
182    }
183
184    fn is_authorized(&self, remote: NodeId, rid: RepoId) -> Result<(), AuthorizationError> {
185        let policy = self.policies.seed_policy(&rid)?.policy;
186        // Check policy first, since if we're blocking then we likely don't have
187        // the repository.
188        if policy.is_block() {
189            return Err(AuthorizationError::Unauthorized(remote, rid));
190        }
191        let repo = self.storage.repository(rid)?;
192        let doc = repo.identity_doc()?;
193
194        if !doc.is_visible_to(&remote.into()) {
195            Err(AuthorizationError::Unauthorized(remote, rid))
196        } else {
197            Ok(())
198        }
199    }
200
201    fn fetch(
202        &mut self,
203        rid: RepoId,
204        remote: NodeId,
205        refs_at: Option<Vec<RefsAt>>,
206        channels: channels::ChannelsFlush,
207        notifs: notifications::StoreWriter,
208    ) -> Result<fetch::FetchResult, FetchError> {
209        let FetchConfig {
210            limit,
211            local,
212            expiry,
213        } = &self.fetch_config;
214        // N.b. if the `rid` is blocked this will return an error, so
215        // we won't continue with any further set up of the fetch.
216        let allowed = radicle_fetch::Allowed::from_config(rid, &self.policies)?;
217        let blocked = radicle_fetch::BlockList::from_config(&self.policies)?;
218
219        let mut cache = self.cache.clone();
220        let handle = fetch::Handle::new(
221            rid,
222            *local,
223            &self.storage,
224            allowed,
225            blocked,
226            channels,
227            notifs,
228        )?;
229        let result = handle.fetch(
230            rid,
231            &self.storage,
232            &mut cache,
233            &mut self.db,
234            *limit,
235            remote,
236            refs_at,
237        )?;
238
239        if let Err(e) = garbage::collect(&self.storage, rid, *expiry) {
240            // N.b. ensure that `git gc` works in debug mode.
241            debug_assert!(false, "`git gc` failed: {e}");
242
243            log::warn!(target: "worker", "Failed to run `git gc`: {e}");
244        }
245        Ok(result)
246    }
247}
248
249/// A pool of workers. One thread is allocated for each worker.
250pub struct Pool {
251    pool: Vec<thread::JoinHandle<Result<(), chan::RecvError>>>,
252}
253
254impl Pool {
255    /// Create a new worker pool with the given parameters.
256    pub fn with(
257        tasks: chan::Receiver<Task>,
258        nid: NodeId,
259        handle: Handle,
260        notifications: notifications::StoreWriter,
261        cache: cob::cache::StoreWriter,
262        db: radicle::node::Database,
263        config: Config,
264    ) -> Result<Self, policy::Error> {
265        let mut pool = Vec::with_capacity(config.capacity);
266        for i in 0..config.capacity {
267            let policies =
268                policy::Config::new(config.policy, policy::Store::reader(&config.policies_db)?);
269            let worker = Worker {
270                nid,
271                tasks: tasks.clone(),
272                handle: handle.clone(),
273                storage: config.storage.clone(),
274                fetch_config: config.fetch.clone(),
275                policies,
276                notifications: notifications.clone(),
277                cache: cache.clone(),
278                db: db.clone(),
279            };
280            let thread = thread::spawn(&nid, format!("worker#{i}"), || worker.run());
281
282            pool.push(thread);
283        }
284        Ok(Self { pool })
285    }
286
287    /// Run the worker pool.
288    ///
289    /// Blocks until all worker threads have exited.
290    pub fn run(self) -> thread::Result<()> {
291        for (i, worker) in self.pool.into_iter().enumerate() {
292            if let Err(err) = worker.join()? {
293                log::trace!(target: "pool", "Worker {i} exited: {err}");
294            }
295        }
296        log::debug!(target: "pool", "Worker pool shutting down..");
297
298        Ok(())
299    }
300}