use std::{
num::NonZeroU32,
path::Path,
time::{Duration, Instant},
};
use azure_storage::core::prelude::*;
use azure_storage_blobs::prelude::*;
use spotflow::{Compression, DeviceClientBuilder, MessageContext};
use log::{debug, error, info, warn};
use uuid::Uuid;
mod common;
fn main() {
env_logger::Builder::from_env(
env_logger::Env::default()
.default_filter_or("sqlx=warn,ureq=warn,rumqtt=warn,ingress=info,spotflow=info,info"),
)
.init();
let env_ctx = common::EnvironmentContext::try_load()
.expect("Unable to load settings from environment variables.");
let platform_caller = common::PlatformCaller::try_new(&env_ctx)
.expect("This test needs to call the Platform automatically and it's unable to do so.");
let device_id = Uuid::new_v4().to_string();
let stream_group = "device-sdk";
let stream = "rust";
let site = "test-site";
let batch_count = 1;
let message_count = 10;
let path = Path::new("./test.db");
info!("Using device ID {}", device_id);
info!("Initiating example of Message sending");
info!("Creating Device Client");
let startup = Instant::now();
let client =
DeviceClientBuilder::new(Some(device_id.clone()), env_ctx.provisioning_token, path)
.with_instance(env_ctx.instance_url.to_string())
.with_site_id(site.to_owned())
.with_display_provisioning_operation_callback(Box::new(
common::ProvisioningOperationApprovalHandler::new(Some(platform_caller.clone())),
))
.build()
.expect("Unable to build ingress connection");
info!("Setting up stream configuration");
let mut message_context =
MessageContext::new(Some(stream_group.to_owned()), Some(stream.to_owned()));
message_context.set_compression(Some(Compression::Fastest));
info!("Sending messages");
let sending = Instant::now();
for batch_id in 0..batch_count {
let batch_id = format!("{batch_id:0>2}");
for message_id in 0..message_count {
let message_id = format!("{message_id:0>2}");
debug!("Publishing message {message_id}");
client
.enqueue_message(
&message_context,
Some(batch_id.clone()),
Some(message_id),
vec![b'a'; 1000],
)
.expect("Unable to send message");
}
info!("Completing batch {batch_id}");
client
.enqueue_batch_completion(&message_context, batch_id)
.expect("Unable to complete batch");
}
let buffered = Instant::now();
loop {
let pending = client
.pending_messages_count()
.expect("Unable to obtain number of pending messages");
if pending == 0 {
break;
}
warn!("Waiting for {} more messages to be sent.", pending);
std::thread::sleep(std::time::Duration::from_millis(500));
}
let sent = Instant::now();
let storage_sas_uri = platform_caller
.get_workspace_storage_sas_uri()
.expect("Unable to get storage SAS URI");
let account_name = storage_sas_uri.host().unwrap().split('.').next().unwrap();
let container_name = storage_sas_uri.path().trim_start_matches('/');
let sas_token = storage_sas_uri.query().unwrap();
let http_client = azure_core::new_http_client();
let storage_client = StorageAccountClient::new_sas_token(http_client, account_name, sas_token)
.unwrap()
.as_storage_client();
let container_client = storage_client.as_container_client(container_name);
container_client.list_blobs();
let runtime = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap();
runtime.block_on(async move {
let mut blobs;
loop {
let iv = container_client
.list_blobs()
.prefix(format!("{stream_group}/{stream}/{device_id}/"))
.max_results(NonZeroU32::new(1000).unwrap())
.execute()
.await
.unwrap();
let blob_count = iv.blobs.blobs.len();
blobs = Some(iv);
let expected_count = (batch_count * message_count) as usize;
if blob_count < expected_count {
error!(
"Unexpected number of blobs: Expected {}, got {}",
expected_count, blob_count
);
tokio::time::sleep(Duration::from_secs(5)).await;
continue;
} else {
break;
}
}
let blobs_ready = Instant::now();
if let Some(blobs) = blobs {
for cont in blobs.blobs.blobs.iter() {
log::trace!("\t{}\t{} bytes", cont.name, cont.properties.content_length);
}
}
info!("Startup: {:?}", sending - startup);
info!("Sending: {:?}", buffered - sending);
info!("Actual Sending: {:?}", sent - buffered);
info!("Time until data is ready: {:?}", blobs_ready - sent);
});
}