#![cfg(feature = "io-uring")]
use fibre::oneshot;
use crate::io_uring_backend::ops::{ProtocolConfig, UringOpCompletion, UserData};
use crate::ZmqError;
use std::collections::HashMap;
use std::os::unix::io::RawFd;
#[derive(Debug, Default)]
pub(crate) struct MultipartSendState {
pub total_parts: usize,
pub completed_parts: usize,
}
#[derive(Debug)]
pub(crate) struct ExternalOpContext {
pub reply_tx: oneshot::Sender<Result<UringOpCompletion, ZmqError>>,
pub op_name: String,
pub protocol_handler_factory_id: Option<String>, pub protocol_config: Option<ProtocolConfig>,
pub fd_created_for_connect_op: Option<RawFd>, pub listener_fd: Option<RawFd>, pub target_fd_for_shutdown: Option<RawFd>, pub multipart_state: Option<MultipartSendState>,
}
#[derive(Debug)]
pub(crate) struct ExternalOpTracker {
pub(crate) in_flight: HashMap<UserData, ExternalOpContext>,
pub(crate) next_id: UserData,
}
impl ExternalOpTracker {
pub fn new() -> Self {
Self {
in_flight: HashMap::new(),
next_id: 1, }
}
pub fn new_op_id(&mut self) -> UserData {
let id = self.next_id;
self.next_id = self.next_id.wrapping_add(1);
if self.next_id == 0 || self.next_id >= 1_000_000_000 {
self.next_id = 1;
}
id
}
pub fn add_op(&mut self, user_data: UserData, context: ExternalOpContext) {
if self.in_flight.contains_key(&user_data) {
tracing::warn!(
"ExternalOpTracker: Overwriting existing in-flight operation for UserData {}",
user_data
);
}
self.in_flight.insert(user_data, context);
}
pub fn take_op(&mut self, user_data: UserData) -> Option<ExternalOpContext> {
self.in_flight.remove(&user_data)
}
pub(crate) fn get_op_context_mut(&mut self, user_data: UserData) -> Option<&mut ExternalOpContext> {
self.in_flight.get_mut(&user_data)
}
pub fn take_op_if_shutdown_for_fd(&mut self, fd_to_check: RawFd) -> Option<(UserData, ExternalOpContext)> {
let mut found_ud: Option<UserData> = None;
for (ud, ctx) in self.in_flight.iter() {
if ctx.op_name == "ShutdownConnectionHandler" && ctx.target_fd_for_shutdown == Some(fd_to_check) {
found_ud = Some(*ud);
break;
}
}
if let Some(ud_to_remove) = found_ud {
self.in_flight.remove(&ud_to_remove).map(|ctx| (ud_to_remove, ctx))
} else {
None
}
}
pub fn is_empty(&self) -> bool {
self.in_flight.is_empty()
}
#[allow(dead_code)] pub fn drain_all(&mut self) -> Vec<(UserData, ExternalOpContext)> {
self.in_flight.drain().collect()
}
pub(crate) fn get_op_context_ref(&self, user_data: UserData) -> Option<&ExternalOpContext> {
self.in_flight.get(&user_data)
}
}