Skip to main content

nifi_rust_client/
client.rs

1use reqwest::Client;
2use serde::de::DeserializeOwned;
3use snafu::ResultExt as _;
4use url::Url;
5
6use crate::NifiError;
7use crate::error::{ApiSnafu, AuthSnafu, HttpSnafu};
8
9/// Client for the Apache NiFi REST API.
10#[derive(Debug, Clone)]
11pub struct NifiClient {
12    base_url: Url,
13    http: Client,
14    token: Option<String>,
15}
16
17impl NifiClient {
18    /// Construct a client from pre-built parts. Used by [`crate::NifiClientBuilder`].
19    pub(crate) fn from_parts(base_url: Url, http: Client) -> Self {
20        Self {
21            base_url,
22            http,
23            token: None,
24        }
25    }
26
27    /// Return the current bearer token, if one has been set.
28    ///
29    /// The token is a NiFi-issued JWT. You can persist it between process restarts
30    /// and restore it with [`set_token`][Self::set_token] to avoid re-authenticating.
31    /// The token will eventually expire (NiFi default: 12 hours); when it does, any
32    /// API call returns [`NifiError::Api`] with `status == 401`. Re-call
33    /// [`login`][Self::login] to obtain a fresh token.
34    pub fn token(&self) -> Option<&str> {
35        self.token.as_deref()
36    }
37
38    /// Restore a previously obtained bearer token.
39    ///
40    /// Useful for CLI tools that persist the token in a file between sessions.
41    /// If the token has expired, the next API call will return
42    /// [`NifiError::Api`] with `status == 401`; re-call [`login`][Self::login]
43    /// to obtain a fresh one.
44    pub fn set_token(&mut self, token: String) {
45        self.token = Some(token);
46    }
47
48    /// Invalidate the current bearer token and clear it from the client.
49    ///
50    /// Sends `DELETE /nifi-api/access/logout` to invalidate the token server-side,
51    /// then clears the local token unconditionally so that subsequent requests are
52    /// not sent with a stale credential.
53    ///
54    /// If the server returns an error (e.g. `401` because the token had already
55    /// expired) the local token is still cleared and the error is returned to the
56    /// caller.
57    pub async fn logout(&mut self) -> Result<(), NifiError> {
58        let result = self.delete("/access/logout").await;
59        self.token = None;
60        if result.is_ok() {
61            tracing::info!("NiFi logout successful");
62        }
63        result
64    }
65
66    /// Authenticate with NiFi using single-user credentials.
67    ///
68    /// Obtains a JWT token from `/nifi-api/access/token` and stores it on the
69    /// client for all subsequent requests.
70    ///
71    /// # Token lifetime and expiry
72    ///
73    /// NiFi JWTs expire after 12 hours by default (configurable server-side via
74    /// `nifi.security.user.login.identity.provider.expiration`). Once expired,
75    /// any API call returns [`NifiError::Api`] with `status == 401`. The
76    /// recommended pattern:
77    ///
78    /// ```no_run
79    /// # #[cfg(not(feature = "dynamic"))]
80    /// # async fn example(client: &mut nifi_rust_client::NifiClient) -> Result<(), nifi_rust_client::NifiError> {
81    /// use nifi_rust_client::NifiError;
82    /// match client.flow_api().get_about_info().await {
83    ///     Err(NifiError::Api { status: 401, .. }) => {
84    ///         client.login("admin", "password").await?;
85    ///         // retry the call
86    ///     }
87    ///     other => { other?; }
88    /// }
89    /// # Ok(())
90    /// # }
91    /// ```
92    ///
93    /// Automatic re-login is not built in: storing credentials on the client
94    /// would keep plaintext passwords in memory for the lifetime of the process,
95    /// which is undesirable in most contexts.
96    pub async fn login(&mut self, username: &str, password: &str) -> Result<(), NifiError> {
97        tracing::debug!(method = "POST", path = "/access/token", "NiFi API request");
98        let url = self.api_url("/access/token");
99        let resp = self
100            .http
101            .post(url)
102            .form(&[("username", username), ("password", password)])
103            .send()
104            .await
105            .context(HttpSnafu)?;
106
107        let status = resp.status();
108        tracing::debug!(
109            method = "POST",
110            path = "/access/token",
111            status = status.as_u16(),
112            "NiFi API response"
113        );
114        if !status.is_success() {
115            let body = resp.text().await.unwrap_or_else(|_| status.to_string());
116            tracing::debug!(
117                method = "POST",
118                path = "/access/token",
119                status = status.as_u16(),
120                %body,
121                "NiFi API raw error body"
122            );
123            let message = extract_error_message(&body);
124            tracing::warn!(
125                method = "POST",
126                path = "/access/token",
127                status = status.as_u16(),
128                %message,
129                "NiFi API error"
130            );
131            return AuthSnafu { message }.fail();
132        }
133
134        let token = resp.text().await.context(HttpSnafu)?;
135        self.token = Some(token);
136        tracing::info!("NiFi login successful for {username}");
137        Ok(())
138    }
139
140    // ── Private helpers ───────────────────────────────────────────────────────
141
142    pub(crate) async fn get<T: DeserializeOwned>(&self, path: &str) -> Result<T, NifiError> {
143        tracing::debug!(method = "GET", path, "NiFi API request");
144        let url = self.api_url(path);
145        let resp = self
146            .authenticated(self.http.get(url))
147            .send()
148            .await
149            .context(HttpSnafu)?;
150        Self::deserialize("GET", path, resp).await
151    }
152
153    pub(crate) async fn post<B, T>(&self, path: &str, body: &B) -> Result<T, NifiError>
154    where
155        B: serde::Serialize,
156        T: DeserializeOwned,
157    {
158        tracing::debug!(method = "POST", path, "NiFi API request");
159        let url = self.api_url(path);
160        let resp = self
161            .authenticated(self.http.post(url))
162            .json(body)
163            .send()
164            .await
165            .context(HttpSnafu)?;
166        Self::deserialize("POST", path, resp).await
167    }
168
169    pub(crate) async fn put<B, T>(&self, path: &str, body: &B) -> Result<T, NifiError>
170    where
171        B: serde::Serialize,
172        T: DeserializeOwned,
173    {
174        tracing::debug!(method = "PUT", path, "NiFi API request");
175        let url = self.api_url(path);
176        let resp = self
177            .authenticated(self.http.put(url))
178            .json(body)
179            .send()
180            .await
181            .context(HttpSnafu)?;
182        Self::deserialize("PUT", path, resp).await
183    }
184
185    /// POST that ignores the response body (for endpoints with no JSON response).
186    pub(crate) async fn post_void<B: serde::Serialize>(
187        &self,
188        path: &str,
189        body: &B,
190    ) -> Result<(), NifiError> {
191        tracing::debug!(method = "POST", path, "NiFi API request");
192        let url = self.api_url(path);
193        let resp = self
194            .authenticated(self.http.post(url))
195            .json(body)
196            .send()
197            .await
198            .context(HttpSnafu)?;
199        let status = resp.status();
200        tracing::debug!(
201            method = "POST",
202            path,
203            status = status.as_u16(),
204            "NiFi API response"
205        );
206        if status.is_success() {
207            return Ok(());
208        }
209        let body = resp.text().await.unwrap_or_else(|_| status.to_string());
210        tracing::debug!(method = "POST", path, status = status.as_u16(), %body, "NiFi API raw error body");
211        let message = extract_error_message(&body);
212        tracing::warn!(method = "POST", path, status = status.as_u16(), %message, "NiFi API error");
213        ApiSnafu {
214            status: status.as_u16(),
215            message,
216        }
217        .fail()
218    }
219
220    /// PUT that ignores the response body (for endpoints with no JSON response).
221    #[allow(dead_code)]
222    pub(crate) async fn put_void<B: serde::Serialize>(
223        &self,
224        path: &str,
225        body: &B,
226    ) -> Result<(), NifiError> {
227        tracing::debug!(method = "PUT", path, "NiFi API request");
228        let url = self.api_url(path);
229        let resp = self
230            .authenticated(self.http.put(url))
231            .json(body)
232            .send()
233            .await
234            .context(HttpSnafu)?;
235        let status = resp.status();
236        tracing::debug!(
237            method = "PUT",
238            path,
239            status = status.as_u16(),
240            "NiFi API response"
241        );
242        if status.is_success() {
243            return Ok(());
244        }
245        let body = resp.text().await.unwrap_or_else(|_| status.to_string());
246        tracing::debug!(method = "PUT", path, status = status.as_u16(), %body, "NiFi API raw error body");
247        let message = extract_error_message(&body);
248        tracing::warn!(method = "PUT", path, status = status.as_u16(), %message, "NiFi API error");
249        ApiSnafu {
250            status: status.as_u16(),
251            message,
252        }
253        .fail()
254    }
255
256    /// POST with no request body; deserializes the JSON response.
257    pub(crate) async fn post_no_body<T: DeserializeOwned>(
258        &self,
259        path: &str,
260    ) -> Result<T, NifiError> {
261        tracing::debug!(method = "POST", path, "NiFi API request");
262        let url = self.api_url(path);
263        let resp = self
264            .authenticated(self.http.post(url))
265            .send()
266            .await
267            .context(HttpSnafu)?;
268        Self::deserialize("POST", path, resp).await
269    }
270
271    /// POST with no request body; ignores the response body.
272    // Used by the code generator for void no-body POST endpoints without query params.
273    // No current NiFi 2.x endpoint triggers this path, but keep it for forward compatibility.
274    #[allow(dead_code)]
275    pub(crate) async fn post_void_no_body(&self, path: &str) -> Result<(), NifiError> {
276        tracing::debug!(method = "POST", path, "NiFi API request");
277        let url = self.api_url(path);
278        let resp = self
279            .authenticated(self.http.post(url))
280            .send()
281            .await
282            .context(HttpSnafu)?;
283        let status = resp.status();
284        tracing::debug!(
285            method = "POST",
286            path,
287            status = status.as_u16(),
288            "NiFi API response"
289        );
290        if status.is_success() {
291            return Ok(());
292        }
293        let body = resp.text().await.unwrap_or_else(|_| status.to_string());
294        tracing::debug!(method = "POST", path, status = status.as_u16(), %body, "NiFi API raw error body");
295        let message = extract_error_message(&body);
296        tracing::warn!(method = "POST", path, status = status.as_u16(), %message, "NiFi API error");
297        ApiSnafu {
298            status: status.as_u16(),
299            message,
300        }
301        .fail()
302    }
303
304    /// PUT with no request body; deserializes the JSON response.
305    pub(crate) async fn put_no_body<T: DeserializeOwned>(
306        &self,
307        path: &str,
308    ) -> Result<T, NifiError> {
309        tracing::debug!(method = "PUT", path, "NiFi API request");
310        let url = self.api_url(path);
311        let resp = self
312            .authenticated(self.http.put(url))
313            .send()
314            .await
315            .context(HttpSnafu)?;
316        Self::deserialize("PUT", path, resp).await
317    }
318
319    /// PUT with no request body; ignores the response body.
320    #[allow(dead_code)]
321    pub(crate) async fn put_void_no_body(&self, path: &str) -> Result<(), NifiError> {
322        tracing::debug!(method = "PUT", path, "NiFi API request");
323        let url = self.api_url(path);
324        let resp = self
325            .authenticated(self.http.put(url))
326            .send()
327            .await
328            .context(HttpSnafu)?;
329        let status = resp.status();
330        tracing::debug!(
331            method = "PUT",
332            path,
333            status = status.as_u16(),
334            "NiFi API response"
335        );
336        if status.is_success() {
337            return Ok(());
338        }
339        let body = resp.text().await.unwrap_or_else(|_| status.to_string());
340        tracing::debug!(method = "PUT", path, status = status.as_u16(), %body, "NiFi API raw error body");
341        let message = extract_error_message(&body);
342        tracing::warn!(method = "PUT", path, status = status.as_u16(), %message, "NiFi API error");
343        ApiSnafu {
344            status: status.as_u16(),
345            message,
346        }
347        .fail()
348    }
349
350    /// POST with `application/octet-stream` body.
351    ///
352    /// Used for binary upload endpoints (e.g. NAR upload).
353    /// `filename` is sent as the `Filename` request header when provided.
354    pub(crate) async fn post_octet_stream<T: DeserializeOwned>(
355        &self,
356        path: &str,
357        filename: Option<&str>,
358        data: Vec<u8>,
359    ) -> Result<T, NifiError> {
360        tracing::debug!(method = "POST", path, "NiFi API request");
361        let url = self.api_url(path);
362        let builder = self
363            .authenticated(self.http.post(url))
364            .header("Content-Type", "application/octet-stream")
365            .body(data);
366        let builder = if let Some(name) = filename {
367            builder.header("Filename", name)
368        } else {
369            builder
370        };
371        let resp = builder.send().await.context(HttpSnafu)?;
372        Self::deserialize("POST", path, resp).await
373    }
374
375    /// POST with `application/octet-stream` body; ignores the response body.
376    ///
377    /// Used for binary upload endpoints that return no JSON response.
378    /// `filename` is sent as the `Filename` request header when provided.
379    pub(crate) async fn post_void_octet_stream(
380        &self,
381        path: &str,
382        filename: Option<&str>,
383        data: Vec<u8>,
384    ) -> Result<(), NifiError> {
385        tracing::debug!(method = "POST", path, "NiFi API request");
386        let url = self.api_url(path);
387        let builder = self
388            .authenticated(self.http.post(url))
389            .header("Content-Type", "application/octet-stream")
390            .body(data);
391        let builder = if let Some(name) = filename {
392            builder.header("Filename", name)
393        } else {
394            builder
395        };
396        let resp = builder.send().await.context(HttpSnafu)?;
397        let status = resp.status();
398        tracing::debug!(
399            method = "POST",
400            path,
401            status = status.as_u16(),
402            "NiFi API response"
403        );
404        if status.is_success() {
405            return Ok(());
406        }
407        let body = resp.text().await.unwrap_or_else(|_| status.to_string());
408        tracing::debug!(method = "POST", path, status = status.as_u16(), %body, "NiFi API raw error body");
409        let message = extract_error_message(&body);
410        tracing::warn!(method = "POST", path, status = status.as_u16(), %message, "NiFi API error");
411        ApiSnafu {
412            status: status.as_u16(),
413            message,
414        }
415        .fail()
416    }
417
418    /// POST with query parameters; ignores the response body.
419    ///
420    /// Used for endpoints that accept query parameters and have no JSON response body.
421    #[allow(dead_code)]
422    pub(crate) async fn post_void_with_query<B: serde::Serialize>(
423        &self,
424        path: &str,
425        body: &B,
426        query: &[(&str, String)],
427    ) -> Result<(), NifiError> {
428        tracing::debug!(method = "POST", path, "NiFi API request");
429        let url = self.api_url(path);
430        let resp = self
431            .authenticated(self.http.post(url).query(query))
432            .json(body)
433            .send()
434            .await
435            .context(HttpSnafu)?;
436        let status = resp.status();
437        tracing::debug!(
438            method = "POST",
439            path,
440            status = status.as_u16(),
441            "NiFi API response"
442        );
443        if status.is_success() {
444            return Ok(());
445        }
446        let body = resp.text().await.unwrap_or_else(|_| status.to_string());
447        tracing::debug!(method = "POST", path, status = status.as_u16(), %body, "NiFi API raw error body");
448        let message = extract_error_message(&body);
449        tracing::warn!(method = "POST", path, status = status.as_u16(), %message, "NiFi API error");
450        ApiSnafu {
451            status: status.as_u16(),
452            message,
453        }
454        .fail()
455    }
456
457    /// GET that ignores the response body (for endpoints with no JSON response).
458    ///
459    /// Treats 302 as success in addition to 2xx: NiFi's `GET /access/logout/complete`
460    /// responds with a redirect once the logout is complete.
461    pub(crate) async fn get_void(&self, path: &str) -> Result<(), NifiError> {
462        tracing::debug!(method = "GET", path, "NiFi API request");
463        let url = self.api_url(path);
464        let resp = self
465            .authenticated(self.http.get(url))
466            .send()
467            .await
468            .context(HttpSnafu)?;
469        let status = resp.status();
470        tracing::debug!(
471            method = "GET",
472            path,
473            status = status.as_u16(),
474            "NiFi API response"
475        );
476        if status.is_success() || status.as_u16() == 302 {
477            return Ok(());
478        }
479        let body = resp.text().await.unwrap_or_else(|_| status.to_string());
480        tracing::debug!(method = "GET", path, status = status.as_u16(), %body, "NiFi API raw error body");
481        let message = extract_error_message(&body);
482        tracing::warn!(method = "GET", path, status = status.as_u16(), %message, "NiFi API error");
483        ApiSnafu {
484            status: status.as_u16(),
485            message,
486        }
487        .fail()
488    }
489
490    pub(crate) async fn get_with_query<T: DeserializeOwned>(
491        &self,
492        path: &str,
493        query: &[(&str, String)],
494    ) -> Result<T, NifiError> {
495        tracing::debug!(method = "GET", path, "NiFi API request");
496        let url = self.api_url(path);
497        let resp = self
498            .authenticated(self.http.get(url).query(query))
499            .send()
500            .await
501            .context(HttpSnafu)?;
502        Self::deserialize("GET", path, resp).await
503    }
504
505    pub(crate) async fn get_void_with_query(
506        &self,
507        path: &str,
508        query: &[(&str, String)],
509    ) -> Result<(), NifiError> {
510        tracing::debug!(method = "GET", path, "NiFi API request");
511        let url = self.api_url(path);
512        let resp = self
513            .authenticated(self.http.get(url).query(query))
514            .send()
515            .await
516            .context(HttpSnafu)?;
517        let status = resp.status();
518        tracing::debug!(
519            method = "GET",
520            path,
521            status = status.as_u16(),
522            "NiFi API response"
523        );
524        if status.is_success() || status.as_u16() == 302 {
525            return Ok(());
526        }
527        let body = resp.text().await.unwrap_or_else(|_| status.to_string());
528        tracing::debug!(method = "GET", path, status = status.as_u16(), %body, "NiFi API raw error body");
529        let message = extract_error_message(&body);
530        tracing::warn!(method = "GET", path, status = status.as_u16(), %message, "NiFi API error");
531        ApiSnafu {
532            status: status.as_u16(),
533            message,
534        }
535        .fail()
536    }
537
538    pub(crate) async fn delete_returning_with_query<T: DeserializeOwned>(
539        &self,
540        path: &str,
541        query: &[(&str, String)],
542    ) -> Result<T, NifiError> {
543        tracing::debug!(method = "DELETE", path, "NiFi API request");
544        let url = self.api_url(path);
545        let resp = self
546            .authenticated(self.http.delete(url).query(query))
547            .send()
548            .await
549            .context(HttpSnafu)?;
550        Self::deserialize("DELETE", path, resp).await
551    }
552
553    pub(crate) async fn delete_with_query(
554        &self,
555        path: &str,
556        query: &[(&str, String)],
557    ) -> Result<(), NifiError> {
558        tracing::debug!(method = "DELETE", path, "NiFi API request");
559        let url = self.api_url(path);
560        let resp = self
561            .authenticated(self.http.delete(url).query(query))
562            .send()
563            .await
564            .context(HttpSnafu)?;
565        let status = resp.status();
566        tracing::debug!(
567            method = "DELETE",
568            path,
569            status = status.as_u16(),
570            "NiFi API response"
571        );
572        if status.is_success() {
573            return Ok(());
574        }
575        let body = resp.text().await.unwrap_or_else(|_| status.to_string());
576        tracing::debug!(method = "DELETE", path, status = status.as_u16(), %body, "NiFi API raw error body");
577        let message = extract_error_message(&body);
578        tracing::warn!(method = "DELETE", path, status = status.as_u16(), %message, "NiFi API error");
579        ApiSnafu {
580            status: status.as_u16(),
581            message,
582        }
583        .fail()
584    }
585
586    pub(crate) async fn post_with_query<B, T>(
587        &self,
588        path: &str,
589        body: &B,
590        query: &[(&str, String)],
591    ) -> Result<T, NifiError>
592    where
593        B: serde::Serialize,
594        T: DeserializeOwned,
595    {
596        tracing::debug!(method = "POST", path, "NiFi API request");
597        let url = self.api_url(path);
598        let resp = self
599            .authenticated(self.http.post(url).query(query))
600            .json(body)
601            .send()
602            .await
603            .context(HttpSnafu)?;
604        Self::deserialize("POST", path, resp).await
605    }
606
607    pub(crate) async fn delete_returning<T: DeserializeOwned>(
608        &self,
609        path: &str,
610    ) -> Result<T, NifiError> {
611        tracing::debug!(method = "DELETE", path, "NiFi API request");
612        let url = self.api_url(path);
613        let resp = self
614            .authenticated(self.http.delete(url))
615            .send()
616            .await
617            .context(HttpSnafu)?;
618        Self::deserialize("DELETE", path, resp).await
619    }
620
621    pub(crate) async fn delete(&self, path: &str) -> Result<(), NifiError> {
622        tracing::debug!(method = "DELETE", path, "NiFi API request");
623        let url = self.api_url(path);
624        let resp = self
625            .authenticated(self.http.delete(url))
626            .send()
627            .await
628            .context(HttpSnafu)?;
629        let status = resp.status();
630        tracing::debug!(
631            method = "DELETE",
632            path,
633            status = status.as_u16(),
634            "NiFi API response"
635        );
636        if status.is_success() {
637            return Ok(());
638        }
639        let body = resp.text().await.unwrap_or_else(|_| status.to_string());
640        tracing::debug!(method = "DELETE", path, status = status.as_u16(), %body, "NiFi API raw error body");
641        let message = extract_error_message(&body);
642        tracing::warn!(method = "DELETE", path, status = status.as_u16(), %message, "NiFi API error");
643        ApiSnafu {
644            status: status.as_u16(),
645            message,
646        }
647        .fail()
648    }
649
650    fn authenticated(&self, req: reqwest::RequestBuilder) -> reqwest::RequestBuilder {
651        match &self.token {
652            Some(token) => req.bearer_auth(token),
653            None => {
654                tracing::warn!(
655                    "sending NiFi API request without a bearer token — call login() first"
656                );
657                req
658            }
659        }
660    }
661
662    async fn deserialize<T: DeserializeOwned>(
663        method: &str,
664        path: &str,
665        resp: reqwest::Response,
666    ) -> Result<T, NifiError> {
667        let status = resp.status();
668        tracing::debug!(method, path, status = status.as_u16(), "NiFi API response");
669        if status.is_success() {
670            return resp.json::<T>().await.context(HttpSnafu);
671        }
672        let body = resp.text().await.unwrap_or_else(|_| status.to_string());
673        tracing::debug!(method, path, status = status.as_u16(), %body, "NiFi API raw error body");
674        let message = extract_error_message(&body);
675        tracing::warn!(method, path, status = status.as_u16(), %message, "NiFi API error");
676        ApiSnafu {
677            status: status.as_u16(),
678            message,
679        }
680        .fail()
681    }
682
683    pub(crate) fn api_url(&self, path: &str) -> Url {
684        let mut url = self.base_url.clone();
685        url.set_path(&format!("/nifi-api{path}"));
686        url
687    }
688}
689
690/// Extract a human-readable message from a NiFi error response body.
691///
692/// NiFi returns either a JSON object with a `"message"` field or plain text.
693/// Logs the raw body at `debug` level before extracting.
694pub fn extract_error_message(body: &str) -> String {
695    serde_json::from_str::<serde_json::Value>(body)
696        .ok()
697        .and_then(|v| v["message"].as_str().map(str::to_owned))
698        .unwrap_or_else(|| body.to_owned())
699}