use std::collections::hash_map::{Entry, HashMap};
use futures::channel::{mpsc, oneshot};
use jsonrpc_types::*;
use crate::error::WsClientError;
type PendingMethodCall = oneshot::Sender<Result<Output, WsClientError>>;
type PendingBatchMethodCall = oneshot::Sender<Result<Vec<Output>, WsClientError>>;
type PendingSubscription = oneshot::Sender<Result<(Id, mpsc::Receiver<SubscriptionNotification>), WsClientError>>;
type ActiveSubscription = mpsc::Sender<SubscriptionNotification>;
type PendingUnsubscribe = oneshot::Sender<Result<bool, WsClientError>>;
#[derive(Debug)]
enum RequestKind {
PendingMethodCall(PendingMethodCall),
PendingBatchMethodCall(PendingBatchMethodCall),
PendingSubscription(PendingSubscription),
ActiveSubscription(ActiveSubscription),
PendingUnsubscribe((Id, PendingUnsubscribe)),
}
pub enum RequestStatus {
PendingMethodCall,
PendingBatchMethodCall,
PendingSubscription,
ActiveSubscription,
PendingUnsubscribe,
Invalid,
}
#[derive(Debug)]
pub struct TaskManager {
requests: HashMap<u64, RequestKind>,
subscriptions: HashMap<Id, u64>,
pub(crate) max_capacity_per_subscription: usize,
}
impl TaskManager {
pub fn new(max_capacity_per_subscription: usize) -> Self {
Self {
requests: HashMap::new(),
subscriptions: HashMap::new(),
max_capacity_per_subscription,
}
}
pub fn insert_pending_method_call(
&mut self,
request_id: u64,
send_back: PendingMethodCall,
) -> Result<(), PendingMethodCall> {
match self.requests.entry(request_id) {
Entry::Vacant(request) => {
request.insert(RequestKind::PendingMethodCall(send_back));
Ok(())
}
Entry::Occupied(_) => Err(send_back),
}
}
pub fn complete_pending_method_call(&mut self, request_id: u64) -> Option<PendingMethodCall> {
match self.requests.entry(request_id) {
Entry::Occupied(request) if matches!(request.get(), RequestKind::PendingMethodCall(_)) => {
if let (_req_id, RequestKind::PendingMethodCall(send_back)) = request.remove_entry() {
Some(send_back)
} else {
unreachable!("Kind must be PendingMethodCall; qed");
}
}
_ => None,
}
}
pub fn insert_pending_batch_method_call(
&mut self,
min_request_id: u64,
send_back: PendingBatchMethodCall,
) -> Result<(), PendingBatchMethodCall> {
match self.requests.entry(min_request_id) {
Entry::Vacant(request) => {
request.insert(RequestKind::PendingBatchMethodCall(send_back));
Ok(())
}
Entry::Occupied(_) => Err(send_back),
}
}
pub fn complete_pending_batch_method_call(&mut self, min_request_id: u64) -> Option<PendingBatchMethodCall> {
match self.requests.entry(min_request_id) {
Entry::Occupied(request) if matches!(request.get(), RequestKind::PendingBatchMethodCall(_)) => {
if let (_min_req_id, RequestKind::PendingBatchMethodCall(send_back)) = request.remove_entry() {
Some(send_back)
} else {
unreachable!("Kind must be PendingMethodCall; qed");
}
}
_ => None,
}
}
pub fn insert_pending_subscription(
&mut self,
request_id: u64,
send_back: PendingSubscription,
) -> Result<(), PendingSubscription> {
match self.requests.entry(request_id) {
Entry::Vacant(request) => {
request.insert(RequestKind::PendingSubscription(send_back));
Ok(())
}
Entry::Occupied(_) => Err(send_back),
}
}
pub fn complete_pending_subscription(&mut self, request_id: u64) -> Option<PendingSubscription> {
match self.requests.entry(request_id) {
Entry::Occupied(request) if matches!(request.get(), RequestKind::PendingSubscription(_)) => {
if let (_id, RequestKind::PendingSubscription(send_back)) = request.remove_entry() {
Some(send_back)
} else {
unreachable!("Kind must be PendingSubscription; qed");
}
}
_ => None,
}
}
pub fn insert_active_subscription(
&mut self,
request_id: u64,
subscription_id: Id,
send_back: ActiveSubscription,
) -> Result<(), ActiveSubscription> {
match (
self.requests.entry(request_id),
self.subscriptions.entry(subscription_id),
) {
(Entry::Vacant(request), Entry::Vacant(subscription)) => {
request.insert(RequestKind::ActiveSubscription(send_back));
subscription.insert(request_id);
Ok(())
}
_ => Err(send_back),
}
}
pub fn remove_active_subscription(&mut self, request_id: u64, subscription_id: Id) -> Option<ActiveSubscription> {
match (
self.requests.entry(request_id),
self.subscriptions.entry(subscription_id),
) {
(Entry::Occupied(request), Entry::Occupied(subscription)) => {
let (_req_id, kind) = request.remove_entry();
let (_sub_id, _req_id) = subscription.remove_entry();
if let RequestKind::ActiveSubscription(send_back) = kind {
Some(send_back)
} else {
unreachable!("Kind must be ActiveSubscription; qed");
}
}
_ => None,
}
}
pub fn insert_pending_unsubscribe(
&mut self,
request_id: u64,
subscription_id: Id,
send_back: PendingUnsubscribe,
) -> Result<(), PendingUnsubscribe> {
match self.requests.entry(request_id) {
Entry::Vacant(request) => {
request.insert(RequestKind::PendingUnsubscribe((subscription_id, send_back)));
Ok(())
}
Entry::Occupied(_) => Err(send_back),
}
}
pub fn complete_pending_unsubscribe(&mut self, request_id: u64) -> Option<(Id, PendingUnsubscribe)> {
match self.requests.entry(request_id) {
Entry::Occupied(request) if matches!(request.get(), RequestKind::PendingUnsubscribe(_)) => {
if let (_req_id, RequestKind::PendingUnsubscribe(send_back)) = request.remove_entry() {
Some(send_back)
} else {
unreachable!("Kind must be PendingUnsubscribe; qed");
}
}
_ => None,
}
}
pub fn get_request_id_by(&self, subscription_id: &Id) -> Option<u64> {
self.subscriptions.get(subscription_id).copied()
}
pub fn request_status(&mut self, request_id: &u64) -> RequestStatus {
self.requests
.get(request_id)
.map_or(RequestStatus::Invalid, |kind| match kind {
RequestKind::PendingMethodCall(_) => RequestStatus::PendingMethodCall,
RequestKind::PendingBatchMethodCall(_) => RequestStatus::PendingBatchMethodCall,
RequestKind::PendingSubscription(_) => RequestStatus::PendingSubscription,
RequestKind::ActiveSubscription(_) => RequestStatus::ActiveSubscription,
RequestKind::PendingUnsubscribe(_) => RequestStatus::PendingUnsubscribe,
})
}
pub fn as_active_subscription_mut(&mut self, request_id: &u64) -> Option<&mut ActiveSubscription> {
let kind = self.requests.get_mut(request_id);
if let Some(RequestKind::ActiveSubscription(sink)) = kind {
Some(sink)
} else {
None
}
}
}