use crate::cbconfig;
use crate::configparser::ConfigParser;
use crate::configwatcher::ConfigWatcherMemd;
use crate::error::{Error, ErrorKind};
use crate::kv_orchestration::KvClientManagerClientType;
use crate::kvclient::{KvClient, StdKvClient};
use crate::kvclient_ops::KvClientOps;
use crate::kvclientpool::KvClientPool;
use crate::kvendpointclientmanager::KvEndpointClientManager;
use crate::memdx::hello_feature::HelloFeature;
use crate::memdx::request::{GetClusterConfigKnownVersion, GetClusterConfigRequest};
use crate::parsedconfig::ParsedConfig;
use std::env;
use std::sync::Arc;
use std::time::Duration;
use tokio::time::{timeout, timeout_at};
use tracing::{debug, trace};
#[derive(Clone)]
pub(crate) struct ConfigFetcherMemd<M: KvEndpointClientManager> {
kv_client_manager: Arc<M>,
fetch_timeout: Duration,
}
pub(crate) struct ConfigFetcherMemdOptions<M: KvEndpointClientManager> {
pub kv_client_manager: Arc<M>,
pub fetch_timeout: Duration,
}
impl<M: KvEndpointClientManager> ConfigFetcherMemd<M> {
pub fn new(opts: ConfigFetcherMemdOptions<M>) -> Self {
Self {
kv_client_manager: opts.kv_client_manager.clone(),
fetch_timeout: opts.fetch_timeout,
}
}
pub(crate) async fn poll_one(
&self,
endpoint: &str,
rev_id: i64,
rev_epoch: i64,
skip_fetch_cb: impl FnOnce(Arc<KvClientManagerClientType<M>>) -> bool,
) -> crate::error::Result<Option<ParsedConfig>> {
let client = self.kv_client_manager.get_endpoint_client(endpoint).await?;
if skip_fetch_cb(client.clone()) {
return Ok(None);
}
debug!("Fetching config from {}", &endpoint);
let hostname = client.remote_hostname();
let known_version = {
if rev_id > 0 && client.has_feature(HelloFeature::ClusterMapKnownVersion) {
Some(GetClusterConfigKnownVersion { rev_epoch, rev_id })
} else {
None
}
};
let resp = timeout(
self.fetch_timeout,
client.get_cluster_config(GetClusterConfigRequest { known_version }),
)
.await
.map_err(|e| Error::new_message_error("get cluster config timed out"))?
.map_err(Error::new_contextual_memdx_error)?;
if resp.config.is_empty() {
return Ok(None);
}
let config = cbconfig::parse::parse_terse_config(&resp.config, hostname)?;
if env::var("RSCBC_DEBUG_CONFIG").is_ok() {
trace!("Fetcher fetched new config {:?}", &config);
}
Ok(Some(ConfigParser::parse_terse_config(config, hostname)?))
}
}