use std::sync::Arc;
use std::time::Duration;
use arc_swap::ArcSwapOption;
use futures::future::BoxFuture;
use mssf_core::client::FabricClient;
use mssf_core::client::svc_mgmt_client::{PartitionKeyType, ResolvedServicePartition};
use mssf_core::types::Uri as FabricUri;
use crate::resolve::ServicePartitionResolver;
use crate::retry::OperationRetryer;
use super::resolver::{BoxError, TargetResolver};
use super::selector::{DialTarget, SelectError, TargetSelector};
pub struct FabricTargetResolver {
inner: ServicePartitionResolver,
uri: FabricUri,
key: PartitionKeyType,
timeout: Option<Duration>,
selector: TargetSelector,
cached: ArcSwapOption<ResolvedServicePartition>,
}
impl TargetResolver for FabricTargetResolver {
fn resolve(&self) -> BoxFuture<'_, Result<DialTarget, BoxError>> {
Box::pin(async move {
let prev = self.cached.load_full();
let had_cache = prev.is_some();
let new_rsp = self
.inner
.resolve(&self.uri, &self.key, prev.as_deref(), self.timeout, None)
.await
.map_err(|e| {
tracing::warn!(
uri = %self.uri,
had_cache,
error = ?e,
"FabricTargetResolver: SF resolve_service_partition failed",
);
Box::new(e) as BoxError
})?;
let (rsp, cache_outcome) = match prev.as_deref() {
Some(p) => match p.partial_cmp(&new_rsp) {
Some(std::cmp::Ordering::Less) => {
let arc = Arc::new(new_rsp);
self.cached.store(Some(arc.clone()));
(arc, "advanced")
}
Some(_) => {
(prev.unwrap(), "kept")
}
None => {
let arc = Arc::new(new_rsp);
self.cached.store(Some(arc.clone()));
(arc, "hard-reset")
}
},
None => {
let arc = Arc::new(new_rsp);
self.cached.store(Some(arc.clone()));
(arc, "first")
}
};
match (self.selector)(&rsp) {
Ok(target) => {
tracing::info!(
uri = %self.uri,
had_cache,
cache = cache_outcome,
host = %target.host,
port = target.port,
"FabricTargetResolver: resolved dial target",
);
Ok(target)
}
Err(SelectError::NoMatch) => {
tracing::warn!(
uri = %self.uri,
had_cache,
cache = cache_outcome,
endpoint_count = rsp.endpoints.len(),
"FabricTargetResolver: selector found no matching endpoint",
);
Err("no matching endpoint".into())
}
Err(SelectError::Fatal(b)) => {
tracing::warn!(
uri = %self.uri,
had_cache,
cache = cache_outcome,
error = %b,
"FabricTargetResolver: selector returned fatal error",
);
Err(b)
}
}
})
}
}
pub struct FabricTargetResolverBuilder {
fc: FabricClient,
uri: Option<FabricUri>,
key: PartitionKeyType,
timeout: Option<Duration>,
retryer: Option<OperationRetryer>,
selector: Option<TargetSelector>,
}
impl FabricTargetResolverBuilder {
pub fn new(fc: FabricClient) -> Self {
Self {
fc,
uri: None,
key: PartitionKeyType::None,
timeout: None,
retryer: None,
selector: None,
}
}
pub fn service_uri(mut self, uri: impl Into<FabricUri>) -> Self {
self.uri = Some(uri.into());
self
}
pub fn partition_key(mut self, key: PartitionKeyType) -> Self {
self.key = key;
self
}
pub fn resolve_timeout(mut self, t: Duration) -> Self {
self.timeout = Some(t);
self
}
pub fn retryer(mut self, r: OperationRetryer) -> Self {
self.retryer = Some(r);
self
}
pub fn target_selector<F>(mut self, f: F) -> Self
where
F: Fn(&ResolvedServicePartition) -> Result<DialTarget, SelectError> + Send + Sync + 'static,
{
self.selector = Some(Arc::new(f));
self
}
pub fn build(self) -> Arc<FabricTargetResolver> {
let uri = self
.uri
.expect("FabricTargetResolverBuilder::service_uri is required");
let selector = self
.selector
.expect("FabricTargetResolverBuilder::target_selector is required");
let retryer = self
.retryer
.unwrap_or_else(|| OperationRetryer::builder().build());
Arc::new(FabricTargetResolver {
inner: ServicePartitionResolver::new(self.fc, retryer),
uri,
key: self.key,
timeout: self.timeout,
selector,
cached: ArcSwapOption::empty(),
})
}
}