zenoh 1.9.0

Zenoh: The Zero Overhead Pub/Sub/Query Protocol.
Documentation
//
// Copyright (c) 2023 ZettaScale Technology
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License 2.0 which is available at
// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
//
// Contributors:
//   ZettaScale Zenoh Team, <zenoh@zettascale.tech>
//
use std::{any::Any, cell::OnceCell, sync::Arc};

use arc_swap::ArcSwapOption;
use zenoh_link::Link;
use zenoh_protocol::{
    core::ZenohIdProto,
    network::{
        ext, Declare, DeclareBody, DeclareFinal, NetworkBodyMut, NetworkMessageExt as _,
        NetworkMessageMut, ResponseFinal,
    },
};
use zenoh_result::ZResult;
use zenoh_transport::{unicast::TransportUnicast, TransportPeerEventHandler};

use super::Primitives;
use crate::net::routing::{
    dispatcher::face::Face,
    gateway::{InterceptorCacheValueType, Resource},
    hat::{DispatcherContext, HatTrait},
    interceptor::{has_interceptor, InterceptorContext, InterceptorTrait, InterceptorsChain},
    RoutingContext,
};

pub struct DeMux {
    pub(crate) face: Face,
    pub(crate) transport: Option<TransportUnicast>,
    pub(crate) interceptor: Arc<ArcSwapOption<InterceptorsChain>>,
    zid: ZenohIdProto,
}

impl DeMux {
    pub(crate) fn new(
        face: Face,
        transport: Option<TransportUnicast>,
        interceptor: Arc<ArcSwapOption<InterceptorsChain>>,
        zid: ZenohIdProto,
    ) -> Self {
        Self {
            face,
            transport,
            interceptor,
            zid,
        }
    }
}

struct DeMuxContext<'a> {
    demux: &'a DeMux,
    cache: OnceCell<InterceptorCacheValueType>,
    expr: OnceCell<String>,
}

impl DeMuxContext<'_> {
    fn prefix(&self, msg: &NetworkMessageMut) -> Option<Arc<Resource>> {
        if let Some(wire_expr) = msg.wire_expr() {
            let wire_expr = wire_expr.to_owned();
            let rtables = zread!(self.demux.face.tables.tables);
            if let Some(prefix) = rtables
                .data
                .get_mapping(&self.demux.face.state, &wire_expr.scope, wire_expr.mapping)
                .cloned()
            {
                return Some(prefix);
            }
        }
        None
    }
}

impl InterceptorContext for DeMuxContext<'_> {
    fn face(&self) -> Option<Face> {
        Some(self.demux.face.clone())
    }

    fn full_expr(&self, msg: &NetworkMessageMut) -> Option<&str> {
        if self.expr.get().is_none() {
            if let Some(wire_expr) = msg.wire_expr() {
                if let Some(prefix) = self.prefix(msg) {
                    self.expr
                        .set(prefix.expr().to_string() + wire_expr.suffix.as_ref())
                        .ok();
                }
            }
        }
        self.expr.get().map(|x| x.as_str())
    }
    fn get_cache(&self, msg: &NetworkMessageMut) -> Option<&Box<dyn Any + Send + Sync>> {
        if self.cache.get().is_none() && msg.wire_expr().is_some_and(|we| !we.has_suffix()) {
            if let Some(prefix) = self.prefix(msg) {
                // TODO interceptor can change between the initial load and the cache load
                if let Some(cache) = self
                    .demux
                    .interceptor
                    .load()
                    .as_ref()
                    .and_then(|i| prefix.get_ingress_cache(&self.demux.face, i))
                {
                    self.cache.set(cache).ok();
                }
            }
        }
        self.cache.get().and_then(|c| c.get_ref().as_ref())
    }
}

impl TransportPeerEventHandler for DeMux {
    fn handle_message(&self, mut msg: NetworkMessageMut) -> ZResult<()> {
        let _span = tracing::enabled!(tracing::Level::DEBUG).then(|| {
            tracing::debug_span!(
                "demux",
                zid = %self.zid,
                src = %self.face
            )
            .entered()
        });

        if has_interceptor(&self.interceptor) {
            if let Some(interceptor) = self.interceptor.load().as_ref() {
                let mut ctx = DeMuxContext {
                    demux: self,
                    cache: OnceCell::new(),
                    expr: OnceCell::new(),
                };

                match &msg.body {
                    NetworkBodyMut::Request(request) => {
                        let request_id = request.id;
                        let qos = request.ext_qos;
                        if !interceptor.intercept(&mut msg, &mut ctx) {
                            // request was blocked by an interceptor, we need to send response final to avoid timeout error
                            self.face
                                .state
                                .primitives
                                .send_response_final(&mut ResponseFinal {
                                    rid: request_id,
                                    ext_qos: qos,
                                    ext_tstamp: None,
                                });
                            return Ok(());
                        }
                    }
                    NetworkBodyMut::Interest(interest) => {
                        let interest_id = interest.id;
                        if !interceptor.intercept(&mut msg, &mut ctx) {
                            // request was blocked by an interceptor, we need to send declare final to avoid timeout error
                            self.face.state.primitives.send_declare(RoutingContext::new(
                                &mut Declare {
                                    interest_id: Some(interest_id),
                                    ext_qos: ext::QoSType::DECLARE,
                                    ext_tstamp: None,
                                    ext_nodeid: ext::NodeIdType::DEFAULT,
                                    body: DeclareBody::DeclareFinal(DeclareFinal),
                                },
                            ));
                            return Ok(());
                        }
                    }
                    _ => {
                        if !interceptor.intercept(&mut msg, &mut ctx) {
                            return Ok(());
                        }
                    }
                };
            }
        }

        match msg.body {
            NetworkBodyMut::Push(m) => self.face.send_push(m, msg.reliability),
            NetworkBodyMut::Declare(m) => self.face.send_declare(m),
            NetworkBodyMut::Interest(m) => self.face.send_interest(m),
            NetworkBodyMut::Request(m) => self.face.send_request(m),
            NetworkBodyMut::Response(m) => self.face.send_response(m),
            NetworkBodyMut::ResponseFinal(m) => self.face.send_response_final(m),
            NetworkBodyMut::OAM(m) => {
                if self.transport.is_none() {
                    bail!("Received network OAM from face w/o transport");
                }

                let mut declares = vec![];
                let ctrl_lock = zlock!(self.face.tables.ctrl_lock);
                let mut wtables = zwrite!(self.face.tables.tables);
                let tables = &mut *wtables;

                let ctx = DispatcherContext {
                    tables_lock: &self.face.tables,
                    tables: &mut tables.data,
                    src_face: &mut self.face.state.clone(),
                    send_declare: &mut |p, m| declares.push((p.clone(), m)),
                };

                let (owner_hat, other_hats) = tables
                    .hats
                    .partition_mut(&self.face.state.region)
                    .expect("face region should have a corresponding hat");

                owner_hat.handle_oam(
                    ctx,
                    m,
                    other_hats.map(|hat| &mut **hat as &mut dyn HatTrait),
                )?;

                drop(wtables);
                drop(ctrl_lock);

                for (p, m) in declares {
                    m.with_mut(|m| p.send_declare(m));
                }
            }
        }

        Ok(())
    }

    fn new_link(&self, _link: Link) {}

    fn del_link(&self, _link: Link) {}

    fn closed(&self) {
        self.face.send_close();
    }

    fn as_any(&self) -> &dyn Any {
        self
    }
}