spotflow 0.8.1

Device SDK for Spotflow IoT Platform
Documentation
use std::{
    num::NonZeroU32,
    path::Path,
    time::{Duration, Instant},
};

use azure_storage::core::prelude::*;
use azure_storage_blobs::prelude::*;

use spotflow::{Compression, DeviceClientBuilder, MessageContext};

use log::{debug, error, info, warn};
use uuid::Uuid;

mod common;

fn main() {
    env_logger::Builder::from_env(
        env_logger::Env::default()
            .default_filter_or("sqlx=warn,ureq=warn,rumqtt=warn,ingress=info,spotflow=info,info"),
    )
    .init();

    let env_ctx = common::EnvironmentContext::try_load()
        .expect("Unable to load settings from environment variables.");

    let platform_caller = common::PlatformCaller::try_new(&env_ctx)
        .expect("This test needs to call the Platform automatically and it's unable to do so.");

    let device_id = Uuid::new_v4().to_string();

    let stream_group = "device-sdk";
    let stream = "rust";
    let site = "test-site";

    let batch_count = 1;
    let message_count = 10;

    let path = Path::new("./test.db");

    info!("Using device ID {}", device_id);

    info!("Initiating example of Message sending");

    info!("Creating Device Client");

    let startup = Instant::now();

    let client =
        DeviceClientBuilder::new(Some(device_id.clone()), env_ctx.provisioning_token, path)
            .with_instance(env_ctx.instance_url.to_string())
            .with_site_id(site.to_owned())
            .with_display_provisioning_operation_callback(Box::new(
                common::ProvisioningOperationApprovalHandler::new(Some(platform_caller.clone())),
            ))
            .build()
            .expect("Unable to build ingress connection");

    info!("Setting up stream configuration");

    let mut message_context =
        MessageContext::new(Some(stream_group.to_owned()), Some(stream.to_owned()));
    message_context.set_compression(Some(Compression::Fastest));

    info!("Sending messages");
    let sending = Instant::now();

    for batch_id in 0..batch_count {
        let batch_id = format!("{batch_id:0>2}");

        for message_id in 0..message_count {
            let message_id = format!("{message_id:0>2}");

            debug!("Publishing message {message_id}");
            client
                .enqueue_message(
                    &message_context,
                    Some(batch_id.clone()),
                    Some(message_id),
                    vec![b'a'; 1000],
                )
                .expect("Unable to send message");
        }

        info!("Completing batch {batch_id}");
        client
            .enqueue_batch_completion(&message_context, batch_id)
            .expect("Unable to complete batch");
    }

    let buffered = Instant::now();

    loop {
        let pending = client
            .pending_messages_count()
            .expect("Unable to obtain number of pending messages");
        if pending == 0 {
            break;
        }
        warn!("Waiting for {} more messages to be sent.", pending);
        std::thread::sleep(std::time::Duration::from_millis(500));
    }

    let sent = Instant::now();

    // Get credentials to the landing store
    let storage_sas_uri = platform_caller
        .get_workspace_storage_sas_uri()
        .expect("Unable to get storage SAS URI");
    let account_name = storage_sas_uri.host().unwrap().split('.').next().unwrap();
    let container_name = storage_sas_uri.path().trim_start_matches('/');
    let sas_token = storage_sas_uri.query().unwrap();

    // Check landing store
    let http_client = azure_core::new_http_client();
    let storage_client = StorageAccountClient::new_sas_token(http_client, account_name, sas_token)
        .unwrap()
        .as_storage_client();

    let container_client = storage_client.as_container_client(container_name);

    container_client.list_blobs();

    let runtime = tokio::runtime::Builder::new_current_thread()
        .enable_all()
        .build()
        .unwrap();

    runtime.block_on(async move {
        let mut blobs;
        loop {
            let iv = container_client
                .list_blobs()
                .prefix(format!("{stream_group}/{stream}/{device_id}/"))
                .max_results(NonZeroU32::new(1000).unwrap())
                .execute()
                .await
                .unwrap();

            let blob_count = iv.blobs.blobs.len();
            blobs = Some(iv);

            let expected_count = (batch_count * message_count) as usize;
            if blob_count < expected_count {
                error!(
                    "Unexpected number of blobs: Expected {}, got {}",
                    expected_count, blob_count
                );
                tokio::time::sleep(Duration::from_secs(5)).await;
                continue;
            } else {
                break;
            }
        }

        let blobs_ready = Instant::now();

        if let Some(blobs) = blobs {
            for cont in blobs.blobs.blobs.iter() {
                log::trace!("\t{}\t{} bytes", cont.name, cont.properties.content_length);
            }
        }

        info!("Startup: {:?}", sending - startup);
        info!("Sending: {:?}", buffered - sending);
        info!("Actual Sending: {:?}", sent - buffered);
        info!("Time until data is ready: {:?}", blobs_ready - sent);
    });
}