use std::{collections::HashMap, sync::mpsc::SyncSender};
use tokio::sync::mpsc::{self, UnboundedReceiver, UnboundedSender};
use crate::core::supported_message::SupportedMessage;
pub(crate) struct MessageQueue {
inflight_requests: HashMap<u32, Option<SyncSender<SupportedMessage>>>,
responses: HashMap<u32, SupportedMessage>,
sender: Option<UnboundedSender<Message>>,
}
#[derive(Debug)]
pub enum Message {
Quit,
SupportedMessage(SupportedMessage),
}
impl MessageQueue {
pub fn new() -> MessageQueue {
MessageQueue {
inflight_requests: HashMap::new(),
responses: HashMap::new(),
sender: None,
}
}
pub(crate) fn clear(&mut self) {
self.inflight_requests.clear();
self.responses.clear();
}
pub(crate) fn make_request_channel(&mut self) -> UnboundedReceiver<Message> {
let (tx, rx) = mpsc::unbounded_channel();
self.sender = Some(tx.clone());
rx
}
pub(crate) fn request_was_processed(&mut self, request_handle: u32) {
debug!("Request {} was processed by the server", request_handle);
}
fn send_message(&self, message: Message) -> bool {
let sender = self.sender.as_ref().expect(
"MessageQueue::send_message should never be called before make_request_channel",
);
if sender.is_closed() {
error!("Send message will fail because sender has been closed");
false
} else if let Err(err) = sender.send(message) {
debug!("Cannot send message to message receiver, error {}", err);
false
} else {
true
}
}
pub(crate) fn add_request(
&mut self,
request: SupportedMessage,
sender: Option<SyncSender<SupportedMessage>>,
) {
let request_handle = request.request_handle();
trace!("Sending request {:?} to be sent", request);
self.inflight_requests.insert(request_handle, sender);
let _ = self.send_message(Message::SupportedMessage(request));
}
pub(crate) fn quit(&self) {
debug!("Sending a quit to the message receiver");
let _ = self.send_message(Message::Quit);
}
pub(crate) fn request_has_timed_out(&mut self, request_handle: u32) {
info!(
"Request {} has timed out and any response will be ignored",
request_handle
);
let _ = self.inflight_requests.remove(&request_handle);
}
pub(crate) fn store_response(&mut self, response: SupportedMessage) {
let request_handle = response.request_handle();
trace!("Received response {:?}", response);
debug!("Response to Request {} has been stored", request_handle);
if let Some(sender) = self.inflight_requests.remove(&request_handle) {
if let Some(sender) = sender {
if let Err(e) = sender.send(response) {
error!(
"Cannot send a response to a synchronous request {} because send failed, error = {}",
request_handle,
e
);
}
} else {
self.responses.insert(request_handle, response);
}
} else {
error!("A response with request handle {} doesn't belong to any request and will be ignored, inflight requests = {:?}, request = {:?}", request_handle, self.inflight_requests, response);
if let SupportedMessage::ServiceFault(response) = response {
error!(
"Unhandled response is a service fault, service result = {}",
response.response_header.service_result
)
}
}
}
pub(crate) fn async_responses(&mut self) -> Vec<SupportedMessage> {
let mut async_handles = self.responses.keys().copied().collect::<Vec<_>>();
async_handles.sort();
async_handles
.iter()
.map(|k| self.responses.remove(k).unwrap())
.collect()
}
}