use std::{any::Any, cell::OnceCell, sync::Arc};
use arc_swap::ArcSwapOption;
use zenoh_link::Link;
use zenoh_protocol::{
core::ZenohIdProto,
network::{
ext, Declare, DeclareBody, DeclareFinal, NetworkBodyMut, NetworkMessageExt as _,
NetworkMessageMut, ResponseFinal,
},
};
use zenoh_result::ZResult;
use zenoh_transport::{unicast::TransportUnicast, TransportPeerEventHandler};
use super::Primitives;
use crate::net::routing::{
dispatcher::face::Face,
gateway::{InterceptorCacheValueType, Resource},
hat::{DispatcherContext, HatTrait},
interceptor::{has_interceptor, InterceptorContext, InterceptorTrait, InterceptorsChain},
RoutingContext,
};
pub struct DeMux {
pub(crate) face: Face,
pub(crate) transport: Option<TransportUnicast>,
pub(crate) interceptor: Arc<ArcSwapOption<InterceptorsChain>>,
zid: ZenohIdProto,
}
impl DeMux {
pub(crate) fn new(
face: Face,
transport: Option<TransportUnicast>,
interceptor: Arc<ArcSwapOption<InterceptorsChain>>,
zid: ZenohIdProto,
) -> Self {
Self {
face,
transport,
interceptor,
zid,
}
}
}
struct DeMuxContext<'a> {
demux: &'a DeMux,
cache: OnceCell<InterceptorCacheValueType>,
expr: OnceCell<String>,
}
impl DeMuxContext<'_> {
fn prefix(&self, msg: &NetworkMessageMut) -> Option<Arc<Resource>> {
if let Some(wire_expr) = msg.wire_expr() {
let wire_expr = wire_expr.to_owned();
let rtables = zread!(self.demux.face.tables.tables);
if let Some(prefix) = rtables
.data
.get_mapping(&self.demux.face.state, &wire_expr.scope, wire_expr.mapping)
.cloned()
{
return Some(prefix);
}
}
None
}
}
impl InterceptorContext for DeMuxContext<'_> {
fn face(&self) -> Option<Face> {
Some(self.demux.face.clone())
}
fn full_expr(&self, msg: &NetworkMessageMut) -> Option<&str> {
if self.expr.get().is_none() {
if let Some(wire_expr) = msg.wire_expr() {
if let Some(prefix) = self.prefix(msg) {
self.expr
.set(prefix.expr().to_string() + wire_expr.suffix.as_ref())
.ok();
}
}
}
self.expr.get().map(|x| x.as_str())
}
fn get_cache(&self, msg: &NetworkMessageMut) -> Option<&Box<dyn Any + Send + Sync>> {
if self.cache.get().is_none() && msg.wire_expr().is_some_and(|we| !we.has_suffix()) {
if let Some(prefix) = self.prefix(msg) {
if let Some(cache) = self
.demux
.interceptor
.load()
.as_ref()
.and_then(|i| prefix.get_ingress_cache(&self.demux.face, i))
{
self.cache.set(cache).ok();
}
}
}
self.cache.get().and_then(|c| c.get_ref().as_ref())
}
}
impl TransportPeerEventHandler for DeMux {
fn handle_message(&self, mut msg: NetworkMessageMut) -> ZResult<()> {
let _span = tracing::enabled!(tracing::Level::DEBUG).then(|| {
tracing::debug_span!(
"demux",
zid = %self.zid,
src = %self.face
)
.entered()
});
if has_interceptor(&self.interceptor) {
if let Some(interceptor) = self.interceptor.load().as_ref() {
let mut ctx = DeMuxContext {
demux: self,
cache: OnceCell::new(),
expr: OnceCell::new(),
};
match &msg.body {
NetworkBodyMut::Request(request) => {
let request_id = request.id;
let qos = request.ext_qos;
if !interceptor.intercept(&mut msg, &mut ctx) {
self.face
.state
.primitives
.send_response_final(&mut ResponseFinal {
rid: request_id,
ext_qos: qos,
ext_tstamp: None,
});
return Ok(());
}
}
NetworkBodyMut::Interest(interest) => {
let interest_id = interest.id;
if !interceptor.intercept(&mut msg, &mut ctx) {
self.face.state.primitives.send_declare(RoutingContext::new(
&mut Declare {
interest_id: Some(interest_id),
ext_qos: ext::QoSType::DECLARE,
ext_tstamp: None,
ext_nodeid: ext::NodeIdType::DEFAULT,
body: DeclareBody::DeclareFinal(DeclareFinal),
},
));
return Ok(());
}
}
_ => {
if !interceptor.intercept(&mut msg, &mut ctx) {
return Ok(());
}
}
};
}
}
match msg.body {
NetworkBodyMut::Push(m) => self.face.send_push(m, msg.reliability),
NetworkBodyMut::Declare(m) => self.face.send_declare(m),
NetworkBodyMut::Interest(m) => self.face.send_interest(m),
NetworkBodyMut::Request(m) => self.face.send_request(m),
NetworkBodyMut::Response(m) => self.face.send_response(m),
NetworkBodyMut::ResponseFinal(m) => self.face.send_response_final(m),
NetworkBodyMut::OAM(m) => {
if self.transport.is_none() {
bail!("Received network OAM from face w/o transport");
}
let mut declares = vec![];
let ctrl_lock = zlock!(self.face.tables.ctrl_lock);
let mut wtables = zwrite!(self.face.tables.tables);
let tables = &mut *wtables;
let ctx = DispatcherContext {
tables_lock: &self.face.tables,
tables: &mut tables.data,
src_face: &mut self.face.state.clone(),
send_declare: &mut |p, m| declares.push((p.clone(), m)),
};
let (owner_hat, other_hats) = tables
.hats
.partition_mut(&self.face.state.region)
.expect("face region should have a corresponding hat");
owner_hat.handle_oam(
ctx,
m,
other_hats.map(|hat| &mut **hat as &mut dyn HatTrait),
)?;
drop(wtables);
drop(ctrl_lock);
for (p, m) in declares {
m.with_mut(|m| p.send_declare(m));
}
}
}
Ok(())
}
fn new_link(&self, _link: Link) {}
fn del_link(&self, _link: Link) {}
fn closed(&self) {
self.face.send_close();
}
fn as_any(&self) -> &dyn Any {
self
}
}