radicle_protocol/fetcher/
service.rs1use std::collections::HashMap;
2
3use radicle_core::{NodeId, RepoId};
4
5use crate::fetcher::{
6 state::{
7 command::{self},
8 event, Config, FetcherState, QueuedFetch,
9 },
10 RefsToFetch,
11};
12
13#[derive(Debug)]
21pub struct FetcherService<S> {
22 state: FetcherState,
23 subscribers: HashMap<FetchKey, Vec<S>>,
24}
25
26impl<S> FetcherService<S> {
27 pub fn new(config: Config) -> Self {
29 Self {
30 state: FetcherState::new(config),
31 subscribers: HashMap::new(),
32 }
33 }
34
35 pub fn state(&self) -> &FetcherState {
37 &self.state
38 }
39}
40
41#[derive(Clone, Debug, PartialEq, Eq, Hash)]
43struct FetchKey {
44 rid: RepoId,
45 node: NodeId,
46 refs: RefsToFetch,
47}
48
49impl FetchKey {
50 fn new(rid: RepoId, node: NodeId, refs: RefsToFetch) -> Self {
51 Self { rid, node, refs }
52 }
53}
54
55#[must_use]
57#[derive(Debug)]
58pub struct FetchInitiated<S> {
59 pub event: event::Fetch,
61 pub rejected: Option<S>,
63}
64
65#[must_use]
67#[derive(Debug)]
68pub struct FetchCompleted<S> {
69 pub event: event::Fetched,
71 pub subscribers: Vec<S>,
73}
74
75#[must_use]
77#[derive(Debug)]
78pub struct FetchesCancelled<S> {
79 pub event: event::Cancel,
81 pub orphaned: Vec<(RepoId, S)>,
83}
84
85impl<S> FetcherService<S> {
86 pub fn fetch(&mut self, cmd: command::Fetch, subscriber: Option<S>) -> FetchInitiated<S> {
96 let key = FetchKey::new(cmd.rid, cmd.from, cmd.refs.clone());
97 let event = self.state.fetch(cmd);
98
99 let rejected = match &event {
100 event::Fetch::QueueAtCapacity { .. } => subscriber,
101 _ => {
102 if let Some(r) = subscriber {
103 self.subscribers.entry(key).or_default().push(r);
104 }
105 None
106 }
107 };
108
109 FetchInitiated { event, rejected }
110 }
111
112 pub fn fetched(&mut self, cmd: command::Fetched) -> FetchCompleted<S> {
116 let event = self.state.fetched(cmd);
117 match event {
118 e @ event::Fetched::NotFound { .. } => FetchCompleted {
120 event: e,
121 subscribers: vec![],
122 },
123 ref e @ event::Fetched::Completed { ref refs, .. } => {
124 let key = FetchKey::new(cmd.rid, cmd.from, refs.clone());
125 let subscribers = self.subscribers.remove(&key).unwrap_or_default();
126 FetchCompleted {
127 event: e.clone(),
128 subscribers,
129 }
130 }
131 }
132 }
133
134 pub fn cancel(&mut self, cmd: command::Cancel) -> FetchesCancelled<S> {
139 let from = cmd.from;
140 let event = self.state.cancel(cmd);
141
142 let mut orphaned = Vec::new();
143 self.subscribers.retain(|key, subscribers| {
144 if key.node == from {
145 orphaned.extend(subscribers.drain(..).map(|r| (key.rid, r)));
146 false
147 } else {
148 true
149 }
150 });
151
152 FetchesCancelled { event, orphaned }
153 }
154
155 pub fn dequeue(&mut self, from: &NodeId) -> Option<QueuedFetch> {
159 self.state.dequeue(from)
160 }
161}
162
163#[cfg(test)]
164mod tests {
165 use radicle::storage::refs::RefsAt;
166 use radicle::test::arbitrary;
167 use std::num::NonZeroUsize;
168 use std::time::Duration;
169
170 use super::*;
171
172 use crate::fetcher::MaxQueueSize;
173
174 #[test]
175 fn test_fetch_coalescing_different_refs() {
176 let config = Config::new()
177 .with_max_concurrency(NonZeroUsize::new(1).unwrap())
178 .with_max_capacity(MaxQueueSize::new(NonZeroUsize::new(10).unwrap()));
179 let mut service = FetcherService::<usize>::new(config);
180 let node = arbitrary::gen(1);
181 let repo = arbitrary::gen(1);
182 let refs_specific: Vec<RefsAt> = arbitrary::vec(2);
183 let refs_all = vec![];
184 let timeout = Duration::from_secs(30);
185
186 let initiated1 = service.fetch(
188 command::Fetch {
189 from: node,
190 rid: repo,
191 refs: refs_specific.clone().into(),
192 timeout,
193 },
194 Some(1),
195 );
196
197 assert!(matches!(initiated1.event, event::Fetch::Started { .. }));
198
199 let initiated2 = service.fetch(
201 command::Fetch {
202 from: node,
203 rid: repo,
204 refs: refs_all.clone().into(),
205 timeout,
206 },
207 Some(2),
208 );
209
210 assert!(matches!(initiated2.event, event::Fetch::Queued { .. }));
213
214 let completed = service.fetched(command::Fetched {
217 from: node,
218 rid: repo,
219 });
220
221 match completed.event {
222 event::Fetched::Completed { ref refs, .. } => {
223 assert_eq!(refs, &refs_specific.into());
224 }
225 _ => panic!("Expected Completed event"),
226 }
227
228 assert_eq!(completed.subscribers, vec![1]);
230
231 assert!(service.subscribers.contains_key(&FetchKey::new(
233 repo,
234 node,
235 refs_all.clone().into()
236 )));
237
238 let remaining = &service.subscribers[&FetchKey::new(repo, node, refs_all.into())];
239 assert_eq!(remaining.len(), 1);
240 assert_eq!(remaining[0], 2);
241 }
242}