use bytes::Bytes;
use reqwest::{Client, ClientBuilder, StatusCode, Url, header};
use serde::{Deserialize, Serialize};
use std::time::Duration;
use thiserror::Error;
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct Stream(String);
impl Stream {
pub fn new<S: Into<String>>(s: S) -> Result<Self, HttpError> {
let s: String = s.into();
if s.trim().is_empty() {
return Err(HttpError::InvalidInput("stream must not be empty".into()));
}
Ok(Self(s))
}
}
impl TryFrom<&str> for Stream {
type Error = HttpError;
fn try_from(s: &str) -> Result<Self, Self::Error> {
Stream::new(s)
}
}
impl TryFrom<String> for Stream {
type Error = HttpError;
fn try_from(s: String) -> Result<Self, Self::Error> {
Stream::new(s)
}
}
impl AsRef<str> for Stream {
fn as_ref(&self) -> &str {
&self.0
}
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct Topic(String);
impl Topic {
pub fn new(string: impl Into<String>) -> Self {
let s = string.into();
if s.trim().is_empty() {
Self("#".to_string()) } else {
Self(s)
}
}
}
impl From<&str> for Topic {
fn from(s: &str) -> Self {
Self::new(s)
}
}
impl From<String> for Topic {
fn from(s: String) -> Self {
Self::new(s)
}
}
impl AsRef<str> for Topic {
fn as_ref(&self) -> &str {
&self.0
}
}
#[derive(Debug)]
pub enum ResponseBody {
Text(String),
Bytes(Bytes),
}
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum Accept {
TextPlain,
ApplicationJson,
ApplicationOctetStream,
Base64,
}
impl Accept {
pub fn header_value(self) -> &'static str {
match self {
Accept::TextPlain => "text/plain",
Accept::ApplicationJson => "application/json",
Accept::ApplicationOctetStream => "application/octet-stream",
Accept::Base64 => "base64",
}
}
}
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum ContentType {
TextPlain,
ApplicationOctetStream,
Base64,
}
impl ContentType {
pub fn header_value(self) -> &'static str {
match self {
ContentType::TextPlain => "text/plain",
ContentType::ApplicationOctetStream => "application/octet-stream",
ContentType::Base64 => "base64",
}
}
}
#[derive(Debug, Error)]
pub enum HttpError {
#[error("invalid input: {0}")]
InvalidInput(String),
#[error(transparent)]
Request(#[from] reqwest::Error),
#[error("HTTP status {code}: {body}")]
Status { code: StatusCode, body: String },
#[error(transparent)]
Json(#[from] serde_json::Error),
}
#[derive(Serialize)]
struct MultiGetRequest<'a> {
#[serde(rename = "content-type")]
content_type: &'a str,
#[serde(rename = "topic-filters")]
topic_filters: Vec<String>,
}
#[derive(Debug, Deserialize)]
pub struct MultiGetItem {
pub topic: String,
pub payload: String,
}
#[derive(Clone)]
pub struct HttpClient {
client: Client,
base_url: String,
}
#[derive(Debug)]
pub struct HttpClientBuilder {
base_url: String,
timeout: Duration,
}
impl HttpClientBuilder {
pub fn new(base_url: &str) -> Self {
Self {
base_url: base_url.to_string(),
timeout: Duration::from_secs(10),
}
}
pub fn timeout(mut self, duration: Duration) -> Self {
self.timeout = duration;
self
}
pub fn build(self) -> Result<HttpClient, HttpError> {
let mut parsed = Url::parse(&self.base_url)
.map_err(|e| HttpError::InvalidInput(format!("invalid base_url: {e}")))?;
parsed.set_path(""); let base_url = parsed.as_str().trim_end_matches('/').to_string();
let client = ClientBuilder::new()
.https_only(true)
.timeout(self.timeout)
.build()?;
Ok(HttpClient { client, base_url })
}
}
impl HttpClient {
pub fn builder(base_url: &str) -> HttpClientBuilder {
HttpClientBuilder::new(base_url)
}
pub fn with_client(base_url: &str, client: reqwest::Client) -> Result<Self, HttpError> {
let mut parsed = Url::parse(base_url)
.map_err(|e| HttpError::InvalidInput(format!("invalid base_url: {e}")))?;
parsed.set_path("");
let base_url = parsed.as_str().trim_end_matches('/').to_string();
Ok(Self { client, base_url })
}
pub async fn get_retained(
&self,
stream: &Stream,
topic: &Topic,
accept: Accept,
token: &str,
) -> Result<ResponseBody, HttpError> {
let url = format!(
"{}/data/v0/single/tt/{}/{}",
self.base_url,
stream.as_ref(),
topic.as_ref()
);
let resp = self
.client
.get(url)
.header(header::ACCEPT, accept.header_value())
.bearer_auth(token)
.send()
.await?;
let status = resp.status();
if !status.is_success() {
return Err(HttpError::Status {
code: status,
body: resp.text().await.unwrap_or("<failed to read body>".into()),
});
}
let body = match accept {
Accept::ApplicationOctetStream => ResponseBody::Bytes(resp.bytes().await?),
_ => ResponseBody::Text(resp.text().await?),
};
Ok(body)
}
pub async fn post_retained_body(
&self,
stream: &Stream,
topic: &Topic,
content_type: ContentType,
token: &str,
payload: impl Into<Bytes>,
qos: Option<&str>,
retained: Option<&str>,
) -> Result<(), HttpError> {
let payload: Bytes = payload.into();
if payload.len() > 128 * 1024 {
return Err(HttpError::InvalidInput(format!(
"payload is {} bytes, exceeding the 128 KB limit",
payload.len()
)));
}
let qos_value = qos.unwrap_or("1");
let retained_value = retained.unwrap_or("true");
if qos_value != "1" && qos_value != "0" {
return Err(HttpError::InvalidInput(format!(
"Invalid QoS value: {}. Only '1' or '0' are allowed.",
qos_value
)));
}
let url = format!(
"{}/data/v0/single/tt/{}/{}?qos={}&retained={}",
self.base_url,
stream.as_ref(),
topic.as_ref(),
qos_value,
retained_value
);
let resp = self
.client
.post(url)
.header(header::CONTENT_TYPE, content_type.header_value())
.bearer_auth(token)
.body(payload)
.send()
.await?;
if !resp.status().is_success() {
return Err(HttpError::Status {
code: resp.status(),
body: resp.text().await.unwrap_or("<failed to read body>".into()),
});
}
Ok(())
}
pub async fn post_retained_file(
&self,
stream: &Stream,
topic: &Topic,
content_type: ContentType,
token: &str,
path: impl AsRef<std::path::Path>,
) -> Result<(), HttpError> {
let payload = std::fs::read(&path).map_err(|e| {
HttpError::InvalidInput(format!(
"failed to read file `{}`: {e}",
path.as_ref().display()
))
})?;
self.post_retained_body(stream, topic, content_type, token, payload, None, None)
.await
}
pub async fn delete_retained(
&self,
stream: &Stream,
topic: &Topic,
token: &str,
) -> Result<(), HttpError> {
let url = format!(
"{}/data/v0/single/tt/{}/{}",
self.base_url,
stream.as_ref(),
topic.as_ref()
);
let resp = self.client.delete(url).bearer_auth(token).send().await?;
if !resp.status().is_success() {
return Err(HttpError::Status {
code: resp.status(),
body: resp.text().await.unwrap_or("<failed to read body>".into()),
});
}
Ok(())
}
pub async fn multi_get(
&self,
stream: &Stream,
topics: &[Topic],
accept: Accept,
token: &str,
) -> Result<Vec<MultiGetItem>, HttpError> {
let inner_ct = match accept {
Accept::TextPlain => "text/plain",
Accept::Base64 => "base64",
_ => {
return Err(HttpError::InvalidInput(
"multi-get only supports Accept::TextPlain or Accept::Base64".into(),
));
}
};
let filters: Vec<String> = topics
.iter()
.map(|t| format!("/tt/{}/{}", stream.as_ref(), t.as_ref()))
.collect();
let req = MultiGetRequest {
content_type: inner_ct,
topic_filters: filters,
};
let url = format!("{}/data/v0/multi", self.base_url);
let resp = self
.client
.post(url)
.json(&req)
.bearer_auth(token)
.send()
.await?;
let status = resp.status();
if !status.is_success() {
return Err(HttpError::Status {
code: status,
body: resp.text().await.unwrap_or("<failed to read body>".into()),
});
}
let text = resp.text().await?;
let items: Vec<MultiGetItem> = serde_json::from_str(&text)?;
Ok(items)
}
}
#[cfg(all(test, feature = "http-protocol-adapter"))]
mod tests {
use super::*;
use mockito::{Matcher, Server};
#[tokio::test]
async fn multi_get_plus_returns_multiple_items() {
let mut server = Server::new_async().await;
let expected_subset = serde_json::json!({
"topic-filters": ["/tt/greenbox-test/sensors/temp/+"]
});
let mock = server
.mock("POST", "/data/v0/multi")
.match_header("authorization", "Bearer TOKEN")
.match_header("content-type", "application/json")
.match_body(Matcher::PartialJson(expected_subset))
.with_status(200)
.with_header("content-type", "application/json")
.with_body(
r#"
[
{"topic": "/tt/greenbox-test/sensors/temp/room1", "payload": "21.5"},
{"topic": "/tt/greenbox-test/sensors/temp/room2", "payload": "22.1"}
]
"#,
)
.create_async()
.await;
let client =
HttpClient::with_client(server.url().as_str(), reqwest::Client::new()).unwrap();
let stream = Stream::try_from("greenbox-test").unwrap();
let filter = Topic::try_from("sensors/temp/+").unwrap();
let items = client
.multi_get(&stream, &[filter], Accept::TextPlain, "TOKEN")
.await
.unwrap();
println!("--- DEBUG: items returned = {} ---", items.len());
for (idx, it) in items.iter().enumerate() {
println!("[{}] {} => {}", idx, it.topic, it.payload);
}
println!("----------------------------------");
mock.assert_async().await;
assert_eq!(items.len(), 2);
assert_eq!(items[0].topic, "/tt/greenbox-test/sensors/temp/room1");
assert_eq!(items[0].payload, "21.5");
assert_eq!(items[1].topic, "/tt/greenbox-test/sensors/temp/room2");
assert_eq!(items[1].payload, "22.1");
}
#[test]
fn builder_rejects_invalid_base_url() {
assert!(matches!(
HttpClientBuilder::new("not-a-valid-url").build(),
Err(HttpError::InvalidInput(_))
));
}
#[test]
fn builder_accepts_valid_base_url() {
assert!(matches!(
HttpClientBuilder::new("https://protocol-adapter.example.com").build(),
Ok(_)
));
}
#[test]
fn with_client_rejects_invalid_url() {
assert!(matches!(
HttpClient::with_client("not-a-url", reqwest::Client::new()),
Err(HttpError::InvalidInput(_))
));
}
#[test]
fn with_client_accepts_valid_url() {
assert!(matches!(
HttpClient::with_client("https://example.com", reqwest::Client::new()),
Ok(_)
));
}
#[test]
fn stream_rejects_empty() {
assert!(matches!(Stream::new(""), Err(HttpError::InvalidInput(_))));
}
#[test]
fn stream_rejects_whitespace_only() {
assert!(matches!(
Stream::new(" "),
Err(HttpError::InvalidInput(_))
));
}
#[test]
fn stream_accepts_valid_name() {
let s = Stream::new("my-stream").unwrap();
assert_eq!(s.as_ref(), "my-stream");
}
#[test]
fn stream_try_from_str() {
assert!(Stream::try_from("ok").is_ok());
assert!(Stream::try_from("").is_err());
}
#[test]
fn stream_try_from_string() {
assert!(Stream::try_from("ok".to_string()).is_ok());
assert!(Stream::try_from(String::new()).is_err());
}
#[test]
fn topic_defaults_to_wildcard_when_empty() {
assert_eq!(Topic::new("").as_ref(), "#");
}
#[test]
fn topic_defaults_to_wildcard_when_whitespace() {
assert_eq!(Topic::new(" ").as_ref(), "#");
}
#[test]
fn topic_preserves_value() {
assert_eq!(Topic::new("sensors/temp").as_ref(), "sensors/temp");
}
#[test]
fn topic_from_str() {
let t: Topic = "foo".into();
assert_eq!(t.as_ref(), "foo");
}
#[test]
fn topic_from_string() {
let t: Topic = String::from("bar").into();
assert_eq!(t.as_ref(), "bar");
}
#[test]
fn accept_header_values() {
assert_eq!(Accept::TextPlain.header_value(), "text/plain");
assert_eq!(Accept::ApplicationJson.header_value(), "application/json");
assert_eq!(
Accept::ApplicationOctetStream.header_value(),
"application/octet-stream"
);
assert_eq!(Accept::Base64.header_value(), "base64");
}
#[test]
fn content_type_header_values() {
assert_eq!(ContentType::TextPlain.header_value(), "text/plain");
assert_eq!(
ContentType::ApplicationOctetStream.header_value(),
"application/octet-stream"
);
assert_eq!(ContentType::Base64.header_value(), "base64");
}
#[tokio::test]
async fn post_retained_body_rejects_invalid_qos() {
let client = HttpClient::with_client("http://localhost", reqwest::Client::new()).unwrap();
let stream = Stream::new("s").unwrap();
let topic = Topic::new("t");
for bad in &["2", "1 ", "01", "abc", ""] {
let result = client
.post_retained_body(
&stream,
&topic,
ContentType::TextPlain,
"tok",
b"hello".as_ref(),
Some(bad),
None,
)
.await;
assert!(
matches!(result, Err(HttpError::InvalidInput(_))),
"expected InvalidInput for qos={bad:?}, got {result:?}"
);
}
}
#[tokio::test]
async fn post_retained_body_accepts_valid_qos() {
let mut server = Server::new_async().await;
let stream = Stream::new("s").unwrap();
let topic = Topic::new("t");
for qos in &["0", "1"] {
let mock = server
.mock("POST", "/data/v0/single/tt/s/t")
.match_query(Matcher::AllOf(vec![
Matcher::UrlEncoded("qos".into(), qos.to_string()),
Matcher::UrlEncoded("retained".into(), "true".into()),
]))
.with_status(200)
.create_async()
.await;
let client =
HttpClient::with_client(server.url().as_str(), reqwest::Client::new()).unwrap();
client
.post_retained_body(
&stream,
&topic,
ContentType::TextPlain,
"tok",
b"hello".as_ref(),
Some(qos),
None,
)
.await
.unwrap_or_else(|e| panic!("qos={qos} should be accepted, got {e:?}"));
mock.assert_async().await;
}
}
}