Skip to main content

radicle_protocol/fetcher/
service.rs

1use std::collections::HashMap;
2
3use radicle_core::{NodeId, RepoId};
4
5use crate::fetcher::{
6    state::{
7        command::{self},
8        event, Config, FetcherState, QueuedFetch,
9    },
10    RefsToFetch,
11};
12
13/// Service layer that wraps [`FetcherState`] and manages subscriber coalescing.
14///
15/// When multiple callers request the same fetch, their subscribers are collected
16/// and all notified when the fetch completes.
17///
18/// # Type Parameter
19/// - `S`: The subscriber type (e.g., `chan::Sender<FetchResult>`).
20#[derive(Debug)]
21pub struct FetcherService<S> {
22    state: FetcherState,
23    subscribers: HashMap<FetchKey, Vec<S>>,
24}
25
26impl<S> FetcherService<S> {
27    /// Initialize the [`FetcherService`] with the give [`Config`].
28    pub fn new(config: Config) -> Self {
29        Self {
30            state: FetcherState::new(config),
31            subscribers: HashMap::new(),
32        }
33    }
34
35    /// Provide a reference handle to the [`FetcherState`].
36    pub fn state(&self) -> &FetcherState {
37        &self.state
38    }
39}
40
41/// Key for pending subscribers.
42#[derive(Clone, Debug, PartialEq, Eq, Hash)]
43struct FetchKey {
44    rid: RepoId,
45    node: NodeId,
46    refs: RefsToFetch,
47}
48
49impl FetchKey {
50    fn new(rid: RepoId, node: NodeId, refs: RefsToFetch) -> Self {
51        Self { rid, node, refs }
52    }
53}
54
55/// The result of calling [`FetcherService::fetch`].
56#[must_use]
57#[derive(Debug)]
58pub struct FetchInitiated<S> {
59    /// The underlying result from calling [`FetcherState::fetch`].
60    pub event: event::Fetch,
61    /// Subscriber returned if fetch was rejected (queue at capacity).
62    pub rejected: Option<S>,
63}
64
65/// The result of calling [`FetcherService::fetched`].
66#[must_use]
67#[derive(Debug)]
68pub struct FetchCompleted<S> {
69    /// The underlying result from calling [`FetcherState::fetched`].
70    pub event: event::Fetched,
71    /// All the subscribers that were interested in this given fetch.
72    pub subscribers: Vec<S>,
73}
74
75/// The result of calling [`FetcherService::cancel`].
76#[must_use]
77#[derive(Debug)]
78pub struct FetchesCancelled<S> {
79    /// The underlying result from calling [`FetcherState::cancel`].
80    pub event: event::Cancel,
81    /// Orphaned subscribers paired with their [`RepoId`].
82    pub orphaned: Vec<(RepoId, S)>,
83}
84
85impl<S> FetcherService<S> {
86    /// Initiate a fetch, optionally registering a subscriber.
87    ///
88    /// Subscribers are coalesced: if the same `(rid, node)` is already being
89    /// fetched or queued, the subscriber joins the existing waiters.
90    ///
91    /// If the fetch could not be initiated, and also could not be queued, then
92    /// subscriber is returned to notify of the rejection.
93    ///
94    /// See [`FetcherState::fetch`].
95    pub fn fetch(&mut self, cmd: command::Fetch, subscriber: Option<S>) -> FetchInitiated<S> {
96        let key = FetchKey::new(cmd.rid, cmd.from, cmd.refs.clone());
97        let event = self.state.fetch(cmd);
98
99        let rejected = match &event {
100            event::Fetch::QueueAtCapacity { .. } => subscriber,
101            _ => {
102                if let Some(r) = subscriber {
103                    self.subscribers.entry(key).or_default().push(r);
104                }
105                None
106            }
107        };
108
109        FetchInitiated { event, rejected }
110    }
111
112    /// Mark a fetch as completed and retrieve waiting subscribers.
113    ///
114    /// See [`FetcherState::fetched`].
115    pub fn fetched(&mut self, cmd: command::Fetched) -> FetchCompleted<S> {
116        let event = self.state.fetched(cmd);
117        match event {
118            // TODO(finto): drop subscribers with this partial key?
119            e @ event::Fetched::NotFound { .. } => FetchCompleted {
120                event: e,
121                subscribers: vec![],
122            },
123            ref e @ event::Fetched::Completed { ref refs, .. } => {
124                let key = FetchKey::new(cmd.rid, cmd.from, refs.clone());
125                let subscribers = self.subscribers.remove(&key).unwrap_or_default();
126                FetchCompleted {
127                    event: e.clone(),
128                    subscribers,
129                }
130            }
131        }
132    }
133
134    /// Cancel all fetches for a disconnected peer, returning any orphaned
135    /// subscribers.
136    ///
137    /// See [`FetcherState::cancel`].
138    pub fn cancel(&mut self, cmd: command::Cancel) -> FetchesCancelled<S> {
139        let from = cmd.from;
140        let event = self.state.cancel(cmd);
141
142        let mut orphaned = Vec::new();
143        self.subscribers.retain(|key, subscribers| {
144            if key.node == from {
145                orphaned.extend(subscribers.drain(..).map(|r| (key.rid, r)));
146                false
147            } else {
148                true
149            }
150        });
151
152        FetchesCancelled { event, orphaned }
153    }
154
155    /// Dequeue the next fetch for a node.
156    ///
157    /// See [`FetcherState::dequeue`].
158    pub fn dequeue(&mut self, from: &NodeId) -> Option<QueuedFetch> {
159        self.state.dequeue(from)
160    }
161}
162
163#[cfg(test)]
164mod tests {
165    use radicle::storage::refs::RefsAt;
166    use radicle::test::arbitrary;
167    use std::num::NonZeroUsize;
168    use std::time::Duration;
169
170    use super::*;
171
172    use crate::fetcher::MaxQueueSize;
173
174    #[test]
175    fn test_fetch_coalescing_different_refs() {
176        let config = Config::new()
177            .with_max_concurrency(NonZeroUsize::new(1).unwrap())
178            .with_max_capacity(MaxQueueSize::new(NonZeroUsize::new(10).unwrap()));
179        let mut service = FetcherService::<usize>::new(config);
180        let node = arbitrary::gen(1);
181        let repo = arbitrary::gen(1);
182        let refs_specific: Vec<RefsAt> = arbitrary::vec(2);
183        let refs_all = vec![];
184        let timeout = Duration::from_secs(30);
185
186        // fetch specific refs (Subscriber 1)
187        let initiated1 = service.fetch(
188            command::Fetch {
189                from: node,
190                rid: repo,
191                refs: refs_specific.clone().into(),
192                timeout,
193            },
194            Some(1),
195        );
196
197        assert!(matches!(initiated1.event, event::Fetch::Started { .. }));
198
199        // fetch all refs (Subscriber 2)
200        let initiated2 = service.fetch(
201            command::Fetch {
202                from: node,
203                rid: repo,
204                refs: refs_all.clone().into(),
205                timeout,
206            },
207            Some(2),
208        );
209
210        // should be queued because refs differ
211
212        assert!(matches!(initiated2.event, event::Fetch::Queued { .. }));
213
214        // complete the specific refs fetch
215
216        let completed = service.fetched(command::Fetched {
217            from: node,
218            rid: repo,
219        });
220
221        match completed.event {
222            event::Fetched::Completed { ref refs, .. } => {
223                assert_eq!(refs, &refs_specific.into());
224            }
225            _ => panic!("Expected Completed event"),
226        }
227
228        // only Subscriber 1 should be notified
229        assert_eq!(completed.subscribers, vec![1]);
230
231        // subscriber 2 should still be waiting
232        assert!(service.subscribers.contains_key(&FetchKey::new(
233            repo,
234            node,
235            refs_all.clone().into()
236        )));
237
238        let remaining = &service.subscribers[&FetchKey::new(repo, node, refs_all.into())];
239        assert_eq!(remaining.len(), 1);
240        assert_eq!(remaining[0], 2);
241    }
242}