use crate::error::ZmqError;
use crate::message::{Msg, MsgFlags};
use crate::socket::patterns::fair_queue::{FairQueue, PushError};
#[allow(unused_imports)]
use crate::Blob;
use dashmap::DashMap;
use std::collections::VecDeque;
use std::time::Duration;
use tokio::sync::Mutex as TokioMutex;
use tokio::time::timeout as tokio_timeout;
#[derive(Debug)]
pub(crate) struct IncomingMessageOrchestrator<QItem: Send + 'static> {
socket_core_handle: usize,
main_incoming_queue: FairQueue<QItem>,
queue_push_timeout: Option<Duration>,
partial_pipe_messages: DashMap<usize, Vec<Msg>>,
current_recv_frames_buffer: TokioMutex<VecDeque<Msg>>,
}
impl<QItem: Send + 'static> IncomingMessageOrchestrator<QItem> {
pub fn new(core_handle: usize, rcvhwm: usize) -> Self {
Self {
socket_core_handle: core_handle,
main_incoming_queue: FairQueue::new(rcvhwm.max(1)),
queue_push_timeout: Some(Duration::from_millis(1000)),
partial_pipe_messages: DashMap::new(),
current_recv_frames_buffer: TokioMutex::new(VecDeque::new()),
}
}
pub fn accumulate_pipe_frame(&self, pipe_read_id: usize, incoming_frame: Msg) -> Result<Option<Vec<Msg>>, ZmqError> {
let is_last_frame_of_zmtp_message = !incoming_frame.is_more();
if is_last_frame_of_zmtp_message {
let mut assembled_message = self
.partial_pipe_messages
.remove(&pipe_read_id)
.map(|(_key, val)| val)
.unwrap_or_else(Vec::new);
assembled_message.push(incoming_frame);
if assembled_message.is_empty() {
tracing::warn!(
"Orchestrator: Assembled an empty ZMTP message for pipe {}.",
pipe_read_id
);
}
Ok(Some(assembled_message))
} else {
let mut pipe_buffer_entry = self.partial_pipe_messages.entry(pipe_read_id).or_insert_with(Vec::new);
pipe_buffer_entry.value_mut().push(incoming_frame);
Ok(None)
}
}
pub async fn queue_item(&self, pipe_read_id_for_logging: usize, item_to_queue: QItem) -> Result<(), ZmqError> {
let push_result = match self.queue_push_timeout {
None => self.main_incoming_queue.push_item(item_to_queue).await,
Some(duration) if duration.is_zero() => match self.main_incoming_queue.try_push_item(item_to_queue) {
Ok(()) => Ok(()),
Err(PushError::Full(_returned_item)) => {
tracing::warn!(
handle = self.socket_core_handle,
pipe_id = pipe_read_id_for_logging,
"Orchestrator try_push item to main queue failed: Full. Item dropped."
);
Ok(())
}
Err(PushError::Closed(_returned_item)) => {
tracing::error!(
handle = self.socket_core_handle,
pipe_id = pipe_read_id_for_logging,
"Orchestrator try_push item to main queue failed: Closed."
);
Err(ZmqError::Internal("Main incoming queue closed".into()))
}
},
Some(duration) => match tokio_timeout(duration, self.main_incoming_queue.push_item(item_to_queue)).await {
Ok(Ok(())) => Ok(()),
Ok(Err(e)) => Err(e),
Err(_timeout_elapsed) => {
tracing::warn!(
handle = self.socket_core_handle,
pipe_id = pipe_read_id_for_logging,
"Orchestrator timed push of item to main queue failed: Timeout. Item dropped."
);
Ok(())
}
},
};
match push_result {
Ok(()) => Ok(()),
Err(ZmqError::Internal(ref msg)) if msg.contains("FairQueue channel closed") => Err(ZmqError::Internal(
"Main incoming queue channel unexpectedly closed".into(),
)),
Err(e) => {
tracing::error!(handle = self.socket_core_handle, pipe_id = pipe_read_id_for_logging, error = %e, "Orchestrator: Unexpected error pushing item.");
Err(e)
}
}
}
pub(crate) async fn recv_item_from_main_queue(&self, rcvtimeo_opt: Option<Duration>) -> Result<QItem, ZmqError> {
let pop_future = self.main_incoming_queue.pop_item();
match rcvtimeo_opt {
Some(duration) if !duration.is_zero() => match tokio_timeout(duration, pop_future).await {
Ok(Ok(Some(item))) => Ok(item),
Ok(Ok(None)) => Err(ZmqError::Internal(
"Orchestrator: Main receive queue closed while popping item".into(),
)),
Ok(Err(e)) => Err(e),
Err(_timeout_elapsed) => Err(ZmqError::Timeout),
},
_ => {
if rcvtimeo_opt == Some(Duration::ZERO) {
match self.main_incoming_queue.try_pop_item() {
Ok(Some(item)) => Ok(item),
Ok(None) => Err(ZmqError::ResourceLimitReached),
Err(e) => Err(e),
}
} else {
match pop_future.await? {
Some(item) => Ok(item),
None => Err(ZmqError::Internal(
"Orchestrator: Main receive queue closed (inf wait)".into(),
)),
}
}
}
}
}
pub async fn recv_logical_message(
&self,
rcvtimeo_opt: Option<Duration>,
qitem_to_app_frames: impl FnOnce(QItem) -> Vec<Msg>,
) -> Result<Vec<Msg>, ZmqError> {
let mut buffer_guard = self.current_recv_frames_buffer.lock().await;
if !buffer_guard.is_empty() {
tracing::warn!(
handle = self.socket_core_handle,
"recv_logical_message called while recv_message buffer was active. Clearing buffer."
);
*buffer_guard = VecDeque::new();
}
drop(buffer_guard);
let q_item = self.recv_item_from_main_queue(rcvtimeo_opt).await?;
let app_frames = qitem_to_app_frames(q_item);
Ok(app_frames)
}
pub async fn recv_message(
&self,
rcvtimeo_opt: Option<Duration>,
qitem_to_app_frames: impl FnOnce(QItem) -> Vec<Msg>,
) -> Result<Msg, ZmqError> {
let mut buffer_guard = self.current_recv_frames_buffer.lock().await;
if let Some(mut frame) = buffer_guard.pop_front() {
if !buffer_guard.is_empty() {
frame.set_flags(frame.flags() | MsgFlags::MORE);
} else {
frame.set_flags(frame.flags() & !MsgFlags::MORE);
}
return Ok(frame);
}
drop(buffer_guard);
let q_item = self.recv_item_from_main_queue(rcvtimeo_opt).await?;
let app_frames_vec = qitem_to_app_frames(q_item);
let mut buffer_guard = self.current_recv_frames_buffer.lock().await;
if app_frames_vec.is_empty() {
tracing::warn!(
handle = self.socket_core_handle,
"recv_message: Transformed QItem resulted in empty app_frames. Returning empty Msg."
);
return Ok(Msg::new()); }
let mut app_frames_deque = VecDeque::from(app_frames_vec);
let first_frame = app_frames_deque.pop_front().unwrap();
*buffer_guard = app_frames_deque;
Ok(first_frame)
}
pub async fn reset_recv_message_buffer(&self) {
let mut buffer_guard = self.current_recv_frames_buffer.lock().await;
if !buffer_guard.is_empty() {
tracing::trace!(
handle = self.socket_core_handle,
"Orchestrator: Resetting current_recv_frames_buffer."
);
*buffer_guard = VecDeque::new();
}
}
pub async fn clear_pipe_state(&self, pipe_read_id: usize) {
if self.partial_pipe_messages.remove(&pipe_read_id).is_some() {
tracing::debug!(
handle = self.socket_core_handle,
pipe_id = pipe_read_id,
"Orchestrator: Cleared partial ZMTP message buffer for detached pipe"
);
}
self.reset_recv_message_buffer().await;
}
pub async fn close(&self) {
self.main_incoming_queue.close();
self.partial_pipe_messages.clear();
let mut buffer_guard = self.current_recv_frames_buffer.lock().await;
*buffer_guard = VecDeque::new();
}
}