use crate::runtime::MailboxSender;
use crate::socket::connection_iface::ISocketConnection;
use crate::socket::events::MonitorSender;
use crate::socket::options::SocketOptions;
use crate::socket::types::SocketType;
use crate::socket::SocketEvent;
use crate::Msg;
use fibre::mpmc::AsyncSender;
use std::collections::{HashMap, HashSet};
#[cfg(feature = "io-uring")]
use std::os::unix::io::RawFd;
use std::sync::Arc;
use std::time::Instant;
use tokio::task::JoinHandle;
#[derive(Debug)]
pub(crate) struct EndpointInfo {
pub mailbox: MailboxSender,
pub task_handle: Option<JoinHandle<()>>,
pub endpoint_type: EndpointType,
pub endpoint_uri: String,
pub pipe_ids: Option<(usize, usize)>, pub handle_id: usize,
pub target_endpoint_uri: Option<String>,
pub is_outbound_connection: bool,
pub peer_socket_type: Option<String>,
pub connection_iface: Arc<dyn ISocketConnection>,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub(crate) enum EndpointType {
Listener,
Session, }
#[derive(Debug)]
pub(crate) struct CoreState {
pub(crate) handle: usize,
pub options: Arc<SocketOptions>,
pub socket_type: SocketType,
pub pipes_tx: HashMap<usize, AsyncSender<Vec<Msg>>>,
pub pipe_reader_task_handles: HashMap<usize, JoinHandle<()>>,
pub endpoints: HashMap<String, EndpointInfo>,
pub pipe_read_id_to_endpoint_uri: HashMap<usize, String>,
#[cfg(feature = "io-uring")]
pub uring_fd_to_endpoint_uri: HashMap<RawFd, String>,
#[cfg(feature = "inproc")]
pub(crate) bound_inproc_names: HashSet<String>,
pub(crate) monitor_tx: Option<MonitorSender>,
pub(crate) last_bound_endpoint: Option<String>,
}
impl CoreState {
pub(crate) fn new(handle: usize, socket_type: SocketType, options: SocketOptions) -> Self {
Self {
handle,
options: Arc::new(options), socket_type,
pipes_tx: HashMap::new(),
pipe_reader_task_handles: HashMap::new(),
endpoints: HashMap::new(),
pipe_read_id_to_endpoint_uri: HashMap::new(),
#[cfg(feature = "io-uring")]
uring_fd_to_endpoint_uri: HashMap::new(),
#[cfg(feature = "inproc")]
bound_inproc_names: HashSet::new(),
monitor_tx: None,
last_bound_endpoint: None,
}
}
pub(crate) fn get_pipe_sender(&self, pipe_write_id: usize) -> Option<AsyncSender<Vec<Msg>>> {
self.pipes_tx.get(&pipe_write_id).cloned()
}
#[allow(dead_code)]
pub(crate) fn get_reader_task_handle(&self, pipe_read_id: usize) -> Option<&JoinHandle<()>> {
self.pipe_reader_task_handles.get(&pipe_read_id)
}
pub(crate) fn remove_pipe_state(&mut self, pipe_write_id: usize, pipe_read_id: usize) -> bool {
let tx_removed = self.pipes_tx.remove(&pipe_write_id).is_some();
if tx_removed {
tracing::trace!(
core_handle = self.handle,
pipe_id = pipe_write_id,
"CoreState: Removed pipe sender"
);
}
let reader_removed = if let Some(handle) = self.pipe_reader_task_handles.remove(&pipe_read_id) {
tracing::trace!(
core_handle = self.handle,
pipe_id = pipe_read_id,
"CoreState: Aborting pipe reader task"
);
handle.abort();
true
} else {
false
};
if reader_removed {
tracing::trace!(
core_handle = self.handle,
pipe_id = pipe_read_id,
"CoreState: Removed pipe reader task handle"
);
}
let map_removed = self.pipe_read_id_to_endpoint_uri.remove(&pipe_read_id).is_some();
if map_removed {
tracing::trace!(
core_handle = self.handle,
pipe_id = pipe_read_id,
"CoreState: Removed pipe_read_id_to_endpoint_uri mapping"
);
}
tx_removed || reader_removed || map_removed
}
pub(crate) fn send_monitor_event(&self, event: SocketEvent) {
if let Some(ref tx) = self.monitor_tx {
if tx.clone().to_sync().send(event).is_err() {
tracing::warn!(
socket_handle = self.handle,
"Failed to send event to monitor channel (full or closed)"
);
}
}
}
pub(crate) fn get_monitor_sender_clone(&self) -> Option<MonitorSender> {
self.monitor_tx.clone()
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub(crate) enum ShutdownPhase {
Running,
StoppingChildren,
Lingering,
CleaningPipes,
Finished,
}
#[derive(Debug)]
pub(crate) struct ShutdownCoordinator {
pub(crate) state: ShutdownPhase,
pub(crate) pending_child_actors: HashMap<usize, String>,
pub(crate) pending_connections_to_close: HashMap<usize, (String, Arc<dyn ISocketConnection>)>, #[cfg(feature = "inproc")]
pub(crate) inproc_connections_to_cleanup: Vec<(usize, usize, String)>, pub(crate) linger_deadline: Option<Instant>,
}
impl Default for ShutdownCoordinator {
fn default() -> Self {
Self {
state: ShutdownPhase::Running,
pending_child_actors: HashMap::new(),
pending_connections_to_close: HashMap::new(),
#[cfg(feature = "inproc")]
inproc_connections_to_cleanup: Vec::new(),
linger_deadline: None,
}
}
}