spotflow 0.8.1

Device SDK for Spotflow IoT Platform
Documentation
use spotflow::{DeviceClientBuilder, MessageContext};
use std::{
    fs::{self, File},
    path::Path,
};

use log::{debug, info, warn};
use uuid::Uuid;

mod common;

fn main() {
    env_logger::Builder::from_env(
        env_logger::Env::default()
            .default_filter_or("sqlx=warn,ureq=warn,rumqtt=warn,spotflow=debug,info"),
    )
    .init();

    let env_ctx = common::EnvironmentContext::try_load()
        .expect("Unable to load settings from environment variables.");

    let platform_caller = common::PlatformCaller::try_new(&env_ctx)
        .map_err(|e| {
            warn!("Unable to call the Platform automatically, continuing manually. Error: {e}")
        })
        .ok();

    let device_id = Uuid::new_v4().to_string();
    let stream_group = "device-sdk";
    let stream = "rust";
    let site = "test-site";

    let batch_count = 10u32;
    let message_count = 100u32;

    let path = Path::new("./test.db");

    info!("Using device ID {}", device_id);

    info!("Initiating connection tests");

    if path.exists() {
        info!("Removing the old local database file");
        fs::remove_file(path).expect("Unable to delete the old local database file");
    }
    info!("Creating a new local database file");
    File::create(path).expect("Unable to create a new local database file");

    {
        info!("Creating ingress client");

        let client = DeviceClientBuilder::new(
            Some(device_id.clone()),
            env_ctx.provisioning_token.clone(),
            path,
        )
        .with_instance(env_ctx.instance_url.to_string())
        .with_site_id(site.to_owned())
        .with_display_provisioning_operation_callback(Box::new(
            common::ProvisioningOperationApprovalHandler::new(platform_caller.clone()),
        ))
        .build()
        .expect("Unable to build ingress connection");

        let message_context =
            MessageContext::new(Some(stream_group.to_owned()), Some(stream.to_owned()));

        for batch_id in 0..batch_count {
            let batch_id = format!("{batch_id:0>2}");

            for message_id in 0..message_count {
                let message_id = format!("{message_id:0>2}");

                debug!("Publishing message {message_id}");
                client
                    .enqueue_message(
                        &message_context,
                        Some(batch_id.clone()),
                        Some(message_id),
                        vec![b'a'; 1000],
                    )
                    .expect("Unable to send message");
            }

            info!("Completing batch {batch_id}");
            client
                .enqueue_batch_completion(&message_context, batch_id)
                .expect("Unable to complete batch");
        }

        info!("Dropping original ingress");
    }

    {
        info!("Starting new ingress client with configuration");

        let client = DeviceClientBuilder::new(
            Some(device_id.clone()),
            env_ctx.provisioning_token.clone(),
            path,
        )
        .with_instance(env_ctx.instance_url.to_string())
        .with_display_provisioning_operation_callback(Box::new(
            common::ProvisioningOperationApprovalHandler::new(platform_caller.clone()),
        ))
        .build()
        .expect("Unable to build ingress connection");

        loop {
            let pending = client
                .pending_messages_count()
                .expect("Unable to obtain number of pending messages");
            if pending < 200 {
                break;
            }
            warn!("Waiting for {} more messages to be sent.", pending);
            std::thread::sleep(std::time::Duration::from_millis(500));
        }

        info!("Dropping ingress");
    }

    {
        info!("Starting new ingress client");

        let client =
            DeviceClientBuilder::new(Some(device_id.clone()), env_ctx.provisioning_token, path)
                .with_instance(env_ctx.instance_url.to_string())
                .with_display_provisioning_operation_callback(Box::new(
                    common::ProvisioningOperationApprovalHandler::new(platform_caller),
                ))
                .build()
                .expect("Unable to build ingress connection");

        loop {
            let pending = client
                .pending_messages_count()
                .expect("Unable to obtain number of pending messages");
            if pending == 0 {
                break;
            }
            warn!("Waiting for {} more messages to be sent.", pending);
            std::thread::sleep(std::time::Duration::from_millis(500));
        }

        info!("Dropping ingress");
    }
}