use async_trait::async_trait;
use fe2o3_amqp::link::DetachError;
use std::{sync::Arc, time::Duration as StdDuration};
use tokio::sync::Mutex;
use url::Url;
use crate::{
administration::RuleProperties,
amqp::amqp_request_message::add_rule::CreateRuleFilter,
core::{RecoverableTransport, TransportRuleManager},
primitives::{error::RetryError, service_bus_retry_policy::run_operation},
sealed::Sealed,
ServiceBusRetryPolicy,
};
use super::{
amqp_connection_scope::AmqpConnectionScope,
amqp_management_link::AmqpManagementLink,
amqp_request_message::{
add_rule::AddRuleRequest, enumerate_rules::EnumerateRulesRequest,
remove_rule::RemoveRuleRequest,
},
amqp_response_message::{
add_rule::AddRuleResponse, enumerate_rules::EnumerateRulesResponse,
remove_rule::RemoveRuleResponse,
},
error::{AmqpRequestResponseError, CreateRuleError, OpenRuleManagerError},
};
#[derive(Debug)]
pub struct AmqpRuleManager {
pub(crate) identifier_str: String,
pub(crate) service_endpoint: Arc<Url>,
pub(crate) subscription_path: String,
pub(crate) management_link: AmqpManagementLink,
pub(crate) retry_policy: Box<dyn ServiceBusRetryPolicy>,
pub(crate) connection_scope: Arc<Mutex<AmqpConnectionScope>>,
}
#[async_trait]
impl RecoverableTransport for AmqpRuleManager {
type RecoverError = OpenRuleManagerError;
async fn recover(&mut self) -> Result<(), Self::RecoverError> {
let mut scope = self.connection_scope.lock().await;
scope
.recover()
.await
.map_err(|conn_scope_error| {
log::error!("Unable to recover connection scope: {:?}", conn_scope_error);
Self::RecoverError::ConnectionScopeDisposed
})?;
self.management_link = scope
.open_management_link(
&self.service_endpoint,
&self.subscription_path,
&self.identifier_str,
)
.await?;
Ok(())
}
}
impl AmqpRuleManager {
async fn create_rule(
&mut self,
request: &mut AddRuleRequest,
try_timeout: &StdDuration,
) -> Result<AddRuleResponse, AmqpRequestResponseError> {
let server_timeout = try_timeout.as_millis() as u32;
request.set_server_timeout(Some(server_timeout));
let response = self.management_link.client_mut().call(request).await?;
Ok(response)
}
async fn delete_rule(
&mut self,
request: &mut RemoveRuleRequest,
try_timeout: &StdDuration,
) -> Result<RemoveRuleResponse, AmqpRequestResponseError> {
let server_timeout = try_timeout.as_millis() as u32;
request.set_server_timeout(Some(server_timeout));
let response = self.management_link.client_mut().call(request).await?;
Ok(response)
}
async fn get_rules(
&mut self,
request: &mut EnumerateRulesRequest,
try_timeout: &StdDuration,
) -> Result<EnumerateRulesResponse, AmqpRequestResponseError> {
let server_timeout = try_timeout.as_millis() as u32;
request.set_server_timeout(Some(server_timeout));
let response = self.management_link.client_mut().call(request).await?;
Ok(response)
}
}
impl Sealed for AmqpRuleManager {}
#[async_trait]
impl TransportRuleManager for AmqpRuleManager {
type CreateRuleError = RetryError<CreateRuleError>;
type DeleteRuleError = RetryError<AmqpRequestResponseError>;
type GetRulesError = RetryError<AmqpRequestResponseError>;
type CloseError = DetachError;
fn identifier(&self) -> &str {
&self.identifier_str
}
fn subscription_path(&self) -> &str {
&self.subscription_path
}
async fn create_rule(
&mut self,
rule_name: String,
filter: CreateRuleFilter,
) -> Result<(), Self::CreateRuleError> {
let mut request = AddRuleRequest::new(rule_name, filter, None)
.map_err(CreateRuleError::from)
.map_err(RetryError::Operation)?;
let mut try_timeout = self.retry_policy.calculate_try_timeout(0);
let _response = run_operation!(
{ &self.retry_policy },
CreateRuleError,
try_timeout,
self.create_rule(&mut request, &try_timeout),
self.recover()
)?;
Ok(())
}
async fn delete_rule(&mut self, rule_name: String) -> Result<(), Self::DeleteRuleError> {
let mut request = RemoveRuleRequest::new(rule_name, None);
let mut try_timeout = self.retry_policy.calculate_try_timeout(0);
let _response = run_operation!(
{ &self.retry_policy },
AmqpRequestResponseError,
try_timeout,
self.delete_rule(&mut request, &try_timeout),
self.recover()
)?;
Ok(())
}
async fn get_rules(
&mut self,
skip: i32,
top: i32,
) -> Result<Vec<RuleProperties>, Self::GetRulesError> {
let mut request = EnumerateRulesRequest::new(skip, top, None);
let mut try_timeout = self.retry_policy.calculate_try_timeout(0);
let response = run_operation!(
{ &self.retry_policy },
AmqpRequestResponseError,
try_timeout,
self.get_rules(&mut request, &try_timeout),
self.recover()
)?;
Ok(response.into_get_rules_response())
}
async fn close(mut self) -> Result<(), Self::CloseError> {
self.management_link.close().await
}
}