1#![allow(clippy::too_many_arguments)]
2mod channels;
3mod upload_pack;
4
5pub mod fetch;
6pub mod garbage;
7
8use std::path::PathBuf;
9
10use crossbeam_channel as chan;
11
12use radicle::identity::RepoId;
13use radicle::node::notifications;
14use radicle::node::policy::config as policy;
15use radicle::node::policy::config::SeedingPolicy;
16use radicle::prelude::NodeId;
17use radicle::storage::refs::RefsAt;
18use radicle::storage::{ReadRepository, ReadStorage};
19use radicle::{cob, crypto, Storage};
20use radicle_fetch::FetchLimit;
21
22pub use radicle_protocol::worker::{
23 AuthorizationError, FetchError, FetchRequest, FetchResult, UploadError,
24};
25
26use crate::runtime::{thread, Handle};
27use crate::wire::StreamId;
28
29pub use channels::{ChannelEvent, Channels, ChannelsConfig};
30
31pub struct Config {
33 pub capacity: usize,
35 pub storage: Storage,
37 pub fetch: FetchConfig,
39 pub policy: SeedingPolicy,
41 pub policies_db: PathBuf,
43}
44
45pub struct Task {
48 pub fetch: FetchRequest,
49 pub stream: StreamId,
50 pub channels: Channels,
51}
52
53#[derive(Debug)]
55pub struct TaskResult {
56 pub remote: NodeId,
57 pub result: FetchResult,
58 pub stream: StreamId,
59}
60
61#[derive(Debug, Clone)]
62pub struct FetchConfig {
63 pub limit: FetchLimit,
65 pub local: crypto::PublicKey,
67 pub expiry: garbage::Expiry,
70}
71
72struct Worker {
74 nid: NodeId,
75 storage: Storage,
76 fetch_config: FetchConfig,
77 tasks: chan::Receiver<Task>,
78 handle: Handle,
79 policies: policy::Config<policy::store::Read>,
80 notifications: notifications::StoreWriter,
81 cache: cob::cache::StoreWriter,
82 db: radicle::node::Database,
83}
84
85impl Worker {
86 fn run(mut self) -> Result<(), chan::RecvError> {
89 loop {
90 let task = self.tasks.recv()?;
91 self.process(task);
92 }
93 }
94
95 fn process(
96 &mut self,
97 Task {
98 fetch,
99 channels,
100 stream,
101 }: Task,
102 ) {
103 let remote = fetch.remote();
104 let channels = channels::ChannelsFlush::new(self.handle.clone(), channels, remote, stream);
105 let result = self._process(fetch, stream, channels, self.notifications.clone());
106
107 log::trace!(target: "worker", "Sending response back to service..");
108
109 if self
110 .handle
111 .worker_result(TaskResult {
112 remote,
113 stream,
114 result,
115 })
116 .is_err()
117 {
118 log::error!(target: "worker", "Unable to report fetch result: worker channel disconnected");
119 }
120 }
121
122 fn _process(
123 &mut self,
124 fetch: FetchRequest,
125 stream: StreamId,
126 mut channels: channels::ChannelsFlush,
127 notifs: notifications::StoreWriter,
128 ) -> FetchResult {
129 match fetch {
130 FetchRequest::Initiator {
131 rid,
132 remote,
133 refs_at,
134 } => {
135 log::debug!(target: "worker", "Worker processing outgoing fetch for {rid}");
136 let result = self.fetch(rid, remote, refs_at, channels, notifs);
137 FetchResult::Initiator { rid, result }
138 }
139 FetchRequest::Responder { remote, emitter } => {
140 log::debug!(target: "worker", "Worker processing incoming fetch for {remote} on stream {stream}..");
141
142 let timeout = channels.timeout();
143 let (mut stream_r, stream_w) = channels.split();
144 let header = match upload_pack::pktline::git_request(&mut stream_r) {
145 Ok(header) => header,
146 Err(e) => {
147 return FetchResult::Responder {
148 rid: None,
149 result: Err(UploadError::PacketLine(e)),
150 }
151 }
152 };
153 log::debug!(target: "worker", "Spawning upload-pack process for {} on stream {stream}..", header.repo);
154
155 if let Err(e) = self.is_authorized(remote, header.repo) {
156 return FetchResult::Responder {
157 rid: Some(header.repo),
158 result: Err(e.into()),
159 };
160 }
161
162 let result = upload_pack::upload_pack(
163 &self.nid,
164 remote,
165 &self.storage,
166 &emitter,
167 &header,
168 stream_r,
169 stream_w,
170 timeout,
171 )
172 .map(drop)
173 .map_err(UploadError::UploadPack);
174 log::debug!(target: "worker", "Upload process on stream {stream} exited with result {result:?}");
175
176 FetchResult::Responder {
177 rid: Some(header.repo),
178 result,
179 }
180 }
181 }
182 }
183
184 fn is_authorized(&self, remote: NodeId, rid: RepoId) -> Result<(), AuthorizationError> {
185 let policy = self.policies.seed_policy(&rid)?.policy;
186 if policy.is_block() {
189 return Err(AuthorizationError::Unauthorized(remote, rid));
190 }
191 let repo = self.storage.repository(rid)?;
192 let doc = repo.identity_doc()?;
193
194 if !doc.is_visible_to(&remote.into()) {
195 Err(AuthorizationError::Unauthorized(remote, rid))
196 } else {
197 Ok(())
198 }
199 }
200
201 fn fetch(
202 &mut self,
203 rid: RepoId,
204 remote: NodeId,
205 refs_at: Option<Vec<RefsAt>>,
206 channels: channels::ChannelsFlush,
207 notifs: notifications::StoreWriter,
208 ) -> Result<fetch::FetchResult, FetchError> {
209 let FetchConfig {
210 limit,
211 local,
212 expiry,
213 } = &self.fetch_config;
214 let allowed = radicle_fetch::Allowed::from_config(rid, &self.policies)?;
217 let blocked = radicle_fetch::BlockList::from_config(&self.policies)?;
218
219 let mut cache = self.cache.clone();
220 let handle = fetch::Handle::new(
221 rid,
222 *local,
223 &self.storage,
224 allowed,
225 blocked,
226 channels,
227 notifs,
228 )?;
229 let result = handle.fetch(
230 rid,
231 &self.storage,
232 &mut cache,
233 &mut self.db,
234 *limit,
235 remote,
236 refs_at,
237 )?;
238
239 if let Err(e) = garbage::collect(&self.storage, rid, *expiry) {
240 debug_assert!(false, "`git gc` failed: {e}");
242
243 log::warn!(target: "worker", "Failed to run `git gc`: {e}");
244 }
245 Ok(result)
246 }
247}
248
249pub struct Pool {
251 pool: Vec<thread::JoinHandle<Result<(), chan::RecvError>>>,
252}
253
254impl Pool {
255 pub fn with(
257 tasks: chan::Receiver<Task>,
258 nid: NodeId,
259 handle: Handle,
260 notifications: notifications::StoreWriter,
261 cache: cob::cache::StoreWriter,
262 db: radicle::node::Database,
263 config: Config,
264 ) -> Result<Self, policy::Error> {
265 let mut pool = Vec::with_capacity(config.capacity);
266 for i in 0..config.capacity {
267 let policies =
268 policy::Config::new(config.policy, policy::Store::reader(&config.policies_db)?);
269 let worker = Worker {
270 nid,
271 tasks: tasks.clone(),
272 handle: handle.clone(),
273 storage: config.storage.clone(),
274 fetch_config: config.fetch.clone(),
275 policies,
276 notifications: notifications.clone(),
277 cache: cache.clone(),
278 db: db.clone(),
279 };
280 let thread = thread::spawn(&nid, format!("worker#{i}"), || worker.run());
281
282 pool.push(thread);
283 }
284 Ok(Self { pool })
285 }
286
287 pub fn run(self) -> thread::Result<()> {
291 for (i, worker) in self.pool.into_iter().enumerate() {
292 if let Err(err) = worker.join()? {
293 log::trace!(target: "pool", "Worker {i} exited: {err}");
294 }
295 }
296 log::debug!(target: "pool", "Worker pool shutting down..");
297
298 Ok(())
299 }
300}