Crate astarte_message_hub_proto

source ·
Expand description

§Astarte message hub protocol buffers for Rust

This module provides access to the Astarte message hub protocol buffers through a Rust API.

§Documentation

§Requirements

  • protobuf >= 3.15
  • Rust version >= 1.72.0

§Client Example

use std::time;

use clap::Parser;

use astarte_message_hub_proto::astarte_message::Payload;
use astarte_message_hub_proto::message_hub_client::MessageHubClient;
use astarte_message_hub_proto::AstarteMessage;
use astarte_message_hub_proto::Node;
use log::info;

/// Create a ProtoBuf client for the Astarte message hub.
#[derive(Parser, Debug)]
#[clap(version, about)]
struct Cli {
    /// UUID to be used when registering the client as an Astarte message hub node.
    uuid: String,

    /// Stop after sending COUNT messages.
    #[clap(short, long)]
    count: Option<u64>,

    /// Milliseconds to wait between messages.
    #[clap(short, long, default_value = "3000")]
    time: u64,
}

async fn run_example_client() {
    env_logger::init();
    let args = Cli::parse();

    let mut client = MessageHubClient::connect("http://[::1]:50051")
        .await
        .unwrap();

    let device_datastream_interface: &str = r#"{
        "interface_name": "org.astarte-platform.rust.examples.datastream.DeviceDatastream",
        "version_major": 0,
        "version_minor": 1,
        "type": "datastream",
        "ownership": "device",
        "mappings": [
            {
                "endpoint": "/uptime",
                "type": "string",
                "explicit_timestamp": true
            }
        ]
    }"#;

    let interface_jsons = [device_datastream_interface];

    let node = Node::new(&args.uuid, &interface_jsons);

    let mut stream = client.attach(node.clone()).await.unwrap().into_inner();

    // Start a separate task to handle incoming data
    let reply_handle = tokio::spawn(async move {
        info!("Waiting for messages from the message hub.");

        while let Some(astarte_message) = stream.message().await.unwrap() {
            println!("Received AstarteMessage = {:?}", astarte_message);
        }

        info!("Done receiving messages, closing the connection.");
    });

    // Start a separate task to publish data
    let send_handle = tokio::spawn(async move {
        let now = time::SystemTime::now();
        let mut count = 0;
        // Consistent interval of 3 seconds
        let mut interval = tokio::time::interval(std::time::Duration::from_millis(args.time));

        while args.count.is_none() || Some(count) < args.count {
            // Wait a little
            interval.tick().await;

            println!("Publishing the uptime through the message hub.");

            let elapsed = now.elapsed().unwrap().as_secs();

            let elapsed_str = format!("Uptime for node {}: {}", args.uuid, elapsed);
            let msg = AstarteMessage {
                interface_name: "org.astarte-platform.rust.examples.datastream.DeviceDatastream"
                    .to_string(),
                path: "/uptime".to_string(),
                timestamp: None,
                payload: Some(Payload::AstarteData(elapsed_str.into())),
            };
            client.send(msg).await.unwrap();

            count += 1;
        }

        info!("Done sending messages, closing the connection.");
        client.detach(node).await.expect("Detach failed");
    });

    let res = tokio::join!(reply_handle, send_handle);

    match res {
        (Ok(_), Ok(_)) => (),
        (Err(e), Ok(_)) | (Ok(_), Err(e)) => panic!("Error: {}", e),
        (Err(e1), Err(e2)) => panic!("Error:\n\t{}\n\t{}", e1, e2),
    }
}

Re-exports§

Modules§

Structs§

  • An array of bytes for transmission with protobuf. To be used nested inside an AstarteDataTypeIndividual.
  • An array of booleans for transmission with protobuf. To be used nested inside an AstarteDataTypeIndividual.
  • A generic data type to be used nested in an AstarteMessage.
  • An individual data type for transmission with protobuf. To be used nested inside an AstarteDataType.
  • An aggregated object data type for transmission with protobuf. To be used nested inside an AstarteDataType.
  • An array of timestamps for transmission with protobuf. To be used nested inside an AstarteDataTypeIndividual.
  • An array of doubles for transmission with protobuf. To be used nested inside an AstarteDataTypeIndividual.
  • An array of int32 for transmission with protobuf. To be used nested inside an AstarteDataTypeIndividual.
  • An array of int64 for transmission with protobuf. To be used nested inside an AstarteDataTypeIndividual.
  • Astarte message to be used when sending data to Astarte.
  • An array of strings for transmission with protobuf. To be used nested inside an AstarteDataTypeIndividual.
  • Null payload for an AstarteMessage.
  • Configuration message to be used to send configuration to the Astarte message hub.
  • This message defines a node to be attached/detached to the Astarte message hub.