use crate::channel::handler_internal::{ContextInternal, HandlerInternal};
use log::{trace, warn};
use std::any::Any;
use std::cell::RefCell;
use std::marker::PhantomData;
use std::rc::Rc;
use std::{error::Error, time::Instant};
pub trait Handler {
type Rin: 'static;
type Rout: 'static;
type Win: 'static;
type Wout: 'static;
fn name(&self) -> &str;
#[doc(hidden)]
#[allow(clippy::type_complexity)]
fn generate(
self,
) -> (
String,
Rc<RefCell<dyn HandlerInternal>>,
Rc<RefCell<dyn ContextInternal>>,
)
where
Self: Sized + 'static,
{
let handler_name = self.name().to_owned();
let context: Context<Self::Rin, Self::Rout, Self::Win, Self::Wout> =
Context::new(self.name());
let handler: Box<
dyn Handler<Rin = Self::Rin, Rout = Self::Rout, Win = Self::Win, Wout = Self::Wout>,
> = Box::new(self);
(
handler_name,
Rc::new(RefCell::new(handler)),
Rc::new(RefCell::new(context)),
)
}
fn transport_active(&mut self, ctx: &Context<Self::Rin, Self::Rout, Self::Win, Self::Wout>) {
ctx.fire_transport_active();
}
fn transport_inactive(&mut self, ctx: &Context<Self::Rin, Self::Rout, Self::Win, Self::Wout>) {
ctx.fire_transport_inactive();
}
fn handle_read(
&mut self,
ctx: &Context<Self::Rin, Self::Rout, Self::Win, Self::Wout>,
msg: Self::Rin,
);
fn poll_write(
&mut self,
ctx: &Context<Self::Rin, Self::Rout, Self::Win, Self::Wout>,
) -> Option<Self::Wout>;
fn handle_timeout(
&mut self,
ctx: &Context<Self::Rin, Self::Rout, Self::Win, Self::Wout>,
now: Instant,
) {
ctx.fire_timeout(now);
}
fn poll_timeout(
&mut self,
ctx: &Context<Self::Rin, Self::Rout, Self::Win, Self::Wout>,
eto: &mut Instant,
) {
ctx.fire_poll_timeout(eto);
}
fn handle_read_eof(&mut self, ctx: &Context<Self::Rin, Self::Rout, Self::Win, Self::Wout>) {
ctx.fire_read_eof();
}
fn handle_exception(
&mut self,
ctx: &Context<Self::Rin, Self::Rout, Self::Win, Self::Wout>,
err: Box<dyn Error>,
) {
ctx.fire_exception(err);
}
fn handle_close(&mut self, ctx: &Context<Self::Rin, Self::Rout, Self::Win, Self::Wout>) {
ctx.fire_close();
}
}
impl<Rin: 'static, Rout: 'static, Win: 'static, Wout: 'static> HandlerInternal
for Box<dyn Handler<Rin = Rin, Rout = Rout, Win = Win, Wout = Wout>>
{
fn transport_active_internal(&mut self, ctx: &dyn ContextInternal) {
if let Some(ctx) = ctx.as_any().downcast_ref::<Context<Rin, Rout, Win, Wout>>() {
self.transport_active(ctx);
} else {
panic!(
"ctx can't downcast_ref::<Context<Rin, Rout, Win, Wout>> in {} handler",
ctx.name()
);
}
}
fn transport_inactive_internal(&mut self, ctx: &dyn ContextInternal) {
if let Some(ctx) = ctx.as_any().downcast_ref::<Context<Rin, Rout, Win, Wout>>() {
self.transport_inactive(ctx);
} else {
panic!(
"ctx can't downcast_ref::<Context<Rin, Rout, Win, Wout>> in {} handler",
ctx.name()
);
}
}
fn handle_read_internal(&mut self, ctx: &dyn ContextInternal, msg: Box<dyn Any>) {
if let Some(ctx) = ctx.as_any().downcast_ref::<Context<Rin, Rout, Win, Wout>>() {
if let Ok(msg) = msg.downcast::<Rin>() {
self.handle_read(ctx, *msg);
} else {
panic!("msg can't downcast::<Rin> in {} handler", ctx.name());
}
} else {
panic!(
"ctx can't downcast::<Context<Rin, Rout, Win, Wout>> in {} handler",
ctx.name()
);
}
}
fn poll_write_internal(&mut self, ctx: &dyn ContextInternal) -> Option<Box<dyn Any>> {
if let Some(ctx) = ctx.as_any().downcast_ref::<Context<Rin, Rout, Win, Wout>>() {
if let Some(msg) = self.poll_write(ctx) {
Some(Box::new(msg))
} else {
None
}
} else {
panic!(
"ctx can't downcast_ref::<OutboundContext<Win, Wout>> in {} handler",
ctx.name()
);
}
}
fn handle_timeout_internal(&mut self, ctx: &dyn ContextInternal, now: Instant) {
if let Some(ctx) = ctx.as_any().downcast_ref::<Context<Rin, Rout, Win, Wout>>() {
self.handle_timeout(ctx, now);
} else {
panic!(
"ctx can't downcast_ref::<Context<Rin, Rout, Win, Wout>> in {} handler",
ctx.name()
);
}
}
fn poll_timeout_internal(&mut self, ctx: &dyn ContextInternal, eto: &mut Instant) {
if let Some(ctx) = ctx.as_any().downcast_ref::<Context<Rin, Rout, Win, Wout>>() {
self.poll_timeout(ctx, eto);
} else {
panic!(
"ctx can't downcast_ref::<Context<Rin, Rout, Win, Wout>> in {} handler",
ctx.name()
);
}
}
fn handle_read_eof_internal(&mut self, ctx: &dyn ContextInternal) {
if let Some(ctx) = ctx.as_any().downcast_ref::<Context<Rin, Rout, Win, Wout>>() {
self.handle_read_eof(ctx);
} else {
panic!(
"ctx can't downcast_ref::<Context<Rin, Rout, Win, Wout>> in {} handler",
ctx.name()
);
}
}
fn handle_exception_internal(&mut self, ctx: &dyn ContextInternal, err: Box<dyn Error>) {
if let Some(ctx) = ctx.as_any().downcast_ref::<Context<Rin, Rout, Win, Wout>>() {
self.handle_exception(ctx, err);
} else {
panic!(
"ctx can't downcast_ref::<Context<Rin, Rout, Win, Wout>> in {} handler",
ctx.name()
);
}
}
fn handle_close_internal(&mut self, ctx: &dyn ContextInternal) {
if let Some(ctx) = ctx.as_any().downcast_ref::<Context<Rin, Rout, Win, Wout>>() {
self.handle_close(ctx);
} else {
panic!(
"ctx can't downcast_ref::<OutboundContext<Win, Wout>> in {} handler",
ctx.name()
);
}
}
}
pub struct Context<Rin, Rout, Win, Wout> {
name: String,
next_context: Option<Rc<RefCell<dyn ContextInternal>>>,
next_handler: Option<Rc<RefCell<dyn HandlerInternal>>>,
phantom: PhantomData<(Rin, Rout, Win, Wout)>,
}
impl<Rin: 'static, Rout: 'static, Win: 'static, Wout: 'static> Context<Rin, Rout, Win, Wout> {
pub fn new(name: &str) -> Self {
Self {
name: name.to_string(),
next_context: None,
next_handler: None,
phantom: PhantomData,
}
}
pub fn fire_transport_active(&self) {
if let (Some(next_handler), Some(next_context)) = (&self.next_handler, &self.next_context) {
let (mut next_handler, next_context) =
(next_handler.borrow_mut(), next_context.borrow());
next_handler.transport_active_internal(&*next_context);
}
}
pub fn fire_transport_inactive(&self) {
if let (Some(next_handler), Some(next_context)) = (&self.next_handler, &self.next_context) {
let (mut next_handler, next_context) =
(next_handler.borrow_mut(), next_context.borrow());
next_handler.transport_inactive_internal(&*next_context);
}
}
pub fn fire_read(&self, msg: Rout) {
if let (Some(next_handler), Some(next_context)) = (&self.next_handler, &self.next_context) {
let (mut next_handler, next_context) =
(next_handler.borrow_mut(), next_context.borrow());
next_handler.handle_read_internal(&*next_context, Box::new(msg));
} else {
warn!("handle_read reached end of pipeline");
}
}
pub fn fire_poll_write(&self) -> Option<Win> {
if let (Some(next_handler), Some(next_context)) = (&self.next_handler, &self.next_context) {
let (mut next_handler, next_context) =
(next_handler.borrow_mut(), next_context.borrow());
if let Some(msg) = next_handler.poll_write_internal(&*next_context) {
if let Ok(msg) = msg.downcast::<Win>() {
Some(*msg)
} else {
panic!(
"msg can't downcast::<Win> in {} handler",
next_context.name()
);
}
} else {
None
}
} else {
warn!("poll_write reached end of pipeline");
None
}
}
pub fn fire_timeout(&self, now: Instant) {
if let (Some(next_handler), Some(next_context)) = (&self.next_handler, &self.next_context) {
let (mut next_handler, next_context) =
(next_handler.borrow_mut(), next_context.borrow());
next_handler.handle_timeout_internal(&*next_context, now);
} else {
warn!("handle_timeout reached end of pipeline");
}
}
pub fn fire_poll_timeout(&self, eto: &mut Instant) {
if let (Some(next_handler), Some(next_context)) = (&self.next_handler, &self.next_context) {
let (mut next_handler, next_context) =
(next_handler.borrow_mut(), next_context.borrow());
next_handler.poll_timeout_internal(&*next_context, eto);
} else {
trace!("poll_timeout reached end of pipeline");
}
}
pub fn fire_read_eof(&self) {
if let (Some(next_handler), Some(next_context)) = (&self.next_handler, &self.next_context) {
let (mut next_handler, next_context) =
(next_handler.borrow_mut(), next_context.borrow());
next_handler.handle_read_eof_internal(&*next_context);
} else {
warn!("handle_read_eof reached end of pipeline");
}
}
pub fn fire_exception(&self, err: Box<dyn Error>) {
if let (Some(next_handler), Some(next_context)) = (&self.next_handler, &self.next_context) {
let (mut next_handler, next_context) =
(next_handler.borrow_mut(), next_context.borrow());
next_handler.handle_exception_internal(&*next_context, err);
} else {
warn!("handle_exception reached end of pipeline");
}
}
pub fn fire_close(&self) {
if let (Some(next_handler), Some(next_context)) = (&self.next_handler, &self.next_context) {
let (mut next_handler, next_context) =
(next_handler.borrow_mut(), next_context.borrow());
next_handler.handle_close_internal(&*next_context);
} else {
warn!("handle_close reached end of pipeline");
}
}
}
impl<Rin: 'static, Rout: 'static, Win: 'static, Wout: 'static> ContextInternal
for Context<Rin, Rout, Win, Wout>
{
fn fire_transport_active_internal(&self) {
self.fire_transport_active();
}
fn fire_transport_inactive_internal(&self) {
self.fire_transport_inactive();
}
fn fire_read_internal(&self, msg: Box<dyn Any>) {
if let Ok(msg) = msg.downcast::<Rout>() {
self.fire_read(*msg);
} else {
panic!("msg can't downcast::<Rout> in {} handler", self.name());
}
}
fn fire_poll_write_internal(&self) -> Option<Box<dyn Any>> {
if let Some(msg) = self.fire_poll_write() {
Some(Box::new(msg))
} else {
None
}
}
fn fire_timeout_internal(&self, now: Instant) {
self.fire_timeout(now);
}
fn fire_poll_timeout_internal(&self, eto: &mut Instant) {
self.fire_poll_timeout(eto);
}
fn fire_read_eof_internal(&self) {
self.fire_read_eof();
}
fn fire_exception_internal(&self, err: Box<dyn Error>) {
self.fire_exception(err);
}
fn fire_close_internal(&self) {
self.fire_close();
}
fn name(&self) -> &str {
self.name.as_str()
}
fn as_any(&self) -> &dyn Any {
self
}
fn set_next_context(&mut self, next_context: Option<Rc<RefCell<dyn ContextInternal>>>) {
self.next_context = next_context;
}
fn set_next_handler(&mut self, next_handler: Option<Rc<RefCell<dyn HandlerInternal>>>) {
self.next_handler = next_handler;
}
}