Skip to main content

nifi_rust_client/
client.rs

1#![deny(missing_docs)]
2use std::sync::Arc;
3
4use reqwest::Client;
5use serde::de::DeserializeOwned;
6use snafu::ResultExt as _;
7use tokio::sync::RwLock;
8use url::Url;
9
10use crate::NifiError;
11use crate::config::credentials::CredentialProvider;
12use crate::error::{AuthSnafu, HttpSnafu};
13
14/// Client for the Apache NiFi REST API.
15pub struct NifiClient {
16    base_url: Url,
17    http: Client,
18    token: Arc<RwLock<Option<String>>>,
19    credentials: Option<Arc<dyn CredentialProvider>>,
20    #[allow(dead_code)]
21    retry_policy: Option<crate::config::retry::RetryPolicy>,
22}
23
24impl Clone for NifiClient {
25    fn clone(&self) -> Self {
26        Self {
27            base_url: self.base_url.clone(),
28            http: self.http.clone(),
29            token: Arc::clone(&self.token),
30            credentials: self.credentials.clone(),
31            retry_policy: self.retry_policy.clone(),
32        }
33    }
34}
35
36impl std::fmt::Debug for NifiClient {
37    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
38        f.debug_struct("NifiClient")
39            .field("base_url", &self.base_url)
40            .field(
41                "credentials",
42                &self.credentials.as_ref().map(|c| format!("{c:?}")),
43            )
44            .field("retry_policy", &self.retry_policy)
45            .finish_non_exhaustive()
46    }
47}
48
49impl NifiClient {
50    /// Construct a client from pre-built parts. Used by [`crate::NifiClientBuilder`].
51    pub(crate) fn from_parts(
52        base_url: Url,
53        http: Client,
54        credentials: Option<Arc<dyn CredentialProvider>>,
55        retry_policy: Option<crate::config::retry::RetryPolicy>,
56    ) -> Self {
57        Self {
58            base_url,
59            http,
60            token: Arc::new(RwLock::new(None)),
61            credentials,
62            retry_policy,
63        }
64    }
65
66    /// Return the current bearer token, if one has been set.
67    ///
68    /// The token is a NiFi-issued JWT. You can persist it between process restarts
69    /// and restore it with [`set_token`][Self::set_token] to avoid re-authenticating.
70    /// The token will eventually expire (NiFi default: 12 hours); when it does, any
71    /// API call returns [`NifiError::Unauthorized`]. Re-call
72    /// [`login`][Self::login] to obtain a fresh token.
73    pub async fn token(&self) -> Option<String> {
74        self.token.read().await.clone()
75    }
76
77    /// Restore a previously obtained bearer token.
78    ///
79    /// Useful for CLI tools that persist the token in a file between sessions.
80    /// If the token has expired, the next API call will return
81    /// [`NifiError::Unauthorized`]; re-call [`login`][Self::login]
82    /// to obtain a fresh one.
83    pub async fn set_token(&self, token: String) {
84        *self.token.write().await = Some(token);
85    }
86
87    /// Invalidate the current bearer token and clear it from the client.
88    ///
89    /// Sends `DELETE /nifi-api/access/logout` to invalidate the token server-side,
90    /// then clears the local token unconditionally so that subsequent requests are
91    /// not sent with a stale credential.
92    ///
93    /// If the server returns an error (e.g. `401` because the token had already
94    /// expired) the local token is still cleared and the error is returned to the
95    /// caller.
96    pub async fn logout(&self) -> Result<(), NifiError> {
97        let result = self.delete_inner("/access/logout").await;
98        *self.token.write().await = None;
99        if result.is_ok() {
100            tracing::info!("NiFi logout successful");
101        }
102        result
103    }
104
105    /// Authenticate with NiFi using single-user credentials.
106    ///
107    /// Obtains a JWT token from `/nifi-api/access/token` and stores it on the
108    /// client for all subsequent requests.
109    ///
110    /// # Token lifetime and expiry
111    ///
112    /// NiFi JWTs expire after 12 hours by default (configurable server-side via
113    /// `nifi.security.user.login.identity.provider.expiration`). Once expired,
114    /// any API call returns [`NifiError::Unauthorized`]. Configure a
115    /// [`CredentialProvider`] on the builder to enable
116    /// automatic token refresh on 401 responses.
117    pub async fn login(&self, username: &str, password: &str) -> Result<(), NifiError> {
118        tracing::debug!(method = "POST", path = "/access/token", "NiFi API request");
119        let url = self.api_url("/access/token");
120        let resp = self
121            .http
122            .post(url)
123            .form(&[("username", username), ("password", password)])
124            .send()
125            .await
126            .context(HttpSnafu)?;
127
128        let status = resp.status();
129        tracing::debug!(
130            method = "POST",
131            path = "/access/token",
132            status = status.as_u16(),
133            "NiFi API response"
134        );
135        if !status.is_success() {
136            let body = resp.text().await.unwrap_or_else(|_| status.to_string());
137            tracing::debug!(
138                method = "POST",
139                path = "/access/token",
140                status = status.as_u16(),
141                %body,
142                "NiFi API raw error body"
143            );
144            let message = extract_error_message(&body);
145            tracing::warn!(
146                method = "POST",
147                path = "/access/token",
148                status = status.as_u16(),
149                %message,
150                "NiFi API error"
151            );
152            return AuthSnafu { message }.fail();
153        }
154
155        let token = resp.text().await.context(HttpSnafu)?;
156        *self.token.write().await = Some(token);
157        tracing::info!("NiFi login successful for {username}");
158        Ok(())
159    }
160
161    /// Authenticate using the configured [`CredentialProvider`].
162    ///
163    /// Returns [`NifiError::Auth`] if no credential provider has been configured.
164    pub async fn login_with_provider(&self) -> Result<(), NifiError> {
165        let creds = self.credentials.as_ref().ok_or_else(|| NifiError::Auth {
166            message: "no credential provider configured".to_string(),
167        })?;
168        let (username, password) = creds.credentials().await?;
169        self.login(&username, &password).await
170    }
171
172    // ── Auth-retry wrapper ────────────────────────────────────────────────────
173
174    /// Execute `f`, and if it returns `NifiError::Unauthorized` and a credential
175    /// provider is configured, refresh the token and retry once.
176    #[tracing::instrument(skip_all)]
177    async fn with_auth_retry<T, F, Fut>(&self, f: F) -> Result<T, NifiError>
178    where
179        F: Fn() -> Fut,
180        Fut: std::future::Future<Output = Result<T, NifiError>>,
181    {
182        match f().await {
183            Err(NifiError::Unauthorized { .. }) if self.credentials.is_some() => {
184                tracing::info!("received 401, refreshing token via credential provider");
185                self.login_with_provider().await?;
186                f().await
187            }
188            other => other,
189        }
190    }
191
192    // ── Transient-error retry wrapper ──────────────────────────────────────────
193
194    /// Execute `f` with optional transient-error retry using exponential backoff.
195    ///
196    /// When a [`RetryPolicy`](crate::config::retry::RetryPolicy) is configured, retries
197    /// [retryable](NifiError::is_retryable) errors up to `max_retries` times.
198    /// Each attempt goes through [`with_auth_retry`] so 401 handling still works.
199    #[tracing::instrument(skip_all)]
200    async fn with_retry<T, F, Fut>(&self, f: F) -> Result<T, NifiError>
201    where
202        F: Fn() -> Fut,
203        Fut: std::future::Future<Output = Result<T, NifiError>>,
204    {
205        let Some(policy) = &self.retry_policy else {
206            return self.with_auth_retry(&f).await;
207        };
208
209        let mut last_err: Option<NifiError> = None;
210        for attempt in 0..=policy.max_retries {
211            if attempt > 0 {
212                let backoff = policy.backoff_for(attempt - 1);
213                tracing::info!(
214                    attempt,
215                    backoff_ms = backoff.as_millis() as u64,
216                    "retrying after transient error"
217                );
218                tokio::time::sleep(backoff).await;
219            }
220            match self.with_auth_retry(&f).await {
221                Ok(v) => return Ok(v),
222                Err(e) if e.is_retryable() => {
223                    tracing::warn!(attempt, error = %e, "transient error, will retry");
224                    last_err = Some(e);
225                }
226                Err(e) => return Err(e),
227            }
228        }
229        // Safety: the loop always executes at least once (attempt 0..=max_retries),
230        // and every iteration that reaches here sets `last_err`.
231        match last_err {
232            Some(e) => Err(e),
233            // unreachable: loop runs at least once and non-retryable errors return early
234            None => self.with_auth_retry(&f).await,
235        }
236    }
237
238    // ── Private helpers ───────────────────────────────────────────────────────
239
240    pub(crate) async fn get<T: DeserializeOwned>(&self, path: &str) -> Result<T, NifiError> {
241        self.with_retry(|| async {
242            tracing::debug!(method = "GET", path, "NiFi API request");
243            let url = self.api_url(path);
244            let resp = self
245                .authenticated(self.http.get(url))
246                .await
247                .send()
248                .await
249                .context(HttpSnafu)?;
250            Self::deserialize("GET", path, resp).await
251        })
252        .await
253    }
254
255    pub(crate) async fn post<B, T>(&self, path: &str, body: &B) -> Result<T, NifiError>
256    where
257        B: serde::Serialize,
258        T: DeserializeOwned,
259    {
260        self.with_retry(|| async {
261            tracing::debug!(method = "POST", path, "NiFi API request");
262            let url = self.api_url(path);
263            let resp = self
264                .authenticated(self.http.post(url))
265                .await
266                .json(body)
267                .send()
268                .await
269                .context(HttpSnafu)?;
270            Self::deserialize("POST", path, resp).await
271        })
272        .await
273    }
274
275    pub(crate) async fn put<B, T>(&self, path: &str, body: &B) -> Result<T, NifiError>
276    where
277        B: serde::Serialize,
278        T: DeserializeOwned,
279    {
280        self.with_retry(|| async {
281            tracing::debug!(method = "PUT", path, "NiFi API request");
282            let url = self.api_url(path);
283            let resp = self
284                .authenticated(self.http.put(url))
285                .await
286                .json(body)
287                .send()
288                .await
289                .context(HttpSnafu)?;
290            Self::deserialize("PUT", path, resp).await
291        })
292        .await
293    }
294
295    /// POST that ignores the response body (for endpoints with no JSON response).
296    // Currently unused: no NiFi 2.x endpoint has a JSON body POST with an empty
297    // response. Kept for forward compatibility with the emitter.
298    #[allow(dead_code)]
299    pub(crate) async fn post_void<B: serde::Serialize>(
300        &self,
301        path: &str,
302        body: &B,
303    ) -> Result<(), NifiError> {
304        self.with_retry(|| async {
305            tracing::debug!(method = "POST", path, "NiFi API request");
306            let url = self.api_url(path);
307            let resp = self
308                .authenticated(self.http.post(url))
309                .await
310                .json(body)
311                .send()
312                .await
313                .context(HttpSnafu)?;
314            Self::check_void("POST", path, resp).await
315        })
316        .await
317    }
318
319    /// PUT that ignores the response body (for endpoints with no JSON response).
320    #[allow(dead_code)]
321    pub(crate) async fn put_void<B: serde::Serialize>(
322        &self,
323        path: &str,
324        body: &B,
325    ) -> Result<(), NifiError> {
326        self.with_retry(|| async {
327            tracing::debug!(method = "PUT", path, "NiFi API request");
328            let url = self.api_url(path);
329            let resp = self
330                .authenticated(self.http.put(url))
331                .await
332                .json(body)
333                .send()
334                .await
335                .context(HttpSnafu)?;
336            Self::check_void("PUT", path, resp).await
337        })
338        .await
339    }
340
341    /// POST with no request body; deserializes the JSON response.
342    pub(crate) async fn post_no_body<T: DeserializeOwned>(
343        &self,
344        path: &str,
345    ) -> Result<T, NifiError> {
346        self.with_retry(|| async {
347            tracing::debug!(method = "POST", path, "NiFi API request");
348            let url = self.api_url(path);
349            let resp = self
350                .authenticated(self.http.post(url))
351                .await
352                .send()
353                .await
354                .context(HttpSnafu)?;
355            Self::deserialize("POST", path, resp).await
356        })
357        .await
358    }
359
360    /// POST with no request body; ignores the response body.
361    // Used by the code generator for void no-body POST endpoints without query params.
362    // No current NiFi 2.x endpoint triggers this path, but keep it for forward compatibility.
363    #[allow(dead_code)]
364    pub(crate) async fn post_void_no_body(&self, path: &str) -> Result<(), NifiError> {
365        self.with_retry(|| async {
366            tracing::debug!(method = "POST", path, "NiFi API request");
367            let url = self.api_url(path);
368            let resp = self
369                .authenticated(self.http.post(url))
370                .await
371                .send()
372                .await
373                .context(HttpSnafu)?;
374            Self::check_void("POST", path, resp).await
375        })
376        .await
377    }
378
379    /// PUT with no request body; deserializes the JSON response.
380    pub(crate) async fn put_no_body<T: DeserializeOwned>(
381        &self,
382        path: &str,
383    ) -> Result<T, NifiError> {
384        self.with_retry(|| async {
385            tracing::debug!(method = "PUT", path, "NiFi API request");
386            let url = self.api_url(path);
387            let resp = self
388                .authenticated(self.http.put(url))
389                .await
390                .send()
391                .await
392                .context(HttpSnafu)?;
393            Self::deserialize("PUT", path, resp).await
394        })
395        .await
396    }
397
398    /// PUT with no request body; ignores the response body.
399    #[allow(dead_code)]
400    pub(crate) async fn put_void_no_body(&self, path: &str) -> Result<(), NifiError> {
401        self.with_retry(|| async {
402            tracing::debug!(method = "PUT", path, "NiFi API request");
403            let url = self.api_url(path);
404            let resp = self
405                .authenticated(self.http.put(url))
406                .await
407                .send()
408                .await
409                .context(HttpSnafu)?;
410            Self::check_void("PUT", path, resp).await
411        })
412        .await
413    }
414
415    /// POST with `application/octet-stream` body.
416    ///
417    /// Used for binary upload endpoints (e.g. NAR upload).
418    /// `filename` is sent as the `Filename` request header when provided.
419    pub(crate) async fn post_octet_stream<T: DeserializeOwned>(
420        &self,
421        path: &str,
422        filename: Option<&str>,
423        data: Vec<u8>,
424    ) -> Result<T, NifiError> {
425        self.with_retry(|| async {
426            tracing::debug!(method = "POST", path, "NiFi API request");
427            let url = self.api_url(path);
428            let builder = self
429                .authenticated(self.http.post(url))
430                .await
431                .header("Content-Type", "application/octet-stream")
432                .body(data.clone());
433            let builder = if let Some(name) = filename {
434                builder.header("Filename", name)
435            } else {
436                builder
437            };
438            let resp = builder.send().await.context(HttpSnafu)?;
439            Self::deserialize("POST", path, resp).await
440        })
441        .await
442    }
443
444    /// POST with `application/octet-stream` body; ignores the response body.
445    ///
446    /// Used for binary upload endpoints that return no JSON response.
447    /// `filename` is sent as the `Filename` request header when provided.
448    // Currently unused: no NiFi 2.x endpoint has an octet-stream request body
449    // with an empty response body. Kept for forward compatibility.
450    #[allow(dead_code)]
451    pub(crate) async fn post_void_octet_stream(
452        &self,
453        path: &str,
454        filename: Option<&str>,
455        data: Vec<u8>,
456    ) -> Result<(), NifiError> {
457        self.with_retry(|| async {
458            tracing::debug!(method = "POST", path, "NiFi API request");
459            let url = self.api_url(path);
460            let builder = self
461                .authenticated(self.http.post(url))
462                .await
463                .header("Content-Type", "application/octet-stream")
464                .body(data.clone());
465            let builder = if let Some(name) = filename {
466                builder.header("Filename", name)
467            } else {
468                builder
469            };
470            let resp = builder.send().await.context(HttpSnafu)?;
471            Self::check_void("POST", path, resp).await
472        })
473        .await
474    }
475
476    /// POST with `multipart/form-data` body.
477    ///
478    /// Used for file-upload endpoints such as
479    /// `POST /process-groups/{id}/process-groups/upload`. Sends a single form
480    /// part named `"file"` carrying the given filename and raw bytes. The
481    /// `Content-Type` header (including the generated boundary) is set by
482    /// reqwest when `.multipart(form)` is called.
483    pub(crate) async fn post_multipart<T: DeserializeOwned>(
484        &self,
485        path: &str,
486        filename: &str,
487        data: Vec<u8>,
488    ) -> Result<T, NifiError> {
489        self.with_retry(|| async {
490            tracing::debug!(method = "POST", path, "NiFi API request");
491            let url = self.api_url(path);
492            let part =
493                reqwest::multipart::Part::bytes(data.clone()).file_name(filename.to_string());
494            let form = reqwest::multipart::Form::new().part("file", part);
495            let resp = self
496                .authenticated(self.http.post(url))
497                .await
498                .multipart(form)
499                .send()
500                .await
501                .context(HttpSnafu)?;
502            Self::deserialize("POST", path, resp).await
503        })
504        .await
505    }
506
507    /// POST with `multipart/form-data` body; ignores the response body.
508    ///
509    /// Variant of [`Self::post_multipart`] for upload endpoints that do not
510    /// return a JSON payload.
511    #[allow(dead_code)]
512    pub(crate) async fn post_void_multipart(
513        &self,
514        path: &str,
515        filename: &str,
516        data: Vec<u8>,
517    ) -> Result<(), NifiError> {
518        self.with_retry(|| async {
519            tracing::debug!(method = "POST", path, "NiFi API request");
520            let url = self.api_url(path);
521            let part =
522                reqwest::multipart::Part::bytes(data.clone()).file_name(filename.to_string());
523            let form = reqwest::multipart::Form::new().part("file", part);
524            let resp = self
525                .authenticated(self.http.post(url))
526                .await
527                .multipart(form)
528                .send()
529                .await
530                .context(HttpSnafu)?;
531            Self::check_void("POST", path, resp).await
532        })
533        .await
534    }
535
536    /// POST with query parameters; ignores the response body.
537    ///
538    /// Used for endpoints that accept query parameters and have no JSON response body.
539    #[allow(dead_code)]
540    pub(crate) async fn post_void_with_query<B: serde::Serialize>(
541        &self,
542        path: &str,
543        body: &B,
544        query: &[(&str, String)],
545    ) -> Result<(), NifiError> {
546        self.with_retry(|| async {
547            tracing::debug!(method = "POST", path, "NiFi API request");
548            let url = self.api_url(path);
549            let resp = self
550                .authenticated(self.http.post(url).query(query))
551                .await
552                .json(body)
553                .send()
554                .await
555                .context(HttpSnafu)?;
556            Self::check_void("POST", path, resp).await
557        })
558        .await
559    }
560
561    /// GET that ignores the response body (for endpoints with no JSON response).
562    ///
563    /// Treats 302 as success in addition to 2xx: NiFi's `GET /access/logout/complete`
564    /// responds with a redirect once the logout is complete.
565    pub(crate) async fn get_void(&self, path: &str) -> Result<(), NifiError> {
566        self.with_retry(|| async {
567            tracing::debug!(method = "GET", path, "NiFi API request");
568            let url = self.api_url(path);
569            let resp = self
570                .authenticated(self.http.get(url))
571                .await
572                .send()
573                .await
574                .context(HttpSnafu)?;
575            Self::check_void_with_redirect("GET", path, resp).await
576        })
577        .await
578    }
579
580    pub(crate) async fn get_with_query<T: DeserializeOwned>(
581        &self,
582        path: &str,
583        query: &[(&str, String)],
584    ) -> Result<T, NifiError> {
585        self.with_retry(|| async {
586            tracing::debug!(method = "GET", path, "NiFi API request");
587            let url = self.api_url(path);
588            let resp = self
589                .authenticated(self.http.get(url).query(query))
590                .await
591                .send()
592                .await
593                .context(HttpSnafu)?;
594            Self::deserialize("GET", path, resp).await
595        })
596        .await
597    }
598
599    /// GET returning raw text (`text/plain`).
600    pub(crate) async fn get_text(&self, path: &str) -> Result<String, NifiError> {
601        self.with_retry(|| async {
602            tracing::debug!(method = "GET", path, "NiFi API request");
603            let url = self.api_url(path);
604            let resp = self
605                .authenticated(self.http.get(url))
606                .await
607                .send()
608                .await
609                .context(HttpSnafu)?;
610            Self::text("GET", path, resp).await
611        })
612        .await
613    }
614
615    /// GET returning raw bytes (`application/octet-stream` or `*/*`).
616    pub(crate) async fn get_bytes(&self, path: &str) -> Result<Vec<u8>, NifiError> {
617        self.with_retry(|| async {
618            tracing::debug!(method = "GET", path, "NiFi API request");
619            let url = self.api_url(path);
620            let resp = self
621                .authenticated(self.http.get(url))
622                .await
623                .send()
624                .await
625                .context(HttpSnafu)?;
626            Self::bytes("GET", path, resp).await
627        })
628        .await
629    }
630
631    /// GET with query parameters returning raw bytes.
632    pub(crate) async fn get_bytes_with_query(
633        &self,
634        path: &str,
635        query: &[(&str, String)],
636    ) -> Result<Vec<u8>, NifiError> {
637        self.with_retry(|| async {
638            tracing::debug!(method = "GET", path, "NiFi API request");
639            let url = self.api_url(path);
640            let resp = self
641                .authenticated(self.http.get(url).query(query))
642                .await
643                .send()
644                .await
645                .context(HttpSnafu)?;
646            Self::bytes("GET", path, resp).await
647        })
648        .await
649    }
650
651    /// POST a JSON body and return the `text/plain` response body.
652    pub(crate) async fn post_returning_text<B: serde::Serialize>(
653        &self,
654        path: &str,
655        body: &B,
656    ) -> Result<String, NifiError> {
657        self.with_retry(|| async {
658            tracing::debug!(method = "POST", path, "NiFi API request");
659            let url = self.api_url(path);
660            let resp = self
661                .authenticated(self.http.post(url))
662                .await
663                .json(body)
664                .send()
665                .await
666                .context(HttpSnafu)?;
667            Self::text("POST", path, resp).await
668        })
669        .await
670    }
671
672    /// POST an `application/octet-stream` body and return the `text/plain` response body.
673    pub(crate) async fn post_octet_stream_returning_text(
674        &self,
675        path: &str,
676        data: Vec<u8>,
677    ) -> Result<String, NifiError> {
678        self.with_retry(|| async {
679            tracing::debug!(method = "POST", path, "NiFi API request");
680            let url = self.api_url(path);
681            let builder = self
682                .authenticated(self.http.post(url))
683                .await
684                .header("Content-Type", "application/octet-stream")
685                .body(data.clone());
686            let resp = builder.send().await.context(HttpSnafu)?;
687            Self::text("POST", path, resp).await
688        })
689        .await
690    }
691
692    // No longer referenced by generated code: every void GET with query params
693    // in the current NiFi specs resolves through a different helper.
694    #[allow(dead_code)]
695    pub(crate) async fn get_void_with_query(
696        &self,
697        path: &str,
698        query: &[(&str, String)],
699    ) -> Result<(), NifiError> {
700        self.with_retry(|| async {
701            tracing::debug!(method = "GET", path, "NiFi API request");
702            let url = self.api_url(path);
703            let resp = self
704                .authenticated(self.http.get(url).query(query))
705                .await
706                .send()
707                .await
708                .context(HttpSnafu)?;
709            Self::check_void_with_redirect("GET", path, resp).await
710        })
711        .await
712    }
713
714    pub(crate) async fn delete_returning_with_query<T: DeserializeOwned>(
715        &self,
716        path: &str,
717        query: &[(&str, String)],
718    ) -> Result<T, NifiError> {
719        self.with_retry(|| async {
720            tracing::debug!(method = "DELETE", path, "NiFi API request");
721            let url = self.api_url(path);
722            let resp = self
723                .authenticated(self.http.delete(url).query(query))
724                .await
725                .send()
726                .await
727                .context(HttpSnafu)?;
728            Self::deserialize("DELETE", path, resp).await
729        })
730        .await
731    }
732
733    pub(crate) async fn delete_with_query(
734        &self,
735        path: &str,
736        query: &[(&str, String)],
737    ) -> Result<(), NifiError> {
738        self.with_retry(|| async {
739            tracing::debug!(method = "DELETE", path, "NiFi API request");
740            let url = self.api_url(path);
741            let resp = self
742                .authenticated(self.http.delete(url).query(query))
743                .await
744                .send()
745                .await
746                .context(HttpSnafu)?;
747            Self::check_void("DELETE", path, resp).await
748        })
749        .await
750    }
751
752    pub(crate) async fn post_with_query<B, T>(
753        &self,
754        path: &str,
755        body: &B,
756        query: &[(&str, String)],
757    ) -> Result<T, NifiError>
758    where
759        B: serde::Serialize,
760        T: DeserializeOwned,
761    {
762        self.with_retry(|| async {
763            tracing::debug!(method = "POST", path, "NiFi API request");
764            let url = self.api_url(path);
765            let resp = self
766                .authenticated(self.http.post(url).query(query))
767                .await
768                .json(body)
769                .send()
770                .await
771                .context(HttpSnafu)?;
772            Self::deserialize("POST", path, resp).await
773        })
774        .await
775    }
776
777    pub(crate) async fn delete_returning<T: DeserializeOwned>(
778        &self,
779        path: &str,
780    ) -> Result<T, NifiError> {
781        self.with_retry(|| async {
782            tracing::debug!(method = "DELETE", path, "NiFi API request");
783            let url = self.api_url(path);
784            let resp = self
785                .authenticated(self.http.delete(url))
786                .await
787                .send()
788                .await
789                .context(HttpSnafu)?;
790            Self::deserialize("DELETE", path, resp).await
791        })
792        .await
793    }
794
795    pub(crate) async fn delete(&self, path: &str) -> Result<(), NifiError> {
796        self.with_retry(|| async {
797            tracing::debug!(method = "DELETE", path, "NiFi API request");
798            let url = self.api_url(path);
799            let resp = self
800                .authenticated(self.http.delete(url))
801                .await
802                .send()
803                .await
804                .context(HttpSnafu)?;
805            Self::check_void("DELETE", path, resp).await
806        })
807        .await
808    }
809
810    /// Inner delete without auth retry, used by `logout` to avoid retrying
811    /// the logout call itself.
812    async fn delete_inner(&self, path: &str) -> Result<(), NifiError> {
813        tracing::debug!(method = "DELETE", path, "NiFi API request");
814        let url = self.api_url(path);
815        let resp = self
816            .authenticated(self.http.delete(url))
817            .await
818            .send()
819            .await
820            .context(HttpSnafu)?;
821        Self::check_void("DELETE", path, resp).await
822    }
823
824    async fn authenticated(&self, req: reqwest::RequestBuilder) -> reqwest::RequestBuilder {
825        let guard = self.token.read().await;
826        match guard.as_deref() {
827            Some(token) => req.bearer_auth(token),
828            None => {
829                tracing::warn!(
830                    "sending NiFi API request without a bearer token — call login() first"
831                );
832                req
833            }
834        }
835    }
836
837    async fn deserialize<T: DeserializeOwned>(
838        method: &str,
839        path: &str,
840        resp: reqwest::Response,
841    ) -> Result<T, NifiError> {
842        let status = resp.status();
843        tracing::debug!(method, path, status = status.as_u16(), "NiFi API response");
844        if status.is_success() {
845            return resp.json::<T>().await.context(HttpSnafu);
846        }
847        let body = resp.text().await.unwrap_or_else(|_| status.to_string());
848        tracing::debug!(method, path, status = status.as_u16(), %body, "NiFi API raw error body");
849        let message = extract_error_message(&body);
850        tracing::warn!(method, path, status = status.as_u16(), %message, "NiFi API error");
851        Err(crate::error::api_error(status.as_u16(), message))
852    }
853
854    /// Check a void response (no JSON body expected). Returns `Ok(())` on success,
855    /// or the appropriate error.
856    async fn check_void(
857        method: &str,
858        path: &str,
859        resp: reqwest::Response,
860    ) -> Result<(), NifiError> {
861        let status = resp.status();
862        tracing::debug!(method, path, status = status.as_u16(), "NiFi API response");
863        if status.is_success() {
864            return Ok(());
865        }
866        let body = resp.text().await.unwrap_or_else(|_| status.to_string());
867        tracing::debug!(method, path, status = status.as_u16(), %body, "NiFi API raw error body");
868        let message = extract_error_message(&body);
869        tracing::warn!(method, path, status = status.as_u16(), %message, "NiFi API error");
870        Err(crate::error::api_error(status.as_u16(), message))
871    }
872
873    /// Read a raw `text/plain` (or equivalent) response body as a `String`.
874    async fn text(method: &str, path: &str, resp: reqwest::Response) -> Result<String, NifiError> {
875        let status = resp.status();
876        tracing::debug!(method, path, status = status.as_u16(), "NiFi API response");
877        if status.is_success() {
878            return resp.text().await.context(HttpSnafu);
879        }
880        let body = resp.text().await.unwrap_or_else(|_| status.to_string());
881        tracing::debug!(method, path, status = status.as_u16(), %body, "NiFi API raw error body");
882        let message = extract_error_message(&body);
883        tracing::warn!(method, path, status = status.as_u16(), %message, "NiFi API error");
884        Err(crate::error::api_error(status.as_u16(), message))
885    }
886
887    /// Read a raw `application/octet-stream` (or equivalent) response body as bytes.
888    async fn bytes(
889        method: &str,
890        path: &str,
891        resp: reqwest::Response,
892    ) -> Result<Vec<u8>, NifiError> {
893        let status = resp.status();
894        tracing::debug!(method, path, status = status.as_u16(), "NiFi API response");
895        if status.is_success() {
896            let b = resp.bytes().await.context(HttpSnafu)?;
897            return Ok(b.to_vec());
898        }
899        let body = resp.text().await.unwrap_or_else(|_| status.to_string());
900        tracing::debug!(method, path, status = status.as_u16(), %body, "NiFi API raw error body");
901        let message = extract_error_message(&body);
902        tracing::warn!(method, path, status = status.as_u16(), %message, "NiFi API error");
903        Err(crate::error::api_error(status.as_u16(), message))
904    }
905
906    /// Like `check_void`, but also treats 302 as success.
907    async fn check_void_with_redirect(
908        method: &str,
909        path: &str,
910        resp: reqwest::Response,
911    ) -> Result<(), NifiError> {
912        let status = resp.status();
913        tracing::debug!(method, path, status = status.as_u16(), "NiFi API response");
914        if status.is_success() || status.as_u16() == 302 {
915            return Ok(());
916        }
917        let body = resp.text().await.unwrap_or_else(|_| status.to_string());
918        tracing::debug!(method, path, status = status.as_u16(), %body, "NiFi API raw error body");
919        let message = extract_error_message(&body);
920        tracing::warn!(method, path, status = status.as_u16(), %message, "NiFi API error");
921        Err(crate::error::api_error(status.as_u16(), message))
922    }
923
924    pub(crate) fn api_url(&self, path: &str) -> Url {
925        let mut url = self.base_url.clone();
926        url.set_path(&format!("/nifi-api{path}"));
927        url
928    }
929}
930
931/// Extract a human-readable message from a NiFi error response body.
932///
933/// NiFi returns either a JSON object with a `"message"` field or plain text.
934/// Logs the raw body at `debug` level before extracting.
935pub fn extract_error_message(body: &str) -> String {
936    serde_json::from_str::<serde_json::Value>(body)
937        .ok()
938        .and_then(|v| v["message"].as_str().map(str::to_owned))
939        .unwrap_or_else(|| body.to_owned())
940}