#![deny(missing_docs)]
use std::time::Duration;
use reqwest::StatusCode;
use crate::NifiError;
const STATUS_OPERATION_FAILED: u16 = StatusCode::INTERNAL_SERVER_ERROR.as_u16();
#[derive(Debug, Clone)]
pub struct WaitConfig {
pub timeout: Duration,
pub poll_interval: Duration,
pub initial_delay: Duration,
pub cleanup: bool,
}
impl Default for WaitConfig {
fn default() -> Self {
Self {
timeout: Duration::from_secs(30),
poll_interval: Duration::from_millis(500),
initial_delay: Duration::ZERO,
cleanup: true,
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ProcessorTargetState {
Running,
Stopped,
Disabled,
}
impl ProcessorTargetState {
pub(crate) fn wire_value(&self) -> &'static str {
#[cfg(feature = "dynamic")]
use crate::dynamic::types::ProcessorDtoState;
#[cfg(not(feature = "dynamic"))]
use crate::types::ProcessorDtoState;
match self {
Self::Running => ProcessorDtoState::Running.as_str(),
Self::Stopped => ProcessorDtoState::Stopped.as_str(),
Self::Disabled => ProcessorDtoState::Disabled.as_str(),
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ControllerServiceTargetState {
Enabled,
Disabled,
}
impl ControllerServiceTargetState {
pub(crate) fn wire_value(&self) -> &'static str {
#[cfg(feature = "dynamic")]
use crate::dynamic::types::ControllerServiceDtoState;
#[cfg(not(feature = "dynamic"))]
use crate::types::ControllerServiceDtoState;
match self {
Self::Enabled => ControllerServiceDtoState::Enabled.as_str(),
Self::Disabled => ControllerServiceDtoState::Disabled.as_str(),
}
}
}
enum PollOutcome {
Pending,
Ready,
Failed(NifiError),
}
async fn poll_until<T, FetchFn, FetchFut>(
config: &WaitConfig,
operation: &str,
fetch: FetchFn,
done: impl Fn(&T) -> PollOutcome,
) -> Result<T, NifiError>
where
FetchFn: Fn() -> FetchFut,
FetchFut: core::future::Future<Output = Result<T, NifiError>>,
{
let deadline = tokio::time::Instant::now() + config.timeout;
if !config.initial_delay.is_zero() {
tokio::time::sleep(config.initial_delay).await;
}
loop {
let value = fetch().await?;
match done(&value) {
PollOutcome::Ready => return Ok(value),
PollOutcome::Failed(e) => return Err(e),
PollOutcome::Pending => {}
}
if tokio::time::Instant::now() >= deadline {
return Err(NifiError::Timeout {
operation: operation.to_string(),
});
}
let remaining = deadline.saturating_duration_since(tokio::time::Instant::now());
let next = core::cmp::min(config.poll_interval, remaining);
tokio::time::sleep(next).await;
}
}
fn warn_cleanup_failure<E: std::fmt::Display>(
operation: &str,
target: &str,
result: Result<(), E>,
) {
if let Err(err) = result {
tracing::warn!(
operation,
target,
error = %err,
"cleanup request failed (best-effort; ignored)"
);
}
}
#[cfg(not(feature = "dynamic"))]
use crate::types::ProcessorEntity;
#[cfg(not(feature = "dynamic"))]
pub async fn processor_state(
client: &crate::NifiClient,
processor_id: &str,
target: ProcessorTargetState,
config: WaitConfig,
) -> Result<ProcessorEntity, NifiError> {
use crate::types::ProcessorDtoState;
let op = format!(
"wait_for_processor_state({processor_id}, {})",
target.wire_value()
);
let fetch = || async { client.processors().get_processor(processor_id).await };
let done = move |entity: &ProcessorEntity| {
let matches = entity
.component
.as_ref()
.and_then(|c| c.state.as_ref())
.is_some_and(|s| {
matches!(
(target, s),
(ProcessorTargetState::Running, ProcessorDtoState::Running)
| (ProcessorTargetState::Stopped, ProcessorDtoState::Stopped)
| (ProcessorTargetState::Disabled, ProcessorDtoState::Disabled)
)
});
if matches {
PollOutcome::Ready
} else {
PollOutcome::Pending
}
};
poll_until(&config, &op, fetch, done).await
}
#[cfg(not(feature = "dynamic"))]
use crate::types::ControllerServiceEntity;
#[cfg(not(feature = "dynamic"))]
pub async fn controller_service_state(
client: &crate::NifiClient,
service_id: &str,
target: ControllerServiceTargetState,
config: WaitConfig,
) -> Result<ControllerServiceEntity, NifiError> {
use crate::types::ControllerServiceDtoState;
let op = format!(
"wait_for_controller_service_state({service_id}, {})",
target.wire_value()
);
let fetch = || async {
client
.controller_services()
.get_controller_service(service_id, None)
.await
};
let done = move |entity: &ControllerServiceEntity| {
let matches = entity
.component
.as_ref()
.and_then(|c| c.state.as_ref())
.is_some_and(|s| {
matches!(
(target, s),
(
ControllerServiceTargetState::Enabled,
ControllerServiceDtoState::Enabled
) | (
ControllerServiceTargetState::Disabled,
ControllerServiceDtoState::Disabled
)
)
});
if matches {
PollOutcome::Ready
} else {
PollOutcome::Pending
}
};
poll_until(&config, &op, fetch, done).await
}
#[cfg(feature = "dynamic")]
use crate::dynamic::types::ControllerServiceEntity;
#[cfg(feature = "dynamic")]
pub async fn controller_service_state_dynamic(
client: &crate::dynamic::DynamicClient,
service_id: &str,
target: ControllerServiceTargetState,
config: WaitConfig,
) -> Result<ControllerServiceEntity, NifiError> {
let target_wire = target.wire_value();
let op = format!("wait_for_controller_service_state({service_id}, {target_wire})");
let fetch = || async {
client
.controller_services()
.get_controller_service(service_id, None)
.await
};
let done = move |entity: &ControllerServiceEntity| {
let state = entity.component.as_ref().and_then(|c| c.state.as_deref());
if state == Some(target_wire) {
PollOutcome::Ready
} else {
PollOutcome::Pending
}
};
poll_until(&config, &op, fetch, done).await
}
#[cfg(feature = "dynamic")]
use crate::dynamic::types::ProcessorEntity;
#[cfg(feature = "dynamic")]
pub async fn processor_state_dynamic(
client: &crate::dynamic::DynamicClient,
processor_id: &str,
target: ProcessorTargetState,
config: WaitConfig,
) -> Result<ProcessorEntity, NifiError> {
let target_wire = target.wire_value();
let op = format!("wait_for_processor_state({processor_id}, {target_wire})");
let fetch = || async { client.processors().get_processor(processor_id).await };
let done = move |entity: &ProcessorEntity| {
let state = entity.component.as_ref().and_then(|c| c.state.as_deref());
if state == Some(target_wire) {
PollOutcome::Ready
} else {
PollOutcome::Pending
}
};
poll_until(&config, &op, fetch, done).await
}
#[cfg(not(feature = "dynamic"))]
use crate::types::ParameterContextUpdateRequestEntity;
#[cfg(not(feature = "dynamic"))]
pub async fn parameter_context_update(
client: &crate::NifiClient,
context_id: &str,
request_id: &str,
config: WaitConfig,
) -> Result<ParameterContextUpdateRequestEntity, NifiError> {
let op = format!("wait_for_parameter_context_update({context_id}/{request_id})");
let fetch = || async {
client
.parametercontexts()
.get_parameter_context_update(context_id, request_id)
.await
};
let done = |entity: &ParameterContextUpdateRequestEntity| {
let req = entity.request.as_ref();
let complete = req.and_then(|r| r.complete).unwrap_or(false);
let failure = req.and_then(|r| r.failure_reason.as_ref());
match (complete, failure) {
(true, Some(reason)) => PollOutcome::Failed(NifiError::Api {
status: STATUS_OPERATION_FAILED,
message: format!("parameter context update failed: {reason}"),
}),
(true, None) => PollOutcome::Ready,
(false, _) => PollOutcome::Pending,
}
};
let result = poll_until(&config, &op, fetch, done).await;
if config.cleanup {
let res = client
.parametercontexts()
.delete_update_request(context_id, request_id, None)
.await
.map(|_| ());
warn_cleanup_failure(&op, &format!("{context_id}/{request_id}"), res);
}
result
}
#[cfg(feature = "dynamic")]
use crate::dynamic::types::ParameterContextUpdateRequestEntity;
#[cfg(feature = "dynamic")]
pub async fn parameter_context_update_dynamic(
client: &crate::dynamic::DynamicClient,
context_id: &str,
request_id: &str,
config: WaitConfig,
) -> Result<ParameterContextUpdateRequestEntity, NifiError> {
let op = format!("wait_for_parameter_context_update({context_id}/{request_id})");
let fetch = || async {
client
.parametercontexts()
.get_parameter_context_update(context_id, request_id)
.await
};
let done = |entity: &ParameterContextUpdateRequestEntity| {
let req = entity.request.as_ref();
let complete = req.and_then(|r| r.complete).unwrap_or(false);
let failure = req.and_then(|r| r.failure_reason.as_ref());
match (complete, failure) {
(true, Some(reason)) => PollOutcome::Failed(NifiError::Api {
status: STATUS_OPERATION_FAILED,
message: format!("parameter context update failed: {reason}"),
}),
(true, None) => PollOutcome::Ready,
(false, _) => PollOutcome::Pending,
}
};
let result = poll_until(&config, &op, fetch, done).await;
if config.cleanup {
let res = client
.parametercontexts()
.delete_update_request(context_id, request_id, None)
.await
.map(|_| ());
warn_cleanup_failure(&op, &format!("{context_id}/{request_id}"), res);
}
result
}
#[cfg(not(feature = "dynamic"))]
use crate::types::ProvenanceDto;
#[cfg(not(feature = "dynamic"))]
pub async fn provenance_query(
client: &crate::NifiClient,
query_id: &str,
config: WaitConfig,
) -> Result<ProvenanceDto, NifiError> {
let op = format!("wait_for_provenance_query({query_id})");
let fetch = || async {
client
.provenance()
.get_provenance(query_id, None, None, None)
.await
};
let done = |dto: &ProvenanceDto| {
if dto.finished.unwrap_or(false) {
PollOutcome::Ready
} else {
PollOutcome::Pending
}
};
let result = poll_until(&config, &op, fetch, done).await;
if config.cleanup {
let res = client
.provenance()
.delete_provenance(query_id, None)
.await
.map(|_| ());
warn_cleanup_failure(&op, query_id, res);
}
result
}
#[cfg(feature = "dynamic")]
use crate::dynamic::types::ProvenanceDto;
#[cfg(feature = "dynamic")]
pub async fn provenance_query_dynamic(
client: &crate::dynamic::DynamicClient,
query_id: &str,
config: WaitConfig,
) -> Result<ProvenanceDto, NifiError> {
let op = format!("wait_for_provenance_query({query_id})");
let fetch = || async {
client
.provenance()
.get_provenance(query_id, None, None, None)
.await
};
let done = |dto: &ProvenanceDto| {
if dto.finished.unwrap_or(false) {
PollOutcome::Ready
} else {
PollOutcome::Pending
}
};
let result = poll_until(&config, &op, fetch, done).await;
if config.cleanup {
let res = client
.provenance()
.delete_provenance(query_id, None)
.await
.map(|_| ());
warn_cleanup_failure(&op, query_id, res);
}
result
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
#[tokio::test]
async fn poll_until_returns_ok_on_first_ready() {
let config = WaitConfig {
timeout: Duration::from_secs(1),
poll_interval: Duration::from_millis(10),
initial_delay: Duration::ZERO,
cleanup: true,
};
let fetch = || async { Ok::<i32, NifiError>(42) };
let done = |v: &i32| {
if *v == 42 {
PollOutcome::Ready
} else {
PollOutcome::Pending
}
};
let result = poll_until(&config, "op", fetch, done).await.unwrap();
assert_eq!(result, 42);
}
#[tokio::test]
async fn poll_until_polls_until_ready() {
let counter = Arc::new(AtomicUsize::new(0));
let config = WaitConfig {
timeout: Duration::from_secs(1),
poll_interval: Duration::from_millis(5),
initial_delay: Duration::ZERO,
cleanup: true,
};
let c = Arc::clone(&counter);
let fetch = move || {
let c = Arc::clone(&c);
async move {
let n = c.fetch_add(1, Ordering::SeqCst);
Ok::<usize, NifiError>(n)
}
};
let done = |v: &usize| {
if *v >= 3 {
PollOutcome::Ready
} else {
PollOutcome::Pending
}
};
let result = poll_until(&config, "op", fetch, done).await.unwrap();
assert_eq!(result, 3);
assert_eq!(counter.load(Ordering::SeqCst), 4);
}
#[tokio::test]
async fn poll_until_times_out() {
let config = WaitConfig {
timeout: Duration::from_millis(50),
poll_interval: Duration::from_millis(10),
initial_delay: Duration::ZERO,
cleanup: true,
};
let fetch = || async { Ok::<i32, NifiError>(0) };
let done = |_: &i32| PollOutcome::Pending;
let err = poll_until(&config, "test_op", fetch, done)
.await
.unwrap_err();
match err {
NifiError::Timeout { operation } => assert_eq!(operation, "test_op"),
other => panic!("expected Timeout, got {other:?}"),
}
}
#[tokio::test]
async fn poll_until_propagates_failed_outcome() {
let config = WaitConfig::default();
let fetch = || async { Ok::<i32, NifiError>(1) };
let done = |_: &i32| {
PollOutcome::Failed(NifiError::Api {
status: 500,
message: "boom".to_string(),
})
};
let err = poll_until(&config, "op", fetch, done).await.unwrap_err();
match err {
NifiError::Api { status, message } => {
assert_eq!(status, 500);
assert_eq!(message, "boom");
}
other => panic!("expected Api, got {other:?}"),
}
}
#[tokio::test]
async fn poll_until_propagates_fetch_error() {
let config = WaitConfig::default();
let fetch = || async {
Err::<i32, NifiError>(NifiError::Api {
status: 404,
message: "not found".to_string(),
})
};
let done = |_: &i32| PollOutcome::Pending;
let err = poll_until(&config, "op", fetch, done).await.unwrap_err();
assert!(matches!(err, NifiError::Api { status: 404, .. }));
}
#[tokio::test]
async fn poll_until_honors_initial_delay() {
let config = WaitConfig {
timeout: Duration::from_secs(1),
poll_interval: Duration::from_millis(10),
initial_delay: Duration::from_millis(30),
cleanup: true,
};
let counter = Arc::new(AtomicUsize::new(0));
let start = tokio::time::Instant::now();
let c = Arc::clone(&counter);
let fetch = move || {
let c = Arc::clone(&c);
async move {
c.fetch_add(1, Ordering::SeqCst);
Ok::<i32, NifiError>(1)
}
};
let done = |_: &i32| PollOutcome::Ready;
let _ = poll_until(&config, "op", fetch, done).await.unwrap();
let elapsed = start.elapsed();
assert!(
elapsed >= Duration::from_millis(25),
"initial_delay not honored, elapsed = {elapsed:?}"
);
assert_eq!(counter.load(Ordering::SeqCst), 1);
}
}