Crate zenoh_backend_traits[][src]

Expand description

This crate provides the traits to be implemented by a zenoh backend library:

Such library must also declare a create_backend() operation with the #[no_mangle] attribute as an entrypoint to be called for the Backend creation.

Example

use async_trait::async_trait;
use zenoh::net::Sample;
use zenoh::{utils, ChangeKind, Properties, Value, ZResult};
use zenoh_backend_traits::*;

#[no_mangle]
pub fn create_backend(properties: &Properties) -> ZResult<Box<dyn Backend>> {
    // The properties are the ones passed via a PUT in the admin space for Backend creation.
    // Here we re-expose them in the admin space for GET operations, adding the PROP_BACKEND_TYPE entry.
    let mut p = properties.clone();
    p.insert(PROP_BACKEND_TYPE.into(), "my_backend_type".into());
    let admin_status = utils::properties_to_json_value(&p);
    Ok(Box::new(MyBackend { admin_status }))
}

// Your Backend implementation
struct MyBackend {
    admin_status: Value,
}

#[async_trait]
impl Backend for MyBackend {
    async fn get_admin_status(&self) -> Value {
        // This operation is called on GET operation on the admin space for the Backend
        // Here we reply with a static status (containing the configuration properties).
        // But we could add dynamic properties for Backend monitoring.
        self.admin_status.clone()
    }

    async fn create_storage(&mut self, properties: Properties) -> ZResult<Box<dyn Storage>> {
        // The properties are the ones passed via a PUT in the admin space for Storage creation.
        Ok(Box::new(MyStorage::new(properties).await?))
    }

    fn incoming_data_interceptor(&self) -> Option<Box<dyn IncomingDataInterceptor>> {
        // No interception point for incoming data (on PUT operations)
        None
    }

    fn outgoing_data_interceptor(&self) -> Option<Box<dyn OutgoingDataInterceptor>> {
        // No interception point for outgoing data (on GET operations)
        None
    }
}

// Your Storage implementation
struct MyStorage {
    admin_status: Value,
}

impl MyStorage {
    async fn new(properties: Properties) -> ZResult<MyStorage> {
        // The properties are the ones passed via a PUT in the admin space for Storage creation.
        // They contain at least a PROP_STORAGE_PATH_EXPR entry (i.e. "path_expr").
        // Here we choose to re-expose them as they are in the admin space for GET operations.
        let admin_status = utils::properties_to_json_value(&properties);
        Ok(MyStorage { admin_status })
    }
}

#[async_trait]
impl Storage for MyStorage {
    async fn get_admin_status(&self) -> Value {
        // This operation is called on GET operation on the admin space for the Storage
        // Here we reply with a static status (containing the configuration properties).
        // But we could add dynamic properties for Storage monitoring.
        self.admin_status.clone()
    }

    // When receiving a Sample (i.e. on PUT or DELETE operations)
    async fn on_sample(&mut self, sample: Sample) -> ZResult<()> {
        // extract ChangeKind and Timestamp from sample.data_info
        let (kind, _timestamp) = if let Some(ref info) = sample.data_info {
            (
                info.kind.map_or(ChangeKind::Put, ChangeKind::from),
                match &info.timestamp {
                    Some(ts) => ts.clone(),
                    None => zenoh::utils::new_reception_timestamp(),
                },
            )
        } else {
            (ChangeKind::Put, zenoh::utils::new_reception_timestamp())
        };
        // Store or delete the sample depending the ChangeKind
        match kind {
            ChangeKind::Put => {
                let _key = sample.res_name;
                // @TODO:
                //  - check if timestamp is newer than the stored one for the same key
                //  - if yes: store (key, sample)
                //  - if not: drop the sample
            }
            ChangeKind::Delete => {
                let _key = sample.res_name;
                // @TODO:
                //  - check if timestamp is newer than the stored one for the same key
                //  - if yes: mark key as deleted (possibly scheduling definitive removal for later)
                //  - if not: drop the sample
            }
            ChangeKind::Patch => {
                println!("Received PATCH for {}: not yet supported", sample.res_name);
            }
        }
        Ok(())
    }

    // When receiving a Query (i.e. on GET operations)
    async fn on_query(&mut self, query: Query) -> ZResult<()> {
        let _path_expr = query.res_name();
        // @TODO:
        //  - test if path expression contains *
        //  - if not: just get the sample with key==path_expr and call: query.reply(sample.clone()).await;
        //  - if yes: get all the samples with key matching path_expr and call for each: query.reply(sample.clone()).await;
        //
        // NOTE: in case query.predicate() is not empty something smarter should be done with returned samples...
        Ok(())
    }
}

Modules

Some useful functions for Backend/Storage implementations.

Structs

A wrapper around the zenoh::net::Query allowing to call the OutgoingDataInterceptor (if any) before to send the reply

Constants

The "type" property key to be used in admin status reported by Backends.

The "path_expr" property key to be used for configuration of each storage.

The "path_prefix" property key that could be used to specify the common path prefix to be stripped from Paths before storing them as keys in the Storage.

Traits

Trait to be implemented by a Backend.

An interceptor allowing to modify the data pushed into a storage before it’s actually stored.

An interceptor allowing to modify the data going out of a storage before it’s sent as a reply to a query.

Trait to be implemented by a Storage.