Skip to main content

radicle_protocol/fetcher/
service.rs

1use std::collections::HashMap;
2
3use radicle_core::{NodeId, RepoId};
4
5use crate::fetcher::{
6    RefsToFetch,
7    state::{
8        Config, FetcherState, QueuedFetch,
9        command::{self},
10        event,
11    },
12};
13
14/// Service layer that wraps [`FetcherState`] and manages subscriber coalescing.
15///
16/// When multiple callers request the same fetch, their subscribers are collected
17/// and all notified when the fetch completes.
18///
19/// # Type Parameter
20/// - `S`: The subscriber type (e.g., `chan::Sender<FetchResult>`).
21#[derive(Debug)]
22pub struct FetcherService<S> {
23    state: FetcherState,
24    subscribers: HashMap<FetchKey, Vec<S>>,
25}
26
27impl<S> FetcherService<S> {
28    /// Initialize the [`FetcherService`] with the give [`Config`].
29    pub fn new(config: Config) -> Self {
30        Self {
31            state: FetcherState::new(config),
32            subscribers: HashMap::new(),
33        }
34    }
35
36    /// Provide a reference handle to the [`FetcherState`].
37    pub fn state(&self) -> &FetcherState {
38        &self.state
39    }
40}
41
42/// Key for pending subscribers.
43#[derive(Clone, Debug, PartialEq, Eq, Hash)]
44struct FetchKey {
45    rid: RepoId,
46    node: NodeId,
47    refs: RefsToFetch,
48}
49
50impl FetchKey {
51    fn new(rid: RepoId, node: NodeId, refs: RefsToFetch) -> Self {
52        Self { rid, node, refs }
53    }
54}
55
56/// The result of calling [`FetcherService::fetch`].
57#[must_use]
58#[derive(Debug)]
59pub struct FetchInitiated<S> {
60    /// The underlying result from calling [`FetcherState::fetch`].
61    pub event: event::Fetch,
62    /// Subscriber returned if fetch was rejected (queue at capacity).
63    pub rejected: Option<S>,
64}
65
66/// The result of calling [`FetcherService::fetched`].
67#[must_use]
68#[derive(Debug)]
69pub struct FetchCompleted<S> {
70    /// The underlying result from calling [`FetcherState::fetched`].
71    pub event: event::Fetched,
72    /// All the subscribers that were interested in this given fetch.
73    pub subscribers: Vec<S>,
74}
75
76/// The result of calling [`FetcherService::cancel`].
77#[must_use]
78#[derive(Debug)]
79pub struct FetchesCancelled<S> {
80    /// The underlying result from calling [`FetcherState::cancel`].
81    pub event: event::Cancel,
82    /// Orphaned subscribers paired with their [`RepoId`].
83    pub orphaned: Vec<(RepoId, S)>,
84}
85
86impl<S> FetcherService<S> {
87    /// Initiate a fetch, optionally registering a subscriber.
88    ///
89    /// Subscribers are coalesced: if the same `(rid, node)` is already being
90    /// fetched or queued, the subscriber joins the existing waiters.
91    ///
92    /// If the fetch could not be initiated, and also could not be queued, then
93    /// subscriber is returned to notify of the rejection.
94    ///
95    /// See [`FetcherState::fetch`].
96    pub fn fetch(&mut self, cmd: command::Fetch, subscriber: Option<S>) -> FetchInitiated<S> {
97        let key = FetchKey::new(cmd.rid, cmd.from, cmd.refs.clone());
98        let event = self.state.fetch(cmd);
99
100        let rejected = match &event {
101            event::Fetch::QueueAtCapacity { .. } => subscriber,
102            _ => {
103                if let Some(r) = subscriber {
104                    self.subscribers.entry(key).or_default().push(r);
105                }
106                None
107            }
108        };
109
110        FetchInitiated { event, rejected }
111    }
112
113    /// Mark a fetch as completed and retrieve waiting subscribers.
114    ///
115    /// See [`FetcherState::fetched`].
116    pub fn fetched(&mut self, cmd: command::Fetched) -> FetchCompleted<S> {
117        let event = self.state.fetched(cmd);
118        match event {
119            // TODO(finto): drop subscribers with this partial key?
120            e @ event::Fetched::NotFound { .. } => FetchCompleted {
121                event: e,
122                subscribers: vec![],
123            },
124            ref e @ event::Fetched::Completed { ref refs, .. } => {
125                let key = FetchKey::new(cmd.rid, cmd.from, refs.clone());
126                let subscribers = self.subscribers.remove(&key).unwrap_or_default();
127                FetchCompleted {
128                    event: e.clone(),
129                    subscribers,
130                }
131            }
132        }
133    }
134
135    /// Cancel all fetches for a disconnected peer, returning any orphaned
136    /// subscribers.
137    ///
138    /// See [`FetcherState::cancel`].
139    pub fn cancel(&mut self, cmd: command::Cancel) -> FetchesCancelled<S> {
140        let from = cmd.from;
141        let event = self.state.cancel(cmd);
142
143        let mut orphaned = Vec::new();
144        self.subscribers.retain(|key, subscribers| {
145            if key.node == from {
146                orphaned.extend(subscribers.drain(..).map(|r| (key.rid, r)));
147                false
148            } else {
149                true
150            }
151        });
152
153        FetchesCancelled { event, orphaned }
154    }
155
156    /// Dequeue the next fetch for a node.
157    ///
158    /// See [`FetcherState::dequeue`].
159    pub fn dequeue(&mut self, from: &NodeId) -> Option<QueuedFetch> {
160        self.state.dequeue(from)
161    }
162}
163
164#[cfg(test)]
165mod tests {
166    use radicle::storage::refs::RefsAt;
167    use radicle::test::arbitrary;
168    use std::num::NonZeroUsize;
169
170    use super::*;
171
172    use crate::fetcher::{FetchConfig, MaxQueueSize};
173
174    #[test]
175    fn test_fetch_coalescing_different_refs() {
176        let config = Config::new()
177            .with_max_concurrency(NonZeroUsize::new(1).unwrap())
178            .with_max_capacity(MaxQueueSize::new(NonZeroUsize::new(10).unwrap()));
179        let mut service = FetcherService::<usize>::new(config);
180        let node = arbitrary::r#gen(1);
181        let repo = arbitrary::r#gen(1);
182        let refs_specific: Vec<RefsAt> = arbitrary::vec(2);
183        let refs_all = vec![];
184        let config = FetchConfig::default();
185
186        // fetch specific refs (Subscriber 1)
187        let initiated1 = service.fetch(
188            command::Fetch {
189                from: node,
190                rid: repo,
191                refs: refs_specific.clone().into(),
192                config,
193            },
194            Some(1),
195        );
196
197        assert!(matches!(initiated1.event, event::Fetch::Started { .. }));
198
199        // fetch all refs (Subscriber 2)
200        let initiated2 = service.fetch(
201            command::Fetch {
202                from: node,
203                rid: repo,
204                refs: refs_all.clone().into(),
205                config,
206            },
207            Some(2),
208        );
209
210        // should be queued because refs differ
211
212        assert!(matches!(initiated2.event, event::Fetch::Queued { .. }));
213
214        // complete the specific refs fetch
215
216        let completed = service.fetched(command::Fetched {
217            from: node,
218            rid: repo,
219        });
220
221        match completed.event {
222            event::Fetched::Completed { ref refs, .. } => {
223                assert_eq!(refs, &refs_specific.into());
224            }
225            _ => panic!("Expected Completed event"),
226        }
227
228        // only Subscriber 1 should be notified
229        assert_eq!(completed.subscribers, vec![1]);
230
231        // subscriber 2 should still be waiting
232        assert!(service.subscribers.contains_key(&FetchKey::new(
233            repo,
234            node,
235            refs_all.clone().into()
236        )));
237
238        let remaining = &service.subscribers[&FetchKey::new(repo, node, refs_all.into())];
239        assert_eq!(remaining.len(), 1);
240        assert_eq!(remaining[0], 2);
241    }
242}