use std::fmt;
use std::str::FromStr;
use chrono::{DateTime, Duration, Utc};
use log::{error, info};
use serde_derive::{Deserialize, Serialize};
use uuid::Uuid;
use crate::{
serde::{
duration_milliseconds_string_option, session_ids_list, ts_milliseconds_string,
ts_milliseconds_string_option,
},
AccountId, Addressable, AgentId, Authenticable, Destination, Error, EventSubscription,
RequestSubscription, ResponseSubscription, SharedGroup, Source,
};
#[cfg(feature = "queue-counter")]
use crate::queue_counter::QueueCounterHandle;
#[derive(Debug, Clone, Deserialize)]
pub struct AgentConfig {
uri: String,
clean_session: Option<bool>,
keep_alive_interval: Option<u64>,
reconnect_interval: Option<u64>,
outgoing_message_queue_size: Option<usize>,
incoming_message_queue_size: Option<usize>,
password: Option<String>,
max_message_size: Option<usize>,
}
impl AgentConfig {
pub fn set_password(&mut self, value: &str) -> &mut Self {
self.password = Some(value.to_owned());
self
}
}
#[derive(Debug)]
pub struct AgentBuilder {
connection: Connection,
api_version: String,
}
impl AgentBuilder {
pub fn new(agent_id: AgentId, api_version: &str) -> Self {
Self {
connection: Connection::new(agent_id),
api_version: api_version.to_owned(),
}
}
pub fn connection_version(self, version: &str) -> Self {
let mut connection = self.connection;
connection.set_version(version);
Self { connection, ..self }
}
pub fn connection_mode(self, mode: ConnectionMode) -> Self {
let mut connection = self.connection;
connection.set_mode(mode);
Self { connection, ..self }
}
pub fn start(
self,
config: &AgentConfig,
) -> Result<(Agent, crossbeam_channel::Receiver<AgentNotification>), Error> {
let options = Self::mqtt_options(&self.connection, &config)?;
let (mqtt_tx, mqtt_rx) = rumqtt::MqttClient::start(options)
.map_err(|e| Error::new(&format!("error starting MQTT client, {}", e)))?;
let (tx, rx) = crossbeam_channel::unbounded::<AgentNotification>();
#[cfg(feature = "queue-counter")]
let queue_counter = QueueCounterHandle::start();
#[cfg(feature = "queue-counter")]
let queue_counter_ = queue_counter.clone();
std::thread::spawn(move || {
for message in mqtt_rx {
if let Notification::Publish(ref content) = message {
info!("Incoming message = '{:?}'", content);
}
let msg: AgentNotification = message.into();
#[cfg(feature = "queue-counter")]
{
if let AgentNotification::Message(Ok(ref content), _) = msg {
queue_counter_.add_incoming_message(content);
};
}
if let Err(e) = tx.send(msg) {
error!("Failed to transmit message, reason = {}", e);
};
}
});
let agent = Agent::new(
self.connection.agent_id,
&self.api_version,
mqtt_tx,
#[cfg(feature = "queue-counter")]
queue_counter,
);
Ok((agent, rx))
}
fn mqtt_options(
connection: &Connection,
config: &AgentConfig,
) -> Result<rumqtt::MqttOptions, Error> {
let uri = config
.uri
.parse::<http::Uri>()
.map_err(|e| Error::new(&format!("error parsing MQTT connection URL, {}", e)))?;
let host = uri.host().ok_or_else(|| Error::new("missing MQTT host"))?;
let port = uri
.port_part()
.ok_or_else(|| Error::new("missing MQTT port"))?;
let username = format!("{}::{}", connection.version, connection.mode);
let password = config
.password
.to_owned()
.unwrap_or_else(|| String::from(""));
let mut opts =
rumqtt::MqttOptions::new(connection.agent_id.to_string(), host, port.as_u16());
opts = match config.clean_session {
Some(value) => opts.set_clean_session(value),
_ => opts,
};
opts = match config.keep_alive_interval {
Some(value) => opts.set_keep_alive(value as u16),
_ => opts,
};
opts = match config.reconnect_interval {
Some(value) => opts.set_reconnect_opts(rumqtt::ReconnectOptions::Always(value)),
None => opts.set_reconnect_opts(rumqtt::ReconnectOptions::Never),
};
opts = match config.incoming_message_queue_size {
Some(value) => opts.set_notification_channel_capacity(value),
_ => opts,
};
opts = match config.outgoing_message_queue_size {
Some(value) => opts.set_inflight(value),
_ => opts,
};
opts = match config.max_message_size {
Some(value) => opts.set_max_packet_size(value),
_ => opts,
};
opts = opts.set_security_opts(rumqtt::SecurityOptions::UsernamePassword(
username, password,
));
Ok(opts)
}
}
#[derive(Clone, Debug)]
pub struct Address {
id: AgentId,
version: String,
}
impl Address {
pub fn new(id: AgentId, version: &str) -> Self {
Self {
id,
version: version.to_owned(),
}
}
pub fn id(&self) -> &AgentId {
&self.id
}
pub fn version(&self) -> &str {
&self.version
}
}
#[derive(Clone)]
pub struct Agent {
address: Address,
tx: rumqtt::MqttClient,
#[cfg(feature = "queue-counter")]
queue_counter: QueueCounterHandle,
}
impl Agent {
#[cfg(feature = "queue-counter")]
fn new(
id: AgentId,
api_version: &str,
tx: rumqtt::MqttClient,
queue_counter: QueueCounterHandle,
) -> Self {
Self {
address: Address::new(id, api_version),
tx,
queue_counter,
}
}
#[cfg(not(feature = "queue-counter"))]
fn new(id: AgentId, api_version: &str, tx: rumqtt::MqttClient) -> Self {
Self {
address: Address::new(id, api_version),
tx,
}
}
pub fn address(&self) -> &Address {
&self.address
}
pub fn id(&self) -> &AgentId {
&self.address.id()
}
pub fn publish<T: serde::Serialize>(
&mut self,
message: OutgoingMessage<T>,
) -> Result<(), Error> {
let dump = Box::new(message).into_dump(&self.address)?;
self.publish_dump(dump)
}
pub fn publish_publishable(
&mut self,
message: Box<dyn IntoPublishableMessage>,
) -> Result<(), Error> {
let dump = message.into_dump(&self.address)?;
self.publish_dump(dump)
}
fn publish_dump(&mut self, dump: PublishableMessage) -> Result<(), Error> {
#[cfg(feature = "queue-counter")]
self.queue_counter.add_outgoing_message(&dump);
let dump = match dump {
PublishableMessage::Event(dump) => dump,
PublishableMessage::Request(dump) => dump,
PublishableMessage::Response(dump) => dump,
};
info!(
"Outgoing message = '{}' sending to the topic = '{}'",
dump.payload(),
dump.topic(),
);
self.tx
.publish(dump.topic, dump.qos, false, dump.payload)
.map_err(|e| Error::new(&format!("error publishing MQTT message, {}", &e)))
}
pub fn subscribe<S>(
&mut self,
subscription: &S,
qos: QoS,
maybe_group: Option<&SharedGroup>,
) -> Result<(), Error>
where
S: SubscriptionTopic,
{
let mut topic = subscription.subscription_topic(self.id(), self.address.version())?;
if let Some(ref group) = maybe_group {
topic = format!("$share/{group}/{topic}", group = group, topic = topic);
};
self.tx
.subscribe(topic, qos)
.map_err(|e| Error::new(&format!("error creating MQTT subscription, {}", e)))?;
Ok(())
}
#[cfg(feature = "queue-counter")]
pub fn get_queue_counter(&self) -> QueueCounterHandle {
self.queue_counter.clone()
}
}
#[derive(Debug, Clone)]
pub enum ConnectionMode {
Default,
Service,
Observer,
Bridge,
}
impl fmt::Display for ConnectionMode {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
write!(
fmt,
"{}",
match self {
ConnectionMode::Default => "default",
ConnectionMode::Service => "service",
ConnectionMode::Observer => "observer",
ConnectionMode::Bridge => "bridge",
}
)
}
}
impl FromStr for ConnectionMode {
type Err = Error;
fn from_str(val: &str) -> Result<Self, Self::Err> {
match val {
"default" => Ok(ConnectionMode::Default),
"service" => Ok(ConnectionMode::Service),
"observer" => Ok(ConnectionMode::Observer),
"bridge" => Ok(ConnectionMode::Bridge),
_ => Err(Error::new(&format!(
"invalid value for the connection mode: {}",
val
))),
}
}
}
#[derive(Debug, Clone)]
pub struct Connection {
agent_id: AgentId,
version: String,
mode: ConnectionMode,
}
impl Connection {
fn new(agent_id: AgentId) -> Self {
Self {
agent_id,
version: String::from("v2"),
mode: ConnectionMode::Default,
}
}
fn set_version(&mut self, value: &str) -> &mut Self {
self.version = value.to_owned();
self
}
fn set_mode(&mut self, value: ConnectionMode) -> &mut Self {
self.mode = value;
self
}
pub fn agent_id(&self) -> &AgentId {
&self.agent_id
}
pub fn version(&self) -> &str {
&self.version
}
pub fn mode(&self) -> &ConnectionMode {
&self.mode
}
}
impl fmt::Display for Connection {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
write!(fmt, "{}/{}/{}", self.version, self.mode, self.agent_id,)
}
}
impl FromStr for Connection {
type Err = Error;
fn from_str(val: &str) -> Result<Self, Self::Err> {
match val.split('/').collect::<Vec<&str>>().as_slice() {
[version_str, mode_str, agent_id_str] => {
let version = (*version_str).to_string();
let mode = ConnectionMode::from_str(mode_str)?;
let agent_id = AgentId::from_str(agent_id_str)?;
Ok(Self {
version,
mode,
agent_id,
})
}
_ => Err(Error::new(&format!(
"invalid value for connection: {}",
val
))),
}
}
}
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct ConnectionProperties {
agent_id: AgentId,
#[serde(rename = "connection_version")]
version: String,
#[serde(rename = "connection_mode")]
mode: ConnectionMode,
}
impl ConnectionProperties {
fn to_connection(&self) -> Connection {
let mut connection = Connection::new(self.agent_id.clone());
connection.set_version(&self.version);
connection.set_mode(self.mode.clone());
connection
}
}
impl Authenticable for ConnectionProperties {
fn as_account_id(&self) -> &AccountId {
&self.agent_id.as_account_id()
}
}
impl Addressable for ConnectionProperties {
fn as_agent_id(&self) -> &AgentId {
&self.agent_id
}
}
#[derive(Clone, Debug, Deserialize, Serialize)]
pub struct LongTermTimingProperties {
#[serde(
default,
skip_serializing_if = "Option::is_none",
with = "duration_milliseconds_string_option"
)]
local_initial_timediff: Option<Duration>,
#[serde(
default,
skip_serializing_if = "Option::is_none",
with = "ts_milliseconds_string_option"
)]
initial_timestamp: Option<DateTime<Utc>>,
#[serde(with = "ts_milliseconds_string")]
broker_timestamp: DateTime<Utc>,
#[serde(with = "ts_milliseconds_string")]
broker_processing_timestamp: DateTime<Utc>,
#[serde(with = "ts_milliseconds_string")]
broker_initial_processing_timestamp: DateTime<Utc>,
#[serde(
default,
skip_serializing_if = "Option::is_none",
with = "duration_milliseconds_string_option"
)]
cumulative_authorization_time: Option<Duration>,
#[serde(
default,
skip_serializing_if = "Option::is_none",
with = "duration_milliseconds_string_option"
)]
cumulative_processing_time: Option<Duration>,
}
impl LongTermTimingProperties {
pub fn update_cumulative_timings(
self,
short_timing: &OutgoingShortTermTimingProperties,
) -> Self {
let cumulative_authorization_time = short_timing
.authorization_time
.map(|increment| {
self.cumulative_authorization_time
.map_or(increment, |initial| initial + increment)
})
.or(self.cumulative_authorization_time);
let cumulative_processing_time = short_timing
.processing_time
.map(|increment| {
self.cumulative_processing_time
.map_or(increment, |initial| initial + increment)
})
.or(self.cumulative_processing_time);
Self {
cumulative_authorization_time,
cumulative_processing_time,
..self
}
}
}
#[derive(Clone, Debug, Deserialize, Serialize)]
pub struct OutgoingShortTermTimingProperties {
#[serde(with = "ts_milliseconds_string")]
timestamp: DateTime<Utc>,
#[serde(
default,
skip_serializing_if = "Option::is_none",
with = "duration_milliseconds_string_option"
)]
processing_time: Option<Duration>,
#[serde(
default,
skip_serializing_if = "Option::is_none",
with = "duration_milliseconds_string_option"
)]
authorization_time: Option<Duration>,
}
impl OutgoingShortTermTimingProperties {
pub fn until_now(start_timestamp: DateTime<Utc>) -> Self {
let now = Utc::now();
let mut timing = Self::new(now);
timing.set_processing_time(now - start_timestamp);
timing
}
pub fn new(timestamp: DateTime<Utc>) -> Self {
Self {
timestamp,
processing_time: None,
authorization_time: None,
}
}
pub fn set_processing_time(&mut self, processing_time: Duration) -> &mut Self {
self.processing_time = Some(processing_time);
self
}
pub fn set_authorization_time(&mut self, authorization_time: Duration) -> &mut Self {
self.authorization_time = Some(authorization_time);
self
}
}
pub type ShortTermTimingProperties = OutgoingShortTermTimingProperties;
#[derive(Clone, Debug, Deserialize, Serialize)]
pub struct IncomingShortTermTimingProperties {
#[serde(
default,
skip_serializing_if = "Option::is_none",
with = "ts_milliseconds_string_option"
)]
timestamp: Option<DateTime<Utc>>,
#[serde(
default,
skip_serializing_if = "Option::is_none",
with = "duration_milliseconds_string_option"
)]
processing_time: Option<Duration>,
#[serde(
default,
skip_serializing_if = "Option::is_none",
with = "duration_milliseconds_string_option"
)]
authorization_time: Option<Duration>,
}
#[derive(Clone, Debug)]
pub struct SessionId {
agent_session_label: Uuid,
broker_session_label: Uuid,
}
impl FromStr for SessionId {
type Err = Error;
fn from_str(s: &str) -> Result<Self, Self::Err> {
let components = s.splitn(2, '.').collect::<Vec<&str>>();
match components[..] {
[agent_session_label_str, broker_session_label_str] => {
let agent_session_label =
Uuid::parse_str(agent_session_label_str).map_err(|err| {
let msg = format!("Failed to parse agent session label UUID: {}", err);
Error::new(&msg)
})?;
let broker_session_label =
Uuid::parse_str(broker_session_label_str).map_err(|err| {
let msg = format!("Failed to parse broker session label UUID: {}", err);
Error::new(&msg)
})?;
Ok(Self {
agent_session_label,
broker_session_label,
})
}
_ => Err(Error::new(
"Failed to parse SessionId. Expected 2 UUIDs separated by .",
)),
}
}
}
impl fmt::Display for SessionId {
fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> {
write!(
f,
"{}.{}",
self.agent_session_label, self.broker_session_label
)
}
}
#[derive(Clone, Debug)]
pub struct TrackingId {
label: Uuid,
session_id: SessionId,
}
impl FromStr for TrackingId {
type Err = Error;
fn from_str(s: &str) -> Result<Self, Self::Err> {
let components = s.splitn(2, '.').collect::<Vec<&str>>();
match components[..] {
[label_str, session_id_str] => {
let label = Uuid::parse_str(label_str).map_err(|err| {
let msg = format!("Failed to parse tracking id label UUID: {}", err);
Error::new(&msg)
})?;
Ok(Self {
label,
session_id: SessionId::from_str(session_id_str)?,
})
}
_ => Err(Error::new(
"Failed to parse TrackingId. Expected 3 UUIDs separated by .",
)),
}
}
}
impl fmt::Display for TrackingId {
fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> {
write!(f, "{}.{}", self.label, self.session_id)
}
}
#[derive(Clone, Debug, Deserialize, Serialize)]
pub struct TrackingProperties {
tracking_id: TrackingId,
#[serde(with = "session_ids_list")]
session_tracking_label: Vec<SessionId>,
#[serde(default, skip_serializing_if = "Option::is_none")]
local_tracking_label: Option<String>,
}
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct IncomingEventProperties {
#[serde(flatten)]
conn: ConnectionProperties,
label: Option<String>,
#[serde(flatten)]
long_term_timing: LongTermTimingProperties,
#[serde(flatten)]
short_term_timing: IncomingShortTermTimingProperties,
#[serde(flatten)]
tracking: TrackingProperties,
}
impl IncomingEventProperties {
pub fn to_connection(&self) -> Connection {
self.conn.to_connection()
}
pub fn label(&self) -> Option<&str> {
self.label.as_ref().map(|l| &**l)
}
pub fn long_term_timing(&self) -> &LongTermTimingProperties {
&self.long_term_timing
}
pub fn short_term_timing(&self) -> &IncomingShortTermTimingProperties {
&self.short_term_timing
}
pub fn tracking(&self) -> &TrackingProperties {
&self.tracking
}
pub fn to_event(
&self,
label: &'static str,
short_term_timing: OutgoingShortTermTimingProperties,
) -> OutgoingEventProperties {
let long_term_timing = self.update_long_term_timing(&short_term_timing);
let mut props = OutgoingEventProperties::new(label, short_term_timing);
props.set_long_term_timing(long_term_timing);
props.set_tracking(self.tracking.clone());
props
}
fn update_long_term_timing(
&self,
short_term_timing: &OutgoingShortTermTimingProperties,
) -> LongTermTimingProperties {
self.long_term_timing
.clone()
.update_cumulative_timings(short_term_timing)
}
}
impl Authenticable for IncomingEventProperties {
fn as_account_id(&self) -> &AccountId {
&self.conn.as_account_id()
}
}
impl Addressable for IncomingEventProperties {
fn as_agent_id(&self) -> &AgentId {
&self.conn.as_agent_id()
}
}
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct IncomingRequestProperties {
method: String,
correlation_data: String,
response_topic: String,
#[serde(flatten)]
conn: ConnectionProperties,
broker_agent_id: AgentId,
#[serde(flatten)]
long_term_timing: LongTermTimingProperties,
#[serde(flatten)]
short_term_timing: IncomingShortTermTimingProperties,
#[serde(flatten)]
tracking: TrackingProperties,
}
impl IncomingRequestProperties {
pub fn method(&self) -> &str {
&self.method
}
pub fn correlation_data(&self) -> &str {
&self.correlation_data
}
pub fn response_topic(&self) -> &str {
&self.response_topic
}
pub fn broker_agent_id(&self) -> &AgentId {
&self.broker_agent_id
}
pub fn long_term_timing(&self) -> &LongTermTimingProperties {
&self.long_term_timing
}
pub fn short_term_timing(&self) -> &IncomingShortTermTimingProperties {
&self.short_term_timing
}
pub fn tracking(&self) -> &TrackingProperties {
&self.tracking
}
pub fn to_connection(&self) -> Connection {
self.conn.to_connection()
}
pub fn to_event(
&self,
label: &'static str,
short_term_timing: OutgoingShortTermTimingProperties,
) -> OutgoingEventProperties {
let long_term_timing = self.update_long_term_timing(&short_term_timing);
let mut props = OutgoingEventProperties::new(label, short_term_timing);
props.set_long_term_timing(long_term_timing);
props.set_tracking(self.tracking.clone());
props
}
pub fn to_request(
&self,
method: &str,
response_topic: &str,
correlation_data: &str,
short_term_timing: OutgoingShortTermTimingProperties,
) -> OutgoingRequestProperties {
let long_term_timing = self.update_long_term_timing(&short_term_timing);
let mut props = OutgoingRequestProperties::new(
method,
response_topic,
correlation_data,
short_term_timing,
);
props.set_long_term_timing(long_term_timing);
props.set_tracking(self.tracking.clone());
props
}
pub fn to_response(
&self,
status: ResponseStatus,
short_term_timing: OutgoingShortTermTimingProperties,
) -> OutgoingResponseProperties {
let mut props = OutgoingResponseProperties::new(
status,
&self.correlation_data,
self.update_long_term_timing(&short_term_timing),
short_term_timing,
self.tracking.clone(),
);
props.response_topic = Some(self.response_topic.to_owned());
props
}
fn update_long_term_timing(
&self,
short_term_timing: &OutgoingShortTermTimingProperties,
) -> LongTermTimingProperties {
self.long_term_timing
.clone()
.update_cumulative_timings(short_term_timing)
}
}
impl Authenticable for IncomingRequestProperties {
fn as_account_id(&self) -> &AccountId {
&self.conn.as_account_id()
}
}
impl Addressable for IncomingRequestProperties {
fn as_agent_id(&self) -> &AgentId {
&self.conn.as_agent_id()
}
}
#[derive(Clone, Debug, Deserialize, Serialize)]
pub struct IncomingResponseProperties {
#[serde(with = "crate::serde::HttpStatusCodeRef")]
status: ResponseStatus,
correlation_data: String,
#[serde(flatten)]
conn: ConnectionProperties,
#[serde(flatten)]
long_term_timing: LongTermTimingProperties,
#[serde(flatten)]
short_term_timing: IncomingShortTermTimingProperties,
#[serde(flatten)]
tracking: TrackingProperties,
}
impl IncomingResponseProperties {
pub fn status(&self) -> ResponseStatus {
self.status
}
pub fn correlation_data(&self) -> &str {
&self.correlation_data
}
pub fn long_term_timing(&self) -> &LongTermTimingProperties {
&self.long_term_timing
}
pub fn short_term_timing(&self) -> &IncomingShortTermTimingProperties {
&self.short_term_timing
}
pub fn tracking(&self) -> &TrackingProperties {
&self.tracking
}
pub fn to_connection(&self) -> Connection {
self.conn.to_connection()
}
}
impl Authenticable for IncomingResponseProperties {
fn as_account_id(&self) -> &AccountId {
&self.conn.as_account_id()
}
}
impl Addressable for IncomingResponseProperties {
fn as_agent_id(&self) -> &AgentId {
&self.conn.as_agent_id()
}
}
#[derive(Debug, Clone)]
pub enum IncomingMessage<T> {
Event(IncomingEvent<T>),
Request(IncomingRequest<T>),
Response(IncomingResponse<T>),
}
#[derive(Debug, Clone)]
pub struct IncomingMessageContent<T, P>
where
P: Addressable + serde::Serialize,
{
payload: T,
properties: P,
}
impl<T, P> IncomingMessageContent<T, P>
where
P: Addressable + serde::Serialize,
{
pub fn new(payload: T, properties: P) -> Self {
Self {
payload,
properties,
}
}
pub fn payload(&self) -> &T {
&self.payload
}
pub fn extract_payload(self) -> T {
self.payload
}
pub fn properties(&self) -> &P {
&self.properties
}
}
impl<T> IncomingRequest<T> {
pub fn to_response<R>(
&self,
data: R,
status: ResponseStatus,
timing: OutgoingShortTermTimingProperties,
api_version: &str,
) -> OutgoingMessage<R>
where
R: serde::Serialize,
{
OutgoingMessage::Response(OutgoingResponse::new(
data,
self.properties.to_response(status, timing),
Destination::Unicast(
self.properties.as_agent_id().clone(),
api_version.to_owned(),
),
))
}
}
pub type IncomingEvent<T> = IncomingMessageContent<T, IncomingEventProperties>;
pub type IncomingRequest<T> = IncomingMessageContent<T, IncomingRequestProperties>;
pub type IncomingResponse<T> = IncomingMessageContent<T, IncomingResponseProperties>;
impl<String: std::ops::Deref<Target = str>> IncomingEvent<String> {
pub fn convert_payload<T>(message: &IncomingEvent<String>) -> Result<T, Error>
where
T: serde::de::DeserializeOwned,
{
let payload = serde_json::from_str::<T>(&message.payload()).map_err(|e| {
Error::new(&format!(
"error deserializing payload of an envelope, {}",
&e
))
})?;
Ok(payload)
}
pub fn convert<T>(message: IncomingEvent<String>) -> Result<IncomingEvent<T>, Error>
where
T: serde::de::DeserializeOwned,
{
let props = message.properties().to_owned();
let payload = serde_json::from_str::<T>(&message.payload()).map_err(|e| {
Error::new(&format!(
"error deserializing payload of an envelope, {}",
&e
))
})?;
Ok(IncomingEvent::new(payload, props))
}
}
impl<String: std::ops::Deref<Target = str>> IncomingRequest<String> {
pub fn convert_payload<T>(message: &IncomingRequest<String>) -> Result<T, Error>
where
T: serde::de::DeserializeOwned,
{
let payload = serde_json::from_str::<T>(&message.payload()).map_err(|e| {
Error::new(&format!(
"error deserializing payload of an envelope, {}",
&e
))
})?;
Ok(payload)
}
pub fn convert<T>(message: IncomingRequest<String>) -> Result<IncomingRequest<T>, Error>
where
T: serde::de::DeserializeOwned,
{
let props = message.properties().to_owned();
let payload = serde_json::from_str::<T>(&message.payload()).map_err(|e| {
Error::new(&format!(
"error deserializing payload of an envelope, {}",
&e
))
})?;
Ok(IncomingRequest::new(payload, props))
}
}
impl<String: std::ops::Deref<Target = str>> IncomingResponse<String> {
pub fn convert_payload<T>(message: &IncomingResponse<String>) -> Result<T, Error>
where
T: serde::de::DeserializeOwned,
{
let payload = serde_json::from_str::<T>(&message.payload()).map_err(|e| {
Error::new(&format!(
"error deserializing payload of an envelope, {}",
&e
))
})?;
Ok(payload)
}
pub fn convert<T>(message: IncomingResponse<String>) -> Result<IncomingResponse<T>, Error>
where
T: serde::de::DeserializeOwned,
{
let props = message.properties().to_owned();
let payload = serde_json::from_str::<T>(&message.payload()).map_err(|e| {
Error::new(&format!(
"error deserializing payload of an envelope, {}",
&e
))
})?;
Ok(IncomingResponse::new(payload, props))
}
}
#[derive(Debug, Serialize)]
pub struct OutgoingEventProperties {
label: &'static str,
#[serde(default, skip_serializing_if = "Option::is_none")]
agent_id: Option<AgentId>,
#[serde(flatten)]
long_term_timing: Option<LongTermTimingProperties>,
#[serde(flatten)]
short_term_timing: OutgoingShortTermTimingProperties,
#[serde(flatten)]
tracking: Option<TrackingProperties>,
}
impl OutgoingEventProperties {
pub fn new(label: &'static str, short_term_timing: OutgoingShortTermTimingProperties) -> Self {
Self {
label,
long_term_timing: None,
short_term_timing,
tracking: None,
agent_id: None,
}
}
pub fn set_agent_id(&mut self, agent_id: AgentId) -> &mut Self {
self.agent_id = Some(agent_id);
self
}
pub fn set_long_term_timing(&mut self, timing: LongTermTimingProperties) -> &mut Self {
self.long_term_timing = Some(timing);
self
}
pub fn set_tracking(&mut self, tracking: TrackingProperties) -> &mut Self {
self.tracking = Some(tracking);
self
}
}
#[derive(Debug, Serialize)]
pub struct OutgoingRequestProperties {
method: String,
correlation_data: String,
response_topic: String,
#[serde(default, skip_serializing_if = "Option::is_none")]
agent_id: Option<AgentId>,
#[serde(flatten)]
long_term_timing: Option<LongTermTimingProperties>,
#[serde(flatten)]
short_term_timing: OutgoingShortTermTimingProperties,
#[serde(flatten)]
tracking: Option<TrackingProperties>,
#[serde(
default,
skip_serializing_if = "Option::is_none",
with = "ts_milliseconds_string_option"
)]
local_timestamp: Option<DateTime<Utc>>,
}
impl OutgoingRequestProperties {
pub fn new(
method: &str,
response_topic: &str,
correlation_data: &str,
short_term_timing: OutgoingShortTermTimingProperties,
) -> Self {
Self {
method: method.to_owned(),
response_topic: response_topic.to_owned(),
correlation_data: correlation_data.to_owned(),
agent_id: None,
long_term_timing: None,
short_term_timing,
tracking: None,
local_timestamp: None,
}
}
pub fn set_agent_id(&mut self, agent_id: AgentId) -> &mut Self {
self.agent_id = Some(agent_id);
self
}
pub fn set_long_term_timing(&mut self, timing: LongTermTimingProperties) -> &mut Self {
self.long_term_timing = Some(timing);
self
}
pub fn set_tracking(&mut self, tracking: TrackingProperties) -> &mut Self {
self.tracking = Some(tracking);
self
}
pub fn set_local_timestamp(&mut self, local_timestamp: DateTime<Utc>) -> &mut Self {
self.local_timestamp = Some(local_timestamp);
self
}
pub fn correlation_data(&self) -> &str {
&self.correlation_data
}
}
#[derive(Debug, Serialize)]
pub struct OutgoingResponseProperties {
#[serde(with = "crate::serde::HttpStatusCodeRef")]
status: ResponseStatus,
correlation_data: String,
#[serde(skip)]
response_topic: Option<String>,
#[serde(flatten)]
long_term_timing: LongTermTimingProperties,
#[serde(flatten)]
short_term_timing: OutgoingShortTermTimingProperties,
#[serde(flatten)]
tracking: TrackingProperties,
}
impl OutgoingResponseProperties {
pub fn new(
status: ResponseStatus,
correlation_data: &str,
long_term_timing: LongTermTimingProperties,
short_term_timing: OutgoingShortTermTimingProperties,
tracking: TrackingProperties,
) -> Self {
Self {
status,
correlation_data: correlation_data.to_owned(),
response_topic: None,
long_term_timing,
short_term_timing,
tracking,
}
}
fn response_topic(&self) -> Option<&str> {
self.response_topic.as_ref().map(|t| &**t)
}
}
pub type ResponseStatus = http::StatusCode;
#[derive(Debug)]
pub enum OutgoingMessage<T>
where
T: serde::Serialize,
{
Event(OutgoingEvent<T>),
Request(OutgoingRequest<T>),
Response(OutgoingResponse<T>),
}
pub type OutgoingEvent<T> = OutgoingMessageContent<T, OutgoingEventProperties>;
pub type OutgoingRequest<T> = OutgoingMessageContent<T, OutgoingRequestProperties>;
pub type OutgoingResponse<T> = OutgoingMessageContent<T, OutgoingResponseProperties>;
#[derive(Debug)]
pub struct OutgoingMessageContent<T, P>
where
T: serde::Serialize,
{
payload: T,
properties: P,
destination: Destination,
}
impl<T, P> OutgoingMessageContent<T, P>
where
T: serde::Serialize,
{
fn new(payload: T, properties: P, destination: Destination) -> Self {
Self {
payload,
properties,
destination,
}
}
pub fn properties(&self) -> &P {
&self.properties
}
}
impl<T> OutgoingEvent<T>
where
T: serde::Serialize,
{
pub fn broadcast(
payload: T,
properties: OutgoingEventProperties,
to_uri: &str,
) -> OutgoingMessage<T> {
OutgoingMessage::Event(Self::new(
payload,
properties,
Destination::Broadcast(to_uri.to_owned()),
))
}
pub fn multicast<A>(
payload: T,
properties: OutgoingEventProperties,
to: &A,
) -> OutgoingMessage<T>
where
A: Authenticable,
{
OutgoingMessage::Event(Self::new(
payload,
properties,
Destination::Multicast(to.as_account_id().to_owned()),
))
}
}
impl<T> OutgoingRequest<T>
where
T: serde::Serialize,
{
pub fn multicast<A>(
payload: T,
properties: OutgoingRequestProperties,
to: &A,
) -> OutgoingMessage<T>
where
A: Authenticable,
{
OutgoingMessage::Request(Self::new(
payload,
properties,
Destination::Multicast(to.as_account_id().clone()),
))
}
pub fn unicast<A>(
payload: T,
properties: OutgoingRequestProperties,
to: &A,
version: &str,
) -> OutgoingMessage<T>
where
A: Addressable,
{
OutgoingMessage::Request(Self::new(
payload,
properties,
Destination::Unicast(to.as_agent_id().clone(), version.to_owned()),
))
}
}
impl<T> OutgoingResponse<T>
where
T: serde::Serialize,
{
pub fn unicast<A>(
payload: T,
properties: OutgoingResponseProperties,
to: &A,
version: &str,
) -> OutgoingMessage<T>
where
A: Addressable,
{
OutgoingMessage::Response(Self::new(
payload,
properties,
Destination::Unicast(to.as_agent_id().clone(), version.to_owned()),
))
}
}
impl<T> compat::IntoEnvelope for OutgoingEvent<T>
where
T: serde::Serialize,
{
fn into_envelope(self) -> Result<compat::OutgoingEnvelope, Error> {
let payload = serde_json::to_string(&self.payload)
.map_err(|e| Error::new(&format!("error serializing payload of an envelope, {}", e)))?;
let envelope = compat::OutgoingEnvelope::new(
&payload,
compat::OutgoingEnvelopeProperties::Event(self.properties),
self.destination,
);
Ok(envelope)
}
}
impl<T> compat::IntoEnvelope for OutgoingRequest<T>
where
T: serde::Serialize,
{
fn into_envelope(self) -> Result<compat::OutgoingEnvelope, Error> {
let payload = serde_json::to_string(&self.payload)
.map_err(|e| Error::new(&format!("error serializing payload of an envelope, {}", e)))?;
let envelope = compat::OutgoingEnvelope::new(
&payload,
compat::OutgoingEnvelopeProperties::Request(self.properties),
self.destination,
);
Ok(envelope)
}
}
impl<T> compat::IntoEnvelope for OutgoingResponse<T>
where
T: serde::Serialize,
{
fn into_envelope(self) -> Result<compat::OutgoingEnvelope, Error> {
let payload = serde_json::to_string(&self.payload)
.map_err(|e| Error::new(&format!("error serializing payload of an envelope, {}", e)))?;
let envelope = compat::OutgoingEnvelope::new(
&payload,
compat::OutgoingEnvelopeProperties::Response(self.properties),
self.destination,
);
Ok(envelope)
}
}
impl<T> compat::IntoEnvelope for OutgoingMessage<T>
where
T: serde::Serialize,
{
fn into_envelope(self) -> Result<compat::OutgoingEnvelope, Error> {
match self {
OutgoingMessage::Event(v) => v.into_envelope(),
OutgoingMessage::Response(v) => v.into_envelope(),
OutgoingMessage::Request(v) => v.into_envelope(),
}
}
}
pub trait Publishable {
fn destination_topic(&self, publisher: &Address) -> Result<String, Error>;
fn qos(&self) -> QoS;
}
impl<T: serde::Serialize> Publishable for OutgoingEvent<T> {
fn destination_topic(&self, publisher: &Address) -> Result<String, Error> {
match self.destination {
Destination::Broadcast(ref uri) => Ok(format!(
"apps/{app}/api/{version}/{uri}",
app = publisher.id().as_account_id(),
version = publisher.version(),
uri = uri,
)),
Destination::Multicast(ref account_id) => Ok(format!(
"agents/{agent_id}/api/{version}/out/{app}",
agent_id = publisher.id(),
version = publisher.version(),
app = account_id,
)),
_ => Err(Error::new(&format!(
"destination = '{:?}' is incompatible with event message type",
self.destination,
))),
}
}
fn qos(&self) -> QoS {
QoS::AtLeastOnce
}
}
impl<T: serde::Serialize> Publishable for OutgoingRequest<T> {
fn destination_topic(&self, publisher: &Address) -> Result<String, Error> {
match self.destination {
Destination::Unicast(ref agent_id, ref version) => Ok(format!(
"agents/{agent_id}/api/{version}/in/{app}",
agent_id = agent_id,
version = version,
app = publisher.id().as_account_id(),
)),
Destination::Multicast(ref account_id) => Ok(format!(
"agents/{agent_id}/api/{version}/out/{app}",
agent_id = publisher.id(),
version = publisher.version(),
app = account_id,
)),
_ => Err(Error::new(&format!(
"destination = '{:?}' is incompatible with request message type",
self.destination,
))),
}
}
fn qos(&self) -> QoS {
QoS::AtMostOnce
}
}
impl<T: serde::Serialize> Publishable for OutgoingResponse<T> {
fn destination_topic(&self, publisher: &Address) -> Result<String, Error> {
match self.properties().response_topic() {
Some(response_topic) => Ok(response_topic.to_owned()),
None => match self.destination {
Destination::Unicast(ref agent_id, ref version) => Ok(format!(
"agents/{agent_id}/api/{version}/in/{app}",
agent_id = agent_id,
version = version,
app = publisher.id().as_account_id(),
)),
_ => Err(Error::new(&format!(
"destination = '{:?}' is incompatible with response message type",
self.destination,
))),
},
}
}
fn qos(&self) -> QoS {
QoS::AtLeastOnce
}
}
impl<T: serde::Serialize> Publishable for OutgoingMessage<T> {
fn destination_topic(&self, publisher: &Address) -> Result<String, Error> {
match self {
OutgoingMessage::Event(v) => v.destination_topic(publisher),
OutgoingMessage::Response(v) => v.destination_topic(publisher),
OutgoingMessage::Request(v) => v.destination_topic(publisher),
}
}
fn qos(&self) -> QoS {
match self {
OutgoingMessage::Event(v) => v.qos(),
OutgoingMessage::Response(v) => v.qos(),
OutgoingMessage::Request(v) => v.qos(),
}
}
}
#[derive(Clone, Debug)]
pub struct PublishableDump {
topic: String,
qos: QoS,
payload: String,
}
impl PublishableDump {
pub fn topic(&self) -> &str {
&self.topic
}
pub fn qos(&self) -> QoS {
self.qos
}
pub fn payload(&self) -> &str {
&self.payload
}
}
pub enum PublishableMessage {
Event(PublishableDump),
Request(PublishableDump),
Response(PublishableDump),
}
impl PublishableMessage {
pub fn topic(&self) -> &str {
match self {
Self::Event(v) => v.topic(),
Self::Request(v) => v.topic(),
Self::Response(v) => v.topic(),
}
}
pub fn qos(&self) -> QoS {
match self {
Self::Event(v) => v.qos(),
Self::Request(v) => v.qos(),
Self::Response(v) => v.qos(),
}
}
pub fn payload(&self) -> &str {
match self {
Self::Event(v) => v.payload(),
Self::Request(v) => v.payload(),
Self::Response(v) => v.payload(),
}
}
}
pub trait IntoPublishableMessage {
fn into_dump(self: Box<Self>, publisher: &Address) -> Result<PublishableMessage, Error>;
}
impl<T: serde::Serialize> IntoPublishableMessage for OutgoingMessage<T> {
fn into_dump(self: Box<Self>, publisher: &Address) -> Result<PublishableMessage, Error> {
use crate::mqtt::compat::{IntoEnvelope, OutgoingEnvelopeProperties};
let topic = self.destination_topic(&publisher)?;
let qos = self.qos();
let envelope = &self.into_envelope()?;
let payload = serde_json::to_string(envelope)
.map_err(|e| Error::new(&format!("error serializing an envelope, {}", &e)))?;
let dump = PublishableDump {
topic,
qos,
payload,
};
let message = match envelope.properties {
OutgoingEnvelopeProperties::Event(_) => PublishableMessage::Event(dump),
OutgoingEnvelopeProperties::Request(_) => PublishableMessage::Request(dump),
OutgoingEnvelopeProperties::Response(_) => PublishableMessage::Response(dump),
};
Ok(message)
}
}
pub trait SubscriptionTopic {
fn subscription_topic<A>(&self, agent_id: &A, me_version: &str) -> Result<String, Error>
where
A: Addressable;
}
impl SubscriptionTopic for &'static str {
fn subscription_topic<A>(&self, _me: &A, _me_version: &str) -> Result<String, Error>
where
A: Addressable,
{
Ok((*self).to_string())
}
}
impl<'a> SubscriptionTopic for EventSubscription<'a> {
fn subscription_topic<A>(&self, _me: &A, _me_version: &str) -> Result<String, Error>
where
A: Addressable,
{
match self.source {
Source::Broadcast(ref from_account_id, ref version, ref uri) => Ok(format!(
"apps/{app}/api/{version}/{uri}",
app = from_account_id,
version = version,
uri = uri,
)),
_ => Err(Error::new(&format!(
"source = '{:?}' is incompatible with event subscription",
self.source,
))),
}
}
}
impl<'a> SubscriptionTopic for RequestSubscription<'a> {
fn subscription_topic<A>(&self, me: &A, me_version: &str) -> Result<String, Error>
where
A: Addressable,
{
match self.source {
Source::Multicast(Some(ref from_agent_id), ver) => Ok(format!(
"agents/{agent_id}/api/{version}/out/{app}",
agent_id = from_agent_id,
version = ver.unwrap_or("+"),
app = me.as_account_id(),
)),
Source::Multicast(None, ver) => Ok(format!(
"agents/+/api/{version}/out/{app}",
version = ver.unwrap_or("+"),
app = me.as_account_id(),
)),
Source::Unicast(Some(ref from_account_id)) => Ok(format!(
"agents/{agent_id}/api/{version}/in/{app}",
agent_id = me.as_agent_id(),
version = me_version,
app = from_account_id,
)),
Source::Unicast(None) => Ok(format!(
"agents/{agent_id}/api/{version}/in/+",
agent_id = me.as_agent_id(),
version = me_version,
)),
_ => Err(Error::new(&format!(
"source = '{:?}' is incompatible with request subscription",
self.source,
))),
}
}
}
impl<'a> SubscriptionTopic for ResponseSubscription<'a> {
fn subscription_topic<A>(&self, me: &A, me_version: &str) -> Result<String, Error>
where
A: Addressable,
{
match self.source {
Source::Unicast(Some(ref from_account_id)) => Ok(format!(
"agents/{agent_id}/api/{version}/in/{app}",
agent_id = me.as_agent_id(),
version = me_version,
app = from_account_id,
)),
Source::Unicast(None) => Ok(format!(
"agents/{agent_id}/api/{version}/in/+",
agent_id = me.as_agent_id(),
version = me_version,
)),
_ => Err(Error::new(&format!(
"source = '{:?}' is incompatible with response subscription",
self.source,
))),
}
}
}
pub mod compat {
use serde_derive::{Deserialize, Serialize};
use super::{
Destination, IncomingEvent, IncomingEventProperties, IncomingMessage, IncomingRequest,
IncomingRequestProperties, IncomingResponse, IncomingResponseProperties,
OutgoingEventProperties, OutgoingRequestProperties, OutgoingResponseProperties,
};
use crate::Error;
#[derive(Debug, Deserialize)]
#[serde(rename_all = "lowercase")]
#[serde(tag = "type")]
pub(crate) enum IncomingEnvelopeProperties {
Event(IncomingEventProperties),
Request(IncomingRequestProperties),
Response(IncomingResponseProperties),
}
#[derive(Debug, Deserialize)]
pub(crate) struct IncomingEnvelope {
payload: String,
properties: IncomingEnvelopeProperties,
}
impl IncomingEnvelope {
pub(crate) fn properties(&self) -> &IncomingEnvelopeProperties {
&self.properties
}
}
pub(crate) fn into_event(envelope: IncomingEnvelope) -> Result<IncomingMessage<String>, Error> {
let payload = envelope.payload;
match envelope.properties {
IncomingEnvelopeProperties::Event(props) => {
Ok(IncomingMessage::Event(IncomingEvent::new(payload, props)))
}
_ => Err(Error::new("error serializing an envelope into event")),
}
}
pub(crate) fn into_request(
envelope: IncomingEnvelope,
) -> Result<IncomingMessage<String>, Error> {
let payload = envelope.payload;
match envelope.properties {
IncomingEnvelopeProperties::Request(props) => Ok(IncomingMessage::Request(
IncomingRequest::new(payload, props),
)),
_ => Err(Error::new("error serializing an envelope into request")),
}
}
pub(crate) fn into_response(
envelope: IncomingEnvelope,
) -> Result<IncomingMessage<String>, Error> {
let payload = envelope.payload;
match envelope.properties {
IncomingEnvelopeProperties::Response(props) => Ok(IncomingMessage::Response(
IncomingResponse::new(payload, props),
)),
_ => Err(Error::new("error serializing an envelope into response")),
}
}
#[derive(Debug, Serialize)]
#[serde(rename_all = "lowercase")]
#[serde(tag = "type")]
pub enum OutgoingEnvelopeProperties {
Event(OutgoingEventProperties),
Request(OutgoingRequestProperties),
Response(OutgoingResponseProperties),
}
#[derive(Debug, Serialize)]
pub struct OutgoingEnvelope {
payload: String,
pub(crate) properties: OutgoingEnvelopeProperties,
#[serde(skip)]
destination: Destination,
}
impl OutgoingEnvelope {
pub fn new(
payload: &str,
properties: OutgoingEnvelopeProperties,
destination: Destination,
) -> Self {
Self {
payload: payload.to_owned(),
properties,
destination,
}
}
}
pub trait IntoEnvelope {
fn into_envelope(self) -> Result<OutgoingEnvelope, Error>;
}
}
use rumqtt::client::Notification;
use rumqtt::PacketIdentifier;
#[derive(Debug)]
pub enum AgentNotification {
Message(Result<IncomingMessage<String>, String>, MessageData),
Reconnection,
Disconnection,
PubAck(PacketIdentifier),
PubRec(PacketIdentifier),
PubRel(PacketIdentifier),
PubComp(PacketIdentifier),
SubAck(PacketIdentifier),
None,
}
#[derive(Debug, Clone, PartialEq)]
pub struct MessageData {
pub dup: bool,
pub qos: QoS,
pub retain: bool,
pub topic: String,
pub pkid: Option<PacketIdentifier>,
}
impl From<Notification> for AgentNotification {
fn from(notification: Notification) -> Self {
match notification {
Notification::Publish(message) => {
let message_data = MessageData {
dup: message.dup,
qos: message.qos,
retain: message.retain,
topic: message.topic_name,
pkid: message.pkid,
};
let env_result =
serde_json::from_slice::<compat::IncomingEnvelope>(&message.payload)
.map_err(|err| format!("Failed to parse incoming envelope: {}", err))
.and_then(|env| match env.properties() {
compat::IncomingEnvelopeProperties::Request(_) => {
compat::into_request(env)
.map_err(|e| format!("Failed to convert into request: {}", e))
}
compat::IncomingEnvelopeProperties::Response(_) => {
compat::into_response(env)
.map_err(|e| format!("Failed to convert into response: {}", e))
}
compat::IncomingEnvelopeProperties::Event(_) => compat::into_event(env)
.map_err(|e| format!("Failed to convert into event: {}", e)),
});
Self::Message(env_result, message_data)
}
Notification::Reconnection => Self::Reconnection,
Notification::Disconnection => Self::Disconnection,
Notification::PubAck(p) => Self::PubAck(p),
Notification::PubRec(p) => Self::PubRec(p),
Notification::PubRel(p) => Self::PubRel(p),
Notification::PubComp(p) => Self::PubComp(p),
Notification::SubAck(p) => Self::SubAck(p),
Notification::None => Self::None,
}
}
}
pub use rumqtt::QoS;