radicle_protocol/fetcher/
service.rs1use std::collections::HashMap;
2
3use radicle_core::{NodeId, RepoId};
4
5use crate::fetcher::{
6 RefsToFetch,
7 state::{
8 Config, FetcherState, QueuedFetch,
9 command::{self},
10 event,
11 },
12};
13
14#[derive(Debug)]
22pub struct FetcherService<S> {
23 state: FetcherState,
24 subscribers: HashMap<FetchKey, Vec<S>>,
25}
26
27impl<S> FetcherService<S> {
28 pub fn new(config: Config) -> Self {
30 Self {
31 state: FetcherState::new(config),
32 subscribers: HashMap::new(),
33 }
34 }
35
36 pub fn state(&self) -> &FetcherState {
38 &self.state
39 }
40}
41
42#[derive(Clone, Debug, PartialEq, Eq, Hash)]
44struct FetchKey {
45 rid: RepoId,
46 node: NodeId,
47 refs: RefsToFetch,
48}
49
50impl FetchKey {
51 fn new(rid: RepoId, node: NodeId, refs: RefsToFetch) -> Self {
52 Self { rid, node, refs }
53 }
54}
55
56#[must_use]
58#[derive(Debug)]
59pub struct FetchInitiated<S> {
60 pub event: event::Fetch,
62 pub rejected: Option<S>,
64}
65
66#[must_use]
68#[derive(Debug)]
69pub struct FetchCompleted<S> {
70 pub event: event::Fetched,
72 pub subscribers: Vec<S>,
74}
75
76#[must_use]
78#[derive(Debug)]
79pub struct FetchesCancelled<S> {
80 pub event: event::Cancel,
82 pub orphaned: Vec<(RepoId, S)>,
84}
85
86impl<S> FetcherService<S> {
87 pub fn fetch(&mut self, cmd: command::Fetch, subscriber: Option<S>) -> FetchInitiated<S> {
97 let key = FetchKey::new(cmd.rid, cmd.from, cmd.refs.clone());
98 let event = self.state.fetch(cmd);
99
100 let rejected = match &event {
101 event::Fetch::QueueAtCapacity { .. } => subscriber,
102 _ => {
103 if let Some(r) = subscriber {
104 self.subscribers.entry(key).or_default().push(r);
105 }
106 None
107 }
108 };
109
110 FetchInitiated { event, rejected }
111 }
112
113 pub fn fetched(&mut self, cmd: command::Fetched) -> FetchCompleted<S> {
117 let event = self.state.fetched(cmd);
118 match event {
119 e @ event::Fetched::NotFound { .. } => FetchCompleted {
121 event: e,
122 subscribers: vec![],
123 },
124 ref e @ event::Fetched::Completed { ref refs, .. } => {
125 let key = FetchKey::new(cmd.rid, cmd.from, refs.clone());
126 let subscribers = self.subscribers.remove(&key).unwrap_or_default();
127 FetchCompleted {
128 event: e.clone(),
129 subscribers,
130 }
131 }
132 }
133 }
134
135 pub fn cancel(&mut self, cmd: command::Cancel) -> FetchesCancelled<S> {
140 let from = cmd.from;
141 let event = self.state.cancel(cmd);
142
143 let mut orphaned = Vec::new();
144 self.subscribers.retain(|key, subscribers| {
145 if key.node == from {
146 orphaned.extend(subscribers.drain(..).map(|r| (key.rid, r)));
147 false
148 } else {
149 true
150 }
151 });
152
153 FetchesCancelled { event, orphaned }
154 }
155
156 pub fn dequeue(&mut self, from: &NodeId) -> Option<QueuedFetch> {
160 self.state.dequeue(from)
161 }
162}
163
164#[cfg(test)]
165mod tests {
166 use radicle::storage::refs::RefsAt;
167 use radicle::test::arbitrary;
168 use std::num::NonZeroUsize;
169
170 use super::*;
171
172 use crate::fetcher::{FetchConfig, MaxQueueSize};
173
174 #[test]
175 fn test_fetch_coalescing_different_refs() {
176 let config = Config::new()
177 .with_max_concurrency(NonZeroUsize::new(1).unwrap())
178 .with_max_capacity(MaxQueueSize::new(NonZeroUsize::new(10).unwrap()));
179 let mut service = FetcherService::<usize>::new(config);
180 let node = arbitrary::r#gen(1);
181 let repo = arbitrary::r#gen(1);
182 let refs_specific: Vec<RefsAt> = arbitrary::vec(2);
183 let refs_all = vec![];
184 let config = FetchConfig::default();
185
186 let initiated1 = service.fetch(
188 command::Fetch {
189 from: node,
190 rid: repo,
191 refs: refs_specific.clone().into(),
192 config,
193 },
194 Some(1),
195 );
196
197 assert!(matches!(initiated1.event, event::Fetch::Started { .. }));
198
199 let initiated2 = service.fetch(
201 command::Fetch {
202 from: node,
203 rid: repo,
204 refs: refs_all.clone().into(),
205 config,
206 },
207 Some(2),
208 );
209
210 assert!(matches!(initiated2.event, event::Fetch::Queued { .. }));
213
214 let completed = service.fetched(command::Fetched {
217 from: node,
218 rid: repo,
219 });
220
221 match completed.event {
222 event::Fetched::Completed { ref refs, .. } => {
223 assert_eq!(refs, &refs_specific.into());
224 }
225 _ => panic!("Expected Completed event"),
226 }
227
228 assert_eq!(completed.subscribers, vec![1]);
230
231 assert!(service.subscribers.contains_key(&FetchKey::new(
233 repo,
234 node,
235 refs_all.clone().into()
236 )));
237
238 let remaining = &service.subscribers[&FetchKey::new(repo, node, refs_all.into())];
239 assert_eq!(remaining.len(), 1);
240 assert_eq!(remaining[0], 2);
241 }
242}