zenoh 1.9.0

Zenoh: The Zero Overhead Pub/Sub/Query Protocol.
Documentation
use std::{
    any::Any,
    sync::{Arc, Mutex},
    time::Duration,
};

use zenoh_core::ztimeout;
use zenoh_protocol::{
    core::{Reliability, WireExpr, ZenohIdProto},
    network::{Declare, Interest, Mapping, Push, Request, Response, ResponseFinal},
};

use crate::{
    api::{
        queryable::{Query, QueryInner, ReplyPrimitives},
        sample::QoS,
    },
    net::primitives::Primitives,
};

const TIMEOUT: Duration = Duration::from_secs(60);

struct ReplyTestPrimitives {
    wire_expr: Arc<Mutex<Option<WireExpr<'static>>>>,
}

impl ReplyTestPrimitives {
    fn new() -> Self {
        ReplyTestPrimitives {
            wire_expr: Arc::new(Mutex::new(None)),
        }
    }

    fn wire_expr(&self) -> Option<WireExpr<'_>> {
        self.wire_expr.lock().unwrap().clone()
    }
}

impl Primitives for ReplyTestPrimitives {
    fn send_interest(&self, _msg: &mut Interest) {}

    fn send_declare(&self, _msg: &mut Declare) {}

    fn send_push(&self, _msg: &mut Push, _reliability: Reliability) {}

    fn send_push_consume(&self, _msg: &mut Push, _reliability: Reliability, _consume: bool) {}

    fn send_request(&self, _msg: &mut Request) {}

    fn send_response(&self, msg: &mut Response) {
        let _ = self.wire_expr.lock().unwrap().insert(msg.wire_expr.clone());
    }

    fn send_response_final(&self, _msg: &mut ResponseFinal) {}

    fn send_close(&self) {}

    fn as_any(&self) -> &dyn Any {
        self
    }
}

#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn test_reply_preserves_optimized_ke() {
    use crate::Config;

    let session = ztimeout!(crate::open(Config::default())).unwrap();

    let primitives = Arc::new(ReplyTestPrimitives::new());

    let query_inner = QueryInner {
        key_expr: "test/**".try_into().unwrap(),
        parameters: "".into(),
        qid: 1,
        zid: ZenohIdProto::default(),
        qos: QoS::default(),
        #[cfg(feature = "unstable")]
        source_info: None,
        primitives: ReplyPrimitives::new_remote(Some(session.downgrade()), primitives.clone()),
    };
    let query = Query {
        inner: Arc::new(query_inner),
        eid: 1,
        value: None,
        attachment: None,
    };

    let ke = "test/reply_declared_ke";
    let declared_ke = ztimeout!(session.declare_keyexpr(ke)).unwrap();
    ztimeout!(query.reply(declared_ke, "payload")).unwrap();

    let mut we = primitives.wire_expr().unwrap();
    assert!(we.suffix.is_empty());
    assert!(we.scope != 0);
    assert!(we.mapping == Mapping::Sender);

    ztimeout!(query.reply(ke, "payload")).unwrap();
    we = primitives.wire_expr().unwrap();
    assert_eq!(&we.suffix, &ke);
    assert!(we.scope == 0);
}