1#![deny(missing_docs)]
2use std::sync::Arc;
3
4use reqwest::Client;
5use serde::de::DeserializeOwned;
6use snafu::ResultExt as _;
7use tokio::sync::RwLock;
8use url::Url;
9
10use crate::NifiError;
11use crate::config::credentials::CredentialProvider;
12use crate::error::{AuthSnafu, HttpSnafu};
13
14pub struct NifiClient {
16 base_url: Url,
17 http: Client,
18 token: Arc<RwLock<Option<String>>>,
19 credentials: Option<Arc<dyn CredentialProvider>>,
20 #[allow(dead_code)]
21 retry_policy: Option<crate::config::retry::RetryPolicy>,
22}
23
24impl Clone for NifiClient {
25 fn clone(&self) -> Self {
26 Self {
27 base_url: self.base_url.clone(),
28 http: self.http.clone(),
29 token: Arc::clone(&self.token),
30 credentials: self.credentials.clone(),
31 retry_policy: self.retry_policy.clone(),
32 }
33 }
34}
35
36impl std::fmt::Debug for NifiClient {
37 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
38 f.debug_struct("NifiClient")
39 .field("base_url", &self.base_url)
40 .field(
41 "credentials",
42 &self.credentials.as_ref().map(|c| format!("{c:?}")),
43 )
44 .field("retry_policy", &self.retry_policy)
45 .finish_non_exhaustive()
46 }
47}
48
49impl NifiClient {
50 pub(crate) fn from_parts(
52 base_url: Url,
53 http: Client,
54 credentials: Option<Arc<dyn CredentialProvider>>,
55 retry_policy: Option<crate::config::retry::RetryPolicy>,
56 ) -> Self {
57 Self {
58 base_url,
59 http,
60 token: Arc::new(RwLock::new(None)),
61 credentials,
62 retry_policy,
63 }
64 }
65
66 pub async fn token(&self) -> Option<String> {
74 self.token.read().await.clone()
75 }
76
77 pub async fn set_token(&self, token: String) {
84 *self.token.write().await = Some(token);
85 }
86
87 pub async fn logout(&self) -> Result<(), NifiError> {
97 let result = self.delete_inner("/access/logout").await;
98 *self.token.write().await = None;
99 if result.is_ok() {
100 tracing::info!("NiFi logout successful");
101 }
102 result
103 }
104
105 pub async fn login(&self, username: &str, password: &str) -> Result<(), NifiError> {
118 tracing::debug!(method = "POST", path = "/access/token", "NiFi API request");
119 let url = self.api_url("/access/token");
120 let resp = self
121 .http
122 .post(url)
123 .form(&[("username", username), ("password", password)])
124 .send()
125 .await
126 .context(HttpSnafu)?;
127
128 let status = resp.status();
129 tracing::debug!(
130 method = "POST",
131 path = "/access/token",
132 status = status.as_u16(),
133 "NiFi API response"
134 );
135 if !status.is_success() {
136 let body = resp.text().await.unwrap_or_else(|_| status.to_string());
137 tracing::debug!(
138 method = "POST",
139 path = "/access/token",
140 status = status.as_u16(),
141 %body,
142 "NiFi API raw error body"
143 );
144 let message = extract_error_message(&body);
145 tracing::warn!(
146 method = "POST",
147 path = "/access/token",
148 status = status.as_u16(),
149 %message,
150 "NiFi API error"
151 );
152 return AuthSnafu { message }.fail();
153 }
154
155 let token = resp.text().await.context(HttpSnafu)?;
156 *self.token.write().await = Some(token);
157 tracing::info!("NiFi login successful for {username}");
158 Ok(())
159 }
160
161 pub async fn login_with_provider(&self) -> Result<(), NifiError> {
165 let creds = self.credentials.as_ref().ok_or_else(|| NifiError::Auth {
166 message: "no credential provider configured".to_string(),
167 })?;
168 let (username, password) = creds.credentials().await?;
169 self.login(&username, &password).await
170 }
171
172 #[tracing::instrument(skip_all)]
177 async fn with_auth_retry<T, F, Fut>(&self, f: F) -> Result<T, NifiError>
178 where
179 F: Fn() -> Fut,
180 Fut: std::future::Future<Output = Result<T, NifiError>>,
181 {
182 match f().await {
183 Err(NifiError::Unauthorized { .. }) if self.credentials.is_some() => {
184 tracing::info!("received 401, refreshing token via credential provider");
185 self.login_with_provider().await?;
186 f().await
187 }
188 other => other,
189 }
190 }
191
192 #[tracing::instrument(skip_all)]
200 async fn with_retry<T, F, Fut>(&self, f: F) -> Result<T, NifiError>
201 where
202 F: Fn() -> Fut,
203 Fut: std::future::Future<Output = Result<T, NifiError>>,
204 {
205 let Some(policy) = &self.retry_policy else {
206 return self.with_auth_retry(&f).await;
207 };
208
209 let mut last_err: Option<NifiError> = None;
210 for attempt in 0..=policy.max_retries {
211 if attempt > 0 {
212 let backoff = policy.backoff_for(attempt - 1);
213 tracing::info!(
214 attempt,
215 backoff_ms = backoff.as_millis() as u64,
216 "retrying after transient error"
217 );
218 tokio::time::sleep(backoff).await;
219 }
220 match self.with_auth_retry(&f).await {
221 Ok(v) => return Ok(v),
222 Err(e) if e.is_retryable() => {
223 tracing::warn!(attempt, error = %e, "transient error, will retry");
224 last_err = Some(e);
225 }
226 Err(e) => return Err(e),
227 }
228 }
229 match last_err {
232 Some(e) => Err(e),
233 None => self.with_auth_retry(&f).await,
235 }
236 }
237
238 pub(crate) async fn get<T: DeserializeOwned>(&self, path: &str) -> Result<T, NifiError> {
241 self.with_retry(|| async {
242 tracing::debug!(method = "GET", path, "NiFi API request");
243 let url = self.api_url(path);
244 let resp = self
245 .authenticated(self.http.get(url))
246 .await
247 .send()
248 .await
249 .context(HttpSnafu)?;
250 Self::deserialize("GET", path, resp).await
251 })
252 .await
253 }
254
255 pub(crate) async fn post<B, T>(&self, path: &str, body: &B) -> Result<T, NifiError>
256 where
257 B: serde::Serialize,
258 T: DeserializeOwned,
259 {
260 self.with_retry(|| async {
261 tracing::debug!(method = "POST", path, "NiFi API request");
262 let url = self.api_url(path);
263 let resp = self
264 .authenticated(self.http.post(url))
265 .await
266 .json(body)
267 .send()
268 .await
269 .context(HttpSnafu)?;
270 Self::deserialize("POST", path, resp).await
271 })
272 .await
273 }
274
275 pub(crate) async fn put<B, T>(&self, path: &str, body: &B) -> Result<T, NifiError>
276 where
277 B: serde::Serialize,
278 T: DeserializeOwned,
279 {
280 self.with_retry(|| async {
281 tracing::debug!(method = "PUT", path, "NiFi API request");
282 let url = self.api_url(path);
283 let resp = self
284 .authenticated(self.http.put(url))
285 .await
286 .json(body)
287 .send()
288 .await
289 .context(HttpSnafu)?;
290 Self::deserialize("PUT", path, resp).await
291 })
292 .await
293 }
294
295 #[allow(dead_code)]
299 pub(crate) async fn post_void<B: serde::Serialize>(
300 &self,
301 path: &str,
302 body: &B,
303 ) -> Result<(), NifiError> {
304 self.with_retry(|| async {
305 tracing::debug!(method = "POST", path, "NiFi API request");
306 let url = self.api_url(path);
307 let resp = self
308 .authenticated(self.http.post(url))
309 .await
310 .json(body)
311 .send()
312 .await
313 .context(HttpSnafu)?;
314 Self::check_void("POST", path, resp).await
315 })
316 .await
317 }
318
319 #[allow(dead_code)]
321 pub(crate) async fn put_void<B: serde::Serialize>(
322 &self,
323 path: &str,
324 body: &B,
325 ) -> Result<(), NifiError> {
326 self.with_retry(|| async {
327 tracing::debug!(method = "PUT", path, "NiFi API request");
328 let url = self.api_url(path);
329 let resp = self
330 .authenticated(self.http.put(url))
331 .await
332 .json(body)
333 .send()
334 .await
335 .context(HttpSnafu)?;
336 Self::check_void("PUT", path, resp).await
337 })
338 .await
339 }
340
341 pub(crate) async fn post_no_body<T: DeserializeOwned>(
343 &self,
344 path: &str,
345 ) -> Result<T, NifiError> {
346 self.with_retry(|| async {
347 tracing::debug!(method = "POST", path, "NiFi API request");
348 let url = self.api_url(path);
349 let resp = self
350 .authenticated(self.http.post(url))
351 .await
352 .send()
353 .await
354 .context(HttpSnafu)?;
355 Self::deserialize("POST", path, resp).await
356 })
357 .await
358 }
359
360 #[allow(dead_code)]
364 pub(crate) async fn post_void_no_body(&self, path: &str) -> Result<(), NifiError> {
365 self.with_retry(|| async {
366 tracing::debug!(method = "POST", path, "NiFi API request");
367 let url = self.api_url(path);
368 let resp = self
369 .authenticated(self.http.post(url))
370 .await
371 .send()
372 .await
373 .context(HttpSnafu)?;
374 Self::check_void("POST", path, resp).await
375 })
376 .await
377 }
378
379 pub(crate) async fn put_no_body<T: DeserializeOwned>(
381 &self,
382 path: &str,
383 ) -> Result<T, NifiError> {
384 self.with_retry(|| async {
385 tracing::debug!(method = "PUT", path, "NiFi API request");
386 let url = self.api_url(path);
387 let resp = self
388 .authenticated(self.http.put(url))
389 .await
390 .send()
391 .await
392 .context(HttpSnafu)?;
393 Self::deserialize("PUT", path, resp).await
394 })
395 .await
396 }
397
398 #[allow(dead_code)]
400 pub(crate) async fn put_void_no_body(&self, path: &str) -> Result<(), NifiError> {
401 self.with_retry(|| async {
402 tracing::debug!(method = "PUT", path, "NiFi API request");
403 let url = self.api_url(path);
404 let resp = self
405 .authenticated(self.http.put(url))
406 .await
407 .send()
408 .await
409 .context(HttpSnafu)?;
410 Self::check_void("PUT", path, resp).await
411 })
412 .await
413 }
414
415 pub(crate) async fn post_octet_stream<T: DeserializeOwned>(
420 &self,
421 path: &str,
422 filename: Option<&str>,
423 data: Vec<u8>,
424 ) -> Result<T, NifiError> {
425 self.with_retry(|| async {
426 tracing::debug!(method = "POST", path, "NiFi API request");
427 let url = self.api_url(path);
428 let builder = self
429 .authenticated(self.http.post(url))
430 .await
431 .header("Content-Type", "application/octet-stream")
432 .body(data.clone());
433 let builder = if let Some(name) = filename {
434 builder.header("Filename", name)
435 } else {
436 builder
437 };
438 let resp = builder.send().await.context(HttpSnafu)?;
439 Self::deserialize("POST", path, resp).await
440 })
441 .await
442 }
443
444 #[allow(dead_code)]
451 pub(crate) async fn post_void_octet_stream(
452 &self,
453 path: &str,
454 filename: Option<&str>,
455 data: Vec<u8>,
456 ) -> Result<(), NifiError> {
457 self.with_retry(|| async {
458 tracing::debug!(method = "POST", path, "NiFi API request");
459 let url = self.api_url(path);
460 let builder = self
461 .authenticated(self.http.post(url))
462 .await
463 .header("Content-Type", "application/octet-stream")
464 .body(data.clone());
465 let builder = if let Some(name) = filename {
466 builder.header("Filename", name)
467 } else {
468 builder
469 };
470 let resp = builder.send().await.context(HttpSnafu)?;
471 Self::check_void("POST", path, resp).await
472 })
473 .await
474 }
475
476 pub(crate) async fn post_multipart<T: DeserializeOwned>(
484 &self,
485 path: &str,
486 filename: &str,
487 data: Vec<u8>,
488 ) -> Result<T, NifiError> {
489 self.with_retry(|| async {
490 tracing::debug!(method = "POST", path, "NiFi API request");
491 let url = self.api_url(path);
492 let part =
493 reqwest::multipart::Part::bytes(data.clone()).file_name(filename.to_string());
494 let form = reqwest::multipart::Form::new().part("file", part);
495 let resp = self
496 .authenticated(self.http.post(url))
497 .await
498 .multipart(form)
499 .send()
500 .await
501 .context(HttpSnafu)?;
502 Self::deserialize("POST", path, resp).await
503 })
504 .await
505 }
506
507 #[allow(dead_code)]
512 pub(crate) async fn post_void_multipart(
513 &self,
514 path: &str,
515 filename: &str,
516 data: Vec<u8>,
517 ) -> Result<(), NifiError> {
518 self.with_retry(|| async {
519 tracing::debug!(method = "POST", path, "NiFi API request");
520 let url = self.api_url(path);
521 let part =
522 reqwest::multipart::Part::bytes(data.clone()).file_name(filename.to_string());
523 let form = reqwest::multipart::Form::new().part("file", part);
524 let resp = self
525 .authenticated(self.http.post(url))
526 .await
527 .multipart(form)
528 .send()
529 .await
530 .context(HttpSnafu)?;
531 Self::check_void("POST", path, resp).await
532 })
533 .await
534 }
535
536 #[allow(dead_code)]
540 pub(crate) async fn post_void_with_query<B: serde::Serialize>(
541 &self,
542 path: &str,
543 body: &B,
544 query: &[(&str, String)],
545 ) -> Result<(), NifiError> {
546 self.with_retry(|| async {
547 tracing::debug!(method = "POST", path, "NiFi API request");
548 let url = self.api_url(path);
549 let resp = self
550 .authenticated(self.http.post(url).query(query))
551 .await
552 .json(body)
553 .send()
554 .await
555 .context(HttpSnafu)?;
556 Self::check_void("POST", path, resp).await
557 })
558 .await
559 }
560
561 pub(crate) async fn get_void(&self, path: &str) -> Result<(), NifiError> {
566 self.with_retry(|| async {
567 tracing::debug!(method = "GET", path, "NiFi API request");
568 let url = self.api_url(path);
569 let resp = self
570 .authenticated(self.http.get(url))
571 .await
572 .send()
573 .await
574 .context(HttpSnafu)?;
575 Self::check_void_with_redirect("GET", path, resp).await
576 })
577 .await
578 }
579
580 pub(crate) async fn get_with_query<T: DeserializeOwned>(
581 &self,
582 path: &str,
583 query: &[(&str, String)],
584 ) -> Result<T, NifiError> {
585 self.with_retry(|| async {
586 tracing::debug!(method = "GET", path, "NiFi API request");
587 let url = self.api_url(path);
588 let resp = self
589 .authenticated(self.http.get(url).query(query))
590 .await
591 .send()
592 .await
593 .context(HttpSnafu)?;
594 Self::deserialize("GET", path, resp).await
595 })
596 .await
597 }
598
599 pub(crate) async fn get_text(&self, path: &str) -> Result<String, NifiError> {
601 self.with_retry(|| async {
602 tracing::debug!(method = "GET", path, "NiFi API request");
603 let url = self.api_url(path);
604 let resp = self
605 .authenticated(self.http.get(url))
606 .await
607 .send()
608 .await
609 .context(HttpSnafu)?;
610 Self::text("GET", path, resp).await
611 })
612 .await
613 }
614
615 pub(crate) async fn get_bytes(&self, path: &str) -> Result<Vec<u8>, NifiError> {
617 self.with_retry(|| async {
618 tracing::debug!(method = "GET", path, "NiFi API request");
619 let url = self.api_url(path);
620 let resp = self
621 .authenticated(self.http.get(url))
622 .await
623 .send()
624 .await
625 .context(HttpSnafu)?;
626 Self::bytes("GET", path, resp).await
627 })
628 .await
629 }
630
631 pub(crate) async fn get_bytes_with_query(
633 &self,
634 path: &str,
635 query: &[(&str, String)],
636 ) -> Result<Vec<u8>, NifiError> {
637 self.with_retry(|| async {
638 tracing::debug!(method = "GET", path, "NiFi API request");
639 let url = self.api_url(path);
640 let resp = self
641 .authenticated(self.http.get(url).query(query))
642 .await
643 .send()
644 .await
645 .context(HttpSnafu)?;
646 Self::bytes("GET", path, resp).await
647 })
648 .await
649 }
650
651 pub(crate) async fn post_returning_text<B: serde::Serialize>(
653 &self,
654 path: &str,
655 body: &B,
656 ) -> Result<String, NifiError> {
657 self.with_retry(|| async {
658 tracing::debug!(method = "POST", path, "NiFi API request");
659 let url = self.api_url(path);
660 let resp = self
661 .authenticated(self.http.post(url))
662 .await
663 .json(body)
664 .send()
665 .await
666 .context(HttpSnafu)?;
667 Self::text("POST", path, resp).await
668 })
669 .await
670 }
671
672 pub(crate) async fn post_octet_stream_returning_text(
674 &self,
675 path: &str,
676 data: Vec<u8>,
677 ) -> Result<String, NifiError> {
678 self.with_retry(|| async {
679 tracing::debug!(method = "POST", path, "NiFi API request");
680 let url = self.api_url(path);
681 let builder = self
682 .authenticated(self.http.post(url))
683 .await
684 .header("Content-Type", "application/octet-stream")
685 .body(data.clone());
686 let resp = builder.send().await.context(HttpSnafu)?;
687 Self::text("POST", path, resp).await
688 })
689 .await
690 }
691
692 #[allow(dead_code)]
695 pub(crate) async fn get_void_with_query(
696 &self,
697 path: &str,
698 query: &[(&str, String)],
699 ) -> Result<(), NifiError> {
700 self.with_retry(|| async {
701 tracing::debug!(method = "GET", path, "NiFi API request");
702 let url = self.api_url(path);
703 let resp = self
704 .authenticated(self.http.get(url).query(query))
705 .await
706 .send()
707 .await
708 .context(HttpSnafu)?;
709 Self::check_void_with_redirect("GET", path, resp).await
710 })
711 .await
712 }
713
714 pub(crate) async fn delete_returning_with_query<T: DeserializeOwned>(
715 &self,
716 path: &str,
717 query: &[(&str, String)],
718 ) -> Result<T, NifiError> {
719 self.with_retry(|| async {
720 tracing::debug!(method = "DELETE", path, "NiFi API request");
721 let url = self.api_url(path);
722 let resp = self
723 .authenticated(self.http.delete(url).query(query))
724 .await
725 .send()
726 .await
727 .context(HttpSnafu)?;
728 Self::deserialize("DELETE", path, resp).await
729 })
730 .await
731 }
732
733 pub(crate) async fn delete_with_query(
734 &self,
735 path: &str,
736 query: &[(&str, String)],
737 ) -> Result<(), NifiError> {
738 self.with_retry(|| async {
739 tracing::debug!(method = "DELETE", path, "NiFi API request");
740 let url = self.api_url(path);
741 let resp = self
742 .authenticated(self.http.delete(url).query(query))
743 .await
744 .send()
745 .await
746 .context(HttpSnafu)?;
747 Self::check_void("DELETE", path, resp).await
748 })
749 .await
750 }
751
752 pub(crate) async fn post_with_query<B, T>(
753 &self,
754 path: &str,
755 body: &B,
756 query: &[(&str, String)],
757 ) -> Result<T, NifiError>
758 where
759 B: serde::Serialize,
760 T: DeserializeOwned,
761 {
762 self.with_retry(|| async {
763 tracing::debug!(method = "POST", path, "NiFi API request");
764 let url = self.api_url(path);
765 let resp = self
766 .authenticated(self.http.post(url).query(query))
767 .await
768 .json(body)
769 .send()
770 .await
771 .context(HttpSnafu)?;
772 Self::deserialize("POST", path, resp).await
773 })
774 .await
775 }
776
777 pub(crate) async fn delete_returning<T: DeserializeOwned>(
778 &self,
779 path: &str,
780 ) -> Result<T, NifiError> {
781 self.with_retry(|| async {
782 tracing::debug!(method = "DELETE", path, "NiFi API request");
783 let url = self.api_url(path);
784 let resp = self
785 .authenticated(self.http.delete(url))
786 .await
787 .send()
788 .await
789 .context(HttpSnafu)?;
790 Self::deserialize("DELETE", path, resp).await
791 })
792 .await
793 }
794
795 pub(crate) async fn delete(&self, path: &str) -> Result<(), NifiError> {
796 self.with_retry(|| async {
797 tracing::debug!(method = "DELETE", path, "NiFi API request");
798 let url = self.api_url(path);
799 let resp = self
800 .authenticated(self.http.delete(url))
801 .await
802 .send()
803 .await
804 .context(HttpSnafu)?;
805 Self::check_void("DELETE", path, resp).await
806 })
807 .await
808 }
809
810 async fn delete_inner(&self, path: &str) -> Result<(), NifiError> {
813 tracing::debug!(method = "DELETE", path, "NiFi API request");
814 let url = self.api_url(path);
815 let resp = self
816 .authenticated(self.http.delete(url))
817 .await
818 .send()
819 .await
820 .context(HttpSnafu)?;
821 Self::check_void("DELETE", path, resp).await
822 }
823
824 async fn authenticated(&self, req: reqwest::RequestBuilder) -> reqwest::RequestBuilder {
825 let guard = self.token.read().await;
826 match guard.as_deref() {
827 Some(token) => req.bearer_auth(token),
828 None => {
829 tracing::warn!(
830 "sending NiFi API request without a bearer token — call login() first"
831 );
832 req
833 }
834 }
835 }
836
837 async fn deserialize<T: DeserializeOwned>(
838 method: &str,
839 path: &str,
840 resp: reqwest::Response,
841 ) -> Result<T, NifiError> {
842 let status = resp.status();
843 tracing::debug!(method, path, status = status.as_u16(), "NiFi API response");
844 if status.is_success() {
845 return resp.json::<T>().await.context(HttpSnafu);
846 }
847 let body = resp.text().await.unwrap_or_else(|_| status.to_string());
848 tracing::debug!(method, path, status = status.as_u16(), %body, "NiFi API raw error body");
849 let message = extract_error_message(&body);
850 tracing::warn!(method, path, status = status.as_u16(), %message, "NiFi API error");
851 Err(crate::error::api_error(status.as_u16(), message))
852 }
853
854 async fn check_void(
857 method: &str,
858 path: &str,
859 resp: reqwest::Response,
860 ) -> Result<(), NifiError> {
861 let status = resp.status();
862 tracing::debug!(method, path, status = status.as_u16(), "NiFi API response");
863 if status.is_success() {
864 return Ok(());
865 }
866 let body = resp.text().await.unwrap_or_else(|_| status.to_string());
867 tracing::debug!(method, path, status = status.as_u16(), %body, "NiFi API raw error body");
868 let message = extract_error_message(&body);
869 tracing::warn!(method, path, status = status.as_u16(), %message, "NiFi API error");
870 Err(crate::error::api_error(status.as_u16(), message))
871 }
872
873 async fn text(method: &str, path: &str, resp: reqwest::Response) -> Result<String, NifiError> {
875 let status = resp.status();
876 tracing::debug!(method, path, status = status.as_u16(), "NiFi API response");
877 if status.is_success() {
878 return resp.text().await.context(HttpSnafu);
879 }
880 let body = resp.text().await.unwrap_or_else(|_| status.to_string());
881 tracing::debug!(method, path, status = status.as_u16(), %body, "NiFi API raw error body");
882 let message = extract_error_message(&body);
883 tracing::warn!(method, path, status = status.as_u16(), %message, "NiFi API error");
884 Err(crate::error::api_error(status.as_u16(), message))
885 }
886
887 async fn bytes(
889 method: &str,
890 path: &str,
891 resp: reqwest::Response,
892 ) -> Result<Vec<u8>, NifiError> {
893 let status = resp.status();
894 tracing::debug!(method, path, status = status.as_u16(), "NiFi API response");
895 if status.is_success() {
896 let b = resp.bytes().await.context(HttpSnafu)?;
897 return Ok(b.to_vec());
898 }
899 let body = resp.text().await.unwrap_or_else(|_| status.to_string());
900 tracing::debug!(method, path, status = status.as_u16(), %body, "NiFi API raw error body");
901 let message = extract_error_message(&body);
902 tracing::warn!(method, path, status = status.as_u16(), %message, "NiFi API error");
903 Err(crate::error::api_error(status.as_u16(), message))
904 }
905
906 async fn check_void_with_redirect(
908 method: &str,
909 path: &str,
910 resp: reqwest::Response,
911 ) -> Result<(), NifiError> {
912 let status = resp.status();
913 tracing::debug!(method, path, status = status.as_u16(), "NiFi API response");
914 if status.is_success() || status.as_u16() == 302 {
915 return Ok(());
916 }
917 let body = resp.text().await.unwrap_or_else(|_| status.to_string());
918 tracing::debug!(method, path, status = status.as_u16(), %body, "NiFi API raw error body");
919 let message = extract_error_message(&body);
920 tracing::warn!(method, path, status = status.as_u16(), %message, "NiFi API error");
921 Err(crate::error::api_error(status.as_u16(), message))
922 }
923
924 pub(crate) fn api_url(&self, path: &str) -> Url {
925 let mut url = self.base_url.clone();
926 url.set_path(&format!("/nifi-api{path}"));
927 url
928 }
929}
930
931pub fn extract_error_message(body: &str) -> String {
936 serde_json::from_str::<serde_json::Value>(body)
937 .ok()
938 .and_then(|v| v["message"].as_str().map(str::to_owned))
939 .unwrap_or_else(|| body.to_owned())
940}