use crate::client::MqttClient;
use crate::error::MqttError;
use crate::time::Duration;
use crate::types::Message;
use crate::QoS;
use std::sync::Arc;
use tokio::sync::{mpsc, Mutex};
#[derive(Debug, Clone)]
pub struct TurmoilClientConfig {
pub client_id: String,
pub clean_start: bool,
pub keep_alive: Duration,
}
impl Default for TurmoilClientConfig {
fn default() -> Self {
Self {
client_id: "turmoil-client".to_string(),
clean_start: true,
keep_alive: Duration::from_secs(60),
}
}
}
impl TurmoilClientConfig {
#[must_use]
pub fn new(client_id: &str) -> Self {
Self {
client_id: client_id.to_string(),
clean_start: true,
keep_alive: Duration::from_secs(60),
}
}
#[must_use]
pub fn with_clean_start(mut self, clean_start: bool) -> Self {
self.clean_start = clean_start;
self
}
#[must_use]
pub fn with_keep_alive(mut self, keep_alive: Duration) -> Self {
self.keep_alive = keep_alive;
self
}
#[must_use]
pub fn with_connect_timeout(self, _timeout: Duration) -> Self {
self
}
}
pub struct TurmoilClient {
inner: MqttClient,
received: Arc<Mutex<mpsc::UnboundedReceiver<Message>>>,
sender: mpsc::UnboundedSender<Message>,
}
impl TurmoilClient {
#[must_use]
pub fn new(config: &TurmoilClientConfig) -> Self {
let inner = MqttClient::new(&config.client_id);
let (sender, receiver) = mpsc::unbounded_channel();
let receiver_arc = Arc::new(Mutex::new(receiver));
Self {
inner,
received: receiver_arc,
sender,
}
}
#[must_use]
pub fn with_config(config: &TurmoilClientConfig) -> Self {
Self::new(config)
}
#[must_use]
pub fn with_id(client_id: &str) -> Self {
let config = TurmoilClientConfig {
client_id: client_id.to_string(),
..Default::default()
};
Self::new(&config)
}
pub async fn connect(&self, address: &str) -> Result<(), MqttError> {
self.inner.connect(address).await
}
pub async fn wait_for_connection(&self, timeout: Duration) -> Result<(), MqttError> {
tokio::time::sleep(timeout).await;
Ok(())
}
pub async fn subscribe(&self, topic: &str, _qos: QoS) -> Result<(), MqttError> {
let sender = self.sender.clone();
self.inner
.subscribe(topic, move |msg| {
let _ = sender.send(msg);
})
.await
.map(|_| ())
}
pub async fn publish(&self, topic: &str, payload: &[u8], qos: QoS) -> Result<(), MqttError> {
self.inner
.publish_qos(topic, payload, qos)
.await
.map(|_| ())
}
pub async fn publish_retained(
&self,
topic: &str,
payload: &[u8],
qos: QoS,
) -> Result<(), MqttError> {
self.publish(topic, payload, qos).await
}
pub async fn receive_timeout(&self, timeout: Duration) -> Result<Message, MqttError> {
let mut receiver = self.received.lock().await;
tokio::time::timeout(timeout, receiver.recv())
.await
.map_err(|_| MqttError::Timeout)?
.ok_or(MqttError::NotConnected)
}
pub async fn disconnect(&self) -> Result<(), MqttError> {
self.inner.disconnect().await
}
pub async fn is_connected(&self) -> bool {
self.inner.is_connected().await
}
pub async fn connect_tls(&self, address: &str) -> Result<(), MqttError> {
self.connect(address).await
}
}