#![cfg(feature = "dynamic")]
use nifi_rust_client::NifiClientBuilder;
use nifi_rust_client::dynamic::types::{FlowMetricsReportingStrategy, IncludedRegistries};
use wiremock::matchers::{method, path, query_param, query_param_is_missing};
use wiremock::{Mock, MockServer, ResponseTemplate};
#[allow(clippy::unwrap_used)]
async fn dynamic_client_on(
mock: &MockServer,
version: &str,
) -> nifi_rust_client::dynamic::DynamicClient {
Mock::given(method("GET"))
.and(path("/nifi-api/flow/about"))
.respond_with(ResponseTemplate::new(200).set_body_json(serde_json::json!({
"about": { "title": "NiFi", "version": version }
})))
.mount(mock)
.await;
let client = NifiClientBuilder::new(&mock.uri())
.unwrap()
.build()
.unwrap();
nifi_rust_client::dynamic::DynamicClient::from_client(client)
.await
.unwrap()
}
#[tokio::test]
async fn dynamic_about_returns_fields() {
let mock = MockServer::start().await;
Mock::given(method("GET"))
.and(path("/nifi-api/flow/about"))
.respond_with(ResponseTemplate::new(200).set_body_json(serde_json::json!({
"about": {
"title": "NiFi",
"version": "2.8.0",
"uri": "http://localhost/nifi-api/",
"contentViewerUrl": "/nifi-content-viewer/",
"timezone": "UTC",
"buildTag": "nifi-2.8.0",
"buildTimestamp": "01/01/2026 00:00:00 UTC"
}
})))
.mount(&mock)
.await;
let client = NifiClientBuilder::new(&mock.uri())
.unwrap()
.build()
.unwrap();
let dynamic = nifi_rust_client::dynamic::DynamicClient::from_client(client)
.await
.unwrap();
let about = dynamic.flow().get_about_info().await.unwrap();
assert_eq!(about.version.as_deref(), Some("2.8.0"));
assert_eq!(about.title.as_deref(), Some("NiFi"));
assert_eq!(about.timezone.as_deref(), Some("UTC"));
}
#[tokio::test]
async fn dynamic_current_user_returns_identity() {
let mock = MockServer::start().await;
let dynamic = dynamic_client_on(&mock, "2.8.0").await;
Mock::given(method("GET"))
.and(path("/nifi-api/flow/current-user"))
.respond_with(ResponseTemplate::new(200).set_body_json(serde_json::json!({
"identity": "admin",
"anonymous": false,
"canVersionFlows": false
})))
.mount(&mock)
.await;
let user = dynamic.flow().get_current_user().await.unwrap();
assert_eq!(user.identity.as_deref(), Some("admin"));
assert!(!user.anonymous.unwrap_or(true));
}
#[tokio::test]
async fn dynamic_get_resources() {
let mock = MockServer::start().await;
let dynamic = dynamic_client_on(&mock, "2.8.0").await;
Mock::given(method("GET"))
.and(path("/nifi-api/resources"))
.respond_with(ResponseTemplate::new(200).set_body_json(serde_json::json!({
"resources": [
{ "identifier": "/flow", "name": "Flow" },
{ "identifier": "/system", "name": "System" }
]
})))
.mount(&mock)
.await;
let result = dynamic.resources().get_resources().await;
assert!(result.is_ok());
let entity = result.unwrap();
assert!(entity.resources.as_deref().unwrap_or_default().len() >= 2);
}
#[tokio::test]
async fn dynamic_patch_version_detection() {
let mock = MockServer::start().await;
Mock::given(method("GET"))
.and(path("/nifi-api/flow/about"))
.respond_with(ResponseTemplate::new(200).set_body_json(serde_json::json!({
"about": { "title": "NiFi", "version": "2.8.1" }
})))
.mount(&mock)
.await;
let client = NifiClientBuilder::new(&mock.uri())
.unwrap()
.build()
.unwrap();
let dynamic = nifi_rust_client::dynamic::DynamicClient::from_client(client)
.await
.unwrap();
assert_eq!(
dynamic.detected_version(),
Some(nifi_rust_client::dynamic::DetectedVersion::V2_8_0)
);
assert_eq!(dynamic.detected_version().unwrap().to_string(), "2.8.0");
}
#[tokio::test]
async fn dynamic_flow_metrics_v2_6_0_errors_on_extra_param() {
let mock = MockServer::start().await;
let dynamic = dynamic_client_on(&mock, "2.6.0").await;
let err = dynamic
.flow()
.get_flow_metrics(
"prometheus",
Some(IncludedRegistries::Jvm),
None,
None,
None,
Some(FlowMetricsReportingStrategy::AllProcessGroups),
)
.await
.unwrap_err();
assert!(
matches!(
err,
nifi_rust_client::NifiError::UnsupportedQueryParam { .. }
),
"expected UnsupportedQueryParam, got: {err:?}"
);
}
#[tokio::test]
async fn dynamic_flow_metrics_v2_8_0_passes_all_params() {
let mock = MockServer::start().await;
let dynamic = dynamic_client_on(&mock, "2.8.0").await;
Mock::given(method("GET"))
.and(path("/nifi-api/flow/metrics/prometheus"))
.and(query_param("includedRegistries", "JVM"))
.and(query_param(
"flowMetricsReportingStrategy",
"ALL_PROCESS_GROUPS",
))
.respond_with(ResponseTemplate::new(200))
.expect(1)
.mount(&mock)
.await;
dynamic
.flow()
.get_flow_metrics(
"prometheus",
Some(IncludedRegistries::Jvm),
None,
None,
None,
Some(FlowMetricsReportingStrategy::AllProcessGroups),
)
.await
.unwrap();
}
#[tokio::test]
async fn dynamic_flow_metrics_v2_7_2_passes_all_params() {
let mock = MockServer::start().await;
let dynamic = dynamic_client_on(&mock, "2.7.2").await;
Mock::given(method("GET"))
.and(path("/nifi-api/flow/metrics/prometheus"))
.and(query_param("includedRegistries", "NIFI"))
.and(query_param("sampleName", "my_sample"))
.and(query_param(
"flowMetricsReportingStrategy",
"ALL_COMPONENTS",
))
.respond_with(ResponseTemplate::new(200))
.expect(1)
.mount(&mock)
.await;
dynamic
.flow()
.get_flow_metrics(
"prometheus",
Some(IncludedRegistries::Nifi),
Some("my_sample"),
None,
None,
Some(FlowMetricsReportingStrategy::AllComponents),
)
.await
.unwrap();
}
#[tokio::test]
async fn dynamic_universal_fields_not_optional() {
let dto = nifi_rust_client::dynamic::types::ConnectableDto::default();
assert_eq!(dto.id, "");
assert_eq!(dto.group_id, "");
assert_eq!(dto.r#type, "");
assert!(dto.name.is_none());
assert!(dto.comments.is_none());
}
#[allow(clippy::unwrap_used)]
async fn clustered_dynamic_client_on(
mock: &MockServer,
version: &str,
) -> nifi_rust_client::dynamic::DynamicClient {
Mock::given(method("GET"))
.and(path("/nifi-api/flow/about"))
.respond_with(ResponseTemplate::new(200).set_body_json(serde_json::json!({
"about": { "title": "NiFi", "version": version }
})))
.mount(mock)
.await;
Mock::given(method("GET"))
.and(path("/nifi-api/flow/cluster/summary"))
.respond_with(ResponseTemplate::new(200).set_body_json(serde_json::json!({
"clusterSummary": { "clustered": true, "connectedToCluster": true }
})))
.mount(mock)
.await;
Mock::given(method("GET"))
.and(path("/nifi-api/controller/cluster"))
.respond_with(ResponseTemplate::new(200).set_body_json(serde_json::json!({
"cluster": {
"nodes": [
{ "nodeId": "node-abc", "status": "CONNECTED" }
]
}
})))
.mount(mock)
.await;
let client = NifiClientBuilder::new(&mock.uri())
.unwrap()
.build()
.unwrap();
let dynamic = nifi_rust_client::dynamic::DynamicClient::from_client(client)
.await
.unwrap();
assert_eq!(
dynamic.cluster_node_id(),
Some("node-abc"),
"cluster discovery must populate cluster_node_id for this test to be meaningful",
);
dynamic
}
#[tokio::test]
async fn dynamic_system_diagnostics_nodewise_true_omits_cluster_node_id() {
let mock = MockServer::start().await;
let dynamic = clustered_dynamic_client_on(&mock, "2.8.0").await;
Mock::given(method("GET"))
.and(path("/nifi-api/system-diagnostics"))
.and(query_param("nodewise", "true"))
.and(query_param_is_missing("clusterNodeId"))
.respond_with(ResponseTemplate::new(200).set_body_json(serde_json::json!({
"systemDiagnostics": {
"aggregateSnapshot": { "availableProcessors": 8 }
}
})))
.mount(&mock)
.await;
let diag = dynamic
.systemdiagnostics()
.get_system_diagnostics(Some(true), None, None)
.await
.expect("nodewise=true must not trigger cluster_node_id auto-injection");
assert_eq!(
diag.aggregate_snapshot
.as_ref()
.and_then(|s| s.available_processors),
Some(8),
);
}