use std::collections::HashMap;
use std::{path::Path, str::FromStr};
use crate::cloud::dps::{ProvisioningToken, RegistrationToken};
use anyhow::{Context, Result};
use http::Uri;
use sqlite::SdkConfiguration;
use sqlite_channel::{Receiver, Sender};
use tokio::{
select,
sync::{mpsc, watch},
};
use tokio_util::sync::CancellationToken;
use twins::Twin;
use self::sqlite::SqliteStore;
pub mod c2d;
pub mod sqlite;
pub mod sqlite_channel;
pub mod twins;
pub struct Store {
pub store: SqliteStore,
pub d2c_producer: Producer,
pub d2c_consumer: Consumer,
pub d2c_acknowledger: Acknowledger,
pub configuration_store: ConfigurationStore,
pub c2d_producer: Sender<CloudToDeviceMessage>,
pub c2d_consumer: Receiver<CloudToDeviceMessage>,
pub twins_store: TwinsStore,
}
#[derive(Debug)]
pub struct Producer {
inner: SqliteStore,
sender: watch::Sender<i32>,
}
#[derive(Debug)]
pub struct Consumer {
receiver: mpsc::Receiver<DeviceMessage>,
}
#[derive(Debug)]
pub struct Acknowledger {
inner: SqliteStore,
}
#[derive(Debug, Clone)]
pub struct ConfigurationStore {
inner: SqliteStore,
site_id: Option<String>,
}
impl Producer {
pub async fn add(&self, mut msg: DeviceMessage) -> Result<()> {
let id = self
.inner
.store_message(&msg)
.await
.context("Unable to store device to cloud message")?;
msg.id = Some(id);
self.sender
.send(id)
.context("Unable to send notification of new message")?;
Ok(())
}
pub async fn count(&self) -> Result<usize> {
self.inner.message_count().await
}
}
impl Consumer {
pub async fn get_message(&mut self) -> Option<DeviceMessage> {
self.receiver.recv().await
}
}
impl Acknowledger {
pub async fn remove_oldest(&self) -> Result<()> {
self.inner.remove_oldest_message().await
}
}
#[allow(dead_code)] impl ConfigurationStore {
pub async fn load_instance_url(&self) -> Result<Uri> {
let url = self.inner.load_instance_url().await?;
Uri::from_str(&url).context("Unable to parse the Platform instance URL from configuration.")
}
pub async fn load_provisioning_token(&self) -> Result<ProvisioningToken> {
self.inner.load_provisioning_token().await
}
pub async fn load_registration_token(&self) -> Result<RegistrationToken> {
self.inner.load_registration_token().await
}
pub async fn save_provisioning_token(&self, token: &ProvisioningToken) -> Result<()> {
self.inner.save_provisioning_token(token).await
}
pub async fn save_registration_token(&self, token: &RegistrationToken) -> Result<()> {
self.inner.save_registration_token(token).await
}
pub async fn load_requested_device_id(&self) -> Option<String> {
self.inner.load_requested_device_id().await.ok().flatten()
}
pub async fn load_workspace_id(&self) -> Result<String> {
self.inner.load_workspace_id().await
}
pub async fn save_workspace_id(&self, workspace_id: &str) -> Result<()> {
self.inner.save_workspace_id(workspace_id).await
}
pub async fn load_device_id(&self) -> Result<String> {
self.inner.load_device_id().await
}
pub async fn save_device_id(&self, device_id: &str) -> Result<()> {
self.inner.save_device_id(device_id).await
}
pub fn site_id(&self) -> Option<&str> {
self.site_id.as_deref()
}
}
#[derive(Debug, Clone)]
pub struct TwinsStore {
inner: SqliteStore,
}
impl TwinsStore {
pub async fn load_desired_properties(&self) -> Result<Option<Twin>> {
self.inner.load_desired_properties().await
}
pub async fn load_reported_properties(&self) -> Result<Option<Twin>> {
self.inner.load_reported_properties().await
}
pub async fn save_desired_properties(&self, twin: &Twin) -> Result<()> {
self.inner.save_desired_properties(twin).await
}
pub async fn save_reported_properties(&self, twin: &Twin) -> Result<()> {
self.inner.save_reported_properties(twin).await
}
}
pub async fn create(
store_path: &Path,
config: &SdkConfiguration,
cancellation_token: CancellationToken,
) -> Result<Store> {
let sqlite = SqliteStore::init(store_path, config).await?;
start(sqlite, config, cancellation_token).await
}
async fn start(
sqlite: SqliteStore,
config: &SdkConfiguration,
cancellation_token: CancellationToken,
) -> Result<Store> {
let (message_sender, message_receiver) = mpsc::channel(100);
let (latest_msg_id_sender, mut latest_msg_id_receiver) = watch::channel(-1);
{
let sqlite = sqlite.clone();
tokio::spawn(async move {
let mut last_id = -1;
loop {
let messages = sqlite
.list_messages_after(last_id)
.await
.expect("Unable to load saved device messages");
if !messages.is_empty() {
log::trace!(
"At least {} messages were persisted and are ready to be sent",
messages.len()
);
last_id = messages
.last()
.expect("We checked that the vec is not empty")
.id
.expect("ID is not empty after being stored in store");
for msg in messages {
select!(
_ = cancellation_token.cancelled() => {
return;
},
sent = message_sender.send(msg) => {
if sent.is_err() {
log::debug!("There is no one listening for messages to be sent. Finishing sender publisher.");
return;
}
},
);
}
} else if *latest_msg_id_receiver.borrow_and_update() == last_id {
select!(
_ = cancellation_token.cancelled() => {
return;
},
read = latest_msg_id_receiver.changed() => {
if read.is_err() {
return;
}
},
)
}
}
});
}
let producer = Producer {
inner: sqlite.clone(),
sender: latest_msg_id_sender,
};
let consumer = Consumer {
receiver: message_receiver,
};
let acknowledger = Acknowledger {
inner: sqlite.clone(),
};
let (c2d_producer, c2d_consumer) = sqlite_channel::channel(sqlite.clone());
let token_store = ConfigurationStore {
inner: sqlite.clone(),
site_id: config.site_id.clone(),
};
let twins_store = TwinsStore {
inner: sqlite.clone(),
};
Ok(Store {
store: sqlite,
d2c_producer: producer,
d2c_consumer: consumer,
d2c_acknowledger: acknowledger,
configuration_store: token_store,
c2d_producer,
c2d_consumer,
twins_store,
})
}
#[derive(Debug)]
pub struct DeviceMessage {
pub id: Option<i32>,
pub site_id: Option<String>,
pub stream_group: Option<String>,
pub stream: Option<String>,
pub batch_id: Option<String>,
pub message_id: Option<String>,
pub content: Vec<u8>,
pub close_option: CloseOption,
pub compression: Compression,
pub batch_slice_id: Option<String>,
pub chunk_id: Option<String>,
}
#[doc(hidden)]
#[derive(Debug)]
pub struct CloudToDeviceMessage {
pub(crate) id: Option<i32>,
pub content: Vec<u8>,
pub properties: HashMap<String, String>,
}
impl CloudToDeviceMessage {
pub fn new(content: Vec<u8>, properties: HashMap<String, String>) -> Self {
CloudToDeviceMessage {
id: None,
content,
properties,
}
}
}
#[derive(Debug, sqlx::Type)]
pub enum CloseOption {
None,
Close,
CloseOnly,
CloseMessageOnly,
}
#[derive(Debug, sqlx::Type)]
pub enum Compression {
None,
BrotliFastest,
BrotliSmallestSize,
}