kitsune_p2p_fetch/
respond.rs

1use kitsune_p2p_types::{KOpData, KSpace};
2use std::sync::Arc;
3
4/// Drop this when response sending is complete.
5pub struct FetchResponseGuard(#[allow(dead_code)] tokio::sync::oneshot::Sender<()>);
6
7#[cfg(any(test, feature = "test_utils"))]
8impl FetchResponseGuard {
9    /// Create a new FetchResponseGuard for testing.
10    pub fn new(inner: tokio::sync::oneshot::Sender<()>) -> Self {
11        Self(inner)
12    }
13}
14
15/// Customization by code making use of the FetchResponseQueue.
16pub trait FetchResponseConfig: 'static + Clone + Send + Sync {
17    /// Data that is forwarded.
18    type User: 'static + Send;
19
20    /// Byte count allowed to be outstanding.
21    /// Any ops requested to be enqueued over this amount
22    /// will be dropped without responding.
23    fn byte_limit(&self) -> u32 {
24        64 * 1024 * 1024
25    }
26
27    /// Number of concurrent sends to allow.
28    fn concurrent_send_limit(&self) -> u32 {
29        1
30    }
31
32    /// Send this fetch response.
33    fn respond(
34        &self,
35        space: KSpace,
36        user: Self::User,
37        completion_guard: FetchResponseGuard,
38        op: KOpData,
39    );
40}
41
42/// Manage responding to requests for data.
43#[derive(Clone)]
44pub struct FetchResponseQueue<C: FetchResponseConfig> {
45    byte_limit: Arc<tokio::sync::Semaphore>,
46    concurrent_send_limit: Arc<tokio::sync::Semaphore>,
47    config: Arc<C>,
48    /// For testing, track the number of bytes sent.
49    #[cfg(feature = "test_utils")]
50    pub bytes_sent: Arc<std::sync::atomic::AtomicUsize>,
51}
52
53impl<C: FetchResponseConfig> FetchResponseQueue<C> {
54    /// Construct a new response queue.
55    pub fn new(config: C) -> Self {
56        let byte_limit = Arc::new(tokio::sync::Semaphore::new(config.byte_limit() as usize));
57        let concurrent_send_limit = Arc::new(tokio::sync::Semaphore::new(
58            config.concurrent_send_limit() as usize,
59        ));
60        let config = Arc::new(config);
61        Self {
62            byte_limit,
63            concurrent_send_limit,
64            config,
65            #[cfg(feature = "test_utils")]
66            bytes_sent: Arc::new(std::sync::atomic::AtomicUsize::new(0)),
67        }
68    }
69
70    /// Enqueue an op to be sent to a remote.
71    pub fn enqueue_op(&self, space: KSpace, user: C::User, op: KOpData) -> bool {
72        let len = op.size();
73
74        // Don't try to take more permits than the byte_limit has.
75        if len > self.config.byte_limit() as usize {
76            tracing::error!(
77                "op size is over configured limit {}",
78                self.config.byte_limit()
79            );
80            return false;
81        }
82
83        let len = len as u32;
84
85        let byte_permit = match self.byte_limit.clone().try_acquire_many_owned(len) {
86            Err(_) => {
87                tracing::warn!(%len, "fetch responder overloaded, dropping op");
88                return false;
89            }
90            Ok(permit) => permit,
91        };
92
93        #[cfg(feature = "test_utils")]
94        self.bytes_sent
95            .fetch_add(len as usize, std::sync::atomic::Ordering::SeqCst);
96
97        let c_limit = self.concurrent_send_limit.clone();
98        let config = self.config.clone();
99        tokio::task::spawn(async move {
100            let _byte_permit = byte_permit;
101
102            let _c_permit = match c_limit.acquire_owned().await {
103                Err(_) => {
104                    tracing::error!("Unexpected closed semaphore for concurrent_send_limit");
105                    return;
106                }
107                Ok(permit) => permit,
108            };
109
110            let (s, r) = tokio::sync::oneshot::channel();
111
112            let guard = FetchResponseGuard(s);
113
114            config.respond(space, user, guard, op);
115
116            // we don't care about the response... in fact
117            // it's *always* an error, because we drop it.
118            let _ = r.await;
119        });
120
121        true
122    }
123}
124
125#[cfg(test)]
126mod tests {
127    use super::*;
128    use kitsune_p2p_types::bin_types::{KitsuneBinType, KitsuneSpace};
129    use std::sync::Mutex;
130
131    struct TestConfInner {
132        pub byte_limit: u32,
133        pub concurrent_send_limit: u32,
134        pub responds: Vec<(KSpace, &'static str, FetchResponseGuard, KOpData)>,
135    }
136
137    #[derive(Clone)]
138    struct TestConf(Arc<Mutex<TestConfInner>>);
139
140    impl TestConf {
141        pub fn new(byte_limit: u32, concurrent_send_limit: u32) -> Self {
142            Self(Arc::new(Mutex::new(TestConfInner {
143                byte_limit,
144                concurrent_send_limit,
145                responds: Vec::new(),
146            })))
147        }
148
149        pub fn drain_responds(&self) -> Vec<(KSpace, &'static str, FetchResponseGuard, KOpData)> {
150            std::mem::take(&mut self.0.lock().unwrap().responds)
151        }
152    }
153
154    impl FetchResponseConfig for TestConf {
155        type User = &'static str;
156
157        fn byte_limit(&self) -> u32 {
158            self.0.lock().unwrap().byte_limit
159        }
160
161        fn concurrent_send_limit(&self) -> u32 {
162            self.0.lock().unwrap().concurrent_send_limit
163        }
164
165        fn respond(&self, space: KSpace, user: Self::User, g: FetchResponseGuard, op: KOpData) {
166            self.0.lock().unwrap().responds.push((space, user, g, op));
167        }
168    }
169
170    #[test]
171    fn config_provides_defaults() {
172        #[derive(Clone)]
173        struct DefaultConf;
174        impl FetchResponseConfig for DefaultConf {
175            type User = ();
176
177            fn respond(
178                &self,
179                _space: KSpace,
180                _user: Self::User,
181                _completion_guard: FetchResponseGuard,
182                _op: KOpData,
183            ) {
184                unreachable!()
185            }
186        }
187
188        let config = DefaultConf;
189        assert!(config.byte_limit() > 0);
190        assert!(config.concurrent_send_limit() > 0);
191    }
192
193    #[test]
194    fn queue_uses_input_config() {
195        let config = TestConf::new(1024, 1);
196        let queue = FetchResponseQueue::new(config.clone());
197
198        // Check that the queue config is based on the input config.
199        assert_eq!(
200            config.byte_limit(),
201            queue.byte_limit.available_permits() as u32
202        );
203        assert_eq!(
204            config.concurrent_send_limit(),
205            queue.concurrent_send_limit.available_permits() as u32
206        );
207
208        // Check that updating the input config DOES NOT update the queue config.
209        // TODO They may as well be properties rather than functions
210        config.0.lock().unwrap().byte_limit = 1;
211        config.0.lock().unwrap().concurrent_send_limit = 2;
212
213        assert_ne!(
214            config.byte_limit(),
215            queue.byte_limit.available_permits() as u32
216        );
217        assert_ne!(
218            config.concurrent_send_limit(),
219            queue.concurrent_send_limit.available_permits() as u32
220        );
221    }
222
223    #[tokio::test(flavor = "multi_thread")]
224    async fn enqueue_op_single() {
225        let config = TestConf::new(1024, 1);
226
227        let q = FetchResponseQueue::new(config.clone());
228        assert_eq!(0, config.drain_responds().len());
229
230        assert!(q.enqueue_op(
231            Arc::new(KitsuneSpace::new(vec![0; 36])),
232            "noodle",
233            Arc::new(b"hello".to_vec().into()),
234        ));
235
236        tokio::time::sleep(std::time::Duration::from_millis(10)).await;
237
238        assert_eq!(1, config.drain_responds().len());
239    }
240
241    #[tokio::test(flavor = "multi_thread")]
242    async fn enqueue_op_drops_large_op() {
243        let config = TestConf::new(1024, 1);
244        let q = FetchResponseQueue::new(config.clone());
245
246        assert!(!q.enqueue_op(
247            Arc::new(KitsuneSpace::new(vec![0; 36])),
248            "lots-of-bytes",
249            Arc::new([0; 1040].to_vec().into()),
250        ));
251    }
252
253    #[tokio::test(flavor = "multi_thread")]
254    async fn enqueue_op_with_insufficient_capacity_remaining() {
255        let config = TestConf::new(1024, 1);
256        let q = FetchResponseQueue::new(config.clone());
257
258        assert!(q.enqueue_op(
259            Arc::new(KitsuneSpace::new(vec![0; 36])),
260            "lots-of-bytes",
261            Arc::new([0; 1000].to_vec().into()),
262        ));
263
264        assert!(!q.enqueue_op(
265            Arc::new(KitsuneSpace::new(vec![0; 36])),
266            "lots-of-bytes",
267            Arc::new([0; 100].to_vec().into()),
268        ));
269    }
270
271    // TODO This situation is never communicated back to the caller because `enqueue_op` is effectively fire and forget
272    //      but it is actually a fatal condition.
273    #[tokio::test(flavor = "multi_thread")]
274    async fn handles_closed_semaphore() {
275        let config = TestConf::new(1024, 1);
276        let q = FetchResponseQueue::new(config.clone());
277
278        assert!(q.enqueue_op(
279            Arc::new(KitsuneSpace::new(vec![0; 36])),
280            "lots-of-bytes",
281            Arc::new([0; 100].to_vec().into()),
282        ));
283
284        // Give the op time to queue
285        tokio::time::sleep(std::time::Duration::from_millis(10)).await;
286
287        q.concurrent_send_limit.close();
288
289        // The semaphore is closed but we only find that out inside the inner task so the enqueue should succeed.
290        assert!(q.enqueue_op(
291            Arc::new(KitsuneSpace::new(vec![0; 36])),
292            "lots-of-bytes",
293            Arc::new([0; 100].to_vec().into()),
294        ));
295
296        tokio::time::sleep(std::time::Duration::from_millis(10)).await;
297
298        // But there will only be one op in the queue
299        assert_eq!(1, config.drain_responds().len());
300    }
301}