use std::time::Duration;
use crate::{AnyResult, Event, Request, Response};
use parking_lot::Mutex;
use zng_task::channel::{self, ChannelError, IpcReceiver, IpcSender};
use zng_txt::Txt;
type AppInitMsg = (
channel::IpcReceiver<Request>,
channel::IpcSender<Response>,
channel::IpcSender<Event>,
);
pub(crate) struct AppInit {
init_sender: channel::NamedIpcSender<AppInitMsg>,
}
impl AppInit {
pub fn new() -> Self {
AppInit {
init_sender: channel::NamedIpcSender::new().expect("failed to create init channel"),
}
}
pub fn name(&self) -> &str {
self.init_sender.name()
}
pub fn connect(self) -> AnyResult<(RequestSender, ResponseReceiver, EventReceiver)> {
let mut init_sender = self
.init_sender
.connect_deadline_blocking(std::time::Duration::from_secs(crate::view_timeout()))?;
let (req_sender, req_recv) = channel::ipc_unbounded()?;
let (rsp_sender, rsp_recv) = channel::ipc_unbounded()?;
let (evt_sender, evt_recv) = channel::ipc_unbounded()?;
init_sender.send_blocking((req_recv, rsp_sender, evt_sender))?;
Ok((
RequestSender(Mutex::new(req_sender)),
ResponseReceiver(Mutex::new(rsp_recv)),
EventReceiver(Mutex::new(evt_recv)),
))
}
}
pub fn connect_view_process(ipc_sender_name: Txt) -> Result<ViewChannels, channel::ChannelError> {
let _s = tracing::trace_span!("connect_view_process").entered();
let mut init_recv = channel::IpcReceiver::<AppInitMsg>::connect(ipc_sender_name)?;
let (req_recv, rsp_sender, evt_sender) = init_recv.recv_deadline_blocking(std::time::Duration::from_secs(crate::view_timeout()))?;
Ok(ViewChannels {
request_receiver: RequestReceiver(Mutex::new(req_recv)),
response_sender: ResponseSender(Mutex::new(rsp_sender)),
event_sender: EventSender(Mutex::new(evt_sender)),
})
}
pub struct ViewChannels {
pub request_receiver: RequestReceiver,
pub response_sender: ResponseSender,
pub event_sender: EventSender,
}
type IpcResult<T> = Result<T, ChannelError>;
pub(crate) struct RequestSender(Mutex<IpcSender<Request>>);
impl RequestSender {
pub fn send(&mut self, req: Request) -> IpcResult<()> {
let r = self.0.get_mut().send_blocking(req);
if let Err(e) = &r {
tracing::debug!("request sender error, {e}");
}
r
}
}
impl Drop for RequestSender {
fn drop(&mut self) {
tracing::trace!("dropped RequestSender");
}
}
pub struct RequestReceiver(Mutex<IpcReceiver<Request>>); impl RequestReceiver {
pub fn recv(&mut self) -> IpcResult<Request> {
let r = self.0.get_mut().recv_blocking();
if let Err(e) = &r {
tracing::debug!("request receiver error, {e}");
}
r
}
}
impl Drop for RequestReceiver {
fn drop(&mut self) {
tracing::trace!("dropped RequestReceiver");
}
}
pub struct ResponseSender(Mutex<IpcSender<Response>>); impl ResponseSender {
pub fn send(&mut self, rsp: Response) -> IpcResult<()> {
assert!(rsp.must_be_send());
let r = self.0.get_mut().send_blocking(rsp);
if let Err(e) = &r {
tracing::debug!("response sender error, {e}");
}
r
}
}
impl Drop for ResponseSender {
fn drop(&mut self) {
tracing::trace!("dropped ResponseSender");
}
}
pub(crate) struct ResponseReceiver(Mutex<IpcReceiver<Response>>);
impl ResponseReceiver {
pub fn recv(&mut self) -> IpcResult<Response> {
let r = self.0.get_mut().recv_blocking();
if let Err(e) = &r {
tracing::debug!("response receiver error, {e}");
}
r
}
}
impl Drop for ResponseReceiver {
fn drop(&mut self) {
tracing::trace!("dropped ResponseReceiver");
}
}
pub struct EventSender(Mutex<IpcSender<Event>>);
impl EventSender {
pub fn send(&mut self, ev: Event) -> IpcResult<()> {
let r = self.0.get_mut().send_blocking(ev);
if let Err(e) = &r {
tracing::debug!("event sender error, {e}");
}
r
}
}
pub(crate) struct EventReceiver(Mutex<IpcReceiver<Event>>);
impl EventReceiver {
pub fn recv(&mut self) -> IpcResult<Event> {
let r = self.0.get_mut().recv_blocking();
if let Err(e) = &r {
tracing::debug!("event receiver error, {e}");
}
r
}
pub fn recv_timeout(&mut self, duration: Duration) -> IpcResult<Event> {
let r = self.0.get_mut().recv_deadline_blocking(duration);
if let Err(e) = &r {
match e {
ChannelError::Timeout => {}
e => tracing::debug!("event receiver error, {e}"),
}
}
r
}
}