use std::panic::RefUnwindSafe;
use std::time::Duration;
use std::{path::Path, sync::Arc};
use anyhow::Result;
use base::BaseConnection;
use c2d::CloudToDeviceMessageGuard;
use crate::cloud::drs::RegistrationResponse;
pub use crate::connection::twins::DesiredProperties;
pub use crate::connection::twins::DesiredPropertiesUpdatedCallback;
use crate::persistence::sqlite::SdkConfiguration;
mod base;
mod builder;
pub mod c2d;
pub use builder::DeviceClientBuilder;
pub use builder::ProvisioningOperation;
pub use builder::ProvisioningOperationDisplayHandler;
pub use c2d::CloudToDeviceMessage;
use crate::connection::ConnectionImplementation;
use crate::{persistence, ProcessSignalsSource};
#[derive(Clone, Copy, Debug)]
pub enum Compression {
Fastest,
SmallestSize,
}
impl Compression {
fn to_persisted_compression(compression: &Option<Compression>) -> persistence::Compression {
match compression {
Some(Compression::Fastest) => persistence::Compression::BrotliFastest,
Some(Compression::SmallestSize) => persistence::Compression::BrotliSmallestSize,
None => persistence::Compression::None,
}
}
}
#[derive(Clone, Debug, Default)]
pub struct MessageContext {
stream_group: Option<String>,
stream: Option<String>,
compression: Option<Compression>,
}
impl MessageContext {
pub fn new(stream_group: Option<String>, stream: Option<String>) -> Self {
Self {
stream_group,
stream,
compression: None,
}
}
pub fn stream_group(&self) -> Option<&str> {
self.stream_group.as_deref()
}
pub fn set_stream_group(&mut self, stream_group: Option<String>) {
self.stream_group = stream_group;
}
pub fn stream(&self) -> Option<&str> {
self.stream.as_deref()
}
pub fn set_stream(&mut self, stream: Option<String>) {
self.stream = stream;
}
pub fn compression(&self) -> Option<Compression> {
self.compression
}
pub fn set_compression(&mut self, compression: Option<Compression>) {
self.compression = compression;
}
}
pub struct MethodReturnValue {
pub status_code: i32,
pub body: Option<Vec<u8>>,
}
impl MethodReturnValue {
pub fn new(status_code: i32, body: Option<Vec<u8>>) -> Self {
Self { status_code, body }
}
}
pub struct MethodError {
pub status_code: i32,
pub message: String,
}
impl MethodError {
pub fn new(status_code: i32, message: String) -> Self {
Self {
status_code,
message,
}
}
}
pub type MethodResult = Result<MethodReturnValue, MethodError>;
pub trait MethodInvocationHandler: Send + RefUnwindSafe {
fn handle(&self, payload: &[u8]) -> Option<MethodResult>;
}
#[derive(Clone)]
pub struct DeviceClient {
connection: Arc<BaseConnection<dyn ConnectionImplementation + Send + Sync>>,
}
impl DeviceClient {
fn new(
config: SdkConfiguration,
path: &Path,
desired_properties_updated_callback: Option<Box<dyn DesiredPropertiesUpdatedCallback>>,
signals_src: Option<Box<dyn ProcessSignalsSource>>,
initial_registration_response: Option<RegistrationResponse>,
remote_access_allowed_for_all_ports: bool,
) -> Result<DeviceClient> {
let connection = BaseConnection::init_ingress(
config,
path,
desired_properties_updated_callback,
signals_src,
initial_registration_response,
remote_access_allowed_for_all_ports,
)?;
connection.wait_properties_ready()?;
let connection = Arc::new(connection);
Ok(DeviceClient { connection })
}
pub fn workspace_id(&self) -> Result<String> {
self.connection.workspace_id()
}
pub fn device_id(&self) -> Result<String> {
self.connection.device_id()
}
pub fn enqueue_message(
&self,
message_context: &MessageContext,
batch_id: Option<String>,
message_id: Option<String>,
payload: Vec<u8>,
) -> Result<()> {
self.connection
.enqueue_message(message_context, batch_id, message_id, payload)
}
pub fn enqueue_message_advanced(
&self,
message_context: &MessageContext,
batch_id: Option<String>,
batch_slice_id: Option<String>,
message_id: Option<String>,
chunk_id: Option<String>,
payload: Vec<u8>,
) -> Result<()> {
self.connection.enqueue_message_advanced(
message_context,
batch_id,
batch_slice_id,
message_id,
chunk_id,
payload,
)
}
pub fn enqueue_batch_completion(
&self,
message_context: &MessageContext,
batch_id: String,
) -> Result<()> {
self.connection
.enqueue_batch_completion(message_context, batch_id)
}
pub fn enqueue_message_completion(
&self,
message_context: &MessageContext,
batch_id: String,
message_id: String,
) -> Result<()> {
self.connection
.enqueue_message_completion(message_context, batch_id, message_id)
}
pub fn send_message(
&self,
message_context: &MessageContext,
batch_id: Option<String>,
message_id: Option<String>,
payload: Vec<u8>,
) -> Result<()> {
self.connection
.send_message(message_context, batch_id, message_id, payload)
}
pub fn send_message_advanced(
&self,
message_context: &MessageContext,
batch_id: Option<String>,
batch_slice_id: Option<String>,
message_id: Option<String>,
chunk_id: Option<String>,
payload: Vec<u8>,
) -> Result<()> {
self.connection.send_message_advanced(
message_context,
batch_id,
batch_slice_id,
message_id,
chunk_id,
payload,
)
}
pub fn pending_messages_count(&self) -> Result<usize> {
self.connection.pending_messages_count()
}
pub fn wait_enqueued_messages_sent(&self) -> Result<()> {
self.connection.wait_enqueued_messages_sent()
}
pub fn desired_properties(&self) -> Result<DesiredProperties> {
self.connection.desired_properties()
}
pub fn desired_properties_if_newer(&self, version: u64) -> Option<DesiredProperties> {
self.connection.desired_properties_if_newer(version)
}
pub fn update_reported_properties(&self, properties: &str) -> Result<()> {
self.connection.update_reported_properties(properties)
}
pub fn any_pending_reported_properties_updates(&self) -> Result<bool> {
self.connection.any_pending_reported_properties_updates()
}
#[deprecated]
#[doc(hidden)]
pub fn process_c2d<G>(&self, callback: G) -> Result<()>
where
G: Fn(&CloudToDeviceMessage) + Send + 'static,
{
self.connection.process_c2d(callback)
}
#[deprecated]
#[doc(hidden)]
pub fn pending_c2d(&self) -> Result<usize> {
self.connection.pending_c2d()
}
#[deprecated]
#[doc(hidden)]
pub fn get_c2d(&self, timeout: Duration) -> Result<CloudToDeviceMessageGuard<'_>> {
self.connection.get_c2d(timeout)
}
#[deprecated]
#[doc(hidden)]
pub fn wait_desired_properties_changed(&self) -> Result<DesiredProperties> {
self.connection.wait_desired_properties_changed()
}
#[deprecated]
#[doc(hidden)]
pub fn reported_properties(&self) -> Option<String> {
self.connection.reported_properties()
}
#[deprecated]
#[doc(hidden)]
pub fn patch_reported_properties(&self, patch: &str) -> Result<()> {
self.connection.patch_reported_properties(patch)
}
}
#[cfg(test)]
mod test {
use super::*;
fn check_if_send<T: Send>() {}
fn check_if_sync<T: Sync>() {}
#[test]
fn traits() {
check_if_sync::<DeviceClient>();
check_if_send::<DeviceClient>();
}
}