Skip to main content

radicle_protocol/fetcher/
state.rs

1//! Logical state for Git fetches happening in the node.
2//!
3//! See [`FetcherState`] for more information.
4//!
5//! See [`command`]'s for input into [`FetcherState`].
6//! See [`event`]'s for output from [`FetcherState`].
7
8pub mod command;
9pub mod event;
10
11pub use command::Command;
12pub use event::Event;
13use radicle::storage::refs::FeatureLevel;
14
15use std::collections::{BTreeMap, VecDeque};
16use std::num::NonZeroUsize;
17use std::time;
18
19use radicle_core::{NodeId, RepoId};
20
21use crate::fetcher::RefsToFetch;
22use crate::service::FETCH_TIMEOUT;
23
24/// Default for the maximum items per fetch queue.
25pub const MAX_FETCH_QUEUE_SIZE: usize = 128;
26/// Default for maximum concurrency per node.
27pub const MAX_CONCURRENCY: NonZeroUsize = NonZeroUsize::MIN;
28
29/// Configuration options for tuning the fetch process.
30///
31/// Note that these are not used directly by [`FetcherState`], but are
32/// maintained within the state so that the options can be tracked across queued
33/// fetches.
34#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
35pub struct FetchConfig {
36    timeout: time::Duration,
37    protocol: radicle_fetch::Config,
38}
39
40impl FetchConfig {
41    /// Construct the default [`FetchConfig`].
42    pub fn new() -> Self {
43        Self {
44            timeout: FETCH_TIMEOUT,
45            protocol: radicle_fetch::Config::default(),
46        }
47    }
48
49    /// Set the [`FetchConfig::timeout`] to the given [`time::Duration`].
50    pub fn with_timeout(mut self, timeout: time::Duration) -> Self {
51        self.timeout = timeout;
52        self
53    }
54
55    /// Set the [`FetchConfig::fetch_config`] to the given [`radicle_fetch::Config`].
56    pub fn with_fetch_config(mut self, config: radicle_fetch::Config) -> Self {
57        self.protocol = config;
58        self
59    }
60
61    /// Set the minimum feature level, within the [`FetchConfig::fetch_config`],
62    /// to the given [`FeatureLevel`].
63    pub fn with_minimum_feature_level(mut self, feature_level: FeatureLevel) -> Self {
64        self.protocol.level_min = feature_level;
65        self
66    }
67
68    /// Return the timeout duration configured for this fetch.
69    pub fn timeout(&self) -> time::Duration {
70        self.timeout
71    }
72
73    /// Return the [`radicle_fetch::Config`] configured for this fetch.
74    pub fn fetch_config(&self) -> radicle_fetch::Config {
75        self.protocol
76    }
77
78    /// Merge another [`FetchConfig`] with the current one.
79    /// For each field, the following semantics occur:
80    /// - `timeout`: the maximum timeout is taken
81    /// - `protocol.limit.refs`: the maximum limit is taken
82    /// - `protocol.limit.special`: the maximum limit is taken
83    /// - `protocol.level_min`: the minimum level is taken
84    fn merge(&mut self, other: FetchConfig) {
85        self.timeout = self.timeout.max(other.timeout);
86        self.protocol.limit.refs = self.protocol.limit.refs.max(other.protocol.limit.refs);
87        self.protocol.limit.special = self
88            .protocol
89            .limit
90            .special
91            .max(other.protocol.limit.special);
92        self.protocol.level_min = self.protocol.level_min.min(other.protocol.level_min);
93    }
94}
95
96impl Default for FetchConfig {
97    fn default() -> Self {
98        Self::new()
99    }
100}
101
102/// Logical state for Git fetches happening in the node.
103///
104/// A fetch can either be:
105///   - [`ActiveFetch`]: meaning it is currently being fetched from another node on the network
106///   - [`QueuedFetch`]: meaning it is expected to be fetched from a given node, but the
107///     repository is already being fetched, or the node is at capacity.
108///
109/// For any given repository, identified by its [`RepoId`], there can only be
110/// one fetch occurring for it at a given time. This prevents any concurrent
111/// fetches from clobbering overlapping references.
112///
113/// If the repository is actively being fetched, then that fetch will be queued
114/// for a later attempt.
115///
116/// For any given node, there is a configurable capacity so that only `N` number
117/// of fetches can happen with it concurrently. This does not guarantee that the
118/// node will actually allow this node to fetch from it – since it will maintain
119/// its own capacity for connections and load.
120#[derive(Clone, Debug, PartialEq, Eq)]
121pub struct FetcherState {
122    /// The active fetches that are occurring, ensuring only one fetch per repository.
123    active: BTreeMap<RepoId, ActiveFetch>,
124    /// The queued fetches, waiting to happen, where each node maintains its own queue.
125    queues: BTreeMap<NodeId, Queue>,
126    /// Configuration for maintaining the fetch state.
127    config: Config,
128}
129
130impl Default for FetcherState {
131    fn default() -> Self {
132        Self::new(Config::default())
133    }
134}
135
136impl FetcherState {
137    /// Initialize the [`FetcherState`] with the given [`Config`].
138    pub fn new(config: Config) -> Self {
139        Self {
140            active: BTreeMap::new(),
141            queues: BTreeMap::new(),
142            config,
143        }
144    }
145}
146
147impl FetcherState {
148    /// Process the handling of a [`Command`], delegating to its corresponding
149    /// method, and returning the corresponding [`Event`].
150    ///
151    /// This method is useful if the [`FetcherState`] is used in batch
152    /// processing and does need to be explicit about the underlying method.
153    pub fn handle(&mut self, command: Command) -> Event {
154        match command {
155            Command::Fetch(fetch) => self.fetch(fetch).into(),
156            Command::Fetched(fetched) => self.fetched(fetched).into(),
157            Command::Cancel(cancel) => self.cancel(cancel).into(),
158        }
159    }
160
161    /// Process a [`Fetch`] command, which transitions the given fetch to
162    /// active, if possible.
163    ///
164    /// The fetch will only transition to being active if:
165    ///
166    ///   - A fetch is not already happening for that repository, in which case it gets queued.
167    ///   - The node to be fetched from is not already at capacity, again it will be queued.
168    ///
169    /// [`Fetch`]: command::Fetch
170    pub fn fetch(
171        &mut self,
172        command::Fetch {
173            from,
174            rid,
175            refs,
176            config,
177        }: command::Fetch,
178    ) -> event::Fetch {
179        if let Some(active) = self.active.get(&rid) {
180            if active.refs == refs && active.from == from {
181                return event::Fetch::AlreadyFetching { rid, from };
182            } else {
183                return self.enqueue(rid, from, refs, config);
184            }
185        }
186
187        if self.is_at_node_capacity(&from) {
188            self.enqueue(rid, from, refs, config)
189        } else {
190            self.active.insert(
191                rid,
192                ActiveFetch {
193                    from,
194                    refs: refs.clone(),
195                },
196            );
197            event::Fetch::Started {
198                rid,
199                from,
200                refs,
201                config,
202            }
203        }
204    }
205
206    /// Process a [`Fetched`] command, which removes the given fetch from the set of active fetches.
207    /// Note that this is agnostic of whether the fetch succeeded or failed.
208    ///
209    /// The caller will be notified if the completed fetch did not exist in the active set.
210    ///
211    /// [`Fetched`]: command::Fetched
212    pub fn fetched(&mut self, command::Fetched { from, rid }: command::Fetched) -> event::Fetched {
213        match self.active.remove(&rid) {
214            None => event::Fetched::NotFound { from, rid },
215            Some(ActiveFetch { from, refs }) => event::Fetched::Completed { from, rid, refs },
216        }
217    }
218
219    /// Attempt to dequeue a [`QueuedFetch`] for the given node.
220    ///
221    /// This will only dequeue the fetch if it is not active, and the given node
222    /// is not at capacity.
223    pub fn dequeue(&mut self, from: &NodeId) -> Option<QueuedFetch> {
224        let is_at_capacity = self.is_at_node_capacity(from);
225        let queue = self.queues.get_mut(from)?;
226        let active = &self.active;
227        queue.try_dequeue(|QueuedFetch { rid, .. }| !is_at_capacity && !active.contains_key(rid))
228    }
229
230    /// Process a [`Cancel`] command, which cancels any active and/or queued
231    /// fetches for that given node.
232    ///
233    /// [`Cancel`]: command::Cancel
234    pub fn cancel(&mut self, command::Cancel { from }: command::Cancel) -> event::Cancel {
235        let cancelled: Vec<_> = self
236            .active
237            .iter()
238            .filter_map(|(rid, f)| (f.from == from).then_some(*rid))
239            .collect();
240        let ongoing: BTreeMap<_, _> = cancelled
241            .iter()
242            .filter_map(|rid| self.active.remove(rid).map(|f| (*rid, f)))
243            .collect();
244        let ongoing = (!ongoing.is_empty()).then_some(ongoing);
245        let queued = self.queues.remove(&from).filter(|queue| !queue.is_empty());
246
247        match (ongoing, queued) {
248            (None, None) => event::Cancel::Unexpected { from },
249            (ongoing, queued) => event::Cancel::Canceled {
250                from,
251                active: ongoing.unwrap_or_default(),
252                queued: queued.map(|q| q.queue).unwrap_or_default(),
253            },
254        }
255    }
256
257    fn enqueue(
258        &mut self,
259        rid: RepoId,
260        from: NodeId,
261        refs: RefsToFetch,
262        config: FetchConfig,
263    ) -> event::Fetch {
264        let queue = self
265            .queues
266            .entry(from)
267            .or_insert(Queue::new(self.config.maximum_queue_size));
268        match queue.enqueue(QueuedFetch { rid, refs, config }) {
269            Enqueue::CapacityReached(QueuedFetch { rid, refs, config }) => {
270                event::Fetch::QueueAtCapacity {
271                    rid,
272                    from,
273                    refs,
274                    config,
275                    capacity: queue.len(),
276                }
277            }
278            Enqueue::Queued => event::Fetch::Queued { rid, from },
279            Enqueue::Merged => event::Fetch::Queued { rid, from },
280        }
281    }
282}
283
284impl FetcherState {
285    /// Get the set of queued fetches.
286    pub fn queued_fetches(&self) -> &BTreeMap<NodeId, Queue> {
287        &self.queues
288    }
289
290    /// Get the set of active fetches.
291    pub fn active_fetches(&self) -> &BTreeMap<RepoId, ActiveFetch> {
292        &self.active
293    }
294
295    /// Get the [`ActiveFetch`] for the provided [`RepoId`], returning `None` if
296    /// it does not exist.
297    pub fn get_active_fetch(&self, rid: &RepoId) -> Option<&ActiveFetch> {
298        self.active.get(rid)
299    }
300
301    /// Check if the number of fetches exceeds the maximum number of concurrent
302    /// fetches for a given [`NodeId`].
303    ///
304    /// Returns `true` if the fetcher is fetching the maximum number of
305    /// repositories, for that node.
306    fn is_at_node_capacity(&self, node: &NodeId) -> bool {
307        let count = self.active.values().filter(|f| &f.from == node).count();
308        count >= self.config.maximum_concurrency.into()
309    }
310}
311
312/// Configuration for the [`FetcherState`].
313#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
314pub struct Config {
315    /// Maximum number of concurrent fetches per peer connection.
316    maximum_concurrency: NonZeroUsize,
317    /// Maximum fetching queue size for a single node.
318    maximum_queue_size: MaxQueueSize,
319}
320
321impl Config {
322    pub fn new() -> Self {
323        Self::default()
324    }
325
326    /// Maximum fetching queue size for a single node.
327    pub fn with_max_capacity(mut self, capacity: MaxQueueSize) -> Self {
328        self.maximum_queue_size = capacity;
329        self
330    }
331
332    /// Maximum number of concurrent fetches per peer connection.
333    pub fn with_max_concurrency(mut self, concurrency: NonZeroUsize) -> Self {
334        self.maximum_concurrency = concurrency;
335        self
336    }
337}
338
339impl Default for Config {
340    fn default() -> Self {
341        Self {
342            maximum_concurrency: MAX_CONCURRENCY,
343            maximum_queue_size: MaxQueueSize::default(),
344        }
345    }
346}
347
348/// An active fetch represents a repository being fetched by a particular node.
349#[derive(Clone, Debug, PartialEq, Eq)]
350pub struct ActiveFetch {
351    pub from: NodeId,
352    pub refs: RefsToFetch,
353}
354
355impl ActiveFetch {
356    /// The node from which the repository is being fetched.
357    pub fn from(&self) -> &NodeId {
358        &self.from
359    }
360
361    /// The set of references that fetch is being performed for.
362    pub fn refs(&self) -> &RefsToFetch {
363        &self.refs
364    }
365}
366
367/// A fetch that is waiting to be processed, in the fetch queue.
368#[derive(Debug, Clone, PartialEq, Eq, Hash)]
369pub struct QueuedFetch {
370    /// The repository that will be fetched.
371    pub rid: RepoId,
372    /// The references that the fetch is being performed for.
373    pub refs: RefsToFetch,
374    /// The configuration options to pass to the fetch process.
375    pub config: FetchConfig,
376}
377
378/// A queue for keeping track of fetches.
379///
380/// It ensures that the queue contains unique items for fetching, and does not
381/// exceed the provided maximum capacity.
382#[derive(Clone, Debug, PartialEq, Eq)]
383pub struct Queue {
384    queue: VecDeque<QueuedFetch>,
385    max_queue_size: MaxQueueSize,
386}
387
388/// The maximum number of fetches that can be queued for a single node.
389#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
390pub struct MaxQueueSize(usize);
391
392impl MaxQueueSize {
393    /// Minimum queue size is `1`.
394    pub const MIN: Self = MaxQueueSize(1);
395
396    /// Create a queue size, that must be larger than `0`.
397    pub fn new(size: NonZeroUsize) -> Self {
398        Self(size.into())
399    }
400
401    pub fn as_usize(&self) -> usize {
402        self.0
403    }
404
405    /// Checks if the `n` provided exceeds the maximum queue size.
406    fn is_exceeded_by(&self, n: usize) -> bool {
407        n >= self.0
408    }
409}
410
411impl Default for MaxQueueSize {
412    fn default() -> Self {
413        Self(MAX_FETCH_QUEUE_SIZE)
414    }
415}
416
417/// The result of [`Queue::enqueue`].
418#[must_use]
419#[derive(Debug, PartialEq, Eq)]
420pub(super) enum Enqueue {
421    /// The capacity of the queue has been exceeded, and the [`QueuedFetch`] is
422    /// returned.
423    CapacityReached(QueuedFetch),
424    /// The [`QueuedFetch`] was successfully queued.
425    Queued,
426    Merged,
427}
428
429impl Queue {
430    /// Create the [`Queue`] with the given [`MaxQueueSize`].
431    pub(super) fn new(max_queue_size: MaxQueueSize) -> Self {
432        Self {
433            queue: VecDeque::with_capacity(max_queue_size.0),
434            max_queue_size,
435        }
436    }
437
438    /// The current number of items in the queue.
439    pub(super) fn len(&self) -> usize {
440        self.queue.len()
441    }
442
443    /// Returns `true` if the [`Queue`] is empty.
444    pub(super) fn is_empty(&self) -> bool {
445        self.queue.is_empty()
446    }
447
448    /// Enqueues a fetch onto the back of the queue, and will only succeed if
449    /// the queue has not reached capacity and if the item is unique.
450    pub(super) fn enqueue(&mut self, fetch: QueuedFetch) -> Enqueue {
451        if let Some(existing) = self.queue.iter_mut().find(|qf| qf.rid == fetch.rid) {
452            existing.refs = existing.refs.clone().merge(fetch.refs);
453            existing.config.merge(fetch.config);
454            return Enqueue::Merged;
455        }
456
457        if self.max_queue_size.is_exceeded_by(self.queue.len()) {
458            Enqueue::CapacityReached(fetch)
459        } else {
460            self.queue.push_back(fetch);
461            Enqueue::Queued
462        }
463    }
464
465    /// Try to dequeue the next [`QueuedFetch`], but only if the `predicate`
466    /// holds; otherwise, it will be pushed back to the front of the queue.
467    pub(super) fn try_dequeue<P>(&mut self, predicate: P) -> Option<QueuedFetch>
468    where
469        P: FnOnce(&QueuedFetch) -> bool,
470    {
471        let fetch = self.dequeue()?;
472        if predicate(&fetch) {
473            Some(fetch)
474        } else {
475            self.queue.push_front(fetch);
476            None
477        }
478    }
479
480    /// Dequeues a fetch from the front of the queue.
481    pub(super) fn dequeue(&mut self) -> Option<QueuedFetch> {
482        self.queue.pop_front()
483    }
484
485    /// Return an iterator over the queued fetches.
486    pub fn iter<'a>(&'a self) -> QueueIter<'a> {
487        QueueIter {
488            inner: self.queue.iter(),
489        }
490    }
491}
492
493/// Iterator of the [`QueuedFetch`]'s
494pub struct QueueIter<'a> {
495    inner: std::collections::vec_deque::Iter<'a, QueuedFetch>,
496}
497
498impl<'a> Iterator for QueueIter<'a> {
499    type Item = &'a QueuedFetch;
500
501    fn next(&mut self) -> Option<Self::Item> {
502        self.inner.next()
503    }
504}
505
506impl<'a> IntoIterator for &'a Queue {
507    type Item = &'a QueuedFetch;
508    type IntoIter = QueueIter<'a>;
509
510    fn into_iter(self) -> Self::IntoIter {
511        self.iter()
512    }
513}