Skip to main content

nifi_rust_client/
client.rs

1#![deny(missing_docs)]
2use std::sync::Arc;
3
4use reqwest::header::{CONTENT_TYPE, HeaderName};
5use reqwest::{Client, Method, StatusCode};
6use serde::de::DeserializeOwned;
7use snafu::ResultExt as _;
8use tokio::sync::RwLock;
9use url::Url;
10
11use crate::NifiError;
12use crate::config::auth::AuthProvider;
13use crate::error::{AuthSnafu, HttpSnafu};
14
15/// `application/octet-stream` MIME type for binary upload bodies.
16const APPLICATION_OCTET_STREAM: &str = "application/octet-stream";
17
18/// HTTP header NiFi reads behind a trusted proxy to identify the end user.
19/// Stored lowercased because [`HeaderName::from_static`] requires it; HTTP
20/// header names are case-insensitive on the wire.
21const PROXIED_ENTITIES_CHAIN: HeaderName = HeaderName::from_static("x-proxiedentitieschain");
22
23/// Client for the Apache NiFi REST API.
24pub struct NifiClient {
25    base_url: Url,
26    http: Client,
27    token: Arc<RwLock<Option<zeroize::Zeroizing<String>>>>,
28    auth_provider: Option<Arc<dyn AuthProvider>>,
29    proxied_entities_chain: Option<String>,
30    retry_policy: Option<crate::config::retry::RetryPolicy>,
31    request_id_header: Option<String>,
32    auth_lock: Arc<tokio::sync::Mutex<()>>,
33}
34
35impl Clone for NifiClient {
36    fn clone(&self) -> Self {
37        Self {
38            base_url: self.base_url.clone(),
39            http: self.http.clone(),
40            token: Arc::clone(&self.token),
41            auth_provider: self.auth_provider.clone(),
42            proxied_entities_chain: self.proxied_entities_chain.clone(),
43            retry_policy: self.retry_policy.clone(),
44            request_id_header: self.request_id_header.clone(),
45            auth_lock: Arc::clone(&self.auth_lock),
46        }
47    }
48}
49
50impl std::fmt::Debug for NifiClient {
51    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
52        f.debug_struct("NifiClient")
53            .field("base_url", &self.base_url)
54            .field(
55                "auth_provider",
56                &self.auth_provider.as_ref().map(|c| format!("{c:?}")),
57            )
58            .field("proxied_entities_chain", &self.proxied_entities_chain)
59            .field("retry_policy", &self.retry_policy)
60            .field("request_id_header", &self.request_id_header)
61            .finish_non_exhaustive()
62    }
63}
64
65impl NifiClient {
66    /// Construct a client from pre-built parts. Used by [`crate::NifiClientBuilder`].
67    pub(crate) fn from_parts(
68        base_url: Url,
69        http: Client,
70        auth_provider: Option<Arc<dyn AuthProvider>>,
71        proxied_entities_chain: Option<String>,
72        retry_policy: Option<crate::config::retry::RetryPolicy>,
73        request_id_header: Option<String>,
74    ) -> Self {
75        Self {
76            base_url,
77            http,
78            token: Arc::new(RwLock::new(None)),
79            auth_provider,
80            proxied_entities_chain,
81            retry_policy,
82            request_id_header,
83            auth_lock: Arc::new(tokio::sync::Mutex::new(())),
84        }
85    }
86
87    /// Return the current bearer token, if one has been set.
88    ///
89    /// The token is a NiFi-issued JWT. The returned `String` is a clone that is
90    /// **not** zeroized on drop — it is your responsibility to persist or destroy
91    /// it securely. The in-client copy is zeroized when cleared or when the
92    /// client is dropped.
93    pub async fn token(&self) -> Option<String> {
94        self.token.read().await.as_ref().map(|t| (**t).clone())
95    }
96
97    /// Restore a previously obtained bearer token.
98    ///
99    /// Useful for CLI tools that persist the token in a file between sessions.
100    /// If the token has expired, the next API call will return
101    /// [`NifiError::Unauthorized`]; re-call [`login`][Self::login]
102    /// to obtain a fresh one.
103    pub async fn set_token(&self, token: String) {
104        *self.token.write().await = Some(zeroize::Zeroizing::new(token));
105    }
106
107    /// Invalidate the current bearer token and clear it from the client.
108    ///
109    /// Sends `DELETE /nifi-api/access/logout` to invalidate the token server-side,
110    /// then clears the local token unconditionally so that subsequent requests are
111    /// not sent with a stale credential.
112    ///
113    /// If the server returns an error (e.g. `401` because the token had already
114    /// expired) the local token is still cleared and the error is returned to the
115    /// caller.
116    #[tracing::instrument(skip(self), fields(request_id = tracing::field::Empty))]
117    pub async fn logout(&self) -> Result<(), NifiError> {
118        let result = self.delete_inner("/access/logout").await;
119        *self.token.write().await = None;
120        if result.is_ok() {
121            tracing::info!("NiFi logout successful");
122        }
123        result
124    }
125
126    /// Authenticate with NiFi using single-user credentials.
127    ///
128    /// Obtains a JWT token from `/nifi-api/access/token` and stores it on the
129    /// client for all subsequent requests.
130    ///
131    /// # Token lifetime and expiry
132    ///
133    /// NiFi JWTs expire after 12 hours by default (configurable server-side via
134    /// `nifi.security.user.login.identity.provider.expiration`). Once expired,
135    /// any API call returns [`NifiError::Unauthorized`]. Configure an
136    /// [`AuthProvider`] on the builder to enable
137    /// automatic token refresh on 401 responses.
138    #[tracing::instrument(skip(self, username, password), fields(request_id = tracing::field::Empty))]
139    pub async fn login(&self, username: &str, password: &str) -> Result<(), NifiError> {
140        let method = Method::POST;
141        tracing::debug!(method = %method, path = "/access/token", "NiFi API request");
142        let url = self.api_url("/access/token");
143        let req = self.apply_request_id(self.http.post(url));
144        let resp = req
145            .form(&[("username", username), ("password", password)])
146            .send()
147            .await
148            .context(HttpSnafu)?;
149
150        let status = resp.status();
151        tracing::debug!(
152            method = %method,
153            path = "/access/token",
154            status = status.as_u16(),
155            "NiFi API response"
156        );
157        if !status.is_success() {
158            let body = resp.text().await.unwrap_or_else(|_| status.to_string());
159            tracing::debug!(
160                method = %method,
161                path = "/access/token",
162                status = status.as_u16(),
163                %body,
164                "NiFi API raw error body"
165            );
166            let message = extract_error_message(&body);
167            tracing::warn!(
168                method = %method,
169                path = "/access/token",
170                status = status.as_u16(),
171                %message,
172                "NiFi API error"
173            );
174            return AuthSnafu { message }.fail();
175        }
176
177        let token = resp.text().await.context(HttpSnafu)?;
178        *self.token.write().await = Some(zeroize::Zeroizing::new(token));
179        tracing::info!("NiFi login successful for {username}");
180        Ok(())
181    }
182
183    /// Authenticate using the configured [`AuthProvider`].
184    ///
185    /// Returns [`NifiError::Auth`] if no auth provider has been configured.
186    #[tracing::instrument(skip(self), fields(request_id = tracing::field::Empty))]
187    pub async fn authenticate(&self) -> Result<(), NifiError> {
188        let provider = self.auth_provider.as_ref().ok_or_else(|| NifiError::Auth {
189            message: "no auth provider configured".to_string(),
190        })?;
191        provider.authenticate(self).await
192    }
193
194    // ── Auth-retry wrapper ────────────────────────────────────────────────────
195
196    /// Execute `f`, and if it returns `NifiError::Unauthorized` and a credential
197    /// provider is configured, refresh the token and retry once.
198    ///
199    /// Uses a mutex + token-snapshot check to ensure that concurrent 401
200    /// responses only trigger a single re-authentication: whichever task wins
201    /// the lock re-auths; tasks that arrive later skip re-auth because they
202    /// observe a changed token.
203    #[tracing::instrument(skip_all)]
204    async fn with_auth_retry<T, F, Fut>(&self, f: F) -> Result<T, NifiError>
205    where
206        F: Fn() -> Fut,
207        Fut: std::future::Future<Output = Result<T, NifiError>>,
208    {
209        // Snapshot the token at entry so we can detect whether a concurrent
210        // task already re-authed while we were waiting on the lock.
211        let token_before = self.token.read().await.as_ref().map(|t| (**t).clone());
212
213        match f().await {
214            Err(NifiError::Unauthorized { .. }) if self.auth_provider.is_some() => {
215                let _guard = self.auth_lock.lock().await;
216                let token_now = self.token.read().await.as_ref().map(|t| (**t).clone());
217                if token_now == token_before {
218                    tracing::info!("received 401, refreshing token via auth provider");
219                    self.authenticate().await?;
220                } else {
221                    tracing::debug!("token already refreshed by concurrent task, skipping re-auth");
222                }
223                // Release the auth lock BEFORE retrying the request — otherwise
224                // the retry's `f().await` would hold the lock across its entire
225                // HTTP round-trip, serializing every concurrent request through
226                // a single critical section.
227                drop(_guard);
228                f().await
229            }
230            other => other,
231        }
232    }
233
234    // ── Transient-error retry wrapper ──────────────────────────────────────────
235
236    /// Execute `f` with optional transient-error retry using exponential backoff.
237    ///
238    /// When a [`RetryPolicy`](crate::config::retry::RetryPolicy) is configured, retries
239    /// [retryable](NifiError::is_retryable) errors up to `max_retries` times.
240    /// Each attempt goes through [`with_auth_retry`] so 401 handling still works.
241    #[tracing::instrument(skip_all)]
242    async fn with_retry<T, F, Fut>(&self, f: F) -> Result<T, NifiError>
243    where
244        F: Fn() -> Fut,
245        Fut: std::future::Future<Output = Result<T, NifiError>>,
246    {
247        let Some(policy) = &self.retry_policy else {
248            return self.with_auth_retry(&f).await;
249        };
250
251        let mut last_err: Option<NifiError> = None;
252        for attempt in 0..=policy.max_retries {
253            if attempt > 0 {
254                let backoff = policy.backoff_for(attempt - 1);
255                tracing::info!(
256                    attempt,
257                    backoff_ms = backoff.as_millis() as u64,
258                    "retrying after transient error"
259                );
260                tokio::time::sleep(backoff).await;
261            }
262            match self.with_auth_retry(&f).await {
263                Ok(v) => return Ok(v),
264                Err(e) if e.is_retryable() => {
265                    tracing::warn!(attempt, error = %e, "transient error, will retry");
266                    last_err = Some(e);
267                }
268                Err(e) => return Err(e),
269            }
270        }
271        // Safety: the loop always executes at least once (attempt 0..=max_retries),
272        // and every iteration that reaches here sets `last_err`.
273        match last_err {
274            Some(e) => Err(e),
275            // unreachable: loop runs at least once and non-retryable errors return early
276            None => self.with_auth_retry(&f).await,
277        }
278    }
279
280    // ── Private helpers ───────────────────────────────────────────────────────
281
282    #[tracing::instrument(skip(self), fields(request_id = tracing::field::Empty))]
283    pub(crate) async fn get<T: DeserializeOwned>(
284        &self,
285        path: &str,
286        extra_headers: &[(&str, &str)],
287    ) -> Result<T, NifiError> {
288        self.with_retry(|| async {
289            let req = self
290                .build_request(&Method::GET, path, self.http.get(self.api_url(path)))
291                .await;
292            let req = apply_extra_headers(req, extra_headers);
293            let resp = req.send().await.context(HttpSnafu)?;
294            Self::deserialize(&Method::GET, path, resp).await
295        })
296        .await
297    }
298
299    #[tracing::instrument(skip(self, body), fields(request_id = tracing::field::Empty))]
300    pub(crate) async fn post<B, T>(
301        &self,
302        path: &str,
303        extra_headers: &[(&str, &str)],
304        body: &B,
305    ) -> Result<T, NifiError>
306    where
307        B: serde::Serialize,
308        T: DeserializeOwned,
309    {
310        self.with_retry(|| async {
311            let req = self
312                .build_request(&Method::POST, path, self.http.post(self.api_url(path)))
313                .await;
314            let req = apply_extra_headers(req, extra_headers);
315            let resp = req.json(body).send().await.context(HttpSnafu)?;
316            Self::deserialize(&Method::POST, path, resp).await
317        })
318        .await
319    }
320
321    #[tracing::instrument(skip(self, body), fields(request_id = tracing::field::Empty))]
322    pub(crate) async fn put<B, T>(
323        &self,
324        path: &str,
325        extra_headers: &[(&str, &str)],
326        body: &B,
327    ) -> Result<T, NifiError>
328    where
329        B: serde::Serialize,
330        T: DeserializeOwned,
331    {
332        self.with_retry(|| async {
333            let req = self
334                .build_request(&Method::PUT, path, self.http.put(self.api_url(path)))
335                .await;
336            let req = apply_extra_headers(req, extra_headers);
337            let resp = req.json(body).send().await.context(HttpSnafu)?;
338            Self::deserialize(&Method::PUT, path, resp).await
339        })
340        .await
341    }
342
343    /// POST with no request body; deserializes the JSON response.
344    #[tracing::instrument(skip(self), fields(request_id = tracing::field::Empty))]
345    pub(crate) async fn post_no_body<T: DeserializeOwned>(
346        &self,
347        path: &str,
348        extra_headers: &[(&str, &str)],
349    ) -> Result<T, NifiError> {
350        self.with_retry(|| async {
351            let req = self
352                .build_request(&Method::POST, path, self.http.post(self.api_url(path)))
353                .await;
354            let req = apply_extra_headers(req, extra_headers);
355            let resp = req.send().await.context(HttpSnafu)?;
356            Self::deserialize(&Method::POST, path, resp).await
357        })
358        .await
359    }
360
361    /// POST with no request body; ignores the response body.
362    ///
363    /// Called by both the static per-version emitter (for POST endpoints
364    /// with no body and an empty response) and the dynamic canonical
365    /// emitter. No current NiFi 2.x spec triggers the static path, so
366    /// this helper is only reached via generated code in `$OUT_DIR` that
367    /// clippy's dead-code lint cannot see. Kept available via
368    /// `#[allow(dead_code)]` rather than deleted.
369    #[allow(dead_code)]
370    #[tracing::instrument(skip(self), fields(request_id = tracing::field::Empty))]
371    pub(crate) async fn post_void_no_body(
372        &self,
373        path: &str,
374        extra_headers: &[(&str, &str)],
375    ) -> Result<(), NifiError> {
376        self.with_retry(|| async {
377            let req = self
378                .build_request(&Method::POST, path, self.http.post(self.api_url(path)))
379                .await;
380            let req = apply_extra_headers(req, extra_headers);
381            let resp = req.send().await.context(HttpSnafu)?;
382            Self::check_void(&Method::POST, path, resp).await
383        })
384        .await
385    }
386
387    /// POST with a JSON body; ignores the response body.
388    ///
389    /// Kept available for forward compatibility — the emitter dispatch table at
390    /// `emit::method` references this helper for the `(POST, Json body, Empty response)`
391    /// combination, but no current NiFi 2.x spec triggers that path.
392    #[allow(dead_code)]
393    #[tracing::instrument(skip(self, body), fields(request_id = tracing::field::Empty))]
394    pub(crate) async fn post_void<B: serde::Serialize>(
395        &self,
396        path: &str,
397        extra_headers: &[(&str, &str)],
398        body: &B,
399    ) -> Result<(), NifiError> {
400        self.with_retry(|| async {
401            let req = self
402                .build_request(&Method::POST, path, self.http.post(self.api_url(path)))
403                .await;
404            let req = apply_extra_headers(req, extra_headers);
405            let resp = req.json(body).send().await.context(HttpSnafu)?;
406            Self::check_void(&Method::POST, path, resp).await
407        })
408        .await
409    }
410
411    /// PUT with no request body; deserializes the JSON response.
412    #[tracing::instrument(skip(self), fields(request_id = tracing::field::Empty))]
413    pub(crate) async fn put_no_body<T: DeserializeOwned>(
414        &self,
415        path: &str,
416        extra_headers: &[(&str, &str)],
417    ) -> Result<T, NifiError> {
418        self.with_retry(|| async {
419            let req = self
420                .build_request(&Method::PUT, path, self.http.put(self.api_url(path)))
421                .await;
422            let req = apply_extra_headers(req, extra_headers);
423            let resp = req.send().await.context(HttpSnafu)?;
424            Self::deserialize(&Method::PUT, path, resp).await
425        })
426        .await
427    }
428
429    /// PUT with a JSON body; ignores the response body.
430    ///
431    /// Kept available for forward compatibility — the emitter dispatch table at
432    /// `emit::method` references this helper for the `(PUT, Json body, Empty response)`
433    /// combination, but no current NiFi 2.x spec triggers that path.
434    #[allow(dead_code)]
435    #[tracing::instrument(skip(self, body), fields(request_id = tracing::field::Empty))]
436    pub(crate) async fn put_void<B: serde::Serialize>(
437        &self,
438        path: &str,
439        extra_headers: &[(&str, &str)],
440        body: &B,
441    ) -> Result<(), NifiError> {
442        self.with_retry(|| async {
443            let req = self
444                .build_request(&Method::PUT, path, self.http.put(self.api_url(path)))
445                .await;
446            let req = apply_extra_headers(req, extra_headers);
447            let resp = req.json(body).send().await.context(HttpSnafu)?;
448            Self::check_void(&Method::PUT, path, resp).await
449        })
450        .await
451    }
452
453    /// PUT with no request body; ignores the response body.
454    ///
455    /// Kept available for forward compatibility — the emitter dispatch table at
456    /// `emit::method` references this helper for the `(PUT, no body, Empty response)`
457    /// combination, but no current NiFi 2.x spec triggers that path.
458    #[allow(dead_code)]
459    #[tracing::instrument(skip(self), fields(request_id = tracing::field::Empty))]
460    pub(crate) async fn put_void_no_body(
461        &self,
462        path: &str,
463        extra_headers: &[(&str, &str)],
464    ) -> Result<(), NifiError> {
465        self.with_retry(|| async {
466            let req = self
467                .build_request(&Method::PUT, path, self.http.put(self.api_url(path)))
468                .await;
469            let req = apply_extra_headers(req, extra_headers);
470            let resp = req.send().await.context(HttpSnafu)?;
471            Self::check_void(&Method::PUT, path, resp).await
472        })
473        .await
474    }
475
476    /// POST with `application/octet-stream` body.
477    ///
478    /// Used for binary upload endpoints (e.g. NAR upload).
479    /// Pass `("Filename", name)` in `extra_headers` when the endpoint
480    /// requires a filename header.
481    #[tracing::instrument(skip(self, data), fields(request_id = tracing::field::Empty))]
482    pub(crate) async fn post_octet_stream<T: DeserializeOwned>(
483        &self,
484        path: &str,
485        extra_headers: &[(&str, &str)],
486        data: bytes::Bytes,
487    ) -> Result<T, NifiError> {
488        self.with_retry(|| async {
489            let req = self
490                .build_request(&Method::POST, path, self.http.post(self.api_url(path)))
491                .await
492                .header(CONTENT_TYPE, APPLICATION_OCTET_STREAM)
493                .body(data.clone());
494            let req = apply_extra_headers(req, extra_headers);
495            let resp = req.send().await.context(HttpSnafu)?;
496            Self::deserialize(&Method::POST, path, resp).await
497        })
498        .await
499    }
500
501    /// POST with `multipart/form-data` body.
502    ///
503    /// Used for file-upload endpoints such as
504    /// `POST /process-groups/{id}/process-groups/upload`. Sends a single form
505    /// part named `"file"` carrying the given filename and raw bytes. The
506    /// `Content-Type` header (including the generated boundary) is set by
507    /// reqwest when `.multipart(form)` is called.
508    #[allow(dead_code)]
509    #[tracing::instrument(skip(self, data), fields(request_id = tracing::field::Empty))]
510    pub(crate) async fn post_multipart<T: DeserializeOwned>(
511        &self,
512        path: &str,
513        extra_headers: &[(&str, &str)],
514        filename: &str,
515        data: bytes::Bytes,
516    ) -> Result<T, NifiError> {
517        self.with_retry(|| async {
518            let req = self
519                .build_request(&Method::POST, path, self.http.post(self.api_url(path)))
520                .await;
521            let req = apply_extra_headers(req, extra_headers);
522            let len = data.len() as u64;
523            let part = reqwest::multipart::Part::stream_with_length(data.clone(), len)
524                .file_name(filename.to_string());
525            let form = reqwest::multipart::Form::new().part("file", part);
526            let resp = req.multipart(form).send().await.context(HttpSnafu)?;
527            Self::deserialize(&Method::POST, path, resp).await
528        })
529        .await
530    }
531
532    /// POST `multipart/form-data` with additional text fields alongside
533    /// the `file` part.
534    ///
535    /// Used by endpoints whose multipart schema declares required text
536    /// properties beyond the file part — for example
537    /// `POST /process-groups/{id}/process-groups/upload` requires
538    /// `clientId`, `groupName`, `positionX`, and `positionY`.
539    ///
540    /// Each `(name, value)` tuple is emitted as a `form.text(...)` part
541    /// before the `file` part. The order mirrors the slice order; the
542    /// generator emits it in alphabetic order by wire name for
543    /// determinism across regenerations.
544    #[tracing::instrument(
545        skip(self, text_fields, data),
546        fields(request_id = tracing::field::Empty)
547    )]
548    pub(crate) async fn post_multipart_with_fields<T: DeserializeOwned>(
549        &self,
550        path: &str,
551        extra_headers: &[(&str, &str)],
552        text_fields: &[(&str, String)],
553        filename: &str,
554        data: bytes::Bytes,
555    ) -> Result<T, NifiError> {
556        self.with_retry(|| async {
557            let req = self
558                .build_request(&Method::POST, path, self.http.post(self.api_url(path)))
559                .await;
560            let req = apply_extra_headers(req, extra_headers);
561            let mut form = reqwest::multipart::Form::new();
562            for (name, value) in text_fields {
563                form = form.text((*name).to_string(), value.clone());
564            }
565            let len = data.len() as u64;
566            let part = reqwest::multipart::Part::stream_with_length(data.clone(), len)
567                .file_name(filename.to_string());
568            form = form.part("file", part);
569            let resp = req.multipart(form).send().await.context(HttpSnafu)?;
570            Self::deserialize(&Method::POST, path, resp).await
571        })
572        .await
573    }
574
575    /// GET that ignores the response body (for endpoints with no JSON response).
576    ///
577    /// Treats 302 as success in addition to 2xx: NiFi's `GET /access/logout/complete`
578    /// responds with a redirect once the logout is complete.
579    ///
580    /// Called from generated static-emitter code; clippy cannot see those
581    /// call sites, and dynamic-only builds skip per-version emission so the
582    /// helper appears unused there.
583    #[allow(dead_code)]
584    #[tracing::instrument(skip(self), fields(request_id = tracing::field::Empty))]
585    pub(crate) async fn get_void(
586        &self,
587        path: &str,
588        extra_headers: &[(&str, &str)],
589    ) -> Result<(), NifiError> {
590        self.with_retry(|| async {
591            let req = self
592                .build_request(&Method::GET, path, self.http.get(self.api_url(path)))
593                .await;
594            let req = apply_extra_headers(req, extra_headers);
595            let resp = req.send().await.context(HttpSnafu)?;
596            Self::check_void_with_redirect(&Method::GET, path, resp).await
597        })
598        .await
599    }
600
601    /// GET with query parameters; ignores the response body.
602    ///
603    /// Called by both the static per-version emitter (for GET endpoints
604    /// with query params and an empty response) and the dynamic canonical
605    /// emitter. No current NiFi 2.x spec triggers the static path, so
606    /// this helper is only reached via generated code in `$OUT_DIR` that
607    /// clippy's dead-code lint cannot see. Kept available via
608    /// `#[allow(dead_code)]` rather than deleted.
609    #[allow(dead_code)]
610    #[tracing::instrument(skip(self, query), fields(request_id = tracing::field::Empty))]
611    pub(crate) async fn get_void_with_query(
612        &self,
613        path: &str,
614        extra_headers: &[(&str, &str)],
615        query: &[(&str, String)],
616    ) -> Result<(), NifiError> {
617        self.with_retry(|| async {
618            let req = self
619                .build_request(
620                    &Method::GET,
621                    path,
622                    self.http.get(self.api_url(path)).query(query),
623                )
624                .await;
625            let req = apply_extra_headers(req, extra_headers);
626            let resp = req.send().await.context(HttpSnafu)?;
627            Self::check_void_with_redirect(&Method::GET, path, resp).await
628        })
629        .await
630    }
631
632    #[tracing::instrument(skip(self, query), fields(request_id = tracing::field::Empty))]
633    pub(crate) async fn get_with_query<T: DeserializeOwned>(
634        &self,
635        path: &str,
636        extra_headers: &[(&str, &str)],
637        query: &[(&str, String)],
638    ) -> Result<T, NifiError> {
639        self.with_retry(|| async {
640            let req = self
641                .build_request(
642                    &Method::GET,
643                    path,
644                    self.http.get(self.api_url(path)).query(query),
645                )
646                .await;
647            let req = apply_extra_headers(req, extra_headers);
648            let resp = req.send().await.context(HttpSnafu)?;
649            Self::deserialize(&Method::GET, path, resp).await
650        })
651        .await
652    }
653
654    /// GET returning raw text (`text/plain`).
655    #[tracing::instrument(skip(self), fields(request_id = tracing::field::Empty))]
656    pub(crate) async fn get_text(
657        &self,
658        path: &str,
659        extra_headers: &[(&str, &str)],
660    ) -> Result<String, NifiError> {
661        self.with_retry(|| async {
662            let req = self
663                .build_request(&Method::GET, path, self.http.get(self.api_url(path)))
664                .await;
665            let req = apply_extra_headers(req, extra_headers);
666            let resp = req.send().await.context(HttpSnafu)?;
667            Self::text(&Method::GET, path, resp).await
668        })
669        .await
670    }
671
672    /// GET returning raw bytes (`application/octet-stream` or `*/*`).
673    #[tracing::instrument(skip(self), fields(request_id = tracing::field::Empty))]
674    pub(crate) async fn get_bytes(
675        &self,
676        path: &str,
677        extra_headers: &[(&str, &str)],
678    ) -> Result<Vec<u8>, NifiError> {
679        self.with_retry(|| async {
680            let req = self
681                .build_request(&Method::GET, path, self.http.get(self.api_url(path)))
682                .await;
683            let req = apply_extra_headers(req, extra_headers);
684            let resp = req.send().await.context(HttpSnafu)?;
685            Self::bytes(&Method::GET, path, resp).await
686        })
687        .await
688    }
689
690    /// GET with query parameters returning raw bytes.
691    #[tracing::instrument(skip(self, query), fields(request_id = tracing::field::Empty))]
692    pub(crate) async fn get_bytes_with_query(
693        &self,
694        path: &str,
695        extra_headers: &[(&str, &str)],
696        query: &[(&str, String)],
697    ) -> Result<Vec<u8>, NifiError> {
698        self.with_retry(|| async {
699            let req = self
700                .build_request(
701                    &Method::GET,
702                    path,
703                    self.http.get(self.api_url(path)).query(query),
704                )
705                .await;
706            let req = apply_extra_headers(req, extra_headers);
707            let resp = req.send().await.context(HttpSnafu)?;
708            Self::bytes(&Method::GET, path, resp).await
709        })
710        .await
711    }
712
713    /// GET returning a stream of body chunks.
714    ///
715    /// The initial request is subject to the configured `AuthProvider`
716    /// (401→re-auth-once) and `RetryPolicy`. Once the response has been
717    /// accepted (status-line read, 2xx/206), the stream takes over and
718    /// transport errors during body delivery propagate directly to the
719    /// caller — they are not retried.
720    #[tracing::instrument(skip(self), fields(request_id = tracing::field::Empty))]
721    pub(crate) async fn get_bytes_stream(
722        &self,
723        path: &str,
724        extra_headers: &[(&str, &str)],
725    ) -> Result<crate::BytesStream, NifiError> {
726        self.with_retry(|| async {
727            let req = self
728                .build_request(&Method::GET, path, self.http.get(self.api_url(path)))
729                .await;
730            let req = apply_extra_headers(req, extra_headers);
731            let resp = req.send().await.context(HttpSnafu)?;
732            Self::bytes_stream(&Method::GET, path, resp).await
733        })
734        .await
735    }
736
737    /// GET with query parameters returning a stream of body chunks.
738    /// See [`Self::get_bytes_stream`] for retry/streaming semantics.
739    #[tracing::instrument(skip(self, query), fields(request_id = tracing::field::Empty))]
740    pub(crate) async fn get_bytes_stream_with_query(
741        &self,
742        path: &str,
743        extra_headers: &[(&str, &str)],
744        query: &[(&str, String)],
745    ) -> Result<crate::BytesStream, NifiError> {
746        self.with_retry(|| async {
747            let req = self
748                .build_request(
749                    &Method::GET,
750                    path,
751                    self.http.get(self.api_url(path)).query(query),
752                )
753                .await;
754            let req = apply_extra_headers(req, extra_headers);
755            let resp = req.send().await.context(HttpSnafu)?;
756            Self::bytes_stream(&Method::GET, path, resp).await
757        })
758        .await
759    }
760
761    /// POST with `application/octet-stream` body; ignores the response body.
762    ///
763    /// Kept available for forward compatibility — the emitter dispatch table at
764    /// `emit::method` references this helper for the `(POST, OctetStream body, Empty response)`
765    /// combination, but no current NiFi 2.x spec triggers that path.
766    #[allow(dead_code)]
767    #[tracing::instrument(skip(self, data), fields(request_id = tracing::field::Empty))]
768    pub(crate) async fn post_void_octet_stream(
769        &self,
770        path: &str,
771        extra_headers: &[(&str, &str)],
772        data: bytes::Bytes,
773    ) -> Result<(), NifiError> {
774        self.with_retry(|| async {
775            let req = self
776                .build_request(&Method::POST, path, self.http.post(self.api_url(path)))
777                .await
778                .header(CONTENT_TYPE, APPLICATION_OCTET_STREAM)
779                .body(data.clone());
780            let req = apply_extra_headers(req, extra_headers);
781            let resp = req.send().await.context(HttpSnafu)?;
782            Self::check_void(&Method::POST, path, resp).await
783        })
784        .await
785    }
786
787    /// POST with `multipart/form-data` body; ignores the response body.
788    ///
789    /// Kept available for forward compatibility — the emitter dispatch table at
790    /// `emit::method` references this helper for the `(POST, Multipart body, Empty response)`
791    /// combination, but no current NiFi 2.x spec triggers that path.
792    #[allow(dead_code)]
793    #[tracing::instrument(skip(self, data), fields(request_id = tracing::field::Empty))]
794    pub(crate) async fn post_void_multipart(
795        &self,
796        path: &str,
797        extra_headers: &[(&str, &str)],
798        filename: &str,
799        data: bytes::Bytes,
800    ) -> Result<(), NifiError> {
801        self.with_retry(|| async {
802            let req = self
803                .build_request(&Method::POST, path, self.http.post(self.api_url(path)))
804                .await;
805            let req = apply_extra_headers(req, extra_headers);
806            let len = data.len() as u64;
807            let part = reqwest::multipart::Part::stream_with_length(data.clone(), len)
808                .file_name(filename.to_string());
809            let form = reqwest::multipart::Form::new().part("file", part);
810            let resp = req.multipart(form).send().await.context(HttpSnafu)?;
811            Self::check_void(&Method::POST, path, resp).await
812        })
813        .await
814    }
815
816    /// POST a JSON body and return the `text/plain` response body.
817    ///
818    /// Called from generated static-emitter code; clippy cannot see those
819    /// call sites, and dynamic-only builds skip per-version emission so the
820    /// helper appears unused there.
821    #[allow(dead_code)]
822    #[tracing::instrument(skip(self, body), fields(request_id = tracing::field::Empty))]
823    pub(crate) async fn post_returning_text<B: serde::Serialize>(
824        &self,
825        path: &str,
826        extra_headers: &[(&str, &str)],
827        body: &B,
828    ) -> Result<String, NifiError> {
829        self.with_retry(|| async {
830            let req = self
831                .build_request(&Method::POST, path, self.http.post(self.api_url(path)))
832                .await;
833            let req = apply_extra_headers(req, extra_headers);
834            let resp = req.json(body).send().await.context(HttpSnafu)?;
835            Self::text(&Method::POST, path, resp).await
836        })
837        .await
838    }
839
840    /// POST an `application/octet-stream` body and return the `text/plain` response body.
841    ///
842    /// Called from generated static-emitter code; clippy cannot see those
843    /// call sites, and dynamic-only builds skip per-version emission so the
844    /// helper appears unused there.
845    #[allow(dead_code)]
846    #[tracing::instrument(skip(self, data), fields(request_id = tracing::field::Empty))]
847    pub(crate) async fn post_octet_stream_returning_text(
848        &self,
849        path: &str,
850        extra_headers: &[(&str, &str)],
851        data: bytes::Bytes,
852    ) -> Result<String, NifiError> {
853        self.with_retry(|| async {
854            let req = self
855                .build_request(&Method::POST, path, self.http.post(self.api_url(path)))
856                .await
857                .header(CONTENT_TYPE, APPLICATION_OCTET_STREAM)
858                .body(data.clone());
859            let req = apply_extra_headers(req, extra_headers);
860            let resp = req.send().await.context(HttpSnafu)?;
861            Self::text(&Method::POST, path, resp).await
862        })
863        .await
864    }
865
866    #[tracing::instrument(skip(self, query), fields(request_id = tracing::field::Empty))]
867    pub(crate) async fn delete_returning_with_query<T: DeserializeOwned>(
868        &self,
869        path: &str,
870        extra_headers: &[(&str, &str)],
871        query: &[(&str, String)],
872    ) -> Result<T, NifiError> {
873        self.with_retry(|| async {
874            let req = self
875                .build_request(
876                    &Method::DELETE,
877                    path,
878                    self.http.delete(self.api_url(path)).query(query),
879                )
880                .await;
881            let req = apply_extra_headers(req, extra_headers);
882            let resp = req.send().await.context(HttpSnafu)?;
883            Self::deserialize(&Method::DELETE, path, resp).await
884        })
885        .await
886    }
887
888    #[tracing::instrument(skip(self, query), fields(request_id = tracing::field::Empty))]
889    pub(crate) async fn delete_with_query(
890        &self,
891        path: &str,
892        extra_headers: &[(&str, &str)],
893        query: &[(&str, String)],
894    ) -> Result<(), NifiError> {
895        self.with_retry(|| async {
896            let req = self
897                .build_request(
898                    &Method::DELETE,
899                    path,
900                    self.http.delete(self.api_url(path)).query(query),
901                )
902                .await;
903            let req = apply_extra_headers(req, extra_headers);
904            let resp = req.send().await.context(HttpSnafu)?;
905            Self::check_void(&Method::DELETE, path, resp).await
906        })
907        .await
908    }
909
910    /// POST with a JSON body and query parameters; ignores the response body.
911    ///
912    /// Kept available for forward compatibility — the emitter dispatch table at
913    /// `emit::method` references this helper for the `(POST, any body, Empty response, query params)`
914    /// combination, but no current NiFi 2.x spec triggers that path.
915    #[allow(dead_code)]
916    #[tracing::instrument(skip(self, body, query), fields(request_id = tracing::field::Empty))]
917    pub(crate) async fn post_void_with_query<B: serde::Serialize>(
918        &self,
919        path: &str,
920        extra_headers: &[(&str, &str)],
921        body: &B,
922        query: &[(&str, String)],
923    ) -> Result<(), NifiError> {
924        self.with_retry(|| async {
925            let req = self
926                .build_request(
927                    &Method::POST,
928                    path,
929                    self.http.post(self.api_url(path)).query(query),
930                )
931                .await;
932            let req = apply_extra_headers(req, extra_headers);
933            let resp = req.json(body).send().await.context(HttpSnafu)?;
934            Self::check_void(&Method::POST, path, resp).await
935        })
936        .await
937    }
938
939    #[tracing::instrument(skip(self, body, query), fields(request_id = tracing::field::Empty))]
940    pub(crate) async fn post_with_query<B, T>(
941        &self,
942        path: &str,
943        extra_headers: &[(&str, &str)],
944        body: &B,
945        query: &[(&str, String)],
946    ) -> Result<T, NifiError>
947    where
948        B: serde::Serialize,
949        T: DeserializeOwned,
950    {
951        self.with_retry(|| async {
952            let req = self
953                .build_request(
954                    &Method::POST,
955                    path,
956                    self.http.post(self.api_url(path)).query(query),
957                )
958                .await;
959            let req = apply_extra_headers(req, extra_headers);
960            let resp = req.json(body).send().await.context(HttpSnafu)?;
961            Self::deserialize(&Method::POST, path, resp).await
962        })
963        .await
964    }
965
966    /// PUT with a JSON body and query parameters; deserializes the JSON response.
967    ///
968    /// Kept available for forward compatibility — the emitter dispatch table at
969    /// `emit::method` references this helper for the `(PUT, Json body, Non-empty response, query params)`
970    /// combination, but no current NiFi 2.x spec triggers that path.
971    #[allow(dead_code)]
972    #[tracing::instrument(skip(self, body, query), fields(request_id = tracing::field::Empty))]
973    pub(crate) async fn put_with_query<B, T>(
974        &self,
975        path: &str,
976        extra_headers: &[(&str, &str)],
977        body: &B,
978        query: &[(&str, String)],
979    ) -> Result<T, NifiError>
980    where
981        B: serde::Serialize,
982        T: DeserializeOwned,
983    {
984        self.with_retry(|| async {
985            let req = self
986                .build_request(
987                    &Method::PUT,
988                    path,
989                    self.http.put(self.api_url(path)).query(query),
990                )
991                .await;
992            let req = apply_extra_headers(req, extra_headers);
993            let resp = req.json(body).send().await.context(HttpSnafu)?;
994            Self::deserialize(&Method::PUT, path, resp).await
995        })
996        .await
997    }
998
999    #[tracing::instrument(skip(self), fields(request_id = tracing::field::Empty))]
1000    pub(crate) async fn delete_returning<T: DeserializeOwned>(
1001        &self,
1002        path: &str,
1003        extra_headers: &[(&str, &str)],
1004    ) -> Result<T, NifiError> {
1005        self.with_retry(|| async {
1006            let req = self
1007                .build_request(&Method::DELETE, path, self.http.delete(self.api_url(path)))
1008                .await;
1009            let req = apply_extra_headers(req, extra_headers);
1010            let resp = req.send().await.context(HttpSnafu)?;
1011            Self::deserialize(&Method::DELETE, path, resp).await
1012        })
1013        .await
1014    }
1015
1016    #[tracing::instrument(skip(self), fields(request_id = tracing::field::Empty))]
1017    pub(crate) async fn delete(
1018        &self,
1019        path: &str,
1020        extra_headers: &[(&str, &str)],
1021    ) -> Result<(), NifiError> {
1022        self.with_retry(|| async {
1023            let req = self
1024                .build_request(&Method::DELETE, path, self.http.delete(self.api_url(path)))
1025                .await;
1026            let req = apply_extra_headers(req, extra_headers);
1027            let resp = req.send().await.context(HttpSnafu)?;
1028            Self::check_void(&Method::DELETE, path, resp).await
1029        })
1030        .await
1031    }
1032
1033    /// Inner delete without auth retry, used by `logout` to avoid retrying
1034    /// the logout call itself.
1035    async fn delete_inner(&self, path: &str) -> Result<(), NifiError> {
1036        let resp = self
1037            .build_request(&Method::DELETE, path, self.http.delete(self.api_url(path)))
1038            .await
1039            .send()
1040            .await
1041            .context(HttpSnafu)?;
1042        Self::check_void(&Method::DELETE, path, resp).await
1043    }
1044
1045    /// Attach a fresh UUIDv4 to the request as the configured request-id
1046    /// header AND record it on the current tracing span. No-op if the client
1047    /// has not been configured with `request_id_header`.
1048    ///
1049    /// Called from `build_request` (which covers every HTTP helper including
1050    /// `delete_inner`) and directly from `login`, which bypasses
1051    /// `build_request` because it runs pre-authentication with a form-encoded
1052    /// body.
1053    fn apply_request_id(&self, req: reqwest::RequestBuilder) -> reqwest::RequestBuilder {
1054        let Some(header) = self.request_id_header.as_deref() else {
1055            return req;
1056        };
1057        let id = uuid::Uuid::new_v4().to_string();
1058        tracing::Span::current().record("request_id", id.as_str());
1059        req.header(header, id)
1060    }
1061
1062    /// Apply the auth header, proxied-entities chain, and per-request debug
1063    /// log to a `RequestBuilder`. All HTTP helpers route through this method so
1064    /// the plumbing lives in exactly one place.
1065    async fn build_request(
1066        &self,
1067        method: &Method,
1068        path: &str,
1069        req: reqwest::RequestBuilder,
1070    ) -> reqwest::RequestBuilder {
1071        let req = self.apply_request_id(req);
1072        tracing::debug!(method = %method, path, "NiFi API request");
1073
1074        let guard = self.token.read().await;
1075        let mut req = match guard.as_deref() {
1076            Some(token) => req.bearer_auth(token.as_str()),
1077            None => {
1078                tracing::warn!(
1079                    "sending NiFi API request without a bearer token — call login() first"
1080                );
1081                req
1082            }
1083        };
1084        if let Some(chain) = &self.proxied_entities_chain {
1085            req = req.header(PROXIED_ENTITIES_CHAIN, chain);
1086        }
1087        req
1088    }
1089
1090    async fn deserialize<T: DeserializeOwned>(
1091        method: &Method,
1092        path: &str,
1093        resp: reqwest::Response,
1094    ) -> Result<T, NifiError> {
1095        let resp = handle_response_status(method, path, resp).await?;
1096        resp.json::<T>().await.context(HttpSnafu)
1097    }
1098
1099    /// Check a void response (no JSON body expected). Returns `Ok(())` on success,
1100    /// or the appropriate error.
1101    async fn check_void(
1102        method: &Method,
1103        path: &str,
1104        resp: reqwest::Response,
1105    ) -> Result<(), NifiError> {
1106        handle_response_status(method, path, resp).await?;
1107        Ok(())
1108    }
1109
1110    /// Read a raw `text/plain` (or equivalent) response body as a `String`.
1111    async fn text(
1112        method: &Method,
1113        path: &str,
1114        resp: reqwest::Response,
1115    ) -> Result<String, NifiError> {
1116        let resp = handle_response_status(method, path, resp).await?;
1117        resp.text().await.context(HttpSnafu)
1118    }
1119
1120    /// Read a raw `application/octet-stream` (or equivalent) response body as bytes.
1121    async fn bytes(
1122        method: &Method,
1123        path: &str,
1124        resp: reqwest::Response,
1125    ) -> Result<Vec<u8>, NifiError> {
1126        let resp = handle_response_status(method, path, resp).await?;
1127        let b = resp.bytes().await.context(HttpSnafu)?;
1128        Ok(b.to_vec())
1129    }
1130
1131    /// Turn a successful `application/octet-stream` (or `*/*`) response into
1132    /// a [`crate::BytesStream`]. Non-2xx status codes are converted into the
1133    /// same `NifiError` shape that [`Self::bytes`] produces.
1134    async fn bytes_stream(
1135        method: &Method,
1136        path: &str,
1137        resp: reqwest::Response,
1138    ) -> Result<crate::BytesStream, NifiError> {
1139        use futures_util::TryStreamExt;
1140        let resp = handle_response_status(method, path, resp).await?;
1141        let s = resp
1142            .bytes_stream()
1143            .map_err(|source| NifiError::Http { source });
1144        Ok(Box::pin(s))
1145    }
1146
1147    /// Like `check_void`, but also treats 302 as success.
1148    ///
1149    /// Does NOT delegate to [`handle_response_status`] because its
1150    /// success predicate also admits `StatusCode::FOUND` (302). Keeping
1151    /// the redirect semantics out of the shared helper means
1152    /// [`handle_response_status`] stays a plain "2xx-or-error" gate.
1153    async fn check_void_with_redirect(
1154        method: &Method,
1155        path: &str,
1156        resp: reqwest::Response,
1157    ) -> Result<(), NifiError> {
1158        let status = resp.status();
1159        tracing::debug!(method = %method, path, status = status.as_u16(), "NiFi API response");
1160        if status.is_success() || status == StatusCode::FOUND {
1161            return Ok(());
1162        }
1163        let body = resp.text().await.unwrap_or_else(|_| status.to_string());
1164        tracing::debug!(method = %method, path, status = status.as_u16(), %body, "NiFi API raw error body");
1165        let message = extract_error_message(&body);
1166        tracing::warn!(method = %method, path, status = status.as_u16(), %message, "NiFi API error");
1167        Err(crate::error::api_error(status.as_u16(), message))
1168    }
1169
1170    pub(crate) fn api_url(&self, path: &str) -> Url {
1171        let mut url = self.base_url.clone();
1172        url.set_path(&format!("/nifi-api{path}"));
1173        url
1174    }
1175}
1176
1177/// Shared preamble for response helpers.
1178///
1179/// Emits the single `tracing::debug!` response line every helper used
1180/// to emit inline, and on non-2xx statuses consumes the body, logs the
1181/// raw body at `debug!` plus the extracted message at `warn!`, and
1182/// returns `Err(NifiError::Api)`. On 2xx the response is handed back to
1183/// the caller so it can read the body however it needs (json / text /
1184/// bytes / stream).
1185///
1186/// [`NifiClient::check_void_with_redirect`] deliberately does NOT
1187/// delegate here — its success predicate admits `StatusCode::FOUND`
1188/// (302) in addition to 2xx, and folding that branch into the shared
1189/// helper would leak redirect semantics into every caller.
1190async fn handle_response_status(
1191    method: &Method,
1192    path: &str,
1193    resp: reqwest::Response,
1194) -> Result<reqwest::Response, NifiError> {
1195    let status = resp.status();
1196    tracing::debug!(method = %method, path, status = status.as_u16(), "NiFi API response");
1197    if status.is_success() {
1198        return Ok(resp);
1199    }
1200    let body = resp.text().await.unwrap_or_else(|_| status.to_string());
1201    tracing::debug!(method = %method, path, status = status.as_u16(), %body, "NiFi API raw error body");
1202    let message = extract_error_message(&body);
1203    tracing::warn!(method = %method, path, status = status.as_u16(), %message, "NiFi API error");
1204    Err(crate::error::api_error(status.as_u16(), message))
1205}
1206
1207/// Apply a fold of `(name, value)` header pairs to a `RequestBuilder`.
1208/// Kept as a free helper so HTTP methods stay one-liners after the refactor.
1209fn apply_extra_headers(
1210    mut req: reqwest::RequestBuilder,
1211    extra: &[(&str, &str)],
1212) -> reqwest::RequestBuilder {
1213    for (name, value) in extra {
1214        req = req.header(*name, *value);
1215    }
1216    req
1217}
1218
1219/// Extract a human-readable message from a NiFi error response body.
1220///
1221/// NiFi returns either a JSON object with a `"message"` field or plain text.
1222/// Logs the raw body at `debug` level before extracting.
1223pub fn extract_error_message(body: &str) -> String {
1224    serde_json::from_str::<serde_json::Value>(body)
1225        .ok()
1226        .and_then(|v| v["message"].as_str().map(str::to_owned))
1227        .unwrap_or_else(|| body.to_owned())
1228}
1229
1230#[cfg(test)]
1231mod tests {
1232    /// Pins the invariant the upload helpers rely on: cloning a `bytes::Bytes`
1233    /// handle is a refcount bump, not a heap allocation. The retry closures in
1234    /// `post_octet_stream`, `post_void_octet_stream`, `post_multipart`,
1235    /// `post_void_multipart`, `post_multipart_with_fields`, and
1236    /// `post_octet_stream_returning_text` call `data.clone()` on every attempt;
1237    /// if someone ever switches the parameter back to `Vec<u8>`, a multi-GB
1238    /// NAR upload would reallocate the entire buffer on each retry. This test
1239    /// fails loudly in that case.
1240    #[test]
1241    fn bytes_clone_is_refcount_only() {
1242        use bytes::Bytes;
1243        let data = Bytes::from(vec![0u8; 1024]);
1244        let before = data.len();
1245        let clone1 = data.clone();
1246        let clone2 = data.clone();
1247        assert_eq!(clone1.len(), before);
1248        assert_eq!(clone2.len(), before);
1249        assert_eq!(
1250            data.as_ptr(),
1251            clone1.as_ptr(),
1252            "Bytes::clone should share buffer"
1253        );
1254        assert_eq!(
1255            data.as_ptr(),
1256            clone2.as_ptr(),
1257            "Bytes::clone should share buffer"
1258        );
1259    }
1260}