Skip to main content

nifi_rust_client/
client.rs

1use std::sync::Arc;
2
3use reqwest::Client;
4use serde::de::DeserializeOwned;
5use snafu::ResultExt as _;
6use tokio::sync::RwLock;
7use url::Url;
8
9use crate::NifiError;
10use crate::config::credentials::CredentialProvider;
11use crate::error::{AuthSnafu, HttpSnafu};
12
13/// Client for the Apache NiFi REST API.
14pub struct NifiClient {
15    base_url: Url,
16    http: Client,
17    token: Arc<RwLock<Option<String>>>,
18    credentials: Option<Arc<dyn CredentialProvider>>,
19    #[allow(dead_code)]
20    retry_policy: Option<crate::config::retry::RetryPolicy>,
21}
22
23impl Clone for NifiClient {
24    fn clone(&self) -> Self {
25        Self {
26            base_url: self.base_url.clone(),
27            http: self.http.clone(),
28            token: Arc::clone(&self.token),
29            credentials: self.credentials.clone(),
30            retry_policy: self.retry_policy.clone(),
31        }
32    }
33}
34
35impl std::fmt::Debug for NifiClient {
36    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
37        f.debug_struct("NifiClient")
38            .field("base_url", &self.base_url)
39            .field(
40                "credentials",
41                &self.credentials.as_ref().map(|c| format!("{c:?}")),
42            )
43            .field("retry_policy", &self.retry_policy)
44            .finish_non_exhaustive()
45    }
46}
47
48impl NifiClient {
49    /// Construct a client from pre-built parts. Used by [`crate::NifiClientBuilder`].
50    pub(crate) fn from_parts(
51        base_url: Url,
52        http: Client,
53        credentials: Option<Arc<dyn CredentialProvider>>,
54        retry_policy: Option<crate::config::retry::RetryPolicy>,
55    ) -> Self {
56        Self {
57            base_url,
58            http,
59            token: Arc::new(RwLock::new(None)),
60            credentials,
61            retry_policy,
62        }
63    }
64
65    /// Return the current bearer token, if one has been set.
66    ///
67    /// The token is a NiFi-issued JWT. You can persist it between process restarts
68    /// and restore it with [`set_token`][Self::set_token] to avoid re-authenticating.
69    /// The token will eventually expire (NiFi default: 12 hours); when it does, any
70    /// API call returns [`NifiError::Unauthorized`]. Re-call
71    /// [`login`][Self::login] to obtain a fresh token.
72    pub async fn token(&self) -> Option<String> {
73        self.token.read().await.clone()
74    }
75
76    /// Restore a previously obtained bearer token.
77    ///
78    /// Useful for CLI tools that persist the token in a file between sessions.
79    /// If the token has expired, the next API call will return
80    /// [`NifiError::Unauthorized`]; re-call [`login`][Self::login]
81    /// to obtain a fresh one.
82    pub async fn set_token(&self, token: String) {
83        *self.token.write().await = Some(token);
84    }
85
86    /// Invalidate the current bearer token and clear it from the client.
87    ///
88    /// Sends `DELETE /nifi-api/access/logout` to invalidate the token server-side,
89    /// then clears the local token unconditionally so that subsequent requests are
90    /// not sent with a stale credential.
91    ///
92    /// If the server returns an error (e.g. `401` because the token had already
93    /// expired) the local token is still cleared and the error is returned to the
94    /// caller.
95    pub async fn logout(&self) -> Result<(), NifiError> {
96        let result = self.delete_inner("/access/logout").await;
97        *self.token.write().await = None;
98        if result.is_ok() {
99            tracing::info!("NiFi logout successful");
100        }
101        result
102    }
103
104    /// Authenticate with NiFi using single-user credentials.
105    ///
106    /// Obtains a JWT token from `/nifi-api/access/token` and stores it on the
107    /// client for all subsequent requests.
108    ///
109    /// # Token lifetime and expiry
110    ///
111    /// NiFi JWTs expire after 12 hours by default (configurable server-side via
112    /// `nifi.security.user.login.identity.provider.expiration`). Once expired,
113    /// any API call returns [`NifiError::Unauthorized`]. Configure a
114    /// [`CredentialProvider`] on the builder to enable
115    /// automatic token refresh on 401 responses.
116    pub async fn login(&self, username: &str, password: &str) -> Result<(), NifiError> {
117        tracing::debug!(method = "POST", path = "/access/token", "NiFi API request");
118        let url = self.api_url("/access/token");
119        let resp = self
120            .http
121            .post(url)
122            .form(&[("username", username), ("password", password)])
123            .send()
124            .await
125            .context(HttpSnafu)?;
126
127        let status = resp.status();
128        tracing::debug!(
129            method = "POST",
130            path = "/access/token",
131            status = status.as_u16(),
132            "NiFi API response"
133        );
134        if !status.is_success() {
135            let body = resp.text().await.unwrap_or_else(|_| status.to_string());
136            tracing::debug!(
137                method = "POST",
138                path = "/access/token",
139                status = status.as_u16(),
140                %body,
141                "NiFi API raw error body"
142            );
143            let message = extract_error_message(&body);
144            tracing::warn!(
145                method = "POST",
146                path = "/access/token",
147                status = status.as_u16(),
148                %message,
149                "NiFi API error"
150            );
151            return AuthSnafu { message }.fail();
152        }
153
154        let token = resp.text().await.context(HttpSnafu)?;
155        *self.token.write().await = Some(token);
156        tracing::info!("NiFi login successful for {username}");
157        Ok(())
158    }
159
160    /// Authenticate using the configured [`CredentialProvider`].
161    ///
162    /// Returns [`NifiError::Auth`] if no credential provider has been configured.
163    pub async fn login_with_provider(&self) -> Result<(), NifiError> {
164        let creds = self.credentials.as_ref().ok_or_else(|| NifiError::Auth {
165            message: "no credential provider configured".to_string(),
166        })?;
167        let (username, password) = creds.credentials().await?;
168        self.login(&username, &password).await
169    }
170
171    // ── Auth-retry wrapper ────────────────────────────────────────────────────
172
173    /// Execute `f`, and if it returns `NifiError::Unauthorized` and a credential
174    /// provider is configured, refresh the token and retry once.
175    async fn with_auth_retry<T, F, Fut>(&self, f: F) -> Result<T, NifiError>
176    where
177        F: Fn() -> Fut,
178        Fut: std::future::Future<Output = Result<T, NifiError>>,
179    {
180        match f().await {
181            Err(NifiError::Unauthorized { .. }) if self.credentials.is_some() => {
182                tracing::info!("received 401, refreshing token via credential provider");
183                self.login_with_provider().await?;
184                f().await
185            }
186            other => other,
187        }
188    }
189
190    // ── Transient-error retry wrapper ──────────────────────────────────────────
191
192    /// Execute `f` with optional transient-error retry using exponential backoff.
193    ///
194    /// When a [`RetryPolicy`](crate::config::retry::RetryPolicy) is configured, retries
195    /// [retryable](NifiError::is_retryable) errors up to `max_retries` times.
196    /// Each attempt goes through [`with_auth_retry`] so 401 handling still works.
197    async fn with_retry<T, F, Fut>(&self, f: F) -> Result<T, NifiError>
198    where
199        F: Fn() -> Fut,
200        Fut: std::future::Future<Output = Result<T, NifiError>>,
201    {
202        let Some(policy) = &self.retry_policy else {
203            return self.with_auth_retry(&f).await;
204        };
205
206        let mut last_err: Option<NifiError> = None;
207        for attempt in 0..=policy.max_retries {
208            if attempt > 0 {
209                let backoff = policy.backoff_for(attempt - 1);
210                tracing::info!(
211                    attempt,
212                    backoff_ms = backoff.as_millis() as u64,
213                    "retrying after transient error"
214                );
215                tokio::time::sleep(backoff).await;
216            }
217            match self.with_auth_retry(&f).await {
218                Ok(v) => return Ok(v),
219                Err(e) if e.is_retryable() => {
220                    tracing::warn!(attempt, error = %e, "transient error, will retry");
221                    last_err = Some(e);
222                }
223                Err(e) => return Err(e),
224            }
225        }
226        // Safety: the loop always executes at least once (attempt 0..=max_retries),
227        // and every iteration that reaches here sets `last_err`.
228        match last_err {
229            Some(e) => Err(e),
230            // unreachable: loop runs at least once and non-retryable errors return early
231            None => self.with_auth_retry(&f).await,
232        }
233    }
234
235    // ── Private helpers ───────────────────────────────────────────────────────
236
237    pub(crate) async fn get<T: DeserializeOwned>(&self, path: &str) -> Result<T, NifiError> {
238        self.with_retry(|| async {
239            tracing::debug!(method = "GET", path, "NiFi API request");
240            let url = self.api_url(path);
241            let resp = self
242                .authenticated(self.http.get(url))
243                .await
244                .send()
245                .await
246                .context(HttpSnafu)?;
247            Self::deserialize("GET", path, resp).await
248        })
249        .await
250    }
251
252    pub(crate) async fn post<B, T>(&self, path: &str, body: &B) -> Result<T, NifiError>
253    where
254        B: serde::Serialize,
255        T: DeserializeOwned,
256    {
257        self.with_retry(|| async {
258            tracing::debug!(method = "POST", path, "NiFi API request");
259            let url = self.api_url(path);
260            let resp = self
261                .authenticated(self.http.post(url))
262                .await
263                .json(body)
264                .send()
265                .await
266                .context(HttpSnafu)?;
267            Self::deserialize("POST", path, resp).await
268        })
269        .await
270    }
271
272    pub(crate) async fn put<B, T>(&self, path: &str, body: &B) -> Result<T, NifiError>
273    where
274        B: serde::Serialize,
275        T: DeserializeOwned,
276    {
277        self.with_retry(|| async {
278            tracing::debug!(method = "PUT", path, "NiFi API request");
279            let url = self.api_url(path);
280            let resp = self
281                .authenticated(self.http.put(url))
282                .await
283                .json(body)
284                .send()
285                .await
286                .context(HttpSnafu)?;
287            Self::deserialize("PUT", path, resp).await
288        })
289        .await
290    }
291
292    /// POST that ignores the response body (for endpoints with no JSON response).
293    pub(crate) async fn post_void<B: serde::Serialize>(
294        &self,
295        path: &str,
296        body: &B,
297    ) -> Result<(), NifiError> {
298        self.with_retry(|| async {
299            tracing::debug!(method = "POST", path, "NiFi API request");
300            let url = self.api_url(path);
301            let resp = self
302                .authenticated(self.http.post(url))
303                .await
304                .json(body)
305                .send()
306                .await
307                .context(HttpSnafu)?;
308            Self::check_void("POST", path, resp).await
309        })
310        .await
311    }
312
313    /// PUT that ignores the response body (for endpoints with no JSON response).
314    #[allow(dead_code)]
315    pub(crate) async fn put_void<B: serde::Serialize>(
316        &self,
317        path: &str,
318        body: &B,
319    ) -> Result<(), NifiError> {
320        self.with_retry(|| async {
321            tracing::debug!(method = "PUT", path, "NiFi API request");
322            let url = self.api_url(path);
323            let resp = self
324                .authenticated(self.http.put(url))
325                .await
326                .json(body)
327                .send()
328                .await
329                .context(HttpSnafu)?;
330            Self::check_void("PUT", path, resp).await
331        })
332        .await
333    }
334
335    /// POST with no request body; deserializes the JSON response.
336    pub(crate) async fn post_no_body<T: DeserializeOwned>(
337        &self,
338        path: &str,
339    ) -> Result<T, NifiError> {
340        self.with_retry(|| async {
341            tracing::debug!(method = "POST", path, "NiFi API request");
342            let url = self.api_url(path);
343            let resp = self
344                .authenticated(self.http.post(url))
345                .await
346                .send()
347                .await
348                .context(HttpSnafu)?;
349            Self::deserialize("POST", path, resp).await
350        })
351        .await
352    }
353
354    /// POST with no request body; ignores the response body.
355    // Used by the code generator for void no-body POST endpoints without query params.
356    // No current NiFi 2.x endpoint triggers this path, but keep it for forward compatibility.
357    #[allow(dead_code)]
358    pub(crate) async fn post_void_no_body(&self, path: &str) -> Result<(), NifiError> {
359        self.with_retry(|| async {
360            tracing::debug!(method = "POST", path, "NiFi API request");
361            let url = self.api_url(path);
362            let resp = self
363                .authenticated(self.http.post(url))
364                .await
365                .send()
366                .await
367                .context(HttpSnafu)?;
368            Self::check_void("POST", path, resp).await
369        })
370        .await
371    }
372
373    /// PUT with no request body; deserializes the JSON response.
374    pub(crate) async fn put_no_body<T: DeserializeOwned>(
375        &self,
376        path: &str,
377    ) -> Result<T, NifiError> {
378        self.with_retry(|| async {
379            tracing::debug!(method = "PUT", path, "NiFi API request");
380            let url = self.api_url(path);
381            let resp = self
382                .authenticated(self.http.put(url))
383                .await
384                .send()
385                .await
386                .context(HttpSnafu)?;
387            Self::deserialize("PUT", path, resp).await
388        })
389        .await
390    }
391
392    /// PUT with no request body; ignores the response body.
393    #[allow(dead_code)]
394    pub(crate) async fn put_void_no_body(&self, path: &str) -> Result<(), NifiError> {
395        self.with_retry(|| async {
396            tracing::debug!(method = "PUT", path, "NiFi API request");
397            let url = self.api_url(path);
398            let resp = self
399                .authenticated(self.http.put(url))
400                .await
401                .send()
402                .await
403                .context(HttpSnafu)?;
404            Self::check_void("PUT", path, resp).await
405        })
406        .await
407    }
408
409    /// POST with `application/octet-stream` body.
410    ///
411    /// Used for binary upload endpoints (e.g. NAR upload).
412    /// `filename` is sent as the `Filename` request header when provided.
413    pub(crate) async fn post_octet_stream<T: DeserializeOwned>(
414        &self,
415        path: &str,
416        filename: Option<&str>,
417        data: Vec<u8>,
418    ) -> Result<T, NifiError> {
419        self.with_retry(|| async {
420            tracing::debug!(method = "POST", path, "NiFi API request");
421            let url = self.api_url(path);
422            let builder = self
423                .authenticated(self.http.post(url))
424                .await
425                .header("Content-Type", "application/octet-stream")
426                .body(data.clone());
427            let builder = if let Some(name) = filename {
428                builder.header("Filename", name)
429            } else {
430                builder
431            };
432            let resp = builder.send().await.context(HttpSnafu)?;
433            Self::deserialize("POST", path, resp).await
434        })
435        .await
436    }
437
438    /// POST with `application/octet-stream` body; ignores the response body.
439    ///
440    /// Used for binary upload endpoints that return no JSON response.
441    /// `filename` is sent as the `Filename` request header when provided.
442    pub(crate) async fn post_void_octet_stream(
443        &self,
444        path: &str,
445        filename: Option<&str>,
446        data: Vec<u8>,
447    ) -> Result<(), NifiError> {
448        self.with_retry(|| async {
449            tracing::debug!(method = "POST", path, "NiFi API request");
450            let url = self.api_url(path);
451            let builder = self
452                .authenticated(self.http.post(url))
453                .await
454                .header("Content-Type", "application/octet-stream")
455                .body(data.clone());
456            let builder = if let Some(name) = filename {
457                builder.header("Filename", name)
458            } else {
459                builder
460            };
461            let resp = builder.send().await.context(HttpSnafu)?;
462            Self::check_void("POST", path, resp).await
463        })
464        .await
465    }
466
467    /// POST with query parameters; ignores the response body.
468    ///
469    /// Used for endpoints that accept query parameters and have no JSON response body.
470    #[allow(dead_code)]
471    pub(crate) async fn post_void_with_query<B: serde::Serialize>(
472        &self,
473        path: &str,
474        body: &B,
475        query: &[(&str, String)],
476    ) -> Result<(), NifiError> {
477        self.with_retry(|| async {
478            tracing::debug!(method = "POST", path, "NiFi API request");
479            let url = self.api_url(path);
480            let resp = self
481                .authenticated(self.http.post(url).query(query))
482                .await
483                .json(body)
484                .send()
485                .await
486                .context(HttpSnafu)?;
487            Self::check_void("POST", path, resp).await
488        })
489        .await
490    }
491
492    /// GET that ignores the response body (for endpoints with no JSON response).
493    ///
494    /// Treats 302 as success in addition to 2xx: NiFi's `GET /access/logout/complete`
495    /// responds with a redirect once the logout is complete.
496    pub(crate) async fn get_void(&self, path: &str) -> Result<(), NifiError> {
497        self.with_retry(|| async {
498            tracing::debug!(method = "GET", path, "NiFi API request");
499            let url = self.api_url(path);
500            let resp = self
501                .authenticated(self.http.get(url))
502                .await
503                .send()
504                .await
505                .context(HttpSnafu)?;
506            Self::check_void_with_redirect("GET", path, resp).await
507        })
508        .await
509    }
510
511    pub(crate) async fn get_with_query<T: DeserializeOwned>(
512        &self,
513        path: &str,
514        query: &[(&str, String)],
515    ) -> Result<T, NifiError> {
516        self.with_retry(|| async {
517            tracing::debug!(method = "GET", path, "NiFi API request");
518            let url = self.api_url(path);
519            let resp = self
520                .authenticated(self.http.get(url).query(query))
521                .await
522                .send()
523                .await
524                .context(HttpSnafu)?;
525            Self::deserialize("GET", path, resp).await
526        })
527        .await
528    }
529
530    pub(crate) async fn get_void_with_query(
531        &self,
532        path: &str,
533        query: &[(&str, String)],
534    ) -> Result<(), NifiError> {
535        self.with_retry(|| async {
536            tracing::debug!(method = "GET", path, "NiFi API request");
537            let url = self.api_url(path);
538            let resp = self
539                .authenticated(self.http.get(url).query(query))
540                .await
541                .send()
542                .await
543                .context(HttpSnafu)?;
544            Self::check_void_with_redirect("GET", path, resp).await
545        })
546        .await
547    }
548
549    pub(crate) async fn delete_returning_with_query<T: DeserializeOwned>(
550        &self,
551        path: &str,
552        query: &[(&str, String)],
553    ) -> Result<T, NifiError> {
554        self.with_retry(|| async {
555            tracing::debug!(method = "DELETE", path, "NiFi API request");
556            let url = self.api_url(path);
557            let resp = self
558                .authenticated(self.http.delete(url).query(query))
559                .await
560                .send()
561                .await
562                .context(HttpSnafu)?;
563            Self::deserialize("DELETE", path, resp).await
564        })
565        .await
566    }
567
568    pub(crate) async fn delete_with_query(
569        &self,
570        path: &str,
571        query: &[(&str, String)],
572    ) -> Result<(), NifiError> {
573        self.with_retry(|| async {
574            tracing::debug!(method = "DELETE", path, "NiFi API request");
575            let url = self.api_url(path);
576            let resp = self
577                .authenticated(self.http.delete(url).query(query))
578                .await
579                .send()
580                .await
581                .context(HttpSnafu)?;
582            Self::check_void("DELETE", path, resp).await
583        })
584        .await
585    }
586
587    pub(crate) async fn post_with_query<B, T>(
588        &self,
589        path: &str,
590        body: &B,
591        query: &[(&str, String)],
592    ) -> Result<T, NifiError>
593    where
594        B: serde::Serialize,
595        T: DeserializeOwned,
596    {
597        self.with_retry(|| async {
598            tracing::debug!(method = "POST", path, "NiFi API request");
599            let url = self.api_url(path);
600            let resp = self
601                .authenticated(self.http.post(url).query(query))
602                .await
603                .json(body)
604                .send()
605                .await
606                .context(HttpSnafu)?;
607            Self::deserialize("POST", path, resp).await
608        })
609        .await
610    }
611
612    pub(crate) async fn delete_returning<T: DeserializeOwned>(
613        &self,
614        path: &str,
615    ) -> Result<T, NifiError> {
616        self.with_retry(|| async {
617            tracing::debug!(method = "DELETE", path, "NiFi API request");
618            let url = self.api_url(path);
619            let resp = self
620                .authenticated(self.http.delete(url))
621                .await
622                .send()
623                .await
624                .context(HttpSnafu)?;
625            Self::deserialize("DELETE", path, resp).await
626        })
627        .await
628    }
629
630    pub(crate) async fn delete(&self, path: &str) -> Result<(), NifiError> {
631        self.with_retry(|| async {
632            tracing::debug!(method = "DELETE", path, "NiFi API request");
633            let url = self.api_url(path);
634            let resp = self
635                .authenticated(self.http.delete(url))
636                .await
637                .send()
638                .await
639                .context(HttpSnafu)?;
640            Self::check_void("DELETE", path, resp).await
641        })
642        .await
643    }
644
645    /// Inner delete without auth retry, used by `logout` to avoid retrying
646    /// the logout call itself.
647    async fn delete_inner(&self, path: &str) -> Result<(), NifiError> {
648        tracing::debug!(method = "DELETE", path, "NiFi API request");
649        let url = self.api_url(path);
650        let resp = self
651            .authenticated(self.http.delete(url))
652            .await
653            .send()
654            .await
655            .context(HttpSnafu)?;
656        Self::check_void("DELETE", path, resp).await
657    }
658
659    async fn authenticated(&self, req: reqwest::RequestBuilder) -> reqwest::RequestBuilder {
660        let guard = self.token.read().await;
661        match guard.as_deref() {
662            Some(token) => req.bearer_auth(token),
663            None => {
664                tracing::warn!(
665                    "sending NiFi API request without a bearer token — call login() first"
666                );
667                req
668            }
669        }
670    }
671
672    async fn deserialize<T: DeserializeOwned>(
673        method: &str,
674        path: &str,
675        resp: reqwest::Response,
676    ) -> Result<T, NifiError> {
677        let status = resp.status();
678        tracing::debug!(method, path, status = status.as_u16(), "NiFi API response");
679        if status.is_success() {
680            return resp.json::<T>().await.context(HttpSnafu);
681        }
682        let body = resp.text().await.unwrap_or_else(|_| status.to_string());
683        tracing::debug!(method, path, status = status.as_u16(), %body, "NiFi API raw error body");
684        let message = extract_error_message(&body);
685        tracing::warn!(method, path, status = status.as_u16(), %message, "NiFi API error");
686        Err(crate::error::api_error(status.as_u16(), message))
687    }
688
689    /// Check a void response (no JSON body expected). Returns `Ok(())` on success,
690    /// or the appropriate error.
691    async fn check_void(
692        method: &str,
693        path: &str,
694        resp: reqwest::Response,
695    ) -> Result<(), NifiError> {
696        let status = resp.status();
697        tracing::debug!(method, path, status = status.as_u16(), "NiFi API response");
698        if status.is_success() {
699            return Ok(());
700        }
701        let body = resp.text().await.unwrap_or_else(|_| status.to_string());
702        tracing::debug!(method, path, status = status.as_u16(), %body, "NiFi API raw error body");
703        let message = extract_error_message(&body);
704        tracing::warn!(method, path, status = status.as_u16(), %message, "NiFi API error");
705        Err(crate::error::api_error(status.as_u16(), message))
706    }
707
708    /// Like `check_void`, but also treats 302 as success.
709    async fn check_void_with_redirect(
710        method: &str,
711        path: &str,
712        resp: reqwest::Response,
713    ) -> Result<(), NifiError> {
714        let status = resp.status();
715        tracing::debug!(method, path, status = status.as_u16(), "NiFi API response");
716        if status.is_success() || status.as_u16() == 302 {
717            return Ok(());
718        }
719        let body = resp.text().await.unwrap_or_else(|_| status.to_string());
720        tracing::debug!(method, path, status = status.as_u16(), %body, "NiFi API raw error body");
721        let message = extract_error_message(&body);
722        tracing::warn!(method, path, status = status.as_u16(), %message, "NiFi API error");
723        Err(crate::error::api_error(status.as_u16(), message))
724    }
725
726    pub(crate) fn api_url(&self, path: &str) -> Url {
727        let mut url = self.base_url.clone();
728        url.set_path(&format!("/nifi-api{path}"));
729        url
730    }
731}
732
733/// Extract a human-readable message from a NiFi error response body.
734///
735/// NiFi returns either a JSON object with a `"message"` field or plain text.
736/// Logs the raw body at `debug` level before extracting.
737pub fn extract_error_message(body: &str) -> String {
738    serde_json::from_str::<serde_json::Value>(body)
739        .ok()
740        .and_then(|v| v["message"].as_str().map(str::to_owned))
741        .unwrap_or_else(|| body.to_owned())
742}