use core::fmt::Debug;
use std::fmt::Display;
use azure_iot_operations_protocol::{
common::{aio_protocol_error::AIOProtocolError, hybrid_logical_clock::HybridLogicalClock},
rpc_command,
};
use thiserror::Error;
mod client;
mod resp3;
pub use client::{Client, ClientOptions, ClientOptionsBuilder, KeyObservation};
pub use resp3::{Operation, SetCondition, SetOptions};
const FENCING_TOKEN_USER_PROPERTY: &str = "__ft";
const PERSIST_USER_PROPERTY: &str = "aio-persistence";
#[derive(Debug, Error)]
#[error(transparent)]
pub struct Error(#[from] ErrorKind);
impl Error {
#[must_use]
pub fn kind(&self) -> &ErrorKind {
&self.0
}
#[must_use]
#[allow(dead_code)]
pub(crate) fn consuming_kind(self) -> ErrorKind {
self.0
}
}
#[derive(Error, Debug)]
#[allow(clippy::large_enum_variant)]
pub enum ErrorKind {
#[error(transparent)]
AIOProtocolError(#[from] AIOProtocolError),
#[error(transparent)]
ServiceError(#[from] ServiceError),
#[error("{0}")]
SerializationError(String),
#[error("{0}")]
InvalidArgument(String),
#[error("Unexpected response payload for the request type: {0}")]
UnexpectedPayload(String),
#[error("key may only be observed once at a time")]
DuplicateObserve,
}
#[derive(Error, Debug)]
pub enum ServiceError {
#[error(
"the request timestamp is too far in the future; ensure that the client and broker system clocks are synchronized"
)]
TimestampSkew,
#[error("a fencing token is required for this request")]
MissingFencingToken,
#[error(
"the request fencing token timestamp is too far in the future; ensure that the client and broker system clocks are synchronized"
)]
FencingTokenSkew,
#[error(
"the request fencing token is a lower version than the fencing token protecting the resource"
)]
FencingTokenLowerVersion,
#[error("the quota has been exceeded")]
KeyQuotaExceeded,
#[error("syntax error")]
SyntaxError,
#[error("not authorized")]
NotAuthorized,
#[error("unknown command")]
UnknownCommand,
#[error("wrong number of arguments")]
WrongNumberOfArguments,
#[error("missing timestamp")]
TimestampMissing,
#[error("malformed timestamp")]
TimestampMalformed,
#[error("the key length is zero")]
KeyLengthZero,
#[error("{0}")]
Unknown(String),
}
impl From<Vec<u8>> for ServiceError {
fn from(s: Vec<u8>) -> Self {
let s_bytes: &[u8] = &s;
match s_bytes {
b"the request timestamp is too far in the future; ensure that the client and broker system clocks are synchronized" => ServiceError::TimestampSkew,
b"a fencing token is required for this request" => ServiceError::MissingFencingToken,
b"the request fencing token timestamp is too far in the future; ensure that the client and broker system clocks are synchronized" => ServiceError::FencingTokenSkew,
b"the request fencing token is a lower version than the fencing token protecting the resource" => ServiceError::FencingTokenLowerVersion,
b"the quota has been exceeded" => ServiceError::KeyQuotaExceeded,
b"syntax error" => ServiceError::SyntaxError,
b"not authorized" => ServiceError::NotAuthorized,
b"unknown command" => ServiceError::UnknownCommand,
b"wrong number of arguments" => ServiceError::WrongNumberOfArguments,
b"missing timestamp" => ServiceError::TimestampMissing,
b"malformed timestamp" => ServiceError::TimestampMalformed,
other => ServiceError::Unknown(std::str::from_utf8(other).unwrap_or_default().to_string()),
}
}
}
#[derive(Debug)]
pub struct Response<T>
where
T: Debug,
{
pub version: Option<HybridLogicalClock>,
pub response: T,
}
fn convert_response<T, F>(
resp: rpc_command::invoker::Response<resp3::Response>,
f: F,
) -> Result<Response<T>, Error>
where
F: FnOnce(resp3::Response) -> Result<T, ()>,
T: Debug,
{
match resp.payload {
resp3::Response::Error(e) => Err(Error(ErrorKind::ServiceError(e.into()))),
payload => match f(payload.clone()) {
Ok(response) => Ok(Response {
response,
version: resp.timestamp,
}),
Err(()) => Err(Error(ErrorKind::UnexpectedPayload(format!("{payload:?}")))),
},
}
}
#[derive(Debug, Clone)]
pub struct KeyNotification {
pub key: Vec<u8>,
pub operation: Operation,
pub version: HybridLogicalClock,
}
impl Display for KeyNotification {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"KeyNotification {{ key: {:?}, operation: {}, version: {} }}",
self.key, self.operation, self.version
)
}
}