#![deny(missing_docs)]
use std::time::Duration;
use reqwest::StatusCode;
use crate::NifiError;
const STATUS_OPERATION_FAILED: u16 = StatusCode::INTERNAL_SERVER_ERROR.as_u16();
#[derive(Debug, Clone)]
pub struct WaitConfig {
pub timeout: Duration,
pub poll_interval: Duration,
pub initial_delay: Duration,
pub cleanup: bool,
}
impl Default for WaitConfig {
fn default() -> Self {
Self {
timeout: Duration::from_secs(30),
poll_interval: Duration::from_millis(500),
initial_delay: Duration::ZERO,
cleanup: true,
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ProcessorTargetState {
Running,
Stopped,
Disabled,
}
impl ProcessorTargetState {
pub(crate) fn wire_value(&self) -> &'static str {
#[cfg(feature = "dynamic")]
use crate::dynamic::types::ProcessorDtoState;
#[cfg(not(feature = "dynamic"))]
use crate::types::ProcessorDtoState;
match self {
Self::Running => ProcessorDtoState::Running.as_str(),
Self::Stopped => ProcessorDtoState::Stopped.as_str(),
Self::Disabled => ProcessorDtoState::Disabled.as_str(),
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ControllerServiceTargetState {
Enabled,
Disabled,
}
impl ControllerServiceTargetState {
pub(crate) fn wire_value(&self) -> &'static str {
#[cfg(feature = "dynamic")]
use crate::dynamic::types::ControllerServiceDtoState;
#[cfg(not(feature = "dynamic"))]
use crate::types::ControllerServiceDtoState;
match self {
Self::Enabled => ControllerServiceDtoState::Enabled.as_str(),
Self::Disabled => ControllerServiceDtoState::Disabled.as_str(),
}
}
}
#[derive(Debug)]
enum PollOutcome {
Pending,
Ready,
Failed(NifiError),
}
async fn poll_until<T, FetchFn, FetchFut>(
config: &WaitConfig,
operation: &str,
fetch: FetchFn,
done: impl Fn(&T) -> PollOutcome,
) -> Result<T, NifiError>
where
FetchFn: Fn() -> FetchFut,
FetchFut: core::future::Future<Output = Result<T, NifiError>>,
{
if !config.initial_delay.is_zero() {
tokio::time::sleep(config.initial_delay).await;
}
let deadline = tokio::time::Instant::now() + config.timeout;
loop {
let value = fetch().await?;
match done(&value) {
PollOutcome::Ready => return Ok(value),
PollOutcome::Failed(e) => return Err(e),
PollOutcome::Pending => {}
}
if tokio::time::Instant::now() >= deadline {
return Err(NifiError::Timeout {
operation: operation.to_string(),
});
}
let remaining = deadline.saturating_duration_since(tokio::time::Instant::now());
let next = core::cmp::min(config.poll_interval, remaining);
tokio::time::sleep(next).await;
}
}
fn warn_cleanup_failure<E: std::fmt::Display>(
operation: &str,
target: &str,
result: Result<(), E>,
) {
if let Err(err) = result {
tracing::warn!(
operation,
target,
error = %err,
"cleanup request failed (best-effort; ignored)"
);
}
}
#[cfg(not(feature = "dynamic"))]
use crate::types::ProcessorEntity;
#[cfg(not(feature = "dynamic"))]
pub async fn processor_state(
client: &crate::NifiClient,
processor_id: &str,
target: ProcessorTargetState,
config: WaitConfig,
) -> Result<ProcessorEntity, NifiError> {
use crate::types::ProcessorDtoState;
let op = format!(
"wait_for_processor_state({processor_id}, {})",
target.wire_value()
);
let fetch = || async { client.processors().get_processor(processor_id).await };
let done = move |entity: &ProcessorEntity| {
let matches = entity
.component
.as_ref()
.and_then(|c| c.state.as_ref())
.is_some_and(|s| {
matches!(
(target, s),
(ProcessorTargetState::Running, ProcessorDtoState::Running)
| (ProcessorTargetState::Stopped, ProcessorDtoState::Stopped)
| (ProcessorTargetState::Disabled, ProcessorDtoState::Disabled)
)
});
if matches {
PollOutcome::Ready
} else {
PollOutcome::Pending
}
};
poll_until(&config, &op, fetch, done).await
}
#[cfg(not(feature = "dynamic"))]
use crate::types::ControllerServiceEntity;
#[cfg(not(feature = "dynamic"))]
pub async fn controller_service_state(
client: &crate::NifiClient,
service_id: &str,
target: ControllerServiceTargetState,
config: WaitConfig,
) -> Result<ControllerServiceEntity, NifiError> {
use crate::types::ControllerServiceDtoState;
let op = format!(
"wait_for_controller_service_state({service_id}, {})",
target.wire_value()
);
let fetch = || async {
client
.controller_services()
.get_controller_service(service_id, None)
.await
};
let done = move |entity: &ControllerServiceEntity| {
let matches = entity
.component
.as_ref()
.and_then(|c| c.state.as_ref())
.is_some_and(|s| {
matches!(
(target, s),
(
ControllerServiceTargetState::Enabled,
ControllerServiceDtoState::Enabled
) | (
ControllerServiceTargetState::Disabled,
ControllerServiceDtoState::Disabled
)
)
});
if matches {
PollOutcome::Ready
} else {
PollOutcome::Pending
}
};
poll_until(&config, &op, fetch, done).await
}
#[cfg(feature = "dynamic")]
use crate::dynamic::types::ControllerServiceEntity;
#[cfg(feature = "dynamic")]
pub async fn controller_service_state_dynamic(
client: &crate::dynamic::DynamicClient,
service_id: &str,
target: ControllerServiceTargetState,
config: WaitConfig,
) -> Result<ControllerServiceEntity, NifiError> {
let target_wire = target.wire_value();
let op = format!("wait_for_controller_service_state({service_id}, {target_wire})");
let fetch = || async {
client
.controller_services()
.get_controller_service(service_id, None)
.await
};
let done = move |entity: &ControllerServiceEntity| {
let state = entity.component.as_ref().and_then(|c| c.state.as_deref());
if state == Some(target_wire) {
PollOutcome::Ready
} else {
PollOutcome::Pending
}
};
poll_until(&config, &op, fetch, done).await
}
#[cfg(feature = "dynamic")]
use crate::dynamic::types::ProcessorEntity;
#[cfg(feature = "dynamic")]
pub async fn processor_state_dynamic(
client: &crate::dynamic::DynamicClient,
processor_id: &str,
target: ProcessorTargetState,
config: WaitConfig,
) -> Result<ProcessorEntity, NifiError> {
let target_wire = target.wire_value();
let op = format!("wait_for_processor_state({processor_id}, {target_wire})");
let fetch = || async { client.processors().get_processor(processor_id).await };
let done = move |entity: &ProcessorEntity| {
let state = entity.component.as_ref().and_then(|c| c.state.as_deref());
if state == Some(target_wire) {
PollOutcome::Ready
} else {
PollOutcome::Pending
}
};
poll_until(&config, &op, fetch, done).await
}
fn terminal_outcome(
terminal: Option<bool>,
failure_reason: Option<&str>,
op_kind: &str,
) -> PollOutcome {
if let Some(reason) = failure_reason {
return PollOutcome::Failed(NifiError::Api {
status: STATUS_OPERATION_FAILED,
message: format!("{op_kind} failed: {reason}"),
});
}
if terminal.unwrap_or(false) {
PollOutcome::Ready
} else {
PollOutcome::Pending
}
}
#[cfg(not(feature = "dynamic"))]
use crate::types::ParameterContextUpdateRequestEntity;
#[cfg(not(feature = "dynamic"))]
pub async fn parameter_context_update(
client: &crate::NifiClient,
context_id: &str,
request_id: &str,
config: WaitConfig,
) -> Result<ParameterContextUpdateRequestEntity, NifiError> {
let op = format!("wait_for_parameter_context_update({context_id}/{request_id})");
let fetch = || async {
client
.parametercontexts()
.get_parameter_context_update(context_id, request_id)
.await
};
let done = |entity: &ParameterContextUpdateRequestEntity| {
let req = entity.request.as_ref();
terminal_outcome(
req.and_then(|r| r.complete),
req.and_then(|r| r.failure_reason.as_deref()),
"parameter context update",
)
};
let result = poll_until(&config, &op, fetch, done).await;
if config.cleanup {
let res = client
.parametercontexts()
.delete_update_request(context_id, request_id, None)
.await
.map(|_| ());
warn_cleanup_failure(&op, &format!("{context_id}/{request_id}"), res);
}
result
}
#[cfg(feature = "dynamic")]
use crate::dynamic::types::ParameterContextUpdateRequestEntity;
#[cfg(feature = "dynamic")]
pub async fn parameter_context_update_dynamic(
client: &crate::dynamic::DynamicClient,
context_id: &str,
request_id: &str,
config: WaitConfig,
) -> Result<ParameterContextUpdateRequestEntity, NifiError> {
let op = format!("wait_for_parameter_context_update({context_id}/{request_id})");
let fetch = || async {
client
.parametercontexts()
.get_parameter_context_update(context_id, request_id)
.await
};
let done = |entity: &ParameterContextUpdateRequestEntity| {
let req = entity.request.as_ref();
terminal_outcome(
req.and_then(|r| r.complete),
req.and_then(|r| r.failure_reason.as_deref()),
"parameter context update",
)
};
let result = poll_until(&config, &op, fetch, done).await;
if config.cleanup {
let res = client
.parametercontexts()
.delete_update_request(context_id, request_id, None)
.await
.map(|_| ());
warn_cleanup_failure(&op, &format!("{context_id}/{request_id}"), res);
}
result
}
#[cfg(not(feature = "dynamic"))]
use crate::types::ProvenanceDto;
#[cfg(not(feature = "dynamic"))]
pub async fn provenance_query(
client: &crate::NifiClient,
query_id: &str,
config: WaitConfig,
) -> Result<ProvenanceDto, NifiError> {
let op = format!("wait_for_provenance_query({query_id})");
let fetch = || async {
client
.provenance()
.get_provenance(query_id, None, None, None)
.await
};
let done = |dto: &ProvenanceDto| {
if dto.finished.unwrap_or(false) {
PollOutcome::Ready
} else {
PollOutcome::Pending
}
};
let result = poll_until(&config, &op, fetch, done).await;
if config.cleanup {
let res = client
.provenance()
.delete_provenance(query_id, None)
.await
.map(|_| ());
warn_cleanup_failure(&op, query_id, res);
}
result
}
#[cfg(feature = "dynamic")]
use crate::dynamic::types::ProvenanceDto;
#[cfg(feature = "dynamic")]
pub async fn provenance_query_dynamic(
client: &crate::dynamic::DynamicClient,
query_id: &str,
config: WaitConfig,
) -> Result<ProvenanceDto, NifiError> {
let op = format!("wait_for_provenance_query({query_id})");
let fetch = || async {
client
.provenance()
.get_provenance(query_id, None, None, None)
.await
};
let done = |dto: &ProvenanceDto| {
if dto.finished.unwrap_or(false) {
PollOutcome::Ready
} else {
PollOutcome::Pending
}
};
let result = poll_until(&config, &op, fetch, done).await;
if config.cleanup {
let res = client
.provenance()
.delete_provenance(query_id, None)
.await
.map(|_| ());
warn_cleanup_failure(&op, query_id, res);
}
result
}
#[cfg(not(feature = "dynamic"))]
use crate::types::DropRequestDto;
#[cfg(not(feature = "dynamic"))]
pub async fn flowfile_drop(
client: &crate::NifiClient,
queue_id: &str,
drop_request_id: &str,
config: WaitConfig,
) -> Result<DropRequestDto, NifiError> {
let op = format!("wait_for_flowfile_drop({queue_id}/{drop_request_id})");
let fetch = || async {
client
.flowfilequeues()
.get_drop_request(queue_id, drop_request_id)
.await
};
let done = |dto: &DropRequestDto| {
terminal_outcome(dto.finished, dto.failure_reason.as_deref(), "drop request")
};
let result = poll_until(&config, &op, fetch, done).await;
if config.cleanup {
let res = client
.flowfilequeues()
.remove_drop_request(queue_id, drop_request_id)
.await
.map(|_| ());
warn_cleanup_failure(&op, &format!("{queue_id}/{drop_request_id}"), res);
}
result
}
#[cfg(feature = "dynamic")]
use crate::dynamic::types::DropRequestDto;
#[cfg(feature = "dynamic")]
pub async fn flowfile_drop_dynamic(
client: &crate::dynamic::DynamicClient,
queue_id: &str,
drop_request_id: &str,
config: WaitConfig,
) -> Result<DropRequestDto, NifiError> {
let op = format!("wait_for_flowfile_drop({queue_id}/{drop_request_id})");
let fetch = || async {
client
.flowfilequeues()
.get_drop_request(queue_id, drop_request_id)
.await
};
let done = |dto: &DropRequestDto| {
terminal_outcome(dto.finished, dto.failure_reason.as_deref(), "drop request")
};
let result = poll_until(&config, &op, fetch, done).await;
if config.cleanup {
let res = client
.flowfilequeues()
.remove_drop_request(queue_id, drop_request_id)
.await
.map(|_| ());
warn_cleanup_failure(&op, &format!("{queue_id}/{drop_request_id}"), res);
}
result
}
#[cfg(not(feature = "dynamic"))]
use crate::types::ListingRequestDto;
#[cfg(not(feature = "dynamic"))]
pub async fn flowfile_listing(
client: &crate::NifiClient,
queue_id: &str,
listing_request_id: &str,
config: WaitConfig,
) -> Result<ListingRequestDto, NifiError> {
let op = format!("wait_for_flowfile_listing({queue_id}/{listing_request_id})");
let fetch = || async {
client
.flowfilequeues()
.get_listing_request(queue_id, listing_request_id)
.await
};
let done = |dto: &ListingRequestDto| {
terminal_outcome(
dto.finished,
dto.failure_reason.as_deref(),
"listing request",
)
};
let result = poll_until(&config, &op, fetch, done).await;
if config.cleanup {
let res = client
.flowfilequeues()
.delete_listing_request(queue_id, listing_request_id)
.await
.map(|_| ());
warn_cleanup_failure(&op, &format!("{queue_id}/{listing_request_id}"), res);
}
result
}
#[cfg(feature = "dynamic")]
use crate::dynamic::types::ListingRequestDto;
#[cfg(feature = "dynamic")]
pub async fn flowfile_listing_dynamic(
client: &crate::dynamic::DynamicClient,
queue_id: &str,
listing_request_id: &str,
config: WaitConfig,
) -> Result<ListingRequestDto, NifiError> {
let op = format!("wait_for_flowfile_listing({queue_id}/{listing_request_id})");
let fetch = || async {
client
.flowfilequeues()
.get_listing_request(queue_id, listing_request_id)
.await
};
let done = |dto: &ListingRequestDto| {
terminal_outcome(
dto.finished,
dto.failure_reason.as_deref(),
"listing request",
)
};
let result = poll_until(&config, &op, fetch, done).await;
if config.cleanup {
let res = client
.flowfilequeues()
.delete_listing_request(queue_id, listing_request_id)
.await
.map(|_| ());
warn_cleanup_failure(&op, &format!("{queue_id}/{listing_request_id}"), res);
}
result
}
#[cfg(not(feature = "dynamic"))]
pub async fn empty_all_connections(
client: &crate::NifiClient,
process_group_id: &str,
drop_request_id: &str,
config: WaitConfig,
) -> Result<DropRequestDto, NifiError> {
let op = format!("wait_for_empty_all_connections({process_group_id}/{drop_request_id})");
let fetch = || async {
client
.processgroups()
.get_drop_all_flowfiles_request(process_group_id, drop_request_id)
.await
};
let done = |dto: &DropRequestDto| {
terminal_outcome(
dto.finished,
dto.failure_reason.as_deref(),
"empty-all-connections request",
)
};
let result = poll_until(&config, &op, fetch, done).await;
if config.cleanup {
let res = client
.processgroups()
.remove_drop_request(process_group_id, drop_request_id)
.await
.map(|_| ());
warn_cleanup_failure(&op, &format!("{process_group_id}/{drop_request_id}"), res);
}
result
}
#[cfg(feature = "dynamic")]
pub async fn empty_all_connections_dynamic(
client: &crate::dynamic::DynamicClient,
process_group_id: &str,
drop_request_id: &str,
config: WaitConfig,
) -> Result<DropRequestDto, NifiError> {
let op = format!("wait_for_empty_all_connections({process_group_id}/{drop_request_id})");
let fetch = || async {
client
.processgroups()
.get_drop_all_flowfiles_request(process_group_id, drop_request_id)
.await
};
let done = |dto: &DropRequestDto| {
terminal_outcome(
dto.finished,
dto.failure_reason.as_deref(),
"empty-all-connections request",
)
};
let result = poll_until(&config, &op, fetch, done).await;
if config.cleanup {
let res = client
.processgroups()
.remove_drop_request(process_group_id, drop_request_id)
.await
.map(|_| ());
warn_cleanup_failure(&op, &format!("{process_group_id}/{drop_request_id}"), res);
}
result
}
#[cfg(not(feature = "dynamic"))]
use crate::types::LineageDto;
#[cfg(not(feature = "dynamic"))]
pub async fn provenance_lineage(
client: &crate::NifiClient,
lineage_id: &str,
config: WaitConfig,
) -> Result<LineageDto, NifiError> {
let op = format!("wait_for_provenance_lineage({lineage_id})");
let fetch = || async { client.provenance().get_lineage(lineage_id, None).await };
let done = |dto: &LineageDto| {
if dto.finished.unwrap_or(false) {
PollOutcome::Ready
} else {
PollOutcome::Pending
}
};
let result = poll_until(&config, &op, fetch, done).await;
if config.cleanup {
let res = client
.provenance()
.delete_lineage(lineage_id, None)
.await
.map(|_| ());
warn_cleanup_failure(&op, lineage_id, res);
}
result
}
#[cfg(feature = "dynamic")]
use crate::dynamic::types::LineageDto;
#[cfg(feature = "dynamic")]
pub async fn provenance_lineage_dynamic(
client: &crate::dynamic::DynamicClient,
lineage_id: &str,
config: WaitConfig,
) -> Result<LineageDto, NifiError> {
let op = format!("wait_for_provenance_lineage({lineage_id})");
let fetch = || async { client.provenance().get_lineage(lineage_id, None).await };
let done = |dto: &LineageDto| {
if dto.finished.unwrap_or(false) {
PollOutcome::Ready
} else {
PollOutcome::Pending
}
};
let result = poll_until(&config, &op, fetch, done).await;
if config.cleanup {
let res = client
.provenance()
.delete_lineage(lineage_id, None)
.await
.map(|_| ());
warn_cleanup_failure(&op, lineage_id, res);
}
result
}
#[cfg(not(feature = "dynamic"))]
use crate::types::VerifyConfigRequestDto;
#[cfg(not(feature = "dynamic"))]
pub async fn processor_verify_config(
client: &crate::NifiClient,
processor_id: &str,
request_id: &str,
config: WaitConfig,
) -> Result<VerifyConfigRequestDto, NifiError> {
let op = format!("wait_for_processor_verify_config({processor_id}/{request_id})");
let fetch = || async {
client
.processors()
.get_verification_request(processor_id, request_id)
.await
};
let done = |dto: &VerifyConfigRequestDto| {
terminal_outcome(dto.complete, dto.failure_reason.as_deref(), "verification")
};
let result = poll_until(&config, &op, fetch, done).await;
if config.cleanup {
let res = client
.processors()
.delete_verification_request(processor_id, request_id)
.await
.map(|_| ());
warn_cleanup_failure(&op, &format!("{processor_id}/{request_id}"), res);
}
result
}
#[cfg(feature = "dynamic")]
use crate::dynamic::types::VerifyConfigRequestDto;
#[cfg(feature = "dynamic")]
pub async fn processor_verify_config_dynamic(
client: &crate::dynamic::DynamicClient,
processor_id: &str,
request_id: &str,
config: WaitConfig,
) -> Result<VerifyConfigRequestDto, NifiError> {
let op = format!("wait_for_processor_verify_config({processor_id}/{request_id})");
let fetch = || async {
client
.processors()
.get_verification_request(processor_id, request_id)
.await
};
let done = |dto: &VerifyConfigRequestDto| {
terminal_outcome(dto.complete, dto.failure_reason.as_deref(), "verification")
};
let result = poll_until(&config, &op, fetch, done).await;
if config.cleanup {
let res = client
.processors()
.delete_verification_request(processor_id, request_id)
.await
.map(|_| ());
warn_cleanup_failure(&op, &format!("{processor_id}/{request_id}"), res);
}
result
}
#[cfg(not(feature = "dynamic"))]
pub async fn controller_service_verify_config(
client: &crate::NifiClient,
service_id: &str,
request_id: &str,
config: WaitConfig,
) -> Result<VerifyConfigRequestDto, NifiError> {
let op = format!("wait_for_controller_service_verify_config({service_id}/{request_id})");
let fetch = || async {
client
.controller_services()
.get_verification_request(service_id, request_id)
.await
};
let done = |dto: &VerifyConfigRequestDto| {
terminal_outcome(dto.complete, dto.failure_reason.as_deref(), "verification")
};
let result = poll_until(&config, &op, fetch, done).await;
if config.cleanup {
let res = client
.controller_services()
.delete_verification_request(service_id, request_id)
.await
.map(|_| ());
warn_cleanup_failure(&op, &format!("{service_id}/{request_id}"), res);
}
result
}
#[cfg(feature = "dynamic")]
pub async fn controller_service_verify_config_dynamic(
client: &crate::dynamic::DynamicClient,
service_id: &str,
request_id: &str,
config: WaitConfig,
) -> Result<VerifyConfigRequestDto, NifiError> {
let op = format!("wait_for_controller_service_verify_config({service_id}/{request_id})");
let fetch = || async {
client
.controller_services()
.get_verification_request(service_id, request_id)
.await
};
let done = |dto: &VerifyConfigRequestDto| {
terminal_outcome(dto.complete, dto.failure_reason.as_deref(), "verification")
};
let result = poll_until(&config, &op, fetch, done).await;
if config.cleanup {
let res = client
.controller_services()
.delete_verification_request(service_id, request_id)
.await
.map(|_| ());
warn_cleanup_failure(&op, &format!("{service_id}/{request_id}"), res);
}
result
}
#[cfg(not(feature = "dynamic"))]
pub async fn reporting_task_verify_config(
client: &crate::NifiClient,
task_id: &str,
request_id: &str,
config: WaitConfig,
) -> Result<VerifyConfigRequestDto, NifiError> {
let op = format!("wait_for_reporting_task_verify_config({task_id}/{request_id})");
let fetch = || async {
client
.reportingtasks()
.get_verification_request(task_id, request_id)
.await
};
let done = |dto: &VerifyConfigRequestDto| {
terminal_outcome(dto.complete, dto.failure_reason.as_deref(), "verification")
};
let result = poll_until(&config, &op, fetch, done).await;
if config.cleanup {
let res = client
.reportingtasks()
.delete_verification_request(task_id, request_id)
.await
.map(|_| ());
warn_cleanup_failure(&op, &format!("{task_id}/{request_id}"), res);
}
result
}
#[cfg(feature = "dynamic")]
pub async fn reporting_task_verify_config_dynamic(
client: &crate::dynamic::DynamicClient,
task_id: &str,
request_id: &str,
config: WaitConfig,
) -> Result<VerifyConfigRequestDto, NifiError> {
let op = format!("wait_for_reporting_task_verify_config({task_id}/{request_id})");
let fetch = || async {
client
.reportingtasks()
.get_verification_request(task_id, request_id)
.await
};
let done = |dto: &VerifyConfigRequestDto| {
terminal_outcome(dto.complete, dto.failure_reason.as_deref(), "verification")
};
let result = poll_until(&config, &op, fetch, done).await;
if config.cleanup {
let res = client
.reportingtasks()
.delete_verification_request(task_id, request_id)
.await
.map(|_| ());
warn_cleanup_failure(&op, &format!("{task_id}/{request_id}"), res);
}
result
}
#[cfg(not(feature = "dynamic"))]
pub async fn parameter_provider_verify_config(
client: &crate::NifiClient,
provider_id: &str,
request_id: &str,
config: WaitConfig,
) -> Result<VerifyConfigRequestDto, NifiError> {
let op = format!("wait_for_parameter_provider_verify_config({provider_id}/{request_id})");
let fetch = || async {
client
.parameterproviders()
.get_verification_request(provider_id, request_id)
.await
};
let done = |dto: &VerifyConfigRequestDto| {
terminal_outcome(dto.complete, dto.failure_reason.as_deref(), "verification")
};
let result = poll_until(&config, &op, fetch, done).await;
if config.cleanup {
let res = client
.parameterproviders()
.delete_verification_request(provider_id, request_id)
.await
.map(|_| ());
warn_cleanup_failure(&op, &format!("{provider_id}/{request_id}"), res);
}
result
}
#[cfg(feature = "dynamic")]
pub async fn parameter_provider_verify_config_dynamic(
client: &crate::dynamic::DynamicClient,
provider_id: &str,
request_id: &str,
config: WaitConfig,
) -> Result<VerifyConfigRequestDto, NifiError> {
let op = format!("wait_for_parameter_provider_verify_config({provider_id}/{request_id})");
let fetch = || async {
client
.parameterproviders()
.get_verification_request(provider_id, request_id)
.await
};
let done = |dto: &VerifyConfigRequestDto| {
terminal_outcome(dto.complete, dto.failure_reason.as_deref(), "verification")
};
let result = poll_until(&config, &op, fetch, done).await;
if config.cleanup {
let res = client
.parameterproviders()
.delete_verification_request(provider_id, request_id)
.await
.map(|_| ());
warn_cleanup_failure(&op, &format!("{provider_id}/{request_id}"), res);
}
result
}
#[cfg(not(feature = "dynamic"))]
pub async fn flow_analysis_rule_verify_config(
client: &crate::NifiClient,
rule_id: &str,
request_id: &str,
config: WaitConfig,
) -> Result<VerifyConfigRequestDto, NifiError> {
let op = format!("wait_for_flow_analysis_rule_verify_config({rule_id}/{request_id})");
let fetch = || async {
client
.controller()
.get_flow_analysis_rule_verification_request(rule_id, request_id)
.await
};
let done = |dto: &VerifyConfigRequestDto| {
terminal_outcome(dto.complete, dto.failure_reason.as_deref(), "verification")
};
let result = poll_until(&config, &op, fetch, done).await;
if config.cleanup {
let res = client
.controller()
.delete_flow_analysis_rule_verification_request(rule_id, request_id)
.await
.map(|_| ());
warn_cleanup_failure(&op, &format!("{rule_id}/{request_id}"), res);
}
result
}
#[cfg(feature = "dynamic")]
pub async fn flow_analysis_rule_verify_config_dynamic(
client: &crate::dynamic::DynamicClient,
rule_id: &str,
request_id: &str,
config: WaitConfig,
) -> Result<VerifyConfigRequestDto, NifiError> {
let op = format!("wait_for_flow_analysis_rule_verify_config({rule_id}/{request_id})");
let fetch = || async {
client
.controller()
.get_flow_analysis_rule_verification_request(rule_id, request_id)
.await
};
let done = |dto: &VerifyConfigRequestDto| {
terminal_outcome(dto.complete, dto.failure_reason.as_deref(), "verification")
};
let result = poll_until(&config, &op, fetch, done).await;
if config.cleanup {
let res = client
.controller()
.delete_flow_analysis_rule_verification_request(rule_id, request_id)
.await
.map(|_| ());
warn_cleanup_failure(&op, &format!("{rule_id}/{request_id}"), res);
}
result
}
#[cfg(not(feature = "dynamic"))]
use crate::types::ParameterProviderApplyParametersRequestDto;
#[cfg(not(feature = "dynamic"))]
pub async fn parameter_provider_apply_parameters(
client: &crate::NifiClient,
provider_id: &str,
request_id: &str,
config: WaitConfig,
) -> Result<ParameterProviderApplyParametersRequestDto, NifiError> {
let op = format!("wait_for_parameter_provider_apply_parameters({provider_id}/{request_id})");
let fetch = || async {
client
.parameterproviders()
.get_parameter_provider_apply_parameters_request(provider_id, request_id)
.await
};
let done = |dto: &ParameterProviderApplyParametersRequestDto| {
terminal_outcome(
dto.complete,
dto.failure_reason.as_deref(),
"apply parameters",
)
};
let result = poll_until(&config, &op, fetch, done).await;
if config.cleanup {
let res = client
.parameterproviders()
.delete_apply_parameters_request(provider_id, request_id, None)
.await
.map(|_| ());
warn_cleanup_failure(&op, &format!("{provider_id}/{request_id}"), res);
}
result
}
#[cfg(feature = "dynamic")]
use crate::dynamic::types::ParameterProviderApplyParametersRequestDto;
#[cfg(feature = "dynamic")]
pub async fn parameter_provider_apply_parameters_dynamic(
client: &crate::dynamic::DynamicClient,
provider_id: &str,
request_id: &str,
config: WaitConfig,
) -> Result<ParameterProviderApplyParametersRequestDto, NifiError> {
let op = format!("wait_for_parameter_provider_apply_parameters({provider_id}/{request_id})");
let fetch = || async {
client
.parameterproviders()
.get_parameter_provider_apply_parameters_request(provider_id, request_id)
.await
};
let done = |dto: &ParameterProviderApplyParametersRequestDto| {
terminal_outcome(
dto.complete,
dto.failure_reason.as_deref(),
"apply parameters",
)
};
let result = poll_until(&config, &op, fetch, done).await;
if config.cleanup {
let res = client
.parameterproviders()
.delete_apply_parameters_request(provider_id, request_id, None)
.await
.map(|_| ());
warn_cleanup_failure(&op, &format!("{provider_id}/{request_id}"), res);
}
result
}
#[cfg(not(feature = "dynamic"))]
use crate::types::VersionedFlowUpdateRequestEntity;
#[cfg(not(feature = "dynamic"))]
pub async fn versioned_flow_update(
client: &crate::NifiClient,
request_id: &str,
config: WaitConfig,
) -> Result<VersionedFlowUpdateRequestEntity, NifiError> {
let op = format!("wait_for_versioned_flow_update({request_id})");
let fetch = || async { client.versions().get_update_request(request_id).await };
let done = |entity: &VersionedFlowUpdateRequestEntity| {
let req = entity.request.as_ref();
terminal_outcome(
req.and_then(|r| r.complete),
req.and_then(|r| r.failure_reason.as_deref()),
"versioned flow update",
)
};
let result = poll_until(&config, &op, fetch, done).await;
if config.cleanup {
let res = client
.versions()
.delete_update_request(request_id, None)
.await
.map(|_| ());
warn_cleanup_failure(&op, request_id, res);
}
result
}
#[cfg(feature = "dynamic")]
use crate::dynamic::types::VersionedFlowUpdateRequestEntity;
#[cfg(feature = "dynamic")]
pub async fn versioned_flow_update_dynamic(
client: &crate::dynamic::DynamicClient,
request_id: &str,
config: WaitConfig,
) -> Result<VersionedFlowUpdateRequestEntity, NifiError> {
let op = format!("wait_for_versioned_flow_update({request_id})");
let fetch = || async { client.versions().get_update_request(request_id).await };
let done = |entity: &VersionedFlowUpdateRequestEntity| {
let req = entity.request.as_ref();
terminal_outcome(
req.and_then(|r| r.complete),
req.and_then(|r| r.failure_reason.as_deref()),
"versioned flow update",
)
};
let result = poll_until(&config, &op, fetch, done).await;
if config.cleanup {
let res = client
.versions()
.delete_update_request(request_id, None)
.await
.map(|_| ());
warn_cleanup_failure(&op, request_id, res);
}
result
}
#[cfg(not(feature = "dynamic"))]
pub async fn versioned_flow_revert(
client: &crate::NifiClient,
request_id: &str,
config: WaitConfig,
) -> Result<VersionedFlowUpdateRequestEntity, NifiError> {
let op = format!("wait_for_versioned_flow_revert({request_id})");
let fetch = || async { client.versions().get_revert_request(request_id).await };
let done = |entity: &VersionedFlowUpdateRequestEntity| {
let req = entity.request.as_ref();
terminal_outcome(
req.and_then(|r| r.complete),
req.and_then(|r| r.failure_reason.as_deref()),
"versioned flow revert",
)
};
let result = poll_until(&config, &op, fetch, done).await;
if config.cleanup {
let res = client
.versions()
.delete_revert_request(request_id, None)
.await
.map(|_| ());
warn_cleanup_failure(&op, request_id, res);
}
result
}
#[cfg(feature = "dynamic")]
pub async fn versioned_flow_revert_dynamic(
client: &crate::dynamic::DynamicClient,
request_id: &str,
config: WaitConfig,
) -> Result<VersionedFlowUpdateRequestEntity, NifiError> {
let op = format!("wait_for_versioned_flow_revert({request_id})");
let fetch = || async { client.versions().get_revert_request(request_id).await };
let done = |entity: &VersionedFlowUpdateRequestEntity| {
let req = entity.request.as_ref();
terminal_outcome(
req.and_then(|r| r.complete),
req.and_then(|r| r.failure_reason.as_deref()),
"versioned flow revert",
)
};
let result = poll_until(&config, &op, fetch, done).await;
if config.cleanup {
let res = client
.versions()
.delete_revert_request(request_id, None)
.await
.map(|_| ());
warn_cleanup_failure(&op, request_id, res);
}
result
}
#[cfg(not(feature = "dynamic"))]
use crate::types::ParameterContextValidationRequestEntity;
#[cfg(not(feature = "dynamic"))]
pub async fn parameter_context_validation(
client: &crate::NifiClient,
context_id: &str,
request_id: &str,
config: WaitConfig,
) -> Result<ParameterContextValidationRequestEntity, NifiError> {
let op = format!("wait_for_parameter_context_validation({context_id}/{request_id})");
let fetch = || async {
client
.parametercontexts()
.get_validation_request(context_id, request_id)
.await
};
let done = |entity: &ParameterContextValidationRequestEntity| {
let req = entity.request.as_ref();
terminal_outcome(
req.and_then(|r| r.complete),
req.and_then(|r| r.failure_reason.as_deref()),
"parameter context validation",
)
};
let result = poll_until(&config, &op, fetch, done).await;
if config.cleanup {
let res = client
.parametercontexts()
.delete_validation_request(context_id, request_id, None)
.await
.map(|_| ());
warn_cleanup_failure(&op, &format!("{context_id}/{request_id}"), res);
}
result
}
#[cfg(feature = "dynamic")]
use crate::dynamic::types::ParameterContextValidationRequestEntity;
#[cfg(feature = "dynamic")]
pub async fn parameter_context_validation_dynamic(
client: &crate::dynamic::DynamicClient,
context_id: &str,
request_id: &str,
config: WaitConfig,
) -> Result<ParameterContextValidationRequestEntity, NifiError> {
let op = format!("wait_for_parameter_context_validation({context_id}/{request_id})");
let fetch = || async {
client
.parametercontexts()
.get_validation_request(context_id, request_id)
.await
};
let done = |entity: &ParameterContextValidationRequestEntity| {
let req = entity.request.as_ref();
terminal_outcome(
req.and_then(|r| r.complete),
req.and_then(|r| r.failure_reason.as_deref()),
"parameter context validation",
)
};
let result = poll_until(&config, &op, fetch, done).await;
if config.cleanup {
let res = client
.parametercontexts()
.delete_validation_request(context_id, request_id, None)
.await
.map(|_| ());
warn_cleanup_failure(&op, &format!("{context_id}/{request_id}"), res);
}
result
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ProcessGroupTargetState {
Running,
Stopped,
}
impl ProcessGroupTargetState {
pub(crate) fn wire_value(&self) -> &'static str {
match self {
Self::Running => "RUNNING",
Self::Stopped => "STOPPED",
}
}
}
#[cfg(not(feature = "dynamic"))]
pub async fn process_group_state(
client: &crate::NifiClient,
pg_id: &str,
target: ProcessGroupTargetState,
config: WaitConfig,
) -> Result<crate::types::ProcessGroupEntity, NifiError> {
let op = format!(
"wait_for_process_group_state({pg_id}, {})",
target.wire_value()
);
let fetch = || async { client.processgroups().get_process_group(pg_id).await };
let done = move |entity: &crate::types::ProcessGroupEntity| {
let ready = match target {
ProcessGroupTargetState::Running => entity.stopped_count == Some(0),
ProcessGroupTargetState::Stopped => entity.running_count == Some(0),
};
if ready {
PollOutcome::Ready
} else {
PollOutcome::Pending
}
};
poll_until(&config, &op, fetch, done).await
}
#[cfg(feature = "dynamic")]
pub async fn process_group_state_dynamic(
client: &crate::dynamic::DynamicClient,
pg_id: &str,
target: ProcessGroupTargetState,
config: WaitConfig,
) -> Result<crate::dynamic::types::ProcessGroupEntity, NifiError> {
let op = format!(
"wait_for_process_group_state({pg_id}, {})",
target.wire_value()
);
let fetch = || async { client.processgroups().get_process_group(pg_id).await };
let done = move |entity: &crate::dynamic::types::ProcessGroupEntity| {
let ready = match target {
ProcessGroupTargetState::Running => entity.stopped_count == Some(0),
ProcessGroupTargetState::Stopped => entity.running_count == Some(0),
};
if ready {
PollOutcome::Ready
} else {
PollOutcome::Pending
}
};
poll_until(&config, &op, fetch, done).await
}
#[cfg(not(feature = "dynamic"))]
pub async fn process_group_controller_services_state(
client: &crate::NifiClient,
pg_id: &str,
target: ControllerServiceTargetState,
config: WaitConfig,
) -> Result<crate::types::ControllerServicesEntity, NifiError> {
use crate::types::{ControllerServiceDtoState, ControllerServiceDtoValidationStatus};
let op = format!(
"wait_for_process_group_controller_services_state({pg_id}, {})",
target.wire_value()
);
let fetch = || async {
client
.flow()
.get_controller_services_from_group(pg_id, None, None, None, None)
.await
};
let done = move |entity: &crate::types::ControllerServicesEntity| {
let services = entity.controller_services.as_deref().unwrap_or(&[]);
let settled = services.iter().all(|svc| {
let state = svc.component.as_ref().and_then(|c| c.state.as_ref());
let validation = svc
.component
.as_ref()
.and_then(|c| c.validation_status.as_ref());
match target {
ControllerServiceTargetState::Enabled => {
matches!(state, Some(ControllerServiceDtoState::Enabled))
|| (matches!(state, Some(ControllerServiceDtoState::Disabled))
&& matches!(
validation,
Some(ControllerServiceDtoValidationStatus::Invalid)
))
}
ControllerServiceTargetState::Disabled => {
matches!(state, Some(ControllerServiceDtoState::Disabled))
}
}
});
if settled {
PollOutcome::Ready
} else {
PollOutcome::Pending
}
};
poll_until(&config, &op, fetch, done).await
}
#[cfg(feature = "dynamic")]
pub async fn process_group_controller_services_state_dynamic(
client: &crate::dynamic::DynamicClient,
pg_id: &str,
target: ControllerServiceTargetState,
config: WaitConfig,
) -> Result<crate::dynamic::types::ControllerServicesEntity, NifiError> {
let op = format!(
"wait_for_process_group_controller_services_state({pg_id}, {})",
target.wire_value()
);
let fetch = || async {
client
.flow()
.get_controller_services_from_group(pg_id, None, None, None, None)
.await
};
let done = move |entity: &crate::dynamic::types::ControllerServicesEntity| {
let services = entity.controller_services.as_deref().unwrap_or(&[]);
let settled = services.iter().all(|svc| {
let state = svc.component.as_ref().and_then(|c| c.state.as_deref());
let validation = svc
.component
.as_ref()
.and_then(|c| c.validation_status.as_deref());
match target {
ControllerServiceTargetState::Enabled => {
state == Some("ENABLED")
|| (state == Some("DISABLED") && validation == Some("INVALID"))
}
ControllerServiceTargetState::Disabled => state == Some("DISABLED"),
}
});
if settled {
PollOutcome::Ready
} else {
PollOutcome::Pending
}
};
poll_until(&config, &op, fetch, done).await
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
#[tokio::test]
async fn poll_until_returns_ok_on_first_ready() {
let config = WaitConfig {
timeout: Duration::from_secs(1),
poll_interval: Duration::from_millis(10),
initial_delay: Duration::ZERO,
cleanup: true,
};
let fetch = || async { Ok::<i32, NifiError>(42) };
let done = |v: &i32| {
if *v == 42 {
PollOutcome::Ready
} else {
PollOutcome::Pending
}
};
let result = poll_until(&config, "op", fetch, done).await.unwrap();
assert_eq!(result, 42);
}
#[tokio::test]
async fn poll_until_polls_until_ready() {
let counter = Arc::new(AtomicUsize::new(0));
let config = WaitConfig {
timeout: Duration::from_secs(1),
poll_interval: Duration::from_millis(5),
initial_delay: Duration::ZERO,
cleanup: true,
};
let c = Arc::clone(&counter);
let fetch = move || {
let c = Arc::clone(&c);
async move {
let n = c.fetch_add(1, Ordering::SeqCst);
Ok::<usize, NifiError>(n)
}
};
let done = |v: &usize| {
if *v >= 3 {
PollOutcome::Ready
} else {
PollOutcome::Pending
}
};
let result = poll_until(&config, "op", fetch, done).await.unwrap();
assert_eq!(result, 3);
assert_eq!(counter.load(Ordering::SeqCst), 4);
}
#[tokio::test]
async fn poll_until_times_out() {
let config = WaitConfig {
timeout: Duration::from_millis(50),
poll_interval: Duration::from_millis(10),
initial_delay: Duration::ZERO,
cleanup: true,
};
let fetch = || async { Ok::<i32, NifiError>(0) };
let done = |_: &i32| PollOutcome::Pending;
let err = poll_until(&config, "test_op", fetch, done)
.await
.unwrap_err();
match err {
NifiError::Timeout { operation } => assert_eq!(operation, "test_op"),
other => panic!("expected Timeout, got {other:?}"),
}
}
#[tokio::test]
async fn poll_until_propagates_failed_outcome() {
let config = WaitConfig::default();
let fetch = || async { Ok::<i32, NifiError>(1) };
let done = |_: &i32| {
PollOutcome::Failed(NifiError::Api {
status: 500,
message: "boom".to_string(),
})
};
let err = poll_until(&config, "op", fetch, done).await.unwrap_err();
match err {
NifiError::Api { status, message } => {
assert_eq!(status, 500);
assert_eq!(message, "boom");
}
other => panic!("expected Api, got {other:?}"),
}
}
#[tokio::test]
async fn poll_until_propagates_fetch_error() {
let config = WaitConfig::default();
let fetch = || async {
Err::<i32, NifiError>(NifiError::Api {
status: 404,
message: "not found".to_string(),
})
};
let done = |_: &i32| PollOutcome::Pending;
let err = poll_until(&config, "op", fetch, done).await.unwrap_err();
assert!(matches!(err, NifiError::Api { status: 404, .. }));
}
#[tokio::test]
async fn poll_until_initial_delay_does_not_consume_timeout_window() {
let config = WaitConfig {
timeout: Duration::from_millis(80),
poll_interval: Duration::from_millis(10),
initial_delay: Duration::from_millis(120),
cleanup: true,
};
let counter = Arc::new(AtomicUsize::new(0));
let c = Arc::clone(&counter);
let fetch = move || {
let c = Arc::clone(&c);
async move {
c.fetch_add(1, Ordering::SeqCst);
Ok::<i32, NifiError>(0)
}
};
let done = |_: &i32| PollOutcome::Pending;
let err = poll_until(&config, "delayed_op", fetch, done)
.await
.unwrap_err();
assert!(matches!(err, NifiError::Timeout { .. }));
let n = counter.load(Ordering::SeqCst);
assert!(
n >= 3,
"expected several polls within the timeout window after initial_delay, got {n}"
);
}
#[test]
fn parameter_context_update_outcome_failure_reason_is_terminal() {
if let PollOutcome::Failed(NifiError::Api { status, message }) =
terminal_outcome(None, Some("boom"), "parameter context update")
{
assert_eq!(status, STATUS_OPERATION_FAILED);
assert!(message.contains("boom"));
} else {
panic!("expected Failed(Api) for (complete=None, failure=Some)");
}
if let PollOutcome::Failed(NifiError::Api { message, .. }) =
terminal_outcome(Some(false), Some("nope"), "parameter context update")
{
assert!(message.contains("nope"));
} else {
panic!("expected Failed(Api) for (complete=Some(false), failure=Some)");
}
if let PollOutcome::Failed(NifiError::Api { message, .. }) = terminal_outcome(
Some(true),
Some("failed-after-complete"),
"parameter context update",
) {
assert!(message.contains("failed-after-complete"));
} else {
panic!("expected Failed(Api) for (complete=Some(true), failure=Some)");
}
}
#[test]
fn parameter_context_update_outcome_no_failure_reason_paths() {
assert!(matches!(
terminal_outcome(Some(true), None, "parameter context update"),
PollOutcome::Ready
));
assert!(matches!(
terminal_outcome(Some(false), None, "parameter context update"),
PollOutcome::Pending
));
assert!(matches!(
terminal_outcome(None, None, "parameter context update"),
PollOutcome::Pending
));
}
#[tokio::test]
async fn poll_until_honors_initial_delay() {
let config = WaitConfig {
timeout: Duration::from_secs(1),
poll_interval: Duration::from_millis(10),
initial_delay: Duration::from_millis(30),
cleanup: true,
};
let counter = Arc::new(AtomicUsize::new(0));
let start = tokio::time::Instant::now();
let c = Arc::clone(&counter);
let fetch = move || {
let c = Arc::clone(&c);
async move {
c.fetch_add(1, Ordering::SeqCst);
Ok::<i32, NifiError>(1)
}
};
let done = |_: &i32| PollOutcome::Ready;
let _ = poll_until(&config, "op", fetch, done).await.unwrap();
let elapsed = start.elapsed();
assert!(
elapsed >= Duration::from_millis(25),
"initial_delay not honored, elapsed = {elapsed:?}"
);
assert_eq!(counter.load(Ordering::SeqCst), 1);
}
}
#[cfg(test)]
mod outcome_tests {
use super::*;
#[test]
fn terminal_outcome_pending_when_not_terminal() {
match terminal_outcome(Some(false), None, "drop request") {
PollOutcome::Pending => {}
other => panic!("expected Pending, got {other:?}"),
}
}
#[test]
fn terminal_outcome_pending_when_none() {
match terminal_outcome(None, None, "drop request") {
PollOutcome::Pending => {}
other => panic!("expected Pending, got {other:?}"),
}
}
#[test]
fn terminal_outcome_ready_when_terminal() {
match terminal_outcome(Some(true), None, "drop request") {
PollOutcome::Ready => {}
other => panic!("expected Ready, got {other:?}"),
}
}
#[test]
fn terminal_outcome_failed_when_failure_reason_set() {
match terminal_outcome(Some(true), Some("queue empty"), "drop request") {
PollOutcome::Failed(NifiError::Api { status, message }) => {
assert_eq!(status, 500);
assert!(message.contains("drop request failed:"));
assert!(message.contains("queue empty"));
}
other => panic!("expected Failed(Api), got {other:?}"),
}
}
#[test]
fn terminal_outcome_failure_overrides_pending_terminal() {
match terminal_outcome(Some(false), Some("oops"), "verification") {
PollOutcome::Failed(_) => {}
other => panic!("expected Failed, got {other:?}"),
}
}
}