use crate::{NifiClient, NifiError};
pub struct DynamicClient {
client: NifiClient,
version: tokio::sync::OnceCell<super::availability::DetectedVersion>,
strategy: crate::dynamic::strategy::VersionResolutionStrategy,
cluster_node_id: tokio::sync::OnceCell<Option<String>>,
}
impl std::fmt::Debug for DynamicClient {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("DynamicClient")
.field("version", &self.version.get())
.field("strategy", &self.strategy)
.field("cluster_node_id", &self.cluster_node_id.get())
.finish()
}
}
#[derive(serde::Deserialize)]
#[serde(rename_all = "camelCase")]
struct AboutResponse {
about: AboutInner,
}
#[derive(serde::Deserialize)]
#[serde(rename_all = "camelCase")]
struct AboutInner {
version: String,
}
#[derive(serde::Deserialize)]
#[serde(rename_all = "camelCase")]
struct ClusterSummaryResponse {
cluster_summary: ClusterSummaryInner,
}
#[derive(serde::Deserialize)]
#[serde(rename_all = "camelCase")]
struct ClusterSummaryInner {
clustered: bool,
}
#[derive(serde::Deserialize)]
#[serde(rename_all = "camelCase")]
struct ClusterResponse {
cluster: ClusterInner,
}
#[derive(serde::Deserialize)]
#[serde(rename_all = "camelCase")]
struct ClusterInner {
nodes: Vec<ClusterNode>,
}
#[derive(serde::Deserialize)]
#[serde(rename_all = "camelCase")]
struct ClusterNode {
node_id: Option<String>,
status: Option<String>,
}
impl DynamicClient {
pub fn new(client: NifiClient) -> Self {
Self {
client,
version: tokio::sync::OnceCell::new(),
strategy: crate::dynamic::strategy::VersionResolutionStrategy::default(),
cluster_node_id: tokio::sync::OnceCell::new(),
}
}
pub fn with_strategy(
client: NifiClient,
strategy: crate::dynamic::strategy::VersionResolutionStrategy,
) -> Self {
Self {
client,
version: tokio::sync::OnceCell::new(),
strategy,
cluster_node_id: tokio::sync::OnceCell::new(),
}
}
pub async fn from_client(client: NifiClient) -> Result<Self, NifiError> {
let dc = Self::new(client);
dc.detect_version().await?;
dc.discover_cluster().await;
Ok(dc)
}
pub fn inner(&self) -> &NifiClient {
&self.client
}
pub fn strategy(&self) -> crate::dynamic::strategy::VersionResolutionStrategy {
self.strategy
}
pub async fn login(&self, username: &str, password: &str) -> Result<(), NifiError> {
self.client.login(username, password).await?;
self.detect_version().await?;
self.discover_cluster().await;
Ok(())
}
pub async fn logout(&self) -> Result<(), NifiError> {
self.client.logout().await
}
pub async fn token(&self) -> Option<String> {
self.client.token().await
}
pub async fn set_token(&self, token: String) {
self.client.set_token(token).await;
}
pub async fn authenticate(&self) -> Result<(), NifiError> {
self.client.authenticate().await
}
pub async fn detect_version(
&self,
) -> Result<super::availability::DetectedVersion, NifiError> {
let strategy = self.strategy;
self.version
.get_or_try_init(|| async {
let resp: AboutResponse = self.client.get("/flow/about", &[]).await?;
crate::dynamic::strategy::resolve_version(
&resp.about.version,
strategy,
super::availability::version_from_str,
super::availability::SUPPORTED_VERSIONS,
)
})
.await
.copied()
}
pub fn detected_version(&self) -> Option<super::availability::DetectedVersion> {
self.version.get().copied()
}
pub fn detected_version_str(&self) -> String {
self.version.get().map(|v| v.to_string()).unwrap_or_default()
}
pub fn cluster_node_id(&self) -> Option<&str> {
self.cluster_node_id.get().and_then(|opt| opt.as_deref())
}
async fn discover_cluster(&self) {
let _ = self
.cluster_node_id
.get_or_init(|| async {
let summary: Result<ClusterSummaryResponse, NifiError> =
self.client.get("/flow/cluster/summary", &[]).await;
match summary {
Ok(s) if s.cluster_summary.clustered => {
let cluster: Result<ClusterResponse, NifiError> =
self.client.get("/controller/cluster", &[]).await;
match cluster {
Ok(c) => c
.cluster
.nodes
.iter()
.find(|n| n.status.as_deref() == Some("CONNECTED"))
.and_then(|n| n.node_id.clone()),
Err(_) => None,
}
}
_ => None,
}
})
.await;
}
pub async fn require_endpoint(
&self,
endpoint: super::availability::Endpoint,
) -> Result<(), NifiError> {
let version = self.detect_version().await?.to_string();
if Self::supports_with(endpoint, &version) {
Ok(())
} else {
Err(NifiError::UnsupportedEndpoint {
endpoint: endpoint.as_str().to_string(),
version,
})
}
}
pub fn supports(&self, endpoint: super::availability::Endpoint) -> bool {
match self.version.get() {
Some(v) => Self::supports_with(endpoint, &v.to_string()),
None => false,
}
}
fn supports_with(endpoint: super::availability::Endpoint, version: &str) -> bool {
super::availability::ENDPOINT_AVAILABILITY
.iter()
.find(|(e, _)| *e == endpoint)
.map(|(_, versions)| versions.contains(&version))
.unwrap_or(false)
}
}