Crate astarte_message_hub_proto

source ·
Expand description

§Astarte message hub protocol buffers for Rust

This module provides access to the Astarte message hub protocol buffers through a Rust API.

§Documentation

§Requirements

  • protobuf >= 3.15
  • Rust version >= 1.72.0

§Client Example

use std::time;

use astarte_message_hub_proto::astarte_message::Payload;
use astarte_message_hub_proto::message_hub_client::MessageHubClient;
use astarte_message_hub_proto::AstarteMessage;
use astarte_message_hub_proto::Node;
use astarte_message_hub_proto::pbjson_types::Empty;
use clap::Parser;
use tonic::metadata::MetadataValue; 
use tonic::transport::channel::Endpoint;
use log::info;
use uuid::Uuid;

/// Create a ProtoBuf client for the Astarte message hub.
#[derive(Parser, Debug)]
#[clap(version, about)]
struct Cli {
    /// UUID to be used when registering the client as an Astarte message hub node.
    uuid: String,

    /// Stop after sending COUNT messages.
    #[clap(short, long)]
    count: Option<u64>,

    /// Milliseconds to wait between messages.
    #[clap(short, long, default_value = "3000")]
    time: u64,
}

async fn run_example_client() {
    env_logger::init();
    let args = Cli::parse();
    
    let uuid = Uuid::parse_str(&args.uuid).unwrap();

    let channel = Endpoint::from_static("http://[::1]:50051")
        .connect()
        .await
        .unwrap();
    
    // adding the interceptor layer will include the Node ID inside the metadata
    let mut client =
        MessageHubClient::with_interceptor(channel, move |mut req: tonic::Request<()>| {
            req.metadata_mut()
                .insert_bin("node-id-bin", MetadataValue::from_bytes(uuid.as_ref()));
            Ok(req)
        });

    let device_datastream_interface: &str = r#"{
        "interface_name": "org.astarte-platform.rust.examples.datastream.DeviceDatastream",
        "version_major": 0,
        "version_minor": 1,
        "type": "datastream",
        "ownership": "device",
        "mappings": [
            {
                "endpoint": "/uptime",
                "type": "string",
                "explicit_timestamp": true
            }
        ]
    }"#;
    
    let interfaces_json = vec![device_datastream_interface.to_string()];
    let node = Node::new(interfaces_json);

    let mut stream = client.attach(node.clone()).await.unwrap().into_inner();

    // Start a separate task to handle incoming data
    let reply_handle = tokio::spawn(async move {
        info!("Waiting for messages from the message hub.");

        while let Some(astarte_message) = stream.message().await.unwrap() {
            println!("Received AstarteMessage = {:?}", astarte_message);
        }

        info!("Done receiving messages, closing the connection.");
    });

    // Start a separate task to publish data
    let send_handle = tokio::spawn(async move {
        let now = time::SystemTime::now();
        let mut count = 0;
        // Consistent interval of 3 seconds
        let mut interval = tokio::time::interval(std::time::Duration::from_millis(args.time));

        while args.count.is_none() || Some(count) < args.count {
            // Wait a little
            interval.tick().await;

            println!("Publishing the uptime through the message hub.");

            let elapsed = now.elapsed().unwrap().as_secs();

            let elapsed_str = format!("Uptime for node {}: {}", args.uuid, elapsed);
            let msg = AstarteMessage {
                interface_name: "org.astarte-platform.rust.examples.datastream.DeviceDatastream"
                    .to_string(),
                path: "/uptime".to_string(),
                timestamp: None,
                payload: Some(Payload::AstarteData(elapsed_str.into())),
            };
            client.send(msg).await.unwrap();

            count += 1;
        }

        info!("Done sending messages, closing the connection.");
        client.detach(Empty {}).await.expect("Detach failed");
    });

    let res = tokio::join!(reply_handle, send_handle);

    match res {
        (Ok(_), Ok(_)) => (),
        (Err(e), Ok(_)) | (Ok(_), Err(e)) => panic!("Error: {}", e),
        (Err(e1), Err(e2)) => panic!("Error:\n\t{}\n\t{}", e1, e2),
    }
}

Re-exports§

Modules§

Structs§

  • An array of bytes for transmission with protobuf. To be used nested inside an AstarteDataTypeIndividual.
  • An array of booleans for transmission with protobuf. To be used nested inside an AstarteDataTypeIndividual.
  • A generic data type to be used nested in an AstarteMessage.
  • An individual data type for transmission with protobuf. To be used nested inside an AstarteDataType.
  • An aggregated object data type for transmission with protobuf. To be used nested inside an AstarteDataType.
  • An array of timestamps for transmission with protobuf. To be used nested inside an AstarteDataTypeIndividual.
  • An array of doubles for transmission with protobuf. To be used nested inside an AstarteDataTypeIndividual.
  • An array of int32 for transmission with protobuf. To be used nested inside an AstarteDataTypeIndividual.
  • An array of int64 for transmission with protobuf. To be used nested inside an AstarteDataTypeIndividual.
  • Astarte message to be used when sending data to Astarte.
  • An array of strings for transmission with protobuf. To be used nested inside an AstarteDataTypeIndividual.
  • Null payload for an AstarteMessage.
  • Configuration message to be used to send configuration to the Astarte message hub.
  • This message defines a list of json interfaces to be added/removed to the Astarte message hub.
  • This message defines a list of interfaces’ names to be removed from the Astarte message hub.
  • MessageHub error type
  • MessageHubEvent is a type of message for returning and propagating errors. It is an enum with the variants, AstarteMessage(message), representing success and containing an astarte message value, and MessageHubError(E) representing error and containing an error value.
  • This message defines a node to be attached to the Astarte message hub.