use astarte_device_sdk::builder::{DeviceBuilder, DeviceSdkBuild};
use astarte_device_sdk::client::{ClientDisconnect, RecvError};
use astarte_device_sdk::store::memory::MemoryStore;
use astarte_device_sdk::transport::grpc::GrpcConfig;
use astarte_device_sdk::{Client, EventLoop};
use clap::Parser;
use log::{error, info};
use std::time;
use tokio::signal::ctrl_c;
use tokio::task::JoinSet;
use uuid::Uuid;
const DEVICE_DATASTREAM: &str = include_str!(
"./interfaces/org.astarte-platform.rust.examples.datastream.DeviceDatastream.json"
);
const SERVER_DATASTREAM: &str = include_str!(
"./interfaces/org.astarte-platform.rust.examples.datastream.ServerDatastream.json"
);
#[derive(Parser, Debug)]
#[clap(version, about)]
struct Cli {
#[clap(default_value = "d1e7a6e9-cf99-4694-8fb6-997934be079c")]
uuid: String,
#[clap(default_value = "http://[::1]:50051")]
endpoint: String,
#[clap(short, long)]
count: Option<u64>,
#[clap(short, long, default_value = "3000")]
time: u64,
}
type DynError = Box<dyn std::error::Error + Send + Sync + 'static>;
#[tokio::main]
async fn main() -> Result<(), DynError> {
stable_eyre::install()?;
env_logger::try_init()?;
let args = Cli::parse();
let node_id = Uuid::parse_str(&args.uuid)?;
let grpc_cfg = GrpcConfig::from_url(node_id, args.endpoint)?;
let (client, connection) = DeviceBuilder::new()
.store(MemoryStore::new())
.interface_str(DEVICE_DATASTREAM)?
.interface_str(SERVER_DATASTREAM)?
.connect(grpc_cfg)
.await?
.build()
.await;
let mut tasks = JoinSet::<Result<(), DynError>>::new();
let client_cl = client.clone();
tasks.spawn(async move {
info!("start receiving messages from the Astarte Message Hub Server");
loop {
match client_cl.recv().await {
Ok(event) => {
info!("received {event:?}");
}
Err(RecvError::Disconnected) => break,
Err(err) => {
error!("error while receiving data from Astarte Message Hub Server: {err:?}")
}
}
}
Ok(())
});
tasks.spawn({
let client = client.clone();
async move {
let now = time::SystemTime::now();
let mut count = 0;
let mut interval = tokio::time::interval(time::Duration::from_millis(args.time));
while args.count.is_none() || Some(count) < args.count {
interval.tick().await;
info!("Publishing the uptime through the message hub.");
let elapsed = now.elapsed()?.as_secs();
let elapsed_str = format!("Uptime for node {}: {}", args.uuid, elapsed);
client
.send(
"org.astarte-platform.rust.examples.datastream.DeviceDatastream",
"/uptime",
elapsed_str,
)
.await?;
count += 1;
}
info!("Done sending messages");
Ok(())
}
});
tasks.spawn(async move {
connection.handle_events().await?;
Ok(())
});
tasks.spawn(async {
ctrl_c().await?;
Ok(())
});
while let Some(res) = tasks.join_next().await {
match res {
Ok(res) => {
res?;
tasks.abort_all();
}
Err(err) if err.is_cancelled() => {}
Err(err) => {
return Err(err.into());
}
};
}
client.disconnect().await?;
Ok(())
}