Crate astarte_message_hub_proto
source ·Expand description
§Astarte message hub protocol buffers for Rust
This module provides access to the Astarte message hub protocol buffers through a Rust API.
§Documentation
§Requirements
- protobuf >= 3.15
- Rust version >= 1.72.0
§Client Example
use std::time;
use astarte_message_hub_proto::astarte_message::Payload;
use astarte_message_hub_proto::message_hub_client::MessageHubClient;
use astarte_message_hub_proto::AstarteMessage;
use astarte_message_hub_proto::Node;
use astarte_message_hub_proto::pbjson_types::Empty;
use clap::Parser;
use tonic::metadata::MetadataValue;
use tonic::transport::channel::Endpoint;
use log::info;
use uuid::Uuid;
/// Create a ProtoBuf client for the Astarte message hub.
#[derive(Parser, Debug)]
#[clap(version, about)]
struct Cli {
/// UUID to be used when registering the client as an Astarte message hub node.
uuid: String,
/// Stop after sending COUNT messages.
#[clap(short, long)]
count: Option<u64>,
/// Milliseconds to wait between messages.
#[clap(short, long, default_value = "3000")]
time: u64,
}
async fn run_example_client() {
env_logger::init();
let args = Cli::parse();
let uuid = Uuid::parse_str(&args.uuid).unwrap();
let channel = Endpoint::from_static("http://[::1]:50051")
.connect()
.await
.unwrap();
// adding the interceptor layer will include the Node ID inside the metadata
let mut client =
MessageHubClient::with_interceptor(channel, move |mut req: tonic::Request<()>| {
req.metadata_mut()
.insert_bin("node-id-bin", MetadataValue::from_bytes(uuid.as_ref()));
Ok(req)
});
let device_datastream_interface: &str = r#"{
"interface_name": "org.astarte-platform.rust.examples.datastream.DeviceDatastream",
"version_major": 0,
"version_minor": 1,
"type": "datastream",
"ownership": "device",
"mappings": [
{
"endpoint": "/uptime",
"type": "string",
"explicit_timestamp": true
}
]
}"#;
let interfaces_json = vec![device_datastream_interface.to_string()];
let node = Node::new(interfaces_json);
let mut stream = client.attach(node.clone()).await.unwrap().into_inner();
// Start a separate task to handle incoming data
let reply_handle = tokio::spawn(async move {
info!("Waiting for messages from the message hub.");
while let Some(astarte_message) = stream.message().await.unwrap() {
println!("Received AstarteMessage = {:?}", astarte_message);
}
info!("Done receiving messages, closing the connection.");
});
// Start a separate task to publish data
let send_handle = tokio::spawn(async move {
let now = time::SystemTime::now();
let mut count = 0;
// Consistent interval of 3 seconds
let mut interval = tokio::time::interval(std::time::Duration::from_millis(args.time));
while args.count.is_none() || Some(count) < args.count {
// Wait a little
interval.tick().await;
println!("Publishing the uptime through the message hub.");
let elapsed = now.elapsed().unwrap().as_secs();
let elapsed_str = format!("Uptime for node {}: {}", args.uuid, elapsed);
let msg = AstarteMessage {
interface_name: "org.astarte-platform.rust.examples.datastream.DeviceDatastream"
.to_string(),
path: "/uptime".to_string(),
timestamp: None,
payload: Some(Payload::AstarteData(elapsed_str.into())),
};
client.send(msg).await.unwrap();
count += 1;
}
info!("Done sending messages, closing the connection.");
client.detach(Empty {}).await.expect("Detach failed");
});
let res = tokio::join!(reply_handle, send_handle);
match res {
(Ok(_), Ok(_)) => (),
(Err(e), Ok(_)) | (Ok(_), Err(e)) => panic!("Error: {}", e),
(Err(e1), Err(e2)) => panic!("Error:\n\t{}\n\t{}", e1, e2),
}
}
Re-exports§
pub use pbjson_types;
pub use prost;
pub use serde;
pub use serde_json;
pub use tonic;
Modules§
- Nested message and enum types in
AstarteDataType
. - Nested message and enum types in
AstarteDataTypeIndividual
. - Nested message and enum types in
AstarteMessage
. - Generated client implementations.
- Generated client implementations.
- Generated server implementations.
- Nested message and enum types in
MessageHubEvent
. - Generated server implementations.
Structs§
- An array of bytes for transmission with protobuf. To be used nested inside an
AstarteDataTypeIndividual
. - An array of booleans for transmission with protobuf. To be used nested inside an
AstarteDataTypeIndividual
. - A generic data type to be used nested in an
AstarteMessage
. - An individual data type for transmission with protobuf. To be used nested inside an
AstarteDataType
. - An aggregated object data type for transmission with protobuf. To be used nested inside an
AstarteDataType
. - An array of timestamps for transmission with protobuf. To be used nested inside an
AstarteDataTypeIndividual
. - An array of doubles for transmission with protobuf. To be used nested inside an
AstarteDataTypeIndividual
. - An array of int32 for transmission with protobuf. To be used nested inside an
AstarteDataTypeIndividual
. - An array of int64 for transmission with protobuf. To be used nested inside an
AstarteDataTypeIndividual
. - Astarte message to be used when sending data to Astarte.
- An array of strings for transmission with protobuf. To be used nested inside an
AstarteDataTypeIndividual
. - Null payload for an
AstarteMessage
. - Configuration message to be used to send configuration to the Astarte message hub.
- This message defines a list of json interfaces to be added/removed to the Astarte message hub.
- This message defines a list of interfaces’ names to be removed from the Astarte message hub.
- MessageHub error type
- MessageHubEvent is a type of message for returning and propagating errors. It is an enum with the variants, AstarteMessage(message), representing success and containing an astarte message value, and MessageHubError(E) representing error and containing an error value.
- This message defines a node to be attached to the Astarte message hub.