use dsh_sdk::protocol_adapters::token::api_client_token_fetcher::ApiClientTokenFetcher;
use dsh_sdk::protocol_adapters::token::data_access_token::{
DataAccessToken, RequestDataAccessToken,
};
use rumqttc::{AsyncClient, MqttOptions, TlsConfiguration, Transport};
const PLATFORM: dsh_sdk::Platform = dsh_sdk::Platform::Poc;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let tenant_name = std::env::var("TENANT").expect("TENANT is not set");
let client_id = std::env::var("CLIENT_ID").expect("CLIENT_ID is not set");
env_logger::builder()
.filter(Some("dsh_sdk"), log::LevelFilter::Trace)
.target(env_logger::Target::Stdout)
.init();
let request = RequestDataAccessToken::new(tenant_name, client_id);
let token = ApiClientAuthenticationService::get_data_access_token(request).await;
let mut mqttoptions = MqttOptions::new(token.client_id(), token.endpoint(), token.port_mqtt());
mqttoptions.set_credentials("", token.raw_token());
mqttoptions.set_transport(Transport::tls_with_config(TlsConfiguration::Native));
let (client, mut eventloop) = AsyncClient::new(mqttoptions, 10);
let topic = token
.claims()
.iter()
.next()
.expect("No avaialable topics")
.full_qualified_topic_name();
client.subscribe(topic, rumqttc::QoS::AtMostOnce).await?;
loop {
match eventloop.poll().await {
Ok(v) => {
println!("Received = {:?}", v);
}
Err(e) => {
println!("Error = {e:?}");
break;
}
}
}
Ok(())
}
struct ApiClientAuthenticationService;
impl ApiClientAuthenticationService {
async fn get_data_access_token(request: RequestDataAccessToken) -> DataAccessToken {
let api_key = std::env::var("API_KEY").expect("API_KEY is not set");
let token_fetcher = ApiClientTokenFetcher::new(api_key, PLATFORM);
token_fetcher
.fetch_data_access_token(request)
.await
.unwrap()
}
}