use std::env;
use std::time::Duration;
use dsh_sdk::protocol_adapters::http_protocol::{
Accept, ContentType, HttpClient, ResponseBody, Stream, Topic,
};
use dsh_sdk::protocol_adapters::token::api_client_token_fetcher::ApiClientTokenFetcher;
use dsh_sdk::protocol_adapters::token::data_access_token::{
DataAccessToken, RequestDataAccessToken,
};
use dsh_sdk::Platform;
const PLATFORM: Platform = Platform::NpLz;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
env_logger::init();
let stream_str = env::var("STREAM").expect("STREAM environment variable is required");
let topic_str = env::var("TOPIC").unwrap_or_else(|_| "".into());
let stream = Stream::try_from(stream_str)?;
let topic = Topic::try_from(topic_str)?;
let base_url = PLATFORM.http_protocol_base_url();
let mqtt_token_str = if let Ok(token) = env::var("MQTT_TOKEN") {
token
} else {
let tenant_name = env::var("TENANT")?;
let client_id = env::var("CLIENT_ID")?;
let request = RequestDataAccessToken::new(tenant_name, client_id);
let data_access_token =
ApiClientAuthenticationService::get_data_access_token(request).await;
data_access_token.raw_token().to_string()
};
let client = HttpClient::builder(&base_url)
.timeout(Duration::from_secs(10))
.build()?;
println!(
"\n--- STEP 1: Initial GET ---
[CALL] .get_retained() with
input: stream='{}', topic='{}', accept=TextPlain\n",
stream.as_ref(),
topic.as_ref()
);
match client
.get_retained(&stream, &topic, Accept::TextPlain, &mqtt_token_str)
.await
{
Ok(body) => {
print!("Existing retained message: ");
match body {
ResponseBody::Text(t) => println!("{t}"),
ResponseBody::Bytes(b) => println!("(bytes) {:?}", b),
}
}
Err(e) => {
println!("No existing retained message (or error): {e}");
}
}
println!("\n--- STEP 2: POST retained message ---
[CALL] .post_retained_body() with
input: stream='{}', topic='{}', content_type=TextPlain, payload='{{\"test\": true, \"value\": 123}}'\n",
stream.as_ref(),
topic.as_ref()
);
let json_payload = r#"{"test": true, "value": 123}"#;
client
.post_retained_body(
&stream,
&topic,
ContentType::TextPlain,
&mqtt_token_str,
json_payload.as_bytes().to_vec(),
None,
None,
)
.await?;
println!("POST retained: OK");
println!(
"\n--- STEP 3: GET retained after using POST
[CALL] .get_retained() with
input: stream='{}', topic='{}', accept=TextPlain\n",
stream.as_ref(),
topic.as_ref()
);
let fetched = client
.get_retained(&stream, &topic, Accept::TextPlain, &mqtt_token_str)
.await?;
match fetched {
ResponseBody::Text(t) => println!("Fetched retained: {t}"),
ResponseBody::Bytes(b) => println!("Fetched retained (bytes): {:?}", b),
}
println!(
"\n--- STEP 4: DELETE retained message ---
[CALL] .delete_retained() with
input: stream='{}', topic='{}'\n",
stream.as_ref(),
topic.as_ref()
);
client
.delete_retained(&stream, &topic, &mqtt_token_str)
.await?;
println!("DELETE retained: OK");
println!("\n--- STEP 5: Posting multiple sensor values for multi-get test ---");
client
.post_retained_body(
&stream,
&Topic::try_from("sensors/temp/room1")?,
ContentType::TextPlain,
&mqtt_token_str,
b"21.5".to_vec(),
None,
None,
)
.await?;
client
.post_retained_body(
&stream,
&Topic::try_from("sensors/temp/room2")?,
ContentType::TextPlain,
&mqtt_token_str,
b"22.1".to_vec(),
None,
None,
)
.await?;
client
.post_retained_body(
&stream,
&Topic::try_from("sensors/humidity/room1")?,
ContentType::TextPlain,
&mqtt_token_str,
b"46".to_vec(),
None,
None,
)
.await?;
client
.post_retained_body(
&stream,
&Topic::try_from("sensors/meta/info")?,
ContentType::TextPlain,
&mqtt_token_str,
b"metadata".to_vec(),
None,
None,
)
.await?;
println!(
"✔ Seeded all test topics:
sensors/temp/room1
sensors/temp/room2
sensors/humidity/room1
sensors/meta/info"
);
println!("\n=== STEP 6: MULTI-GET WILDCARD TESTS ===\n");
let accept = Accept::TextPlain;
println!(
"-- Exact match:
[CALL] .multi_get() with
input: stream='{}', topics=['sensors/temp/room1'], accept=TextPlain\n",
stream.as_ref()
);
let exact = client
.multi_get(
&stream,
&[Topic::try_from("sensors/temp/room1")?],
accept,
&mqtt_token_str,
)
.await?;
for item in exact {
println!("EXACT: {} => {}", item.topic, item.payload);
}
println!("\n-- Multiple filters:
[CALL] .multi_get() with
input: stream='{}', topics=['sensors/temp/room1', 'sensors/temp/room2'], accept=TextPlain\n",
stream.as_ref()
);
let multi_filters = client
.multi_get(
&stream,
&[
Topic::try_from("sensors/temp/room1")?,
Topic::try_from("sensors/temp/room2")?,
],
accept,
&mqtt_token_str,
)
.await?;
for item in multi_filters {
println!(" MULTI: {} => {}", item.topic, item.payload);
}
println!(
"\n-- + (single-level wildcard):
[CALL] .multi_get() with
input: stream='{}', topics=['sensors/temp/+'], accept=TextPlain\n",
stream.as_ref()
);
let plus = client
.multi_get(
&stream,
&[Topic::try_from("sensors/temp/+")?],
accept,
&mqtt_token_str,
)
.await?;
for item in plus {
println!(" PLUS: {} => {}", item.topic, item.payload);
}
println!(
"\n-- # (multi-level wildcard):
[CALL] .multi_get() with
input: stream='{}', topics=['sensors/#'], accept=TextPlain\n",
stream.as_ref()
);
let hash = client
.multi_get(
&stream,
&[Topic::try_from("sensors/#")?],
accept,
&mqtt_token_str,
)
.await?;
for item in hash {
println!(" HASH: {} => {}", item.topic, item.payload);
}
println!("\n-- Multiple filters:
[CALL] .multi_get() with
input: stream='{}', topics=['sensors/temp/+', 'sensors/humidity/#'], accept=TextPlain\n",
stream.as_ref()
);
let multi_filters = client
.multi_get(
&stream,
&[
Topic::try_from("sensors/temp/+")?,
Topic::try_from("sensors/humidity/#")?,
],
accept,
&mqtt_token_str,
)
.await?;
for item in multi_filters {
println!(" MULTI: {} => {}", item.topic, item.payload);
}
println!(
"\n-- Empty result test:
[CALL] .multi_get() with
input: stream='{}', topics=['sensors/unknown/#'], accept=TextPlain\n",
stream.as_ref()
);
let empty = client
.multi_get(
&stream,
&[Topic::try_from("sensors/unknown/#")?],
accept,
&mqtt_token_str,
)
.await?;
println!(" EMPTY RESULT OK — {} items returned", empty.len());
println!("\n=== END OF TESTS ===\n");
Ok(())
}
struct ApiClientAuthenticationService;
impl ApiClientAuthenticationService {
async fn get_data_access_token(request: RequestDataAccessToken) -> DataAccessToken {
let api_key = std::env::var("API_KEY").expect("API_KEY is not set");
let token_fetcher = ApiClientTokenFetcher::new(api_key, PLATFORM);
token_fetcher
.fetch_data_access_token(request)
.await
.unwrap()
}
}