#![cfg(not(feature = "dynamic"))]
use std::time::Duration;
use nifi_rust_client::wait::{
self, ControllerServiceTargetState, ProcessorTargetState, WaitConfig,
};
use nifi_rust_client::{NifiClientBuilder, NifiError};
use serde_json::json;
use wiremock::matchers::{method, path};
use wiremock::{Mock, MockServer, ResponseTemplate};
fn fast_config(timeout_ms: u64) -> WaitConfig {
WaitConfig {
timeout: Duration::from_millis(timeout_ms),
poll_interval: Duration::from_millis(10),
initial_delay: Duration::ZERO,
cleanup: true,
}
}
fn processor_entity(state: &str) -> serde_json::Value {
json!({
"component": {
"id": "abc",
"state": state,
}
})
}
fn controller_service_entity(state: &str) -> serde_json::Value {
json!({
"component": {
"id": "cs-1",
"state": state,
}
})
}
#[tokio::test]
async fn processor_state_reaches_target() {
let mock_server = MockServer::start().await;
Mock::given(method("GET"))
.and(path("/nifi-api/processors/abc"))
.respond_with(ResponseTemplate::new(200).set_body_json(processor_entity("STOPPED")))
.up_to_n_times(2)
.mount(&mock_server)
.await;
Mock::given(method("GET"))
.and(path("/nifi-api/processors/abc"))
.respond_with(ResponseTemplate::new(200).set_body_json(processor_entity("RUNNING")))
.mount(&mock_server)
.await;
let client = NifiClientBuilder::new(&mock_server.uri())
.unwrap()
.build()
.unwrap();
client.set_token("jwt".to_string()).await;
let entity = wait::processor_state(
&client,
"abc",
ProcessorTargetState::Running,
fast_config(1000),
)
.await
.unwrap();
assert!(entity.component.is_some());
}
#[tokio::test]
async fn processor_state_times_out() {
let mock_server = MockServer::start().await;
Mock::given(method("GET"))
.and(path("/nifi-api/processors/abc"))
.respond_with(ResponseTemplate::new(200).set_body_json(processor_entity("STOPPED")))
.mount(&mock_server)
.await;
let client = NifiClientBuilder::new(&mock_server.uri())
.unwrap()
.build()
.unwrap();
client.set_token("jwt".to_string()).await;
let err = wait::processor_state(
&client,
"abc",
ProcessorTargetState::Running,
fast_config(50),
)
.await
.unwrap_err();
match err {
NifiError::Timeout { operation } => {
assert!(operation.contains("wait_for_processor_state"));
assert!(operation.contains("RUNNING"));
}
other => panic!("expected Timeout, got {other:?}"),
}
}
#[tokio::test]
async fn controller_service_state_reaches_target() {
let mock_server = MockServer::start().await;
Mock::given(method("GET"))
.and(path("/nifi-api/controller-services/cs-1"))
.respond_with(
ResponseTemplate::new(200).set_body_json(controller_service_entity("ENABLING")),
)
.up_to_n_times(1)
.mount(&mock_server)
.await;
Mock::given(method("GET"))
.and(path("/nifi-api/controller-services/cs-1"))
.respond_with(
ResponseTemplate::new(200).set_body_json(controller_service_entity("ENABLED")),
)
.mount(&mock_server)
.await;
let client = NifiClientBuilder::new(&mock_server.uri())
.unwrap()
.build()
.unwrap();
client.set_token("jwt".to_string()).await;
let entity = wait::controller_service_state(
&client,
"cs-1",
ControllerServiceTargetState::Enabled,
fast_config(1000),
)
.await
.unwrap();
assert!(entity.component.is_some());
}
#[tokio::test]
async fn controller_service_state_times_out() {
let mock_server = MockServer::start().await;
Mock::given(method("GET"))
.and(path("/nifi-api/controller-services/cs-1"))
.respond_with(
ResponseTemplate::new(200).set_body_json(controller_service_entity("DISABLED")),
)
.mount(&mock_server)
.await;
let client = NifiClientBuilder::new(&mock_server.uri())
.unwrap()
.build()
.unwrap();
client.set_token("jwt".to_string()).await;
let err = wait::controller_service_state(
&client,
"cs-1",
ControllerServiceTargetState::Enabled,
fast_config(50),
)
.await
.unwrap_err();
assert!(matches!(err, NifiError::Timeout { .. }));
}
fn update_request_entity(complete: bool, failure: Option<&str>) -> serde_json::Value {
let mut req = json!({ "complete": complete });
if let Some(reason) = failure {
req["failureReason"] = json!(reason);
}
json!({ "request": req })
}
#[tokio::test]
async fn parameter_context_update_succeeds_and_cleans_up() {
let mock_server = MockServer::start().await;
Mock::given(method("GET"))
.and(path(
"/nifi-api/parameter-contexts/ctx-1/update-requests/req-1",
))
.respond_with(ResponseTemplate::new(200).set_body_json(update_request_entity(false, None)))
.up_to_n_times(1)
.mount(&mock_server)
.await;
Mock::given(method("GET"))
.and(path(
"/nifi-api/parameter-contexts/ctx-1/update-requests/req-1",
))
.respond_with(ResponseTemplate::new(200).set_body_json(update_request_entity(true, None)))
.mount(&mock_server)
.await;
Mock::given(method("DELETE"))
.and(path(
"/nifi-api/parameter-contexts/ctx-1/update-requests/req-1",
))
.respond_with(ResponseTemplate::new(200).set_body_json(update_request_entity(true, None)))
.expect(1)
.mount(&mock_server)
.await;
let client = NifiClientBuilder::new(&mock_server.uri())
.unwrap()
.build()
.unwrap();
client.set_token("jwt".to_string()).await;
let entity = wait::parameter_context_update(&client, "ctx-1", "req-1", fast_config(1000))
.await
.unwrap();
assert_eq!(entity.request.and_then(|r| r.complete), Some(true));
}
#[tokio::test]
async fn parameter_context_update_reports_failure() {
let mock_server = MockServer::start().await;
Mock::given(method("GET"))
.and(path(
"/nifi-api/parameter-contexts/ctx-1/update-requests/req-1",
))
.respond_with(
ResponseTemplate::new(200)
.set_body_json(update_request_entity(true, Some("validation failed"))),
)
.mount(&mock_server)
.await;
Mock::given(method("DELETE"))
.and(path(
"/nifi-api/parameter-contexts/ctx-1/update-requests/req-1",
))
.respond_with(ResponseTemplate::new(200).set_body_json(update_request_entity(true, None)))
.expect(1)
.mount(&mock_server)
.await;
let client = NifiClientBuilder::new(&mock_server.uri())
.unwrap()
.build()
.unwrap();
client.set_token("jwt".to_string()).await;
let err = wait::parameter_context_update(&client, "ctx-1", "req-1", fast_config(1000))
.await
.unwrap_err();
match err {
NifiError::Api { status, message } => {
assert_eq!(status, 500);
assert!(message.contains("validation failed"));
}
other => panic!("expected Api, got {other:?}"),
}
}
#[tokio::test]
async fn parameter_context_update_no_cleanup_when_disabled() {
let mock_server = MockServer::start().await;
Mock::given(method("GET"))
.and(path(
"/nifi-api/parameter-contexts/ctx-1/update-requests/req-1",
))
.respond_with(ResponseTemplate::new(200).set_body_json(update_request_entity(true, None)))
.mount(&mock_server)
.await;
Mock::given(method("DELETE"))
.and(path(
"/nifi-api/parameter-contexts/ctx-1/update-requests/req-1",
))
.respond_with(ResponseTemplate::new(200).set_body_json(update_request_entity(true, None)))
.expect(0)
.mount(&mock_server)
.await;
let client = NifiClientBuilder::new(&mock_server.uri())
.unwrap()
.build()
.unwrap();
client.set_token("jwt".to_string()).await;
let config = WaitConfig {
timeout: Duration::from_millis(500),
poll_interval: Duration::from_millis(10),
initial_delay: Duration::ZERO,
cleanup: false,
};
let entity = wait::parameter_context_update(&client, "ctx-1", "req-1", config)
.await
.unwrap();
assert!(entity.request.and_then(|r| r.complete).unwrap_or(false));
}
fn provenance_entity(finished: bool) -> serde_json::Value {
json!({
"provenance": {
"id": "q-1",
"finished": finished,
"percentCompleted": if finished { 100 } else { 50 },
}
})
}
#[tokio::test]
async fn provenance_query_succeeds_and_cleans_up() {
let mock_server = MockServer::start().await;
Mock::given(method("GET"))
.and(path("/nifi-api/provenance/q-1"))
.respond_with(ResponseTemplate::new(200).set_body_json(provenance_entity(false)))
.up_to_n_times(1)
.mount(&mock_server)
.await;
Mock::given(method("GET"))
.and(path("/nifi-api/provenance/q-1"))
.respond_with(ResponseTemplate::new(200).set_body_json(provenance_entity(true)))
.mount(&mock_server)
.await;
Mock::given(method("DELETE"))
.and(path("/nifi-api/provenance/q-1"))
.respond_with(ResponseTemplate::new(200).set_body_json(provenance_entity(true)))
.expect(1)
.mount(&mock_server)
.await;
let client = NifiClientBuilder::new(&mock_server.uri())
.unwrap()
.build()
.unwrap();
client.set_token("jwt".to_string()).await;
let dto = wait::provenance_query(&client, "q-1", fast_config(1000))
.await
.unwrap();
assert_eq!(dto.finished, Some(true));
}
#[tokio::test]
async fn provenance_query_propagates_fetch_error() {
let mock_server = MockServer::start().await;
Mock::given(method("GET"))
.and(path("/nifi-api/provenance/q-1"))
.respond_with(ResponseTemplate::new(500).set_body_string("internal error"))
.mount(&mock_server)
.await;
Mock::given(method("DELETE"))
.and(path("/nifi-api/provenance/q-1"))
.respond_with(ResponseTemplate::new(200).set_body_json(provenance_entity(true)))
.mount(&mock_server)
.await;
let client = NifiClientBuilder::new(&mock_server.uri())
.unwrap()
.build()
.unwrap();
client.set_token("jwt".to_string()).await;
let err = wait::provenance_query(&client, "q-1", fast_config(200))
.await
.unwrap_err();
assert!(
matches!(err, NifiError::Api { status: 500, .. })
|| matches!(err, NifiError::Timeout { .. }),
"expected Api(500) or Timeout, got: {err:?}"
);
}
fn drop_request_entity(finished: bool, failure: Option<&str>) -> serde_json::Value {
let mut req = json!({
"id": "drop-1",
"finished": finished,
"percentCompleted": if finished { 100 } else { 50 },
});
if let Some(reason) = failure {
req["failureReason"] = json!(reason);
}
json!({ "dropRequest": req })
}
#[tokio::test]
async fn flowfile_drop_succeeds_and_cleans_up() {
let mock_server = MockServer::start().await;
Mock::given(method("GET"))
.and(path("/nifi-api/flowfile-queues/q-1/drop-requests/drop-1"))
.respond_with(ResponseTemplate::new(200).set_body_json(drop_request_entity(false, None)))
.up_to_n_times(1)
.mount(&mock_server)
.await;
Mock::given(method("GET"))
.and(path("/nifi-api/flowfile-queues/q-1/drop-requests/drop-1"))
.respond_with(ResponseTemplate::new(200).set_body_json(drop_request_entity(true, None)))
.mount(&mock_server)
.await;
Mock::given(method("DELETE"))
.and(path("/nifi-api/flowfile-queues/q-1/drop-requests/drop-1"))
.respond_with(ResponseTemplate::new(200).set_body_json(drop_request_entity(true, None)))
.expect(1)
.mount(&mock_server)
.await;
let client = NifiClientBuilder::new(&mock_server.uri())
.unwrap()
.build()
.unwrap();
client.set_token("jwt".to_string()).await;
let dto = wait::flowfile_drop(&client, "q-1", "drop-1", fast_config(1000))
.await
.unwrap();
assert_eq!(dto.finished, Some(true));
}
#[tokio::test]
async fn flowfile_drop_reports_failure() {
let mock_server = MockServer::start().await;
Mock::given(method("GET"))
.and(path("/nifi-api/flowfile-queues/q-1/drop-requests/drop-1"))
.respond_with(
ResponseTemplate::new(200)
.set_body_json(drop_request_entity(true, Some("queue locked"))),
)
.mount(&mock_server)
.await;
Mock::given(method("DELETE"))
.and(path("/nifi-api/flowfile-queues/q-1/drop-requests/drop-1"))
.respond_with(ResponseTemplate::new(200).set_body_json(drop_request_entity(true, None)))
.expect(1)
.mount(&mock_server)
.await;
let client = NifiClientBuilder::new(&mock_server.uri())
.unwrap()
.build()
.unwrap();
client.set_token("jwt".to_string()).await;
let err = wait::flowfile_drop(&client, "q-1", "drop-1", fast_config(1000))
.await
.unwrap_err();
match err {
NifiError::Api { status, message } => {
assert_eq!(status, 500);
assert!(message.contains("drop request failed"));
assert!(message.contains("queue locked"));
}
other => panic!("expected Api, got {other:?}"),
}
}
#[tokio::test]
async fn flowfile_drop_no_cleanup_when_disabled() {
let mock_server = MockServer::start().await;
Mock::given(method("GET"))
.and(path("/nifi-api/flowfile-queues/q-1/drop-requests/drop-1"))
.respond_with(ResponseTemplate::new(200).set_body_json(drop_request_entity(true, None)))
.mount(&mock_server)
.await;
Mock::given(method("DELETE"))
.and(path("/nifi-api/flowfile-queues/q-1/drop-requests/drop-1"))
.respond_with(ResponseTemplate::new(200).set_body_json(drop_request_entity(true, None)))
.expect(0)
.mount(&mock_server)
.await;
let client = NifiClientBuilder::new(&mock_server.uri())
.unwrap()
.build()
.unwrap();
client.set_token("jwt".to_string()).await;
let config = WaitConfig {
timeout: Duration::from_millis(500),
poll_interval: Duration::from_millis(10),
initial_delay: Duration::ZERO,
cleanup: false,
};
let dto = wait::flowfile_drop(&client, "q-1", "drop-1", config)
.await
.unwrap();
assert_eq!(dto.finished, Some(true));
}
fn listing_request_entity(finished: bool, failure: Option<&str>) -> serde_json::Value {
let mut req = json!({
"id": "list-1",
"finished": finished,
"percentCompleted": if finished { 100 } else { 50 },
});
if let Some(reason) = failure {
req["failureReason"] = json!(reason);
}
json!({ "listingRequest": req })
}
#[tokio::test]
async fn flowfile_listing_succeeds_and_cleans_up() {
let mock_server = MockServer::start().await;
Mock::given(method("GET"))
.and(path(
"/nifi-api/flowfile-queues/q-1/listing-requests/list-1",
))
.respond_with(ResponseTemplate::new(200).set_body_json(listing_request_entity(true, None)))
.mount(&mock_server)
.await;
Mock::given(method("DELETE"))
.and(path(
"/nifi-api/flowfile-queues/q-1/listing-requests/list-1",
))
.respond_with(ResponseTemplate::new(200).set_body_json(listing_request_entity(true, None)))
.expect(1)
.mount(&mock_server)
.await;
let client = NifiClientBuilder::new(&mock_server.uri())
.unwrap()
.build()
.unwrap();
client.set_token("jwt".to_string()).await;
let dto = wait::flowfile_listing(&client, "q-1", "list-1", fast_config(1000))
.await
.unwrap();
assert_eq!(dto.finished, Some(true));
}
#[tokio::test]
async fn flowfile_listing_reports_failure() {
let mock_server = MockServer::start().await;
Mock::given(method("GET"))
.and(path(
"/nifi-api/flowfile-queues/q-1/listing-requests/list-1",
))
.respond_with(
ResponseTemplate::new(200)
.set_body_json(listing_request_entity(true, Some("queue too large"))),
)
.mount(&mock_server)
.await;
Mock::given(method("DELETE"))
.and(path(
"/nifi-api/flowfile-queues/q-1/listing-requests/list-1",
))
.respond_with(ResponseTemplate::new(200).set_body_json(listing_request_entity(true, None)))
.expect(1)
.mount(&mock_server)
.await;
let client = NifiClientBuilder::new(&mock_server.uri())
.unwrap()
.build()
.unwrap();
client.set_token("jwt".to_string()).await;
let err = wait::flowfile_listing(&client, "q-1", "list-1", fast_config(1000))
.await
.unwrap_err();
match err {
NifiError::Api { status, message } => {
assert_eq!(status, 500);
assert!(message.contains("listing request failed"));
assert!(message.contains("queue too large"));
}
other => panic!("expected Api, got {other:?}"),
}
}
#[tokio::test]
async fn empty_all_connections_succeeds_and_cleans_up() {
let mock_server = MockServer::start().await;
Mock::given(method("GET"))
.and(path(
"/nifi-api/process-groups/pg-1/empty-all-connections-requests/drop-1",
))
.respond_with(ResponseTemplate::new(200).set_body_json(drop_request_entity(true, None)))
.mount(&mock_server)
.await;
Mock::given(method("DELETE"))
.and(path(
"/nifi-api/process-groups/pg-1/empty-all-connections-requests/drop-1",
))
.respond_with(ResponseTemplate::new(200).set_body_json(drop_request_entity(true, None)))
.expect(1)
.mount(&mock_server)
.await;
let client = NifiClientBuilder::new(&mock_server.uri())
.unwrap()
.build()
.unwrap();
client.set_token("jwt".to_string()).await;
let dto = wait::empty_all_connections(&client, "pg-1", "drop-1", fast_config(1000))
.await
.unwrap();
assert_eq!(dto.finished, Some(true));
}
#[tokio::test]
async fn empty_all_connections_reports_failure() {
let mock_server = MockServer::start().await;
Mock::given(method("GET"))
.and(path(
"/nifi-api/process-groups/pg-1/empty-all-connections-requests/drop-1",
))
.respond_with(
ResponseTemplate::new(200)
.set_body_json(drop_request_entity(true, Some("connection in use"))),
)
.mount(&mock_server)
.await;
Mock::given(method("DELETE"))
.and(path(
"/nifi-api/process-groups/pg-1/empty-all-connections-requests/drop-1",
))
.respond_with(ResponseTemplate::new(200).set_body_json(drop_request_entity(true, None)))
.expect(1)
.mount(&mock_server)
.await;
let client = NifiClientBuilder::new(&mock_server.uri())
.unwrap()
.build()
.unwrap();
client.set_token("jwt".to_string()).await;
let err = wait::empty_all_connections(&client, "pg-1", "drop-1", fast_config(1000))
.await
.unwrap_err();
assert!(matches!(err, NifiError::Api { status: 500, .. }));
}
fn lineage_entity(finished: bool) -> serde_json::Value {
json!({
"lineage": {
"id": "lin-1",
"finished": finished,
"percentCompleted": if finished { 100 } else { 50 },
}
})
}
#[tokio::test]
async fn provenance_lineage_succeeds_and_cleans_up() {
let mock_server = MockServer::start().await;
Mock::given(method("GET"))
.and(path("/nifi-api/provenance/lineage/lin-1"))
.respond_with(ResponseTemplate::new(200).set_body_json(lineage_entity(false)))
.up_to_n_times(1)
.mount(&mock_server)
.await;
Mock::given(method("GET"))
.and(path("/nifi-api/provenance/lineage/lin-1"))
.respond_with(ResponseTemplate::new(200).set_body_json(lineage_entity(true)))
.mount(&mock_server)
.await;
Mock::given(method("DELETE"))
.and(path("/nifi-api/provenance/lineage/lin-1"))
.respond_with(ResponseTemplate::new(200).set_body_json(lineage_entity(true)))
.expect(1)
.mount(&mock_server)
.await;
let client = NifiClientBuilder::new(&mock_server.uri())
.unwrap()
.build()
.unwrap();
client.set_token("jwt".to_string()).await;
let dto = wait::provenance_lineage(&client, "lin-1", fast_config(1000))
.await
.unwrap();
assert_eq!(dto.finished, Some(true));
}
fn verify_config_request_entity(complete: bool, failure: Option<&str>) -> serde_json::Value {
let mut req = json!({
"requestId": "req-1",
"complete": complete,
"percentCompleted": if complete { 100 } else { 50 },
});
if let Some(reason) = failure {
req["failureReason"] = json!(reason);
}
json!({ "request": req })
}
#[tokio::test]
async fn processor_verify_config_succeeds_and_cleans_up() {
let mock_server = MockServer::start().await;
Mock::given(method("GET"))
.and(path(
"/nifi-api/processors/p-1/config/verification-requests/req-1",
))
.respond_with(
ResponseTemplate::new(200).set_body_json(verify_config_request_entity(true, None)),
)
.mount(&mock_server)
.await;
Mock::given(method("DELETE"))
.and(path(
"/nifi-api/processors/p-1/config/verification-requests/req-1",
))
.respond_with(
ResponseTemplate::new(200).set_body_json(verify_config_request_entity(true, None)),
)
.expect(1)
.mount(&mock_server)
.await;
let client = NifiClientBuilder::new(&mock_server.uri())
.unwrap()
.build()
.unwrap();
client.set_token("jwt".to_string()).await;
let dto = wait::processor_verify_config(&client, "p-1", "req-1", fast_config(1000))
.await
.unwrap();
assert_eq!(dto.complete, Some(true));
}
#[tokio::test]
async fn processor_verify_config_reports_failure() {
let mock_server = MockServer::start().await;
Mock::given(method("GET"))
.and(path(
"/nifi-api/processors/p-1/config/verification-requests/req-1",
))
.respond_with(
ResponseTemplate::new(200)
.set_body_json(verify_config_request_entity(true, Some("invalid prop"))),
)
.mount(&mock_server)
.await;
Mock::given(method("DELETE"))
.and(path(
"/nifi-api/processors/p-1/config/verification-requests/req-1",
))
.respond_with(
ResponseTemplate::new(200).set_body_json(verify_config_request_entity(true, None)),
)
.expect(1)
.mount(&mock_server)
.await;
let client = NifiClientBuilder::new(&mock_server.uri())
.unwrap()
.build()
.unwrap();
client.set_token("jwt".to_string()).await;
let err = wait::processor_verify_config(&client, "p-1", "req-1", fast_config(1000))
.await
.unwrap_err();
match err {
NifiError::Api { status, message } => {
assert_eq!(status, 500);
assert!(message.contains("verification failed"));
assert!(message.contains("invalid prop"));
}
other => panic!("expected Api, got {other:?}"),
}
}
#[tokio::test]
async fn controller_service_verify_config_succeeds_and_cleans_up() {
let mock_server = MockServer::start().await;
Mock::given(method("GET"))
.and(path(
"/nifi-api/controller-services/cs-1/config/verification-requests/req-1",
))
.respond_with(
ResponseTemplate::new(200).set_body_json(verify_config_request_entity(true, None)),
)
.mount(&mock_server)
.await;
Mock::given(method("DELETE"))
.and(path(
"/nifi-api/controller-services/cs-1/config/verification-requests/req-1",
))
.respond_with(
ResponseTemplate::new(200).set_body_json(verify_config_request_entity(true, None)),
)
.expect(1)
.mount(&mock_server)
.await;
let client = NifiClientBuilder::new(&mock_server.uri())
.unwrap()
.build()
.unwrap();
client.set_token("jwt".to_string()).await;
let dto = wait::controller_service_verify_config(&client, "cs-1", "req-1", fast_config(1000))
.await
.unwrap();
assert_eq!(dto.complete, Some(true));
}
#[tokio::test]
async fn reporting_task_verify_config_succeeds_and_cleans_up() {
let mock_server = MockServer::start().await;
Mock::given(method("GET"))
.and(path(
"/nifi-api/reporting-tasks/rt-1/config/verification-requests/req-1",
))
.respond_with(
ResponseTemplate::new(200).set_body_json(verify_config_request_entity(true, None)),
)
.mount(&mock_server)
.await;
Mock::given(method("DELETE"))
.and(path(
"/nifi-api/reporting-tasks/rt-1/config/verification-requests/req-1",
))
.respond_with(
ResponseTemplate::new(200).set_body_json(verify_config_request_entity(true, None)),
)
.expect(1)
.mount(&mock_server)
.await;
let client = NifiClientBuilder::new(&mock_server.uri())
.unwrap()
.build()
.unwrap();
client.set_token("jwt".to_string()).await;
let dto = wait::reporting_task_verify_config(&client, "rt-1", "req-1", fast_config(1000))
.await
.unwrap();
assert_eq!(dto.complete, Some(true));
}
#[tokio::test]
async fn parameter_provider_verify_config_succeeds_and_cleans_up() {
let mock_server = MockServer::start().await;
Mock::given(method("GET"))
.and(path(
"/nifi-api/parameter-providers/pp-1/config/verification-requests/req-1",
))
.respond_with(
ResponseTemplate::new(200).set_body_json(verify_config_request_entity(true, None)),
)
.mount(&mock_server)
.await;
Mock::given(method("DELETE"))
.and(path(
"/nifi-api/parameter-providers/pp-1/config/verification-requests/req-1",
))
.respond_with(
ResponseTemplate::new(200).set_body_json(verify_config_request_entity(true, None)),
)
.expect(1)
.mount(&mock_server)
.await;
let client = NifiClientBuilder::new(&mock_server.uri())
.unwrap()
.build()
.unwrap();
client.set_token("jwt".to_string()).await;
let dto = wait::parameter_provider_verify_config(&client, "pp-1", "req-1", fast_config(1000))
.await
.unwrap();
assert_eq!(dto.complete, Some(true));
}
#[tokio::test]
async fn flow_analysis_rule_verify_config_succeeds_and_cleans_up() {
let mock_server = MockServer::start().await;
Mock::given(method("GET"))
.and(path(
"/nifi-api/controller/flow-analysis-rules/far-1/config/verification-requests/req-1",
))
.respond_with(
ResponseTemplate::new(200).set_body_json(verify_config_request_entity(true, None)),
)
.mount(&mock_server)
.await;
Mock::given(method("DELETE"))
.and(path(
"/nifi-api/controller/flow-analysis-rules/far-1/config/verification-requests/req-1",
))
.respond_with(
ResponseTemplate::new(200).set_body_json(verify_config_request_entity(true, None)),
)
.expect(1)
.mount(&mock_server)
.await;
let client = NifiClientBuilder::new(&mock_server.uri())
.unwrap()
.build()
.unwrap();
client.set_token("jwt".to_string()).await;
let dto = wait::flow_analysis_rule_verify_config(&client, "far-1", "req-1", fast_config(1000))
.await
.unwrap();
assert_eq!(dto.complete, Some(true));
}
fn apply_parameters_request_entity(complete: bool, failure: Option<&str>) -> serde_json::Value {
let mut req = json!({
"requestId": "req-1",
"complete": complete,
"percentCompleted": if complete { 100 } else { 50 },
});
if let Some(reason) = failure {
req["failureReason"] = json!(reason);
}
json!({ "request": req })
}
#[tokio::test]
async fn parameter_provider_apply_parameters_succeeds_and_cleans_up() {
let mock_server = MockServer::start().await;
Mock::given(method("GET"))
.and(path(
"/nifi-api/parameter-providers/pp-1/apply-parameters-requests/req-1",
))
.respond_with(
ResponseTemplate::new(200).set_body_json(apply_parameters_request_entity(true, None)),
)
.mount(&mock_server)
.await;
Mock::given(method("DELETE"))
.and(path(
"/nifi-api/parameter-providers/pp-1/apply-parameters-requests/req-1",
))
.respond_with(
ResponseTemplate::new(200).set_body_json(apply_parameters_request_entity(true, None)),
)
.expect(1)
.mount(&mock_server)
.await;
let client = NifiClientBuilder::new(&mock_server.uri())
.unwrap()
.build()
.unwrap();
client.set_token("jwt".to_string()).await;
let dto =
wait::parameter_provider_apply_parameters(&client, "pp-1", "req-1", fast_config(1000))
.await
.unwrap();
assert_eq!(dto.complete, Some(true));
}
#[tokio::test]
async fn parameter_provider_apply_parameters_reports_failure() {
let mock_server = MockServer::start().await;
Mock::given(method("GET"))
.and(path(
"/nifi-api/parameter-providers/pp-1/apply-parameters-requests/req-1",
))
.respond_with(
ResponseTemplate::new(200)
.set_body_json(apply_parameters_request_entity(true, Some("conflict"))),
)
.mount(&mock_server)
.await;
Mock::given(method("DELETE"))
.and(path(
"/nifi-api/parameter-providers/pp-1/apply-parameters-requests/req-1",
))
.respond_with(
ResponseTemplate::new(200).set_body_json(apply_parameters_request_entity(true, None)),
)
.expect(1)
.mount(&mock_server)
.await;
let client = NifiClientBuilder::new(&mock_server.uri())
.unwrap()
.build()
.unwrap();
client.set_token("jwt".to_string()).await;
let err =
wait::parameter_provider_apply_parameters(&client, "pp-1", "req-1", fast_config(1000))
.await
.unwrap_err();
match err {
NifiError::Api { status, message } => {
assert_eq!(status, 500);
assert!(message.contains("apply parameters failed"));
assert!(message.contains("conflict"));
}
other => panic!("expected Api, got {other:?}"),
}
}
fn versioned_flow_update_entity(complete: bool, failure: Option<&str>) -> serde_json::Value {
let mut req = json!({
"requestId": "req-1",
"complete": complete,
"percentCompleted": if complete { 100 } else { 50 },
});
if let Some(reason) = failure {
req["failureReason"] = json!(reason);
}
json!({ "request": req })
}
#[tokio::test]
async fn versioned_flow_update_succeeds_and_cleans_up() {
let mock_server = MockServer::start().await;
Mock::given(method("GET"))
.and(path("/nifi-api/versions/update-requests/req-1"))
.respond_with(
ResponseTemplate::new(200).set_body_json(versioned_flow_update_entity(true, None)),
)
.mount(&mock_server)
.await;
Mock::given(method("DELETE"))
.and(path("/nifi-api/versions/update-requests/req-1"))
.respond_with(
ResponseTemplate::new(200).set_body_json(versioned_flow_update_entity(true, None)),
)
.expect(1)
.mount(&mock_server)
.await;
let client = NifiClientBuilder::new(&mock_server.uri())
.unwrap()
.build()
.unwrap();
client.set_token("jwt".to_string()).await;
let entity = wait::versioned_flow_update(&client, "req-1", fast_config(1000))
.await
.unwrap();
assert_eq!(entity.request.and_then(|r| r.complete), Some(true));
}
#[tokio::test]
async fn versioned_flow_update_reports_failure() {
let mock_server = MockServer::start().await;
Mock::given(method("GET"))
.and(path("/nifi-api/versions/update-requests/req-1"))
.respond_with(
ResponseTemplate::new(200).set_body_json(versioned_flow_update_entity(
true,
Some("conflicting changes"),
)),
)
.mount(&mock_server)
.await;
Mock::given(method("DELETE"))
.and(path("/nifi-api/versions/update-requests/req-1"))
.respond_with(
ResponseTemplate::new(200).set_body_json(versioned_flow_update_entity(true, None)),
)
.expect(1)
.mount(&mock_server)
.await;
let client = NifiClientBuilder::new(&mock_server.uri())
.unwrap()
.build()
.unwrap();
client.set_token("jwt".to_string()).await;
let err = wait::versioned_flow_update(&client, "req-1", fast_config(1000))
.await
.unwrap_err();
match err {
NifiError::Api { status, message } => {
assert_eq!(status, 500);
assert!(message.contains("versioned flow update failed"));
assert!(message.contains("conflicting changes"));
}
other => panic!("expected Api, got {other:?}"),
}
}
#[tokio::test]
async fn versioned_flow_revert_succeeds_and_cleans_up() {
let mock_server = MockServer::start().await;
Mock::given(method("GET"))
.and(path("/nifi-api/versions/revert-requests/req-1"))
.respond_with(
ResponseTemplate::new(200).set_body_json(versioned_flow_update_entity(true, None)),
)
.mount(&mock_server)
.await;
Mock::given(method("DELETE"))
.and(path("/nifi-api/versions/revert-requests/req-1"))
.respond_with(
ResponseTemplate::new(200).set_body_json(versioned_flow_update_entity(true, None)),
)
.expect(1)
.mount(&mock_server)
.await;
let client = NifiClientBuilder::new(&mock_server.uri())
.unwrap()
.build()
.unwrap();
client.set_token("jwt".to_string()).await;
let entity = wait::versioned_flow_revert(&client, "req-1", fast_config(1000))
.await
.unwrap();
assert_eq!(entity.request.and_then(|r| r.complete), Some(true));
}
fn validation_request_entity(complete: bool, failure: Option<&str>) -> serde_json::Value {
let mut req = json!({
"requestId": "req-1",
"complete": complete,
"percentCompleted": if complete { 100 } else { 50 },
});
if let Some(reason) = failure {
req["failureReason"] = json!(reason);
}
json!({ "request": req })
}
#[tokio::test]
async fn parameter_context_validation_succeeds_and_cleans_up() {
let mock_server = MockServer::start().await;
Mock::given(method("GET"))
.and(path(
"/nifi-api/parameter-contexts/ctx-1/validation-requests/req-1",
))
.respond_with(
ResponseTemplate::new(200).set_body_json(validation_request_entity(true, None)),
)
.mount(&mock_server)
.await;
Mock::given(method("DELETE"))
.and(path(
"/nifi-api/parameter-contexts/ctx-1/validation-requests/req-1",
))
.respond_with(
ResponseTemplate::new(200).set_body_json(validation_request_entity(true, None)),
)
.expect(1)
.mount(&mock_server)
.await;
let client = NifiClientBuilder::new(&mock_server.uri())
.unwrap()
.build()
.unwrap();
client.set_token("jwt".to_string()).await;
let entity = wait::parameter_context_validation(&client, "ctx-1", "req-1", fast_config(1000))
.await
.unwrap();
assert_eq!(entity.request.and_then(|r| r.complete), Some(true));
}
#[tokio::test]
async fn parameter_context_validation_reports_failure() {
let mock_server = MockServer::start().await;
Mock::given(method("GET"))
.and(path(
"/nifi-api/parameter-contexts/ctx-1/validation-requests/req-1",
))
.respond_with(
ResponseTemplate::new(200)
.set_body_json(validation_request_entity(true, Some("missing param"))),
)
.mount(&mock_server)
.await;
Mock::given(method("DELETE"))
.and(path(
"/nifi-api/parameter-contexts/ctx-1/validation-requests/req-1",
))
.respond_with(
ResponseTemplate::new(200).set_body_json(validation_request_entity(true, None)),
)
.expect(1)
.mount(&mock_server)
.await;
let client = NifiClientBuilder::new(&mock_server.uri())
.unwrap()
.build()
.unwrap();
client.set_token("jwt".to_string()).await;
let err = wait::parameter_context_validation(&client, "ctx-1", "req-1", fast_config(1000))
.await
.unwrap_err();
match err {
NifiError::Api { status, message } => {
assert_eq!(status, 500);
assert!(message.contains("parameter context validation failed"));
assert!(message.contains("missing param"));
}
other => panic!("expected Api, got {other:?}"),
}
}
fn process_group_entity(running: i64, stopped: i64) -> serde_json::Value {
json!({ "id": "pg-1", "runningCount": running, "stoppedCount": stopped })
}
#[tokio::test]
async fn process_group_state_reaches_running() {
let mock_server = MockServer::start().await;
Mock::given(method("GET"))
.and(path("/nifi-api/process-groups/pg-1"))
.respond_with(ResponseTemplate::new(200).set_body_json(process_group_entity(3, 2)))
.up_to_n_times(2)
.mount(&mock_server)
.await;
Mock::given(method("GET"))
.and(path("/nifi-api/process-groups/pg-1"))
.respond_with(ResponseTemplate::new(200).set_body_json(process_group_entity(5, 0)))
.mount(&mock_server)
.await;
let client = NifiClientBuilder::new(&mock_server.uri())
.unwrap()
.build()
.unwrap();
client.set_token("jwt".to_string()).await;
let entity = wait::process_group_state(
&client,
"pg-1",
wait::ProcessGroupTargetState::Running,
fast_config(1000),
)
.await
.unwrap();
assert_eq!(entity.stopped_count, Some(0));
}
#[tokio::test]
async fn process_group_state_running_ignores_invalid() {
let mock_server = MockServer::start().await;
Mock::given(method("GET"))
.and(path("/nifi-api/process-groups/pg-1"))
.respond_with(ResponseTemplate::new(200).set_body_json(
json!({ "id": "pg-1", "runningCount": 4, "stoppedCount": 0, "invalidCount": 1 }),
))
.mount(&mock_server)
.await;
let client = NifiClientBuilder::new(&mock_server.uri())
.unwrap()
.build()
.unwrap();
client.set_token("jwt".to_string()).await;
let entity = wait::process_group_state(
&client,
"pg-1",
wait::ProcessGroupTargetState::Running,
fast_config(1000),
)
.await
.unwrap();
assert_eq!(entity.running_count, Some(4));
}
#[tokio::test]
async fn process_group_state_times_out_when_never_stops() {
let mock_server = MockServer::start().await;
Mock::given(method("GET"))
.and(path("/nifi-api/process-groups/pg-1"))
.respond_with(ResponseTemplate::new(200).set_body_json(process_group_entity(2, 0)))
.mount(&mock_server)
.await;
let client = NifiClientBuilder::new(&mock_server.uri())
.unwrap()
.build()
.unwrap();
client.set_token("jwt".to_string()).await;
let err = wait::process_group_state(
&client,
"pg-1",
wait::ProcessGroupTargetState::Stopped,
fast_config(50),
)
.await
.unwrap_err();
match err {
NifiError::Timeout { operation } => {
assert!(operation.contains("wait_for_process_group_state"));
assert!(operation.contains("STOPPED"));
}
other => panic!("expected Timeout, got {other:?}"),
}
}
fn cs_list(items: serde_json::Value) -> serde_json::Value {
json!({ "controllerServices": items })
}
fn cs(state: &str, validation: &str) -> serde_json::Value {
json!({ "component": { "state": state, "validationStatus": validation } })
}
#[tokio::test]
async fn pg_controller_services_state_reaches_enabled() {
let mock_server = MockServer::start().await;
Mock::given(method("GET"))
.and(path(
"/nifi-api/flow/process-groups/pg-1/controller-services",
))
.respond_with(ResponseTemplate::new(200).set_body_json(cs_list(json!([
cs("ENABLED", "VALID"),
cs("ENABLING", "VALID")
]))))
.up_to_n_times(1)
.mount(&mock_server)
.await;
Mock::given(method("GET"))
.and(path(
"/nifi-api/flow/process-groups/pg-1/controller-services",
))
.respond_with(ResponseTemplate::new(200).set_body_json(cs_list(json!([
cs("ENABLED", "VALID"),
cs("ENABLED", "VALID")
]))))
.mount(&mock_server)
.await;
let client = NifiClientBuilder::new(&mock_server.uri())
.unwrap()
.build()
.unwrap();
client.set_token("jwt".to_string()).await;
let entity = wait::process_group_controller_services_state(
&client,
"pg-1",
ControllerServiceTargetState::Enabled,
fast_config(1000),
)
.await
.unwrap();
let services = entity.controller_services.unwrap();
assert!(services.iter().all(|s| matches!(
s.component.as_ref().and_then(|c| c.state.as_ref()),
Some(nifi_rust_client::types::ControllerServiceDtoState::Enabled)
)));
}
#[tokio::test]
async fn pg_controller_services_empty_list_is_settled() {
let mock_server = MockServer::start().await;
Mock::given(method("GET"))
.and(path(
"/nifi-api/flow/process-groups/pg-1/controller-services",
))
.respond_with(ResponseTemplate::new(200).set_body_json(cs_list(json!([]))))
.mount(&mock_server)
.await;
let client = NifiClientBuilder::new(&mock_server.uri())
.unwrap()
.build()
.unwrap();
client.set_token("jwt".to_string()).await;
let entity = wait::process_group_controller_services_state(
&client,
"pg-1",
ControllerServiceTargetState::Enabled,
fast_config(1000),
)
.await
.unwrap();
assert_eq!(
entity.controller_services.as_deref().map(<[_]>::len),
Some(0)
);
}
#[tokio::test]
async fn pg_controller_services_enabled_ignores_invalid() {
let mock_server = MockServer::start().await;
Mock::given(method("GET"))
.and(path(
"/nifi-api/flow/process-groups/pg-1/controller-services",
))
.respond_with(ResponseTemplate::new(200).set_body_json(cs_list(json!([
cs("ENABLED", "VALID"),
cs("DISABLED", "INVALID")
]))))
.mount(&mock_server)
.await;
let client = NifiClientBuilder::new(&mock_server.uri())
.unwrap()
.build()
.unwrap();
client.set_token("jwt".to_string()).await;
let entity = wait::process_group_controller_services_state(
&client,
"pg-1",
ControllerServiceTargetState::Enabled,
fast_config(1000),
)
.await
.unwrap();
assert!(entity.controller_services.is_some());
}
#[tokio::test]
async fn pg_controller_services_enabled_times_out_while_valid_disabled() {
let mock_server = MockServer::start().await;
Mock::given(method("GET"))
.and(path(
"/nifi-api/flow/process-groups/pg-1/controller-services",
))
.respond_with(
ResponseTemplate::new(200).set_body_json(cs_list(json!([cs("DISABLED", "VALID")]))),
)
.mount(&mock_server)
.await;
let client = NifiClientBuilder::new(&mock_server.uri())
.unwrap()
.build()
.unwrap();
client.set_token("jwt".to_string()).await;
let err = wait::process_group_controller_services_state(
&client,
"pg-1",
ControllerServiceTargetState::Enabled,
fast_config(50),
)
.await
.unwrap_err();
match err {
NifiError::Timeout { operation } => {
assert!(operation.contains("wait_for_process_group_controller_services_state"));
assert!(operation.contains("ENABLED"));
}
other => panic!("expected Timeout, got {other:?}"),
}
}