Skip to main content

radicle_protocol/fetcher/
state.rs

1//! Logical state for Git fetches happening in the node.
2//!
3//! See [`FetcherState`] for more information.
4//!
5//! See [`command`]'s for input into [`FetcherState`].
6//! See [`event`]'s for output from [`FetcherState`].
7
8pub mod command;
9pub mod event;
10
11pub use command::Command;
12pub use event::Event;
13use serde::Serialize;
14
15use std::collections::{BTreeMap, VecDeque};
16use std::num::NonZeroUsize;
17use std::time;
18
19use radicle_core::{NodeId, RepoId};
20
21use crate::fetcher::RefsToFetch;
22
23/// Default for the maximum items per fetch queue.
24pub const MAX_FETCH_QUEUE_SIZE: usize = 128;
25/// Default for maximum concurrency per node.
26pub const MAX_CONCURRENCY: NonZeroUsize = NonZeroUsize::MIN;
27
28/// Logical state for Git fetches happening in the node.
29///
30/// A fetch can either be:
31///   - [`ActiveFetch`]: meaning it is currently being fetched from another node on the network
32///   - [`QueuedFetch`]: meaning it is expected to be fetched from a given node, but the
33///     repository is already being fetched, or the node is at capacity.
34///
35/// For any given repository, identified by its [`RepoId`], there can only be
36/// one fetch occurring for it at a given time. This prevents any concurrent
37/// fetches from clobbering overlapping references.
38///
39/// If the repository is actively being fetched, then that fetch will be queued
40/// for a later attempt.
41///
42/// For any given node, there is a configurable capacity so that only `N` number
43/// of fetches can happen with it concurrently. This does not guarantee that the
44/// node will actually allow this node to fetch from it – since it will maintain
45/// its own capacity for connections and load.
46#[derive(Clone, Debug, PartialEq, Eq, Serialize)]
47pub struct FetcherState {
48    /// The active fetches that are occurring, ensuring only one fetch per repository.
49    active: BTreeMap<RepoId, ActiveFetch>,
50    /// The queued fetches, waiting to happen, where each node maintains its own queue.
51    queues: BTreeMap<NodeId, Queue>,
52    /// Configuration for maintaining the fetch state.
53    config: Config,
54}
55
56impl Default for FetcherState {
57    fn default() -> Self {
58        Self::new(Config::default())
59    }
60}
61
62impl FetcherState {
63    /// Initialize the [`FetcherState`] with the given [`Config`].
64    pub fn new(config: Config) -> Self {
65        Self {
66            active: BTreeMap::new(),
67            queues: BTreeMap::new(),
68            config,
69        }
70    }
71}
72
73impl FetcherState {
74    /// Process the handling of a [`Command`], delegating to its corresponding
75    /// method, and returning the corresponding [`Event`].
76    ///
77    /// This method is useful if the [`FetcherState`] is used in batch
78    /// processing and does need to be explicit about the underlying method.
79    pub fn handle(&mut self, command: Command) -> Event {
80        match command {
81            Command::Fetch(fetch) => self.fetch(fetch).into(),
82            Command::Fetched(fetched) => self.fetched(fetched).into(),
83            Command::Cancel(cancel) => self.cancel(cancel).into(),
84        }
85    }
86
87    /// Process a [`Fetch`] command, which transitions the given fetch to
88    /// active, if possible.
89    ///
90    /// The fetch will only transition to being active if:
91    ///
92    ///   - A fetch is not already happening for that repository, in which case it gets queued.
93    ///   - The node to be fetched from is not already at capacity, again it will be queued.
94    ///
95    /// [`Fetch`]: command::Fetch
96    pub fn fetch(
97        &mut self,
98        command::Fetch {
99            from,
100            rid,
101            refs,
102            timeout,
103        }: command::Fetch,
104    ) -> event::Fetch {
105        if let Some(active) = self.active.get(&rid) {
106            if active.refs == refs && active.from == from {
107                return event::Fetch::AlreadyFetching { rid, from };
108            } else {
109                return self.enqueue(rid, from, refs, timeout);
110            }
111        }
112
113        if self.is_at_node_capacity(&from) {
114            self.enqueue(rid, from, refs, timeout)
115        } else {
116            self.active.insert(
117                rid,
118                ActiveFetch {
119                    from,
120                    refs: refs.clone(),
121                },
122            );
123            event::Fetch::Started {
124                rid,
125                from,
126                refs,
127                timeout,
128            }
129        }
130    }
131
132    /// Process a [`Fetched`] command, which removes the given fetch from the set of active fetches.
133    /// Note that this is agnostic of whether the fetch succeeded or failed.
134    ///
135    /// The caller will be notified if the completed fetch did not exist in the active set.
136    ///
137    /// [`Fetched`]: command::Fetched
138    pub fn fetched(&mut self, command::Fetched { from, rid }: command::Fetched) -> event::Fetched {
139        match self.active.remove(&rid) {
140            None => event::Fetched::NotFound { from, rid },
141            Some(ActiveFetch { from, refs }) => event::Fetched::Completed { from, rid, refs },
142        }
143    }
144
145    /// Attempt to dequeue a [`QueuedFetch`] for the given node.
146    ///
147    /// This will only dequeue the fetch if it is not active, and the given node
148    /// is not at capacity.
149    pub fn dequeue(&mut self, from: &NodeId) -> Option<QueuedFetch> {
150        let is_at_capacity = self.is_at_node_capacity(from);
151        let queue = self.queues.get_mut(from)?;
152        let active = &self.active;
153        queue.try_dequeue(|QueuedFetch { rid, .. }| !is_at_capacity && !active.contains_key(rid))
154    }
155
156    /// Process a [`Cancel`] command, which cancels any active and/or queued
157    /// fetches for that given node.
158    ///
159    /// [`Cancel`]: command::Cancel
160    pub fn cancel(&mut self, command::Cancel { from }: command::Cancel) -> event::Cancel {
161        let cancelled: Vec<_> = self
162            .active
163            .iter()
164            .filter_map(|(rid, f)| (f.from == from).then_some(*rid))
165            .collect();
166        let ongoing: BTreeMap<_, _> = cancelled
167            .iter()
168            .filter_map(|rid| self.active.remove(rid).map(|f| (*rid, f)))
169            .collect();
170        let ongoing = (!ongoing.is_empty()).then_some(ongoing);
171        let queued = self.queues.remove(&from).filter(|queue| !queue.is_empty());
172
173        match (ongoing, queued) {
174            (None, None) => event::Cancel::Unexpected { from },
175            (ongoing, queued) => event::Cancel::Canceled {
176                from,
177                active: ongoing.unwrap_or_default(),
178                queued: queued.map(|q| q.queue).unwrap_or_default(),
179            },
180        }
181    }
182
183    fn enqueue(
184        &mut self,
185        rid: RepoId,
186        from: NodeId,
187        refs: RefsToFetch,
188        timeout: time::Duration,
189    ) -> event::Fetch {
190        let queue = self
191            .queues
192            .entry(from)
193            .or_insert(Queue::new(self.config.maximum_queue_size));
194        match queue.enqueue(QueuedFetch { rid, refs, timeout }) {
195            Enqueue::CapacityReached(QueuedFetch { rid, refs, timeout }) => {
196                event::Fetch::QueueAtCapacity {
197                    rid,
198                    from,
199                    refs,
200                    timeout,
201                    capacity: queue.len(),
202                }
203            }
204            Enqueue::Queued => event::Fetch::Queued { rid, from },
205            Enqueue::Merged => event::Fetch::Queued { rid, from },
206        }
207    }
208}
209
210impl FetcherState {
211    /// Get the set of queued fetches.
212    pub fn queued_fetches(&self) -> &BTreeMap<NodeId, Queue> {
213        &self.queues
214    }
215
216    /// Get the set of active fetches.
217    pub fn active_fetches(&self) -> &BTreeMap<RepoId, ActiveFetch> {
218        &self.active
219    }
220
221    /// Get the [`ActiveFetch`] for the provided [`RepoId`], returning `None` if
222    /// it does not exist.
223    pub fn get_active_fetch(&self, rid: &RepoId) -> Option<&ActiveFetch> {
224        self.active.get(rid)
225    }
226
227    /// Check if the number of fetches exceeds the maximum number of concurrent
228    /// fetches for a given [`NodeId`].
229    ///
230    /// Returns `true` if the fetcher is fetching the maximum number of
231    /// repositories, for that node.
232    fn is_at_node_capacity(&self, node: &NodeId) -> bool {
233        let count = self.active.values().filter(|f| &f.from == node).count();
234        count >= self.config.maximum_concurrency.into()
235    }
236}
237
238/// Configuration for the [`FetcherState`].
239#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, Serialize)]
240pub struct Config {
241    /// Maximum number of concurrent fetches per peer connection.
242    maximum_concurrency: NonZeroUsize,
243    /// Maximum fetching queue size for a single node.
244    maximum_queue_size: MaxQueueSize,
245}
246
247impl Config {
248    pub fn new() -> Self {
249        Self::default()
250    }
251
252    /// Maximum fetching queue size for a single node.
253    pub fn with_max_capacity(mut self, capacity: MaxQueueSize) -> Self {
254        self.maximum_queue_size = capacity;
255        self
256    }
257
258    /// Maximum number of concurrent fetches per peer connection.
259    pub fn with_max_concurrency(mut self, concurrency: NonZeroUsize) -> Self {
260        self.maximum_concurrency = concurrency;
261        self
262    }
263}
264
265impl Default for Config {
266    fn default() -> Self {
267        Self {
268            maximum_concurrency: MAX_CONCURRENCY,
269            maximum_queue_size: MaxQueueSize::default(),
270        }
271    }
272}
273
274/// An active fetch represents a repository being fetched by a particular node.
275#[derive(Clone, Debug, PartialEq, Eq, Serialize)]
276pub struct ActiveFetch {
277    pub from: NodeId,
278    pub refs: RefsToFetch,
279}
280
281impl ActiveFetch {
282    /// The node from which the repository is being fetched.
283    pub fn from(&self) -> &NodeId {
284        &self.from
285    }
286
287    /// The set of references that fetch is being performed for.
288    pub fn refs(&self) -> &RefsToFetch {
289        &self.refs
290    }
291}
292
293/// A fetch that is waiting to be processed, in the fetch queue.
294#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize)]
295pub struct QueuedFetch {
296    /// The repository that will be fetched.
297    pub rid: RepoId,
298    /// The references that the fetch is being performed for.
299    pub refs: RefsToFetch,
300    /// The timeout given for the fetch request.
301    pub timeout: time::Duration,
302}
303
304/// A queue for keeping track of fetches.
305///
306/// It ensures that the queue contains unique items for fetching, and does not
307/// exceed the provided maximum capacity.
308#[derive(Clone, Debug, PartialEq, Eq, Serialize)]
309pub struct Queue {
310    queue: VecDeque<QueuedFetch>,
311    max_queue_size: MaxQueueSize,
312}
313
314/// The maximum number of fetches that can be queued for a single node.
315#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize)]
316#[serde(transparent)]
317pub struct MaxQueueSize(usize);
318
319impl MaxQueueSize {
320    /// Minimum queue size is `1`.
321    pub const MIN: Self = MaxQueueSize(1);
322
323    /// Create a queue size, that must be larger than `0`.
324    pub fn new(size: NonZeroUsize) -> Self {
325        Self(size.into())
326    }
327
328    pub fn as_usize(&self) -> usize {
329        self.0
330    }
331
332    /// Checks if the `n` provided exceeds the maximum queue size.
333    fn is_exceeded_by(&self, n: usize) -> bool {
334        n >= self.0
335    }
336}
337
338impl Default for MaxQueueSize {
339    fn default() -> Self {
340        Self(MAX_FETCH_QUEUE_SIZE)
341    }
342}
343
344/// The result of [`Queue::enqueue`].
345#[must_use]
346#[derive(Debug, PartialEq, Eq)]
347pub(super) enum Enqueue {
348    /// The capacity of the queue has been exceeded, and the [`QueuedFetch`] is
349    /// returned.
350    CapacityReached(QueuedFetch),
351    /// The [`QueuedFetch`] was successfully queued.
352    Queued,
353    Merged,
354}
355
356impl Queue {
357    /// Create the [`Queue`] with the given [`MaxQueueSize`].
358    pub(super) fn new(max_queue_size: MaxQueueSize) -> Self {
359        Self {
360            queue: VecDeque::with_capacity(max_queue_size.0),
361            max_queue_size,
362        }
363    }
364
365    /// The current number of items in the queue.
366    pub(super) fn len(&self) -> usize {
367        self.queue.len()
368    }
369
370    /// Returns `true` if the [`Queue`] is empty.
371    pub(super) fn is_empty(&self) -> bool {
372        self.queue.is_empty()
373    }
374
375    /// Enqueues a fetch onto the back of the queue, and will only succeed if
376    /// the queue has not reached capacity and if the item is unique.
377    pub(super) fn enqueue(&mut self, fetch: QueuedFetch) -> Enqueue {
378        if let Some(existing) = self.queue.iter_mut().find(|qf| qf.rid == fetch.rid) {
379            existing.refs = existing.refs.clone().merge(fetch.refs);
380            // Take the longer timeout (more generous)
381            existing.timeout = existing.timeout.max(fetch.timeout);
382            return Enqueue::Merged;
383        }
384
385        if self.max_queue_size.is_exceeded_by(self.queue.len()) {
386            Enqueue::CapacityReached(fetch)
387        } else {
388            self.queue.push_back(fetch);
389            Enqueue::Queued
390        }
391    }
392
393    /// Try to dequeue the next [`QueuedFetch`], but only if the `predicate`
394    /// holds, otherwise it will be pushed back to the front of the queue.
395    pub(super) fn try_dequeue<P>(&mut self, predicate: P) -> Option<QueuedFetch>
396    where
397        P: FnOnce(&QueuedFetch) -> bool,
398    {
399        let fetch = self.dequeue()?;
400        if predicate(&fetch) {
401            Some(fetch)
402        } else {
403            self.queue.push_front(fetch);
404            None
405        }
406    }
407
408    /// Dequeues a fetch from the front of the queue.
409    pub(super) fn dequeue(&mut self) -> Option<QueuedFetch> {
410        self.queue.pop_front()
411    }
412
413    /// Return an iterator over the queued fetches.
414    pub fn iter<'a>(&'a self) -> QueueIter<'a> {
415        QueueIter {
416            inner: self.queue.iter(),
417        }
418    }
419}
420
421/// Iterator of the [`QueuedFetch`]'s
422pub struct QueueIter<'a> {
423    inner: std::collections::vec_deque::Iter<'a, QueuedFetch>,
424}
425
426impl<'a> Iterator for QueueIter<'a> {
427    type Item = &'a QueuedFetch;
428
429    fn next(&mut self) -> Option<Self::Item> {
430        self.inner.next()
431    }
432}
433
434impl<'a> IntoIterator for &'a Queue {
435    type Item = &'a QueuedFetch;
436    type IntoIter = QueueIter<'a>;
437
438    fn into_iter(self) -> Self::IntoIter {
439        self.iter()
440    }
441}