kitsune_p2p_fetch/
respond.rs1use kitsune_p2p_types::{KOpData, KSpace};
2use std::sync::Arc;
3
4pub struct FetchResponseGuard(#[allow(dead_code)] tokio::sync::oneshot::Sender<()>);
6
7#[cfg(any(test, feature = "test_utils"))]
8impl FetchResponseGuard {
9 pub fn new(inner: tokio::sync::oneshot::Sender<()>) -> Self {
11 Self(inner)
12 }
13}
14
15pub trait FetchResponseConfig: 'static + Clone + Send + Sync {
17 type User: 'static + Send;
19
20 fn byte_limit(&self) -> u32 {
24 64 * 1024 * 1024
25 }
26
27 fn concurrent_send_limit(&self) -> u32 {
29 1
30 }
31
32 fn respond(
34 &self,
35 space: KSpace,
36 user: Self::User,
37 completion_guard: FetchResponseGuard,
38 op: KOpData,
39 );
40}
41
42#[derive(Clone)]
44pub struct FetchResponseQueue<C: FetchResponseConfig> {
45 byte_limit: Arc<tokio::sync::Semaphore>,
46 concurrent_send_limit: Arc<tokio::sync::Semaphore>,
47 config: Arc<C>,
48 #[cfg(feature = "test_utils")]
50 pub bytes_sent: Arc<std::sync::atomic::AtomicUsize>,
51}
52
53impl<C: FetchResponseConfig> FetchResponseQueue<C> {
54 pub fn new(config: C) -> Self {
56 let byte_limit = Arc::new(tokio::sync::Semaphore::new(config.byte_limit() as usize));
57 let concurrent_send_limit = Arc::new(tokio::sync::Semaphore::new(
58 config.concurrent_send_limit() as usize,
59 ));
60 let config = Arc::new(config);
61 Self {
62 byte_limit,
63 concurrent_send_limit,
64 config,
65 #[cfg(feature = "test_utils")]
66 bytes_sent: Arc::new(std::sync::atomic::AtomicUsize::new(0)),
67 }
68 }
69
70 pub fn enqueue_op(&self, space: KSpace, user: C::User, op: KOpData) -> bool {
72 let len = op.size();
73
74 if len > self.config.byte_limit() as usize {
76 tracing::error!(
77 "op size is over configured limit {}",
78 self.config.byte_limit()
79 );
80 return false;
81 }
82
83 let len = len as u32;
84
85 let byte_permit = match self.byte_limit.clone().try_acquire_many_owned(len) {
86 Err(_) => {
87 tracing::warn!(%len, "fetch responder overloaded, dropping op");
88 return false;
89 }
90 Ok(permit) => permit,
91 };
92
93 #[cfg(feature = "test_utils")]
94 self.bytes_sent
95 .fetch_add(len as usize, std::sync::atomic::Ordering::SeqCst);
96
97 let c_limit = self.concurrent_send_limit.clone();
98 let config = self.config.clone();
99 tokio::task::spawn(async move {
100 let _byte_permit = byte_permit;
101
102 let _c_permit = match c_limit.acquire_owned().await {
103 Err(_) => {
104 tracing::error!("Unexpected closed semaphore for concurrent_send_limit");
105 return;
106 }
107 Ok(permit) => permit,
108 };
109
110 let (s, r) = tokio::sync::oneshot::channel();
111
112 let guard = FetchResponseGuard(s);
113
114 config.respond(space, user, guard, op);
115
116 let _ = r.await;
119 });
120
121 true
122 }
123}
124
125#[cfg(test)]
126mod tests {
127 use super::*;
128 use kitsune_p2p_types::bin_types::{KitsuneBinType, KitsuneSpace};
129 use std::sync::Mutex;
130
131 struct TestConfInner {
132 pub byte_limit: u32,
133 pub concurrent_send_limit: u32,
134 pub responds: Vec<(KSpace, &'static str, FetchResponseGuard, KOpData)>,
135 }
136
137 #[derive(Clone)]
138 struct TestConf(Arc<Mutex<TestConfInner>>);
139
140 impl TestConf {
141 pub fn new(byte_limit: u32, concurrent_send_limit: u32) -> Self {
142 Self(Arc::new(Mutex::new(TestConfInner {
143 byte_limit,
144 concurrent_send_limit,
145 responds: Vec::new(),
146 })))
147 }
148
149 pub fn drain_responds(&self) -> Vec<(KSpace, &'static str, FetchResponseGuard, KOpData)> {
150 std::mem::take(&mut self.0.lock().unwrap().responds)
151 }
152 }
153
154 impl FetchResponseConfig for TestConf {
155 type User = &'static str;
156
157 fn byte_limit(&self) -> u32 {
158 self.0.lock().unwrap().byte_limit
159 }
160
161 fn concurrent_send_limit(&self) -> u32 {
162 self.0.lock().unwrap().concurrent_send_limit
163 }
164
165 fn respond(&self, space: KSpace, user: Self::User, g: FetchResponseGuard, op: KOpData) {
166 self.0.lock().unwrap().responds.push((space, user, g, op));
167 }
168 }
169
170 #[test]
171 fn config_provides_defaults() {
172 #[derive(Clone)]
173 struct DefaultConf;
174 impl FetchResponseConfig for DefaultConf {
175 type User = ();
176
177 fn respond(
178 &self,
179 _space: KSpace,
180 _user: Self::User,
181 _completion_guard: FetchResponseGuard,
182 _op: KOpData,
183 ) {
184 unreachable!()
185 }
186 }
187
188 let config = DefaultConf;
189 assert!(config.byte_limit() > 0);
190 assert!(config.concurrent_send_limit() > 0);
191 }
192
193 #[test]
194 fn queue_uses_input_config() {
195 let config = TestConf::new(1024, 1);
196 let queue = FetchResponseQueue::new(config.clone());
197
198 assert_eq!(
200 config.byte_limit(),
201 queue.byte_limit.available_permits() as u32
202 );
203 assert_eq!(
204 config.concurrent_send_limit(),
205 queue.concurrent_send_limit.available_permits() as u32
206 );
207
208 config.0.lock().unwrap().byte_limit = 1;
211 config.0.lock().unwrap().concurrent_send_limit = 2;
212
213 assert_ne!(
214 config.byte_limit(),
215 queue.byte_limit.available_permits() as u32
216 );
217 assert_ne!(
218 config.concurrent_send_limit(),
219 queue.concurrent_send_limit.available_permits() as u32
220 );
221 }
222
223 #[tokio::test(flavor = "multi_thread")]
224 async fn enqueue_op_single() {
225 let config = TestConf::new(1024, 1);
226
227 let q = FetchResponseQueue::new(config.clone());
228 assert_eq!(0, config.drain_responds().len());
229
230 assert!(q.enqueue_op(
231 Arc::new(KitsuneSpace::new(vec![0; 36])),
232 "noodle",
233 Arc::new(b"hello".to_vec().into()),
234 ));
235
236 tokio::time::sleep(std::time::Duration::from_millis(10)).await;
237
238 assert_eq!(1, config.drain_responds().len());
239 }
240
241 #[tokio::test(flavor = "multi_thread")]
242 async fn enqueue_op_drops_large_op() {
243 let config = TestConf::new(1024, 1);
244 let q = FetchResponseQueue::new(config.clone());
245
246 assert!(!q.enqueue_op(
247 Arc::new(KitsuneSpace::new(vec![0; 36])),
248 "lots-of-bytes",
249 Arc::new([0; 1040].to_vec().into()),
250 ));
251 }
252
253 #[tokio::test(flavor = "multi_thread")]
254 async fn enqueue_op_with_insufficient_capacity_remaining() {
255 let config = TestConf::new(1024, 1);
256 let q = FetchResponseQueue::new(config.clone());
257
258 assert!(q.enqueue_op(
259 Arc::new(KitsuneSpace::new(vec![0; 36])),
260 "lots-of-bytes",
261 Arc::new([0; 1000].to_vec().into()),
262 ));
263
264 assert!(!q.enqueue_op(
265 Arc::new(KitsuneSpace::new(vec![0; 36])),
266 "lots-of-bytes",
267 Arc::new([0; 100].to_vec().into()),
268 ));
269 }
270
271 #[tokio::test(flavor = "multi_thread")]
274 async fn handles_closed_semaphore() {
275 let config = TestConf::new(1024, 1);
276 let q = FetchResponseQueue::new(config.clone());
277
278 assert!(q.enqueue_op(
279 Arc::new(KitsuneSpace::new(vec![0; 36])),
280 "lots-of-bytes",
281 Arc::new([0; 100].to_vec().into()),
282 ));
283
284 tokio::time::sleep(std::time::Duration::from_millis(10)).await;
286
287 q.concurrent_send_limit.close();
288
289 assert!(q.enqueue_op(
291 Arc::new(KitsuneSpace::new(vec![0; 36])),
292 "lots-of-bytes",
293 Arc::new([0; 100].to_vec().into()),
294 ));
295
296 tokio::time::sleep(std::time::Duration::from_millis(10)).await;
297
298 assert_eq!(1, config.drain_responds().len());
300 }
301}