Skip to main content

nifi_rust_client/
client.rs

1use reqwest::Client;
2use serde::de::DeserializeOwned;
3use snafu::ResultExt as _;
4use url::Url;
5
6use crate::NifiError;
7use crate::error::{ApiSnafu, AuthSnafu, HttpSnafu};
8
9/// Client for the Apache NiFi REST API.
10#[derive(Debug, Clone)]
11pub struct NifiClient {
12    base_url: Url,
13    http: Client,
14    token: Option<String>,
15}
16
17impl NifiClient {
18    /// Construct a client from pre-built parts. Used by [`crate::NifiClientBuilder`].
19    pub(crate) fn from_parts(base_url: Url, http: Client) -> Self {
20        Self {
21            base_url,
22            http,
23            token: None,
24        }
25    }
26
27    /// Return the current bearer token, if one has been set.
28    ///
29    /// The token is a NiFi-issued JWT. You can persist it between process restarts
30    /// and restore it with [`set_token`][Self::set_token] to avoid re-authenticating.
31    /// The token will eventually expire (NiFi default: 12 hours); when it does, any
32    /// API call returns [`NifiError::Api`] with `status == 401`. Re-call
33    /// [`login`][Self::login] to obtain a fresh token.
34    pub fn token(&self) -> Option<&str> {
35        self.token.as_deref()
36    }
37
38    /// Restore a previously obtained bearer token.
39    ///
40    /// Useful for CLI tools that persist the token in a file between sessions.
41    /// If the token has expired, the next API call will return
42    /// [`NifiError::Api`] with `status == 401`; re-call [`login`][Self::login]
43    /// to obtain a fresh one.
44    pub fn set_token(&mut self, token: String) {
45        self.token = Some(token);
46    }
47
48    /// Invalidate the current bearer token and clear it from the client.
49    ///
50    /// Sends `DELETE /nifi-api/access/logout` to invalidate the token server-side,
51    /// then clears the local token unconditionally so that subsequent requests are
52    /// not sent with a stale credential.
53    ///
54    /// If the server returns an error (e.g. `401` because the token had already
55    /// expired) the local token is still cleared and the error is returned to the
56    /// caller.
57    pub async fn logout(&mut self) -> Result<(), NifiError> {
58        let result = self.delete("/access/logout").await;
59        self.token = None;
60        if result.is_ok() {
61            tracing::info!("NiFi logout successful");
62        }
63        result
64    }
65
66    /// Authenticate with NiFi using single-user credentials.
67    ///
68    /// Obtains a JWT token from `/nifi-api/access/token` and stores it on the
69    /// client for all subsequent requests.
70    ///
71    /// # Token lifetime and expiry
72    ///
73    /// NiFi JWTs expire after 12 hours by default (configurable server-side via
74    /// `nifi.security.user.login.identity.provider.expiration`). Once expired,
75    /// any API call returns [`NifiError::Api`] with `status == 401`. The
76    /// recommended pattern:
77    ///
78    /// ```no_run
79    /// # #[cfg(not(feature = "dynamic"))]
80    /// # async fn example(client: &mut nifi_rust_client::NifiClient) -> Result<(), nifi_rust_client::NifiError> {
81    /// use nifi_rust_client::NifiError;
82    /// match client.flow_api().get_about_info().await {
83    ///     Err(NifiError::Api { status: 401, .. }) => {
84    ///         client.login("admin", "password").await?;
85    ///         // retry the call
86    ///     }
87    ///     other => { other?; }
88    /// }
89    /// # Ok(())
90    /// # }
91    /// ```
92    ///
93    /// Automatic re-login is not built in: storing credentials on the client
94    /// would keep plaintext passwords in memory for the lifetime of the process,
95    /// which is undesirable in most contexts.
96    pub async fn login(&mut self, username: &str, password: &str) -> Result<(), NifiError> {
97        tracing::debug!(method = "POST", path = "/access/token", "NiFi API request");
98        let url = self.api_url("/access/token");
99        let resp = self
100            .http
101            .post(url)
102            .form(&[("username", username), ("password", password)])
103            .send()
104            .await
105            .context(HttpSnafu)?;
106
107        let status = resp.status();
108        tracing::debug!(
109            method = "POST",
110            path = "/access/token",
111            status = status.as_u16(),
112            "NiFi API response"
113        );
114        if !status.is_success() {
115            let body = resp.text().await.unwrap_or_else(|_| status.to_string());
116            tracing::debug!(
117                method = "POST",
118                path = "/access/token",
119                status = status.as_u16(),
120                %body,
121                "NiFi API raw error body"
122            );
123            let message = extract_error_message(&body);
124            tracing::warn!(
125                method = "POST",
126                path = "/access/token",
127                status = status.as_u16(),
128                %message,
129                "NiFi API error"
130            );
131            return AuthSnafu { message }.fail();
132        }
133
134        let token = resp.text().await.context(HttpSnafu)?;
135        self.token = Some(token);
136        tracing::info!("NiFi login successful for {username}");
137        Ok(())
138    }
139
140    // ── Private helpers ───────────────────────────────────────────────────────
141
142    pub(crate) async fn get<T: DeserializeOwned>(&self, path: &str) -> Result<T, NifiError> {
143        tracing::debug!(method = "GET", path, "NiFi API request");
144        let url = self.api_url(path);
145        let resp = self
146            .authenticated(self.http.get(url))
147            .send()
148            .await
149            .context(HttpSnafu)?;
150        Self::deserialize("GET", path, resp).await
151    }
152
153    pub(crate) async fn post<B, T>(&self, path: &str, body: &B) -> Result<T, NifiError>
154    where
155        B: serde::Serialize,
156        T: DeserializeOwned,
157    {
158        tracing::debug!(method = "POST", path, "NiFi API request");
159        let url = self.api_url(path);
160        let resp = self
161            .authenticated(self.http.post(url))
162            .json(body)
163            .send()
164            .await
165            .context(HttpSnafu)?;
166        Self::deserialize("POST", path, resp).await
167    }
168
169    pub(crate) async fn put<B, T>(&self, path: &str, body: &B) -> Result<T, NifiError>
170    where
171        B: serde::Serialize,
172        T: DeserializeOwned,
173    {
174        tracing::debug!(method = "PUT", path, "NiFi API request");
175        let url = self.api_url(path);
176        let resp = self
177            .authenticated(self.http.put(url))
178            .json(body)
179            .send()
180            .await
181            .context(HttpSnafu)?;
182        Self::deserialize("PUT", path, resp).await
183    }
184
185    /// POST that ignores the response body (for endpoints with no JSON response).
186    pub(crate) async fn post_void<B: serde::Serialize>(
187        &self,
188        path: &str,
189        body: &B,
190    ) -> Result<(), NifiError> {
191        tracing::debug!(method = "POST", path, "NiFi API request");
192        let url = self.api_url(path);
193        let resp = self
194            .authenticated(self.http.post(url))
195            .json(body)
196            .send()
197            .await
198            .context(HttpSnafu)?;
199        let status = resp.status();
200        tracing::debug!(
201            method = "POST",
202            path,
203            status = status.as_u16(),
204            "NiFi API response"
205        );
206        if status.is_success() {
207            return Ok(());
208        }
209        let body = resp.text().await.unwrap_or_else(|_| status.to_string());
210        tracing::debug!(method = "POST", path, status = status.as_u16(), %body, "NiFi API raw error body");
211        let message = extract_error_message(&body);
212        tracing::warn!(method = "POST", path, status = status.as_u16(), %message, "NiFi API error");
213        ApiSnafu {
214            status: status.as_u16(),
215            message,
216        }
217        .fail()
218    }
219
220    /// PUT that ignores the response body (for endpoints with no JSON response).
221    #[allow(dead_code)]
222    pub(crate) async fn put_void<B: serde::Serialize>(
223        &self,
224        path: &str,
225        body: &B,
226    ) -> Result<(), NifiError> {
227        tracing::debug!(method = "PUT", path, "NiFi API request");
228        let url = self.api_url(path);
229        let resp = self
230            .authenticated(self.http.put(url))
231            .json(body)
232            .send()
233            .await
234            .context(HttpSnafu)?;
235        let status = resp.status();
236        tracing::debug!(
237            method = "PUT",
238            path,
239            status = status.as_u16(),
240            "NiFi API response"
241        );
242        if status.is_success() {
243            return Ok(());
244        }
245        let body = resp.text().await.unwrap_or_else(|_| status.to_string());
246        tracing::debug!(method = "PUT", path, status = status.as_u16(), %body, "NiFi API raw error body");
247        let message = extract_error_message(&body);
248        tracing::warn!(method = "PUT", path, status = status.as_u16(), %message, "NiFi API error");
249        ApiSnafu {
250            status: status.as_u16(),
251            message,
252        }
253        .fail()
254    }
255
256    /// POST with no request body; deserializes the JSON response.
257    pub(crate) async fn post_no_body<T: DeserializeOwned>(
258        &self,
259        path: &str,
260    ) -> Result<T, NifiError> {
261        tracing::debug!(method = "POST", path, "NiFi API request");
262        let url = self.api_url(path);
263        let resp = self
264            .authenticated(self.http.post(url))
265            .send()
266            .await
267            .context(HttpSnafu)?;
268        Self::deserialize("POST", path, resp).await
269    }
270
271    /// POST with no request body; ignores the response body.
272    pub(crate) async fn post_void_no_body(&self, path: &str) -> Result<(), NifiError> {
273        tracing::debug!(method = "POST", path, "NiFi API request");
274        let url = self.api_url(path);
275        let resp = self
276            .authenticated(self.http.post(url))
277            .send()
278            .await
279            .context(HttpSnafu)?;
280        let status = resp.status();
281        tracing::debug!(
282            method = "POST",
283            path,
284            status = status.as_u16(),
285            "NiFi API response"
286        );
287        if status.is_success() {
288            return Ok(());
289        }
290        let body = resp.text().await.unwrap_or_else(|_| status.to_string());
291        tracing::debug!(method = "POST", path, status = status.as_u16(), %body, "NiFi API raw error body");
292        let message = extract_error_message(&body);
293        tracing::warn!(method = "POST", path, status = status.as_u16(), %message, "NiFi API error");
294        ApiSnafu {
295            status: status.as_u16(),
296            message,
297        }
298        .fail()
299    }
300
301    /// PUT with no request body; deserializes the JSON response.
302    pub(crate) async fn put_no_body<T: DeserializeOwned>(
303        &self,
304        path: &str,
305    ) -> Result<T, NifiError> {
306        tracing::debug!(method = "PUT", path, "NiFi API request");
307        let url = self.api_url(path);
308        let resp = self
309            .authenticated(self.http.put(url))
310            .send()
311            .await
312            .context(HttpSnafu)?;
313        Self::deserialize("PUT", path, resp).await
314    }
315
316    /// PUT with no request body; ignores the response body.
317    #[allow(dead_code)]
318    pub(crate) async fn put_void_no_body(&self, path: &str) -> Result<(), NifiError> {
319        tracing::debug!(method = "PUT", path, "NiFi API request");
320        let url = self.api_url(path);
321        let resp = self
322            .authenticated(self.http.put(url))
323            .send()
324            .await
325            .context(HttpSnafu)?;
326        let status = resp.status();
327        tracing::debug!(
328            method = "PUT",
329            path,
330            status = status.as_u16(),
331            "NiFi API response"
332        );
333        if status.is_success() {
334            return Ok(());
335        }
336        let body = resp.text().await.unwrap_or_else(|_| status.to_string());
337        tracing::debug!(method = "PUT", path, status = status.as_u16(), %body, "NiFi API raw error body");
338        let message = extract_error_message(&body);
339        tracing::warn!(method = "PUT", path, status = status.as_u16(), %message, "NiFi API error");
340        ApiSnafu {
341            status: status.as_u16(),
342            message,
343        }
344        .fail()
345    }
346
347    /// POST with `application/octet-stream` body.
348    ///
349    /// Used for binary upload endpoints (e.g. NAR upload).
350    /// `filename` is sent as the `Filename` request header when provided.
351    pub(crate) async fn post_octet_stream<T: DeserializeOwned>(
352        &self,
353        path: &str,
354        filename: Option<&str>,
355        data: Vec<u8>,
356    ) -> Result<T, NifiError> {
357        tracing::debug!(method = "POST", path, "NiFi API request");
358        let url = self.api_url(path);
359        let builder = self
360            .authenticated(self.http.post(url))
361            .header("Content-Type", "application/octet-stream")
362            .body(data);
363        let builder = if let Some(name) = filename {
364            builder.header("Filename", name)
365        } else {
366            builder
367        };
368        let resp = builder.send().await.context(HttpSnafu)?;
369        Self::deserialize("POST", path, resp).await
370    }
371
372    /// POST with `application/octet-stream` body; ignores the response body.
373    ///
374    /// Used for binary upload endpoints that return no JSON response.
375    /// `filename` is sent as the `Filename` request header when provided.
376    pub(crate) async fn post_void_octet_stream(
377        &self,
378        path: &str,
379        filename: Option<&str>,
380        data: Vec<u8>,
381    ) -> Result<(), NifiError> {
382        tracing::debug!(method = "POST", path, "NiFi API request");
383        let url = self.api_url(path);
384        let builder = self
385            .authenticated(self.http.post(url))
386            .header("Content-Type", "application/octet-stream")
387            .body(data);
388        let builder = if let Some(name) = filename {
389            builder.header("Filename", name)
390        } else {
391            builder
392        };
393        let resp = builder.send().await.context(HttpSnafu)?;
394        let status = resp.status();
395        tracing::debug!(
396            method = "POST",
397            path,
398            status = status.as_u16(),
399            "NiFi API response"
400        );
401        if status.is_success() {
402            return Ok(());
403        }
404        let body = resp.text().await.unwrap_or_else(|_| status.to_string());
405        tracing::debug!(method = "POST", path, status = status.as_u16(), %body, "NiFi API raw error body");
406        let message = extract_error_message(&body);
407        tracing::warn!(method = "POST", path, status = status.as_u16(), %message, "NiFi API error");
408        ApiSnafu {
409            status: status.as_u16(),
410            message,
411        }
412        .fail()
413    }
414
415    /// POST with query parameters; ignores the response body.
416    ///
417    /// Used for endpoints that accept query parameters and have no JSON response body.
418    #[allow(dead_code)]
419    pub(crate) async fn post_void_with_query<B: serde::Serialize>(
420        &self,
421        path: &str,
422        body: &B,
423        query: &[(&str, String)],
424    ) -> Result<(), NifiError> {
425        tracing::debug!(method = "POST", path, "NiFi API request");
426        let url = self.api_url(path);
427        let resp = self
428            .authenticated(self.http.post(url).query(query))
429            .json(body)
430            .send()
431            .await
432            .context(HttpSnafu)?;
433        let status = resp.status();
434        tracing::debug!(
435            method = "POST",
436            path,
437            status = status.as_u16(),
438            "NiFi API response"
439        );
440        if status.is_success() {
441            return Ok(());
442        }
443        let body = resp.text().await.unwrap_or_else(|_| status.to_string());
444        tracing::debug!(method = "POST", path, status = status.as_u16(), %body, "NiFi API raw error body");
445        let message = extract_error_message(&body);
446        tracing::warn!(method = "POST", path, status = status.as_u16(), %message, "NiFi API error");
447        ApiSnafu {
448            status: status.as_u16(),
449            message,
450        }
451        .fail()
452    }
453
454    /// GET that ignores the response body (for endpoints with no JSON response).
455    ///
456    /// Treats 302 as success in addition to 2xx: NiFi's `GET /access/logout/complete`
457    /// responds with a redirect once the logout is complete.
458    pub(crate) async fn get_void(&self, path: &str) -> Result<(), NifiError> {
459        tracing::debug!(method = "GET", path, "NiFi API request");
460        let url = self.api_url(path);
461        let resp = self
462            .authenticated(self.http.get(url))
463            .send()
464            .await
465            .context(HttpSnafu)?;
466        let status = resp.status();
467        tracing::debug!(
468            method = "GET",
469            path,
470            status = status.as_u16(),
471            "NiFi API response"
472        );
473        if status.is_success() || status.as_u16() == 302 {
474            return Ok(());
475        }
476        let body = resp.text().await.unwrap_or_else(|_| status.to_string());
477        tracing::debug!(method = "GET", path, status = status.as_u16(), %body, "NiFi API raw error body");
478        let message = extract_error_message(&body);
479        tracing::warn!(method = "GET", path, status = status.as_u16(), %message, "NiFi API error");
480        ApiSnafu {
481            status: status.as_u16(),
482            message,
483        }
484        .fail()
485    }
486
487    pub(crate) async fn get_with_query<T: DeserializeOwned>(
488        &self,
489        path: &str,
490        query: &[(&str, String)],
491    ) -> Result<T, NifiError> {
492        tracing::debug!(method = "GET", path, "NiFi API request");
493        let url = self.api_url(path);
494        let resp = self
495            .authenticated(self.http.get(url).query(query))
496            .send()
497            .await
498            .context(HttpSnafu)?;
499        Self::deserialize("GET", path, resp).await
500    }
501
502    pub(crate) async fn get_void_with_query(
503        &self,
504        path: &str,
505        query: &[(&str, String)],
506    ) -> Result<(), NifiError> {
507        tracing::debug!(method = "GET", path, "NiFi API request");
508        let url = self.api_url(path);
509        let resp = self
510            .authenticated(self.http.get(url).query(query))
511            .send()
512            .await
513            .context(HttpSnafu)?;
514        let status = resp.status();
515        tracing::debug!(
516            method = "GET",
517            path,
518            status = status.as_u16(),
519            "NiFi API response"
520        );
521        if status.is_success() || status.as_u16() == 302 {
522            return Ok(());
523        }
524        let body = resp.text().await.unwrap_or_else(|_| status.to_string());
525        tracing::debug!(method = "GET", path, status = status.as_u16(), %body, "NiFi API raw error body");
526        let message = extract_error_message(&body);
527        tracing::warn!(method = "GET", path, status = status.as_u16(), %message, "NiFi API error");
528        ApiSnafu {
529            status: status.as_u16(),
530            message,
531        }
532        .fail()
533    }
534
535    pub(crate) async fn delete_returning_with_query<T: DeserializeOwned>(
536        &self,
537        path: &str,
538        query: &[(&str, String)],
539    ) -> Result<T, NifiError> {
540        tracing::debug!(method = "DELETE", path, "NiFi API request");
541        let url = self.api_url(path);
542        let resp = self
543            .authenticated(self.http.delete(url).query(query))
544            .send()
545            .await
546            .context(HttpSnafu)?;
547        Self::deserialize("DELETE", path, resp).await
548    }
549
550    pub(crate) async fn delete_with_query(
551        &self,
552        path: &str,
553        query: &[(&str, String)],
554    ) -> Result<(), NifiError> {
555        tracing::debug!(method = "DELETE", path, "NiFi API request");
556        let url = self.api_url(path);
557        let resp = self
558            .authenticated(self.http.delete(url).query(query))
559            .send()
560            .await
561            .context(HttpSnafu)?;
562        let status = resp.status();
563        tracing::debug!(
564            method = "DELETE",
565            path,
566            status = status.as_u16(),
567            "NiFi API response"
568        );
569        if status.is_success() {
570            return Ok(());
571        }
572        let body = resp.text().await.unwrap_or_else(|_| status.to_string());
573        tracing::debug!(method = "DELETE", path, status = status.as_u16(), %body, "NiFi API raw error body");
574        let message = extract_error_message(&body);
575        tracing::warn!(method = "DELETE", path, status = status.as_u16(), %message, "NiFi API error");
576        ApiSnafu {
577            status: status.as_u16(),
578            message,
579        }
580        .fail()
581    }
582
583    pub(crate) async fn post_with_query<B, T>(
584        &self,
585        path: &str,
586        body: &B,
587        query: &[(&str, String)],
588    ) -> Result<T, NifiError>
589    where
590        B: serde::Serialize,
591        T: DeserializeOwned,
592    {
593        tracing::debug!(method = "POST", path, "NiFi API request");
594        let url = self.api_url(path);
595        let resp = self
596            .authenticated(self.http.post(url).query(query))
597            .json(body)
598            .send()
599            .await
600            .context(HttpSnafu)?;
601        Self::deserialize("POST", path, resp).await
602    }
603
604    pub(crate) async fn delete_returning<T: DeserializeOwned>(
605        &self,
606        path: &str,
607    ) -> Result<T, NifiError> {
608        tracing::debug!(method = "DELETE", path, "NiFi API request");
609        let url = self.api_url(path);
610        let resp = self
611            .authenticated(self.http.delete(url))
612            .send()
613            .await
614            .context(HttpSnafu)?;
615        Self::deserialize("DELETE", path, resp).await
616    }
617
618    pub(crate) async fn delete(&self, path: &str) -> Result<(), NifiError> {
619        tracing::debug!(method = "DELETE", path, "NiFi API request");
620        let url = self.api_url(path);
621        let resp = self
622            .authenticated(self.http.delete(url))
623            .send()
624            .await
625            .context(HttpSnafu)?;
626        let status = resp.status();
627        tracing::debug!(
628            method = "DELETE",
629            path,
630            status = status.as_u16(),
631            "NiFi API response"
632        );
633        if status.is_success() {
634            return Ok(());
635        }
636        let body = resp.text().await.unwrap_or_else(|_| status.to_string());
637        tracing::debug!(method = "DELETE", path, status = status.as_u16(), %body, "NiFi API raw error body");
638        let message = extract_error_message(&body);
639        tracing::warn!(method = "DELETE", path, status = status.as_u16(), %message, "NiFi API error");
640        ApiSnafu {
641            status: status.as_u16(),
642            message,
643        }
644        .fail()
645    }
646
647    fn authenticated(&self, req: reqwest::RequestBuilder) -> reqwest::RequestBuilder {
648        match &self.token {
649            Some(token) => req.bearer_auth(token),
650            None => {
651                tracing::warn!(
652                    "sending NiFi API request without a bearer token — call login() first"
653                );
654                req
655            }
656        }
657    }
658
659    async fn deserialize<T: DeserializeOwned>(
660        method: &str,
661        path: &str,
662        resp: reqwest::Response,
663    ) -> Result<T, NifiError> {
664        let status = resp.status();
665        tracing::debug!(method, path, status = status.as_u16(), "NiFi API response");
666        if status.is_success() {
667            return resp.json::<T>().await.context(HttpSnafu);
668        }
669        let body = resp.text().await.unwrap_or_else(|_| status.to_string());
670        tracing::debug!(method, path, status = status.as_u16(), %body, "NiFi API raw error body");
671        let message = extract_error_message(&body);
672        tracing::warn!(method, path, status = status.as_u16(), %message, "NiFi API error");
673        ApiSnafu {
674            status: status.as_u16(),
675            message,
676        }
677        .fail()
678    }
679
680    pub(crate) fn api_url(&self, path: &str) -> Url {
681        let mut url = self.base_url.clone();
682        url.set_path(&format!("/nifi-api{path}"));
683        url
684    }
685}
686
687/// Extract a human-readable message from a NiFi error response body.
688///
689/// NiFi returns either a JSON object with a `"message"` field or plain text.
690/// Logs the raw body at `debug` level before extracting.
691pub fn extract_error_message(body: &str) -> String {
692    serde_json::from_str::<serde_json::Value>(body)
693        .ok()
694        .and_then(|v| v["message"].as_str().map(str::to_owned))
695        .unwrap_or_else(|| body.to_owned())
696}