zenoh-ext 1.7.1

Zenoh: extensions to the client API.
Documentation

CI Discussion Discord License License

Eclipse Zenoh

Eclipse Zenoh: Zero Overhead Pub/Sub, Store/Query and Compute.

Zenoh (pronounce /zeno/) unifies data in motion, data at rest and computations. It carefully blends traditional pub/sub with geo-distributed storages, queries and computations, while retaining a level of time and space efficiency that is well beyond any of the mainstream stacks.

Check the website zenoh.io for more information and installation instructions

See also the roadmap for more detailed technical information.

Zenoh Extension crate

The zenoh_ext crate provides some useful extensions on top of the zenoh crate.

The primary components of this crate are:

Serialization support

The library implements encoding and decoding data in simple, compact, and platform-independent binary format described in Zenoh serialization format.

The serialization functions are supported not only in the Rust library, but also in all other language bindings (C, C++, Java, Kotlin, Python, TypeScript) as well as zenoh-pico, which significantly simplifies interoperability.

Serialization example

use zenoh_ext::*;
let zbytes = z_serialize(&(42i32, vec![1u8, 2, 3]));
assert_eq!(z_deserialize::<(i32, Vec<u8>)>(&zbytes).unwrap(), (42i32, vec![1u8, 2, 3]));

Advanced publisher and subscriber

The components AdvancedPublisher and AdvancedSubscriber combine basic Zenoh functionalities to provide publishing and subscribing data with extended delivery guarantees and full control over mechanisms that provide these guarantees.

These components require the "unstable" feature to be enabled.

Advanced publisher and subscriber examples

Publisher

use zenoh_ext::{AdvancedPublisherBuilderExt, CacheConfig, MissDetectionConfig};
let session = zenoh::open(zenoh::Config::default()).await.unwrap();
let publisher = session
    .declare_publisher("key/expression")
    .cache(CacheConfig::default().max_samples(10))
    .sample_miss_detection(
        MissDetectionConfig::default().heartbeat(std::time::Duration::from_secs(1))
    )
    .publisher_detection()
    .await
    .unwrap();
publisher.put("Value").await.unwrap();

Subscriber

use zenoh_ext::{AdvancedSubscriberBuilderExt, HistoryConfig, RecoveryConfig};
let session = zenoh::open(zenoh::Config::default()).await.unwrap();
let subscriber = session
    .declare_subscriber("key/expression")
    .history(HistoryConfig::default().detect_late_publishers())
    .recovery(RecoveryConfig::default().heartbeat())
    .subscriber_detection()
    .await
    .unwrap();
let miss_listener = subscriber.sample_miss_listener().await.unwrap();
loop {
    tokio::select! {
        sample = subscriber.recv_async() => {
            if let Ok(sample) = sample {
                // ...
            }
        },
        miss = miss_listener.recv_async() => {
            if let Ok(miss) = miss {
                // ...
            }
        },
    }
}

Documentation and examples

For more information, see its documentation: https://docs.rs/zenoh-ext and some examples of usage in https://github.com/eclipse-zenoh/zenoh/tree/main/zenoh-ext/examples/examples