1use std::{
36 collections::{HashMap, hash_map::Entry},
37 ops::Range,
38};
39
40use crate::{
41 client::{Error, RawResponseOwned, SubscriptionReceiver, SubscriptionSender},
42 error::RegisterMethodError,
43};
44use jsonrpsee_types::{Id, InvalidRequestId, SubscriptionId};
45use rustc_hash::FxHashMap;
46use tokio::sync::oneshot;
47
48#[derive(Debug)]
49enum Kind {
50 PendingMethodCall(PendingCallOneshot),
51 PendingSubscription((RequestId, PendingSubscriptionOneshot, UnsubscribeMethod)),
52 Subscription((RequestId, SubscriptionSink, UnsubscribeMethod)),
53}
54
55#[derive(Debug, Clone)]
56pub(crate) enum RequestStatus {
58 PendingMethodCall,
60 PendingSubscription,
62 Subscription,
64 Invalid,
66}
67
68type PendingCallOneshot = Option<oneshot::Sender<Result<RawResponseOwned, InvalidRequestId>>>;
69type PendingBatchOneshot = oneshot::Sender<Result<Vec<RawResponseOwned>, InvalidRequestId>>;
70type PendingSubscriptionOneshot = oneshot::Sender<Result<(SubscriptionReceiver, SubscriptionId<'static>), Error>>;
71type SubscriptionSink = SubscriptionSender;
72type UnsubscribeMethod = String;
73type RequestId = Id<'static>;
74
75#[derive(Debug)]
76pub(crate) struct BatchState {
78 pub(crate) send_back: PendingBatchOneshot,
80}
81
82#[derive(Debug, Default)]
83pub(crate) struct RequestManager {
85 requests: FxHashMap<RequestId, Kind>,
89 subscriptions: HashMap<SubscriptionId<'static>, RequestId>,
92 batches: FxHashMap<Range<u64>, BatchState>,
94 notification_handlers: HashMap<String, SubscriptionSink>,
96}
97
98impl RequestManager {
99 #[allow(unused)]
101 pub(crate) fn new() -> Self {
102 Self::default()
103 }
104
105 pub(crate) fn insert_pending_call(
109 &mut self,
110 id: RequestId,
111 send_back: PendingCallOneshot,
112 ) -> Result<(), PendingCallOneshot> {
113 if let Entry::Vacant(v) = self.requests.entry(id) {
114 v.insert(Kind::PendingMethodCall(send_back));
115 Ok(())
116 } else {
117 Err(send_back)
118 }
119 }
120
121 pub(crate) fn insert_pending_batch(
125 &mut self,
126 batch: Range<u64>,
127 send_back: PendingBatchOneshot,
128 ) -> Result<(), PendingBatchOneshot> {
129 if let Entry::Vacant(v) = self.batches.entry(batch) {
130 v.insert(BatchState { send_back });
131 Ok(())
132 } else {
133 Err(send_back)
134 }
135 }
136
137 pub(crate) fn insert_pending_subscription(
141 &mut self,
142 sub_req_id: RequestId,
143 unsub_req_id: RequestId,
144 send_back: PendingSubscriptionOneshot,
145 unsubscribe_method: UnsubscribeMethod,
146 ) -> Result<(), PendingSubscriptionOneshot> {
147 if !self.requests.contains_key(&sub_req_id)
149 && !self.requests.contains_key(&unsub_req_id)
150 && sub_req_id != unsub_req_id
151 {
152 self.requests
153 .insert(sub_req_id, Kind::PendingSubscription((unsub_req_id.clone(), send_back, unsubscribe_method)));
154 self.requests.insert(unsub_req_id, Kind::PendingMethodCall(None));
155 Ok(())
156 } else {
157 Err(send_back)
158 }
159 }
160
161 pub(crate) fn insert_subscription(
165 &mut self,
166 sub_req_id: RequestId,
167 unsub_req_id: RequestId,
168 subscription_id: SubscriptionId<'static>,
169 send_back: SubscriptionSink,
170 unsubscribe_method: UnsubscribeMethod,
171 ) -> Result<(), SubscriptionSink> {
172 if let (Entry::Vacant(request), Entry::Vacant(subscription)) =
173 (self.requests.entry(sub_req_id.clone()), self.subscriptions.entry(subscription_id))
174 {
175 request.insert(Kind::Subscription((unsub_req_id, send_back, unsubscribe_method)));
176 subscription.insert(sub_req_id);
177 Ok(())
178 } else {
179 Err(send_back)
180 }
181 }
182
183 pub(crate) fn insert_notification_handler(
185 &mut self,
186 method: &str,
187 send_back: SubscriptionSink,
188 ) -> Result<(), RegisterMethodError> {
189 if let Entry::Vacant(handle) = self.notification_handlers.entry(method.to_owned()) {
190 handle.insert(send_back);
191 Ok(())
192 } else {
193 Err(RegisterMethodError::AlreadyRegistered(method.to_owned()))
194 }
195 }
196
197 pub(crate) fn remove_notification_handler(&mut self, method: &str) -> Option<SubscriptionSink> {
199 self.notification_handlers.remove(method)
200 }
201
202 pub(crate) fn complete_pending_subscription(
206 &mut self,
207 request_id: RequestId,
208 ) -> Option<(RequestId, PendingSubscriptionOneshot, UnsubscribeMethod)> {
209 match self.requests.entry(request_id) {
210 Entry::Occupied(request) if matches!(request.get(), Kind::PendingSubscription(_)) => {
211 let (_req_id, kind) = request.remove_entry();
212 if let Kind::PendingSubscription(send_back) = kind {
213 Some(send_back)
214 } else {
215 unreachable!("Pending subscription is Pending subscription checked above; qed");
216 }
217 }
218 _ => None,
219 }
220 }
221
222 pub(crate) fn complete_pending_batch(&mut self, batch: Range<u64>) -> Option<BatchState> {
226 match self.batches.entry(batch) {
227 Entry::Occupied(request) => {
228 let (_digest, state) = request.remove_entry();
229 Some(state)
230 }
231 _ => None,
232 }
233 }
234
235 pub(crate) fn complete_pending_call(&mut self, request_id: RequestId) -> Option<PendingCallOneshot> {
239 match self.requests.entry(request_id) {
240 Entry::Occupied(request) if matches!(request.get(), Kind::PendingMethodCall(_)) => {
241 let (_req_id, kind) = request.remove_entry();
242 if let Kind::PendingMethodCall(send_back) = kind {
243 Some(send_back)
244 } else {
245 unreachable!("Pending call is Pending call checked above; qed");
246 }
247 }
248 _ => None,
249 }
250 }
251
252 pub(crate) fn remove_subscription(
256 &mut self,
257 request_id: RequestId,
258 subscription_id: SubscriptionId<'static>,
259 ) -> Option<(RequestId, SubscriptionSink, UnsubscribeMethod, SubscriptionId<'_>)> {
260 match (self.requests.entry(request_id), self.subscriptions.entry(subscription_id)) {
261 (Entry::Occupied(request), Entry::Occupied(subscription))
262 if matches!(request.get(), Kind::Subscription(_)) =>
263 {
264 let (_req_id, kind) = request.remove_entry();
266 let (sub_id, _req_id) = subscription.remove_entry();
267 if let Kind::Subscription((unsub_req_id, send_back, unsub)) = kind {
268 Some((unsub_req_id, send_back, unsub, sub_id))
269 } else {
270 unreachable!("Subscription is Subscription checked above; qed");
271 }
272 }
273 _ => None,
274 }
275 }
276
277 pub(crate) fn unsubscribe(
282 &mut self,
283 request_id: RequestId,
284 subscription_id: SubscriptionId<'static>,
285 ) -> Option<(RequestId, SubscriptionSink, UnsubscribeMethod, SubscriptionId<'_>)> {
286 match (self.requests.entry(request_id), self.subscriptions.entry(subscription_id)) {
287 (Entry::Occupied(mut request), Entry::Occupied(subscription))
288 if matches!(request.get(), Kind::Subscription(_)) =>
289 {
290 let kind = std::mem::replace(request.get_mut(), Kind::PendingMethodCall(None));
293 let (sub_id, _req_id) = subscription.remove_entry();
294 if let Kind::Subscription((unsub_req_id, send_back, unsub)) = kind {
295 Some((unsub_req_id, send_back, unsub, sub_id))
296 } else {
297 unreachable!("Subscription is Subscription checked above; qed");
298 }
299 }
300 _ => None,
301 }
302 }
303
304 pub(crate) fn request_status(&mut self, id: &RequestId) -> RequestStatus {
306 self.requests.get(id).map_or(RequestStatus::Invalid, |kind| match kind {
307 Kind::PendingMethodCall(_) => RequestStatus::PendingMethodCall,
308 Kind::PendingSubscription(_) => RequestStatus::PendingSubscription,
309 Kind::Subscription(_) => RequestStatus::Subscription,
310 })
311 }
312
313 pub(crate) fn as_subscription_mut(&mut self, request_id: &RequestId) -> Option<&mut SubscriptionSink> {
317 if let Some(Kind::Subscription((_, sink, _))) = self.requests.get_mut(request_id) { Some(sink) } else { None }
318 }
319
320 pub(crate) fn as_notification_handler_mut(&mut self, method: String) -> Option<&mut SubscriptionSink> {
324 self.notification_handlers.get_mut(&method)
325 }
326
327 pub(crate) fn get_request_id_by_subscription_id(&self, sub_id: &SubscriptionId) -> Option<RequestId> {
331 self.subscriptions.get(sub_id).map(|id| id.clone().into_owned())
332 }
333}
334
335#[cfg(test)]
336mod tests {
337 use crate::client::subscription_channel;
338
339 use super::RequestManager;
340 use jsonrpsee_types::{Id, SubscriptionId};
341 use tokio::sync::oneshot;
342
343 #[test]
344 fn insert_remove_pending_request_works() {
345 let (request_tx, _) = oneshot::channel();
346
347 let mut manager = RequestManager::new();
348 assert!(manager.insert_pending_call(Id::Number(0), Some(request_tx)).is_ok());
349 assert!(manager.complete_pending_call(Id::Number(0)).is_some());
350 }
351
352 #[test]
353 fn insert_remove_subscription_works() {
354 let (pending_sub_tx, _) = oneshot::channel();
355 let (sub_tx, _) = subscription_channel(1);
356 let mut manager = RequestManager::new();
357 assert!(
358 manager
359 .insert_pending_subscription(Id::Number(1), Id::Number(2), pending_sub_tx, "unsubscribe_method".into())
360 .is_ok()
361 );
362 let (unsub_req_id, _send_back_oneshot, unsubscribe_method) =
363 manager.complete_pending_subscription(Id::Number(1)).unwrap();
364 assert_eq!(unsub_req_id, Id::Number(2));
365 assert!(
366 manager
367 .insert_subscription(
368 Id::Number(1),
369 Id::Number(2),
370 SubscriptionId::Str("uniq_id_from_server".into()),
371 sub_tx,
372 unsubscribe_method
373 )
374 .is_ok()
375 );
376
377 assert!(manager.as_subscription_mut(&Id::Number(1)).is_some());
378 assert!(
379 manager.remove_subscription(Id::Number(1), SubscriptionId::Str("uniq_id_from_server".into())).is_some()
380 );
381 }
382
383 #[test]
384 fn insert_subscription_with_same_sub_and_unsub_id_should_err() {
385 let (tx1, _) = oneshot::channel();
386 let (tx2, _) = oneshot::channel();
387 let (tx3, _) = oneshot::channel();
388 let (tx4, _) = oneshot::channel();
389 let mut manager = RequestManager::new();
390 assert!(
391 manager
392 .insert_pending_subscription(Id::Str("1".into()), Id::Str("1".into()), tx1, "unsubscribe_method".into())
393 .is_err()
394 );
395 assert!(
396 manager
397 .insert_pending_subscription(Id::Str("0".into()), Id::Str("1".into()), tx2, "unsubscribe_method".into())
398 .is_ok()
399 );
400 assert!(
401 manager
402 .insert_pending_subscription(
403 Id::Str("99".into()),
404 Id::Str("0".into()),
405 tx3,
406 "unsubscribe_method".into()
407 )
408 .is_err(),
409 "unsub request ID already occupied"
410 );
411 assert!(
412 manager
413 .insert_pending_subscription(
414 Id::Str("99".into()),
415 Id::Str("1".into()),
416 tx4,
417 "unsubscribe_method".into()
418 )
419 .is_err(),
420 "sub request ID already occupied"
421 );
422 }
423
424 #[test]
425 fn pending_method_call_faulty() {
426 let (request_tx1, _) = oneshot::channel();
427 let (request_tx2, _) = oneshot::channel();
428 let (pending_sub_tx, _) = oneshot::channel();
429 let (sub_tx, _) = subscription_channel(1);
430
431 let mut manager = RequestManager::new();
432 assert!(manager.insert_pending_call(Id::Number(0), Some(request_tx1)).is_ok());
433 assert!(manager.insert_pending_call(Id::Number(0), Some(request_tx2)).is_err());
434 assert!(
435 manager
436 .insert_pending_subscription(Id::Number(0), Id::Number(1), pending_sub_tx, "beef".to_string())
437 .is_err()
438 );
439 assert!(
440 manager
441 .insert_subscription(
442 Id::Number(0),
443 Id::Number(99),
444 SubscriptionId::Num(137),
445 sub_tx,
446 "bibimbap".to_string()
447 )
448 .is_err()
449 );
450
451 assert!(manager.remove_subscription(Id::Number(0), SubscriptionId::Num(137)).is_none());
452 assert!(manager.complete_pending_subscription(Id::Number(0)).is_none());
453 assert!(manager.complete_pending_call(Id::Number(0)).is_some());
454 }
455
456 #[test]
457 fn pending_subscription_faulty() {
458 let (request_tx, _) = oneshot::channel();
459 let (pending_sub_tx1, _) = oneshot::channel();
460 let (pending_sub_tx2, _) = oneshot::channel();
461 let (sub_tx, _) = subscription_channel(1);
462
463 let mut manager = RequestManager::new();
464 assert!(
465 manager
466 .insert_pending_subscription(Id::Number(99), Id::Number(100), pending_sub_tx1, "beef".to_string())
467 .is_ok()
468 );
469 assert!(manager.insert_pending_call(Id::Number(99), Some(request_tx)).is_err());
470 assert!(
471 manager
472 .insert_pending_subscription(Id::Number(99), Id::Number(1337), pending_sub_tx2, "vegan".to_string())
473 .is_err()
474 );
475
476 assert!(
477 manager
478 .insert_subscription(
479 Id::Number(99),
480 Id::Number(100),
481 SubscriptionId::Num(0),
482 sub_tx,
483 "bibimbap".to_string()
484 )
485 .is_err()
486 );
487
488 assert!(manager.remove_subscription(Id::Number(99), SubscriptionId::Num(0)).is_none());
489 assert!(manager.complete_pending_call(Id::Number(99)).is_none());
490 assert!(manager.complete_pending_subscription(Id::Number(99)).is_some());
491 }
492
493 #[test]
494 fn active_subscriptions_faulty() {
495 let (request_tx, _) = oneshot::channel();
496 let (pending_sub_tx, _) = oneshot::channel();
497 let (sub_tx1, _) = subscription_channel(1);
498 let (sub_tx2, _) = subscription_channel(1);
499
500 let mut manager = RequestManager::new();
501
502 assert!(
503 manager
504 .insert_subscription(
505 Id::Number(3),
506 Id::Number(4),
507 SubscriptionId::Num(0),
508 sub_tx1,
509 "bibimbap".to_string()
510 )
511 .is_ok()
512 );
513 assert!(
514 manager
515 .insert_subscription(
516 Id::Number(3),
517 Id::Number(4),
518 SubscriptionId::Num(1),
519 sub_tx2,
520 "bibimbap".to_string()
521 )
522 .is_err()
523 );
524 assert!(
525 manager
526 .insert_pending_subscription(Id::Number(3), Id::Number(4), pending_sub_tx, "beef".to_string())
527 .is_err()
528 );
529 assert!(manager.insert_pending_call(Id::Number(3), Some(request_tx)).is_err());
530
531 assert!(manager.remove_subscription(Id::Number(3), SubscriptionId::Num(7)).is_none());
532 assert!(manager.complete_pending_call(Id::Number(3)).is_none());
533 assert!(manager.complete_pending_subscription(Id::Number(3)).is_none());
534 assert!(manager.remove_subscription(Id::Number(3), SubscriptionId::Num(1)).is_none());
535 assert!(manager.remove_subscription(Id::Number(3), SubscriptionId::Num(0)).is_some());
536
537 assert!(manager.requests.is_empty());
538 assert!(manager.subscriptions.is_empty());
539 }
540}