use sift_stream::{
ChannelConfig, ChannelDataType, ChannelValue, Credentials, Flow, FlowConfig,
IngestionConfigForm, RecoveryStrategy, RunForm, SiftStreamBuilder, TimeValue,
};
use std::{
env,
error::Error,
time::{Duration, SystemTime, UNIX_EPOCH},
};
const FLOW_NAME: &str = "vehicle_metrics";
const SEND_INTERVAL: Duration = Duration::from_millis(500);
const INGEST_DURATION: Duration = Duration::from_secs(10 * 60);
fn make_unique_suffix() -> String {
format!(
"{}_{:x}",
SystemTime::now()
.duration_since(UNIX_EPOCH)
.expect("system time before UNIX EPOCH")
.as_secs(),
rand::random::<u32>()
)
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
tracing_subscriber::fmt().init();
tracing::info!("Starting streaming session.");
let suffix = make_unique_suffix();
let asset_name = format!("robot_vehicle_{suffix}");
let run_name = format!("{asset_name}_run");
dotenvy::dotenv()?;
let credentials = Credentials::Config {
apikey: env::var("SIFT_API_KEY").unwrap(),
uri: env::var("SIFT_GRPC_URL").unwrap(),
};
let flow_config = FlowConfig {
name: FLOW_NAME.into(),
channels: vec![
ChannelConfig {
name: "velocity".into(),
unit: "m/s".into(),
data_type: ChannelDataType::Double.into(),
description: "The velocity Channel streams real-time speed measurements of the vehicle in meters per second (m/s) as double-precision numeric values.".into(),
..Default::default()
},
ChannelConfig {
name: "temperature".into(),
unit: "C".into(),
data_type: ChannelDataType::Double.into(),
description: "The temperature Channel streams real-time temperature readings of the vehicle in degrees Celsius (°C) as double-precision numeric values.".into(),
..Default::default()
},
]
};
let ingestion_client_key = format!("{asset_name}_v1");
let ingestion_config = IngestionConfigForm {
asset_name: asset_name.clone(),
client_key: ingestion_client_key,
flows: vec![flow_config],
};
let run = RunForm {
name: run_name.clone(),
client_key: run_name,
..Default::default()
};
let mut sift_stream = SiftStreamBuilder::new(credentials)
.ingestion_config(ingestion_config)
.recovery_strategy(RecoveryStrategy::default())
.attach_run(run)
.build()
.await?;
let start = std::time::Instant::now();
while start.elapsed() < INGEST_DURATION {
let velocity = rand::random::<f64>() * 10.0;
let temperature = rand::random::<f64>() * 20.0 + 20.0;
let flow = Flow::new(
FLOW_NAME,
TimeValue::now(),
&[
ChannelValue::new("velocity", velocity),
ChannelValue::new("temperature", temperature),
],
);
sift_stream.send(flow).await?;
tokio::time::sleep(SEND_INTERVAL).await;
}
sift_stream.finish().await?;
tracing::info!("Streaming session complete.");
Ok(())
}