radicle_protocol/fetcher/
state.rs1pub mod command;
9pub mod event;
10
11pub use command::Command;
12pub use event::Event;
13use radicle::storage::refs::FeatureLevel;
14
15use std::collections::{BTreeMap, VecDeque};
16use std::num::NonZeroUsize;
17use std::time;
18
19use radicle_core::{NodeId, RepoId};
20
21use crate::fetcher::RefsToFetch;
22use crate::service::FETCH_TIMEOUT;
23
24pub const MAX_FETCH_QUEUE_SIZE: usize = 128;
26pub const MAX_CONCURRENCY: NonZeroUsize = NonZeroUsize::MIN;
28
29#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
35pub struct FetchConfig {
36 timeout: time::Duration,
37 protocol: radicle_fetch::Config,
38}
39
40impl FetchConfig {
41 pub fn new() -> Self {
43 Self {
44 timeout: FETCH_TIMEOUT,
45 protocol: radicle_fetch::Config::default(),
46 }
47 }
48
49 pub fn with_timeout(mut self, timeout: time::Duration) -> Self {
51 self.timeout = timeout;
52 self
53 }
54
55 pub fn with_fetch_config(mut self, config: radicle_fetch::Config) -> Self {
57 self.protocol = config;
58 self
59 }
60
61 pub fn with_minimum_feature_level(mut self, feature_level: FeatureLevel) -> Self {
64 self.protocol.level_min = feature_level;
65 self
66 }
67
68 pub fn timeout(&self) -> time::Duration {
70 self.timeout
71 }
72
73 pub fn fetch_config(&self) -> radicle_fetch::Config {
75 self.protocol
76 }
77
78 fn merge(&mut self, other: FetchConfig) {
85 self.timeout = self.timeout.max(other.timeout);
86 self.protocol.limit.refs = self.protocol.limit.refs.max(other.protocol.limit.refs);
87 self.protocol.limit.special = self
88 .protocol
89 .limit
90 .special
91 .max(other.protocol.limit.special);
92 self.protocol.level_min = self.protocol.level_min.min(other.protocol.level_min);
93 }
94}
95
96impl Default for FetchConfig {
97 fn default() -> Self {
98 Self::new()
99 }
100}
101
102#[derive(Clone, Debug, PartialEq, Eq)]
121pub struct FetcherState {
122 active: BTreeMap<RepoId, ActiveFetch>,
124 queues: BTreeMap<NodeId, Queue>,
126 config: Config,
128}
129
130impl Default for FetcherState {
131 fn default() -> Self {
132 Self::new(Config::default())
133 }
134}
135
136impl FetcherState {
137 pub fn new(config: Config) -> Self {
139 Self {
140 active: BTreeMap::new(),
141 queues: BTreeMap::new(),
142 config,
143 }
144 }
145}
146
147impl FetcherState {
148 pub fn handle(&mut self, command: Command) -> Event {
154 match command {
155 Command::Fetch(fetch) => self.fetch(fetch).into(),
156 Command::Fetched(fetched) => self.fetched(fetched).into(),
157 Command::Cancel(cancel) => self.cancel(cancel).into(),
158 }
159 }
160
161 pub fn fetch(
171 &mut self,
172 command::Fetch {
173 from,
174 rid,
175 refs,
176 config,
177 }: command::Fetch,
178 ) -> event::Fetch {
179 if let Some(active) = self.active.get(&rid) {
180 if active.refs == refs && active.from == from {
181 return event::Fetch::AlreadyFetching { rid, from };
182 } else {
183 return self.enqueue(rid, from, refs, config);
184 }
185 }
186
187 if self.is_at_node_capacity(&from) {
188 self.enqueue(rid, from, refs, config)
189 } else {
190 self.active.insert(
191 rid,
192 ActiveFetch {
193 from,
194 refs: refs.clone(),
195 },
196 );
197 event::Fetch::Started {
198 rid,
199 from,
200 refs,
201 config,
202 }
203 }
204 }
205
206 pub fn fetched(&mut self, command::Fetched { from, rid }: command::Fetched) -> event::Fetched {
213 match self.active.remove(&rid) {
214 None => event::Fetched::NotFound { from, rid },
215 Some(ActiveFetch { from, refs }) => event::Fetched::Completed { from, rid, refs },
216 }
217 }
218
219 pub fn dequeue(&mut self, from: &NodeId) -> Option<QueuedFetch> {
224 let is_at_capacity = self.is_at_node_capacity(from);
225 let queue = self.queues.get_mut(from)?;
226 let active = &self.active;
227 queue.try_dequeue(|QueuedFetch { rid, .. }| !is_at_capacity && !active.contains_key(rid))
228 }
229
230 pub fn cancel(&mut self, command::Cancel { from }: command::Cancel) -> event::Cancel {
235 let cancelled: Vec<_> = self
236 .active
237 .iter()
238 .filter_map(|(rid, f)| (f.from == from).then_some(*rid))
239 .collect();
240 let ongoing: BTreeMap<_, _> = cancelled
241 .iter()
242 .filter_map(|rid| self.active.remove(rid).map(|f| (*rid, f)))
243 .collect();
244 let ongoing = (!ongoing.is_empty()).then_some(ongoing);
245 let queued = self.queues.remove(&from).filter(|queue| !queue.is_empty());
246
247 match (ongoing, queued) {
248 (None, None) => event::Cancel::Unexpected { from },
249 (ongoing, queued) => event::Cancel::Canceled {
250 from,
251 active: ongoing.unwrap_or_default(),
252 queued: queued.map(|q| q.queue).unwrap_or_default(),
253 },
254 }
255 }
256
257 fn enqueue(
258 &mut self,
259 rid: RepoId,
260 from: NodeId,
261 refs: RefsToFetch,
262 config: FetchConfig,
263 ) -> event::Fetch {
264 let queue = self
265 .queues
266 .entry(from)
267 .or_insert(Queue::new(self.config.maximum_queue_size));
268 match queue.enqueue(QueuedFetch { rid, refs, config }) {
269 Enqueue::CapacityReached(QueuedFetch { rid, refs, config }) => {
270 event::Fetch::QueueAtCapacity {
271 rid,
272 from,
273 refs,
274 config,
275 capacity: queue.len(),
276 }
277 }
278 Enqueue::Queued => event::Fetch::Queued { rid, from },
279 Enqueue::Merged => event::Fetch::Queued { rid, from },
280 }
281 }
282}
283
284impl FetcherState {
285 pub fn queued_fetches(&self) -> &BTreeMap<NodeId, Queue> {
287 &self.queues
288 }
289
290 pub fn active_fetches(&self) -> &BTreeMap<RepoId, ActiveFetch> {
292 &self.active
293 }
294
295 pub fn get_active_fetch(&self, rid: &RepoId) -> Option<&ActiveFetch> {
298 self.active.get(rid)
299 }
300
301 fn is_at_node_capacity(&self, node: &NodeId) -> bool {
307 let count = self.active.values().filter(|f| &f.from == node).count();
308 count >= self.config.maximum_concurrency.into()
309 }
310}
311
312#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
314pub struct Config {
315 maximum_concurrency: NonZeroUsize,
317 maximum_queue_size: MaxQueueSize,
319}
320
321impl Config {
322 pub fn new() -> Self {
323 Self::default()
324 }
325
326 pub fn with_max_capacity(mut self, capacity: MaxQueueSize) -> Self {
328 self.maximum_queue_size = capacity;
329 self
330 }
331
332 pub fn with_max_concurrency(mut self, concurrency: NonZeroUsize) -> Self {
334 self.maximum_concurrency = concurrency;
335 self
336 }
337}
338
339impl Default for Config {
340 fn default() -> Self {
341 Self {
342 maximum_concurrency: MAX_CONCURRENCY,
343 maximum_queue_size: MaxQueueSize::default(),
344 }
345 }
346}
347
348#[derive(Clone, Debug, PartialEq, Eq)]
350pub struct ActiveFetch {
351 pub from: NodeId,
352 pub refs: RefsToFetch,
353}
354
355impl ActiveFetch {
356 pub fn from(&self) -> &NodeId {
358 &self.from
359 }
360
361 pub fn refs(&self) -> &RefsToFetch {
363 &self.refs
364 }
365}
366
367#[derive(Debug, Clone, PartialEq, Eq, Hash)]
369pub struct QueuedFetch {
370 pub rid: RepoId,
372 pub refs: RefsToFetch,
374 pub config: FetchConfig,
376}
377
378#[derive(Clone, Debug, PartialEq, Eq)]
383pub struct Queue {
384 queue: VecDeque<QueuedFetch>,
385 max_queue_size: MaxQueueSize,
386}
387
388#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
390pub struct MaxQueueSize(usize);
391
392impl MaxQueueSize {
393 pub const MIN: Self = MaxQueueSize(1);
395
396 pub fn new(size: NonZeroUsize) -> Self {
398 Self(size.into())
399 }
400
401 pub fn as_usize(&self) -> usize {
402 self.0
403 }
404
405 fn is_exceeded_by(&self, n: usize) -> bool {
407 n >= self.0
408 }
409}
410
411impl Default for MaxQueueSize {
412 fn default() -> Self {
413 Self(MAX_FETCH_QUEUE_SIZE)
414 }
415}
416
417#[must_use]
419#[derive(Debug, PartialEq, Eq)]
420pub(super) enum Enqueue {
421 CapacityReached(QueuedFetch),
424 Queued,
426 Merged,
427}
428
429impl Queue {
430 pub(super) fn new(max_queue_size: MaxQueueSize) -> Self {
432 Self {
433 queue: VecDeque::with_capacity(max_queue_size.0),
434 max_queue_size,
435 }
436 }
437
438 pub(super) fn len(&self) -> usize {
440 self.queue.len()
441 }
442
443 pub(super) fn is_empty(&self) -> bool {
445 self.queue.is_empty()
446 }
447
448 pub(super) fn enqueue(&mut self, fetch: QueuedFetch) -> Enqueue {
451 if let Some(existing) = self.queue.iter_mut().find(|qf| qf.rid == fetch.rid) {
452 existing.refs = existing.refs.clone().merge(fetch.refs);
453 existing.config.merge(fetch.config);
454 return Enqueue::Merged;
455 }
456
457 if self.max_queue_size.is_exceeded_by(self.queue.len()) {
458 Enqueue::CapacityReached(fetch)
459 } else {
460 self.queue.push_back(fetch);
461 Enqueue::Queued
462 }
463 }
464
465 pub(super) fn try_dequeue<P>(&mut self, predicate: P) -> Option<QueuedFetch>
468 where
469 P: FnOnce(&QueuedFetch) -> bool,
470 {
471 let fetch = self.dequeue()?;
472 if predicate(&fetch) {
473 Some(fetch)
474 } else {
475 self.queue.push_front(fetch);
476 None
477 }
478 }
479
480 pub(super) fn dequeue(&mut self) -> Option<QueuedFetch> {
482 self.queue.pop_front()
483 }
484
485 pub fn iter<'a>(&'a self) -> QueueIter<'a> {
487 QueueIter {
488 inner: self.queue.iter(),
489 }
490 }
491}
492
493pub struct QueueIter<'a> {
495 inner: std::collections::vec_deque::Iter<'a, QueuedFetch>,
496}
497
498impl<'a> Iterator for QueueIter<'a> {
499 type Item = &'a QueuedFetch;
500
501 fn next(&mut self) -> Option<Self::Item> {
502 self.inner.next()
503 }
504}
505
506impl<'a> IntoIterator for &'a Queue {
507 type Item = &'a QueuedFetch;
508 type IntoIter = QueueIter<'a>;
509
510 fn into_iter(self) -> Self::IntoIter {
511 self.iter()
512 }
513}