1#![deny(missing_docs)]
2use std::sync::Arc;
3
4use reqwest::header::{CONTENT_TYPE, HeaderName};
5use reqwest::{Client, Method, StatusCode};
6use serde::de::DeserializeOwned;
7use snafu::ResultExt as _;
8use tokio::sync::RwLock;
9use url::Url;
10
11use crate::NifiError;
12use crate::config::auth::AuthProvider;
13use crate::error::{AuthSnafu, HttpSnafu};
14
15const APPLICATION_OCTET_STREAM: &str = "application/octet-stream";
17
18const PROXIED_ENTITIES_CHAIN: HeaderName = HeaderName::from_static("x-proxiedentitieschain");
22
23pub struct NifiClient {
25 base_url: Url,
26 http: Client,
27 token: Arc<RwLock<Option<zeroize::Zeroizing<String>>>>,
28 auth_provider: Option<Arc<dyn AuthProvider>>,
29 proxied_entities_chain: Option<String>,
30 retry_policy: Option<crate::config::retry::RetryPolicy>,
31 request_id_header: Option<String>,
32 auth_lock: Arc<tokio::sync::Mutex<()>>,
33}
34
35impl Clone for NifiClient {
36 fn clone(&self) -> Self {
37 Self {
38 base_url: self.base_url.clone(),
39 http: self.http.clone(),
40 token: Arc::clone(&self.token),
41 auth_provider: self.auth_provider.clone(),
42 proxied_entities_chain: self.proxied_entities_chain.clone(),
43 retry_policy: self.retry_policy.clone(),
44 request_id_header: self.request_id_header.clone(),
45 auth_lock: Arc::clone(&self.auth_lock),
46 }
47 }
48}
49
50impl std::fmt::Debug for NifiClient {
51 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
52 f.debug_struct("NifiClient")
53 .field("base_url", &self.base_url)
54 .field(
55 "auth_provider",
56 &self.auth_provider.as_ref().map(|c| format!("{c:?}")),
57 )
58 .field("proxied_entities_chain", &self.proxied_entities_chain)
59 .field("retry_policy", &self.retry_policy)
60 .field("request_id_header", &self.request_id_header)
61 .finish_non_exhaustive()
62 }
63}
64
65impl NifiClient {
66 pub(crate) fn from_parts(
68 base_url: Url,
69 http: Client,
70 auth_provider: Option<Arc<dyn AuthProvider>>,
71 proxied_entities_chain: Option<String>,
72 retry_policy: Option<crate::config::retry::RetryPolicy>,
73 request_id_header: Option<String>,
74 ) -> Self {
75 Self {
76 base_url,
77 http,
78 token: Arc::new(RwLock::new(None)),
79 auth_provider,
80 proxied_entities_chain,
81 retry_policy,
82 request_id_header,
83 auth_lock: Arc::new(tokio::sync::Mutex::new(())),
84 }
85 }
86
87 pub async fn token(&self) -> Option<String> {
94 self.token.read().await.as_ref().map(|t| (**t).clone())
95 }
96
97 pub async fn set_token(&self, token: String) {
104 *self.token.write().await = Some(zeroize::Zeroizing::new(token));
105 }
106
107 #[tracing::instrument(skip(self), fields(request_id = tracing::field::Empty))]
117 pub async fn logout(&self) -> Result<(), NifiError> {
118 let result = self.delete_inner("/access/logout").await;
119 *self.token.write().await = None;
120 if result.is_ok() {
121 tracing::info!("NiFi logout successful");
122 }
123 result
124 }
125
126 #[tracing::instrument(skip(self, username, password), fields(request_id = tracing::field::Empty))]
139 pub async fn login(&self, username: &str, password: &str) -> Result<(), NifiError> {
140 let method = Method::POST;
141 tracing::debug!(method = %method, path = "/access/token", "NiFi API request");
142 let url = self.api_url("/access/token");
143 let req = self.apply_request_id(self.http.post(url));
144 let resp = req
145 .form(&[("username", username), ("password", password)])
146 .send()
147 .await
148 .context(HttpSnafu)?;
149
150 let status = resp.status();
151 tracing::debug!(
152 method = %method,
153 path = "/access/token",
154 status = status.as_u16(),
155 "NiFi API response"
156 );
157 if !status.is_success() {
158 let body = resp.text().await.unwrap_or_else(|_| status.to_string());
159 tracing::debug!(
160 method = %method,
161 path = "/access/token",
162 status = status.as_u16(),
163 %body,
164 "NiFi API raw error body"
165 );
166 let message = extract_error_message(&body);
167 tracing::warn!(
168 method = %method,
169 path = "/access/token",
170 status = status.as_u16(),
171 %message,
172 "NiFi API error"
173 );
174 return AuthSnafu { message }.fail();
175 }
176
177 let token = resp.text().await.context(HttpSnafu)?;
178 *self.token.write().await = Some(zeroize::Zeroizing::new(token));
179 tracing::info!("NiFi login successful for {username}");
180 Ok(())
181 }
182
183 #[tracing::instrument(skip(self), fields(request_id = tracing::field::Empty))]
187 pub async fn authenticate(&self) -> Result<(), NifiError> {
188 let provider = self.auth_provider.as_ref().ok_or_else(|| NifiError::Auth {
189 message: "no auth provider configured".to_string(),
190 })?;
191 provider.authenticate(self).await
192 }
193
194 #[tracing::instrument(skip_all)]
204 async fn with_auth_retry<T, F, Fut>(&self, f: F) -> Result<T, NifiError>
205 where
206 F: Fn() -> Fut,
207 Fut: std::future::Future<Output = Result<T, NifiError>>,
208 {
209 let token_before = self.token.read().await.as_ref().map(|t| (**t).clone());
212
213 match f().await {
214 Err(NifiError::Unauthorized { .. }) if self.auth_provider.is_some() => {
215 let _guard = self.auth_lock.lock().await;
216 let token_now = self.token.read().await.as_ref().map(|t| (**t).clone());
217 if token_now == token_before {
218 tracing::info!("received 401, refreshing token via auth provider");
219 self.authenticate().await?;
220 } else {
221 tracing::debug!("token already refreshed by concurrent task, skipping re-auth");
222 }
223 drop(_guard);
228 f().await
229 }
230 other => other,
231 }
232 }
233
234 #[tracing::instrument(skip_all)]
242 async fn with_retry<T, F, Fut>(&self, f: F) -> Result<T, NifiError>
243 where
244 F: Fn() -> Fut,
245 Fut: std::future::Future<Output = Result<T, NifiError>>,
246 {
247 let Some(policy) = &self.retry_policy else {
248 return self.with_auth_retry(&f).await;
249 };
250
251 let mut last_err: Option<NifiError> = None;
252 for attempt in 0..=policy.max_retries {
253 if attempt > 0 {
254 let backoff = policy.backoff_for(attempt - 1);
255 tracing::info!(
256 attempt,
257 backoff_ms = backoff.as_millis() as u64,
258 "retrying after transient error"
259 );
260 tokio::time::sleep(backoff).await;
261 }
262 match self.with_auth_retry(&f).await {
263 Ok(v) => return Ok(v),
264 Err(e) if e.is_retryable() => {
265 tracing::warn!(attempt, error = %e, "transient error, will retry");
266 last_err = Some(e);
267 }
268 Err(e) => return Err(e),
269 }
270 }
271 match last_err {
274 Some(e) => Err(e),
275 None => self.with_auth_retry(&f).await,
277 }
278 }
279
280 #[tracing::instrument(skip(self), fields(request_id = tracing::field::Empty))]
283 pub(crate) async fn get<T: DeserializeOwned>(
284 &self,
285 path: &str,
286 extra_headers: &[(&str, &str)],
287 ) -> Result<T, NifiError> {
288 self.with_retry(|| async {
289 let req = self
290 .build_request(&Method::GET, path, self.http.get(self.api_url(path)))
291 .await;
292 let req = apply_extra_headers(req, extra_headers);
293 let resp = req.send().await.context(HttpSnafu)?;
294 Self::deserialize(&Method::GET, path, resp).await
295 })
296 .await
297 }
298
299 #[tracing::instrument(skip(self, body), fields(request_id = tracing::field::Empty))]
300 pub(crate) async fn post<B, T>(
301 &self,
302 path: &str,
303 extra_headers: &[(&str, &str)],
304 body: &B,
305 ) -> Result<T, NifiError>
306 where
307 B: serde::Serialize,
308 T: DeserializeOwned,
309 {
310 self.with_retry(|| async {
311 let req = self
312 .build_request(&Method::POST, path, self.http.post(self.api_url(path)))
313 .await;
314 let req = apply_extra_headers(req, extra_headers);
315 let resp = req.json(body).send().await.context(HttpSnafu)?;
316 Self::deserialize(&Method::POST, path, resp).await
317 })
318 .await
319 }
320
321 #[tracing::instrument(skip(self, body), fields(request_id = tracing::field::Empty))]
322 pub(crate) async fn put<B, T>(
323 &self,
324 path: &str,
325 extra_headers: &[(&str, &str)],
326 body: &B,
327 ) -> Result<T, NifiError>
328 where
329 B: serde::Serialize,
330 T: DeserializeOwned,
331 {
332 self.with_retry(|| async {
333 let req = self
334 .build_request(&Method::PUT, path, self.http.put(self.api_url(path)))
335 .await;
336 let req = apply_extra_headers(req, extra_headers);
337 let resp = req.json(body).send().await.context(HttpSnafu)?;
338 Self::deserialize(&Method::PUT, path, resp).await
339 })
340 .await
341 }
342
343 #[tracing::instrument(skip(self), fields(request_id = tracing::field::Empty))]
345 pub(crate) async fn post_no_body<T: DeserializeOwned>(
346 &self,
347 path: &str,
348 extra_headers: &[(&str, &str)],
349 ) -> Result<T, NifiError> {
350 self.with_retry(|| async {
351 let req = self
352 .build_request(&Method::POST, path, self.http.post(self.api_url(path)))
353 .await;
354 let req = apply_extra_headers(req, extra_headers);
355 let resp = req.send().await.context(HttpSnafu)?;
356 Self::deserialize(&Method::POST, path, resp).await
357 })
358 .await
359 }
360
361 #[allow(dead_code)]
370 #[tracing::instrument(skip(self), fields(request_id = tracing::field::Empty))]
371 pub(crate) async fn post_void_no_body(
372 &self,
373 path: &str,
374 extra_headers: &[(&str, &str)],
375 ) -> Result<(), NifiError> {
376 self.with_retry(|| async {
377 let req = self
378 .build_request(&Method::POST, path, self.http.post(self.api_url(path)))
379 .await;
380 let req = apply_extra_headers(req, extra_headers);
381 let resp = req.send().await.context(HttpSnafu)?;
382 Self::check_void(&Method::POST, path, resp).await
383 })
384 .await
385 }
386
387 #[allow(dead_code)]
393 #[tracing::instrument(skip(self, body), fields(request_id = tracing::field::Empty))]
394 pub(crate) async fn post_void<B: serde::Serialize>(
395 &self,
396 path: &str,
397 extra_headers: &[(&str, &str)],
398 body: &B,
399 ) -> Result<(), NifiError> {
400 self.with_retry(|| async {
401 let req = self
402 .build_request(&Method::POST, path, self.http.post(self.api_url(path)))
403 .await;
404 let req = apply_extra_headers(req, extra_headers);
405 let resp = req.json(body).send().await.context(HttpSnafu)?;
406 Self::check_void(&Method::POST, path, resp).await
407 })
408 .await
409 }
410
411 #[tracing::instrument(skip(self), fields(request_id = tracing::field::Empty))]
413 pub(crate) async fn put_no_body<T: DeserializeOwned>(
414 &self,
415 path: &str,
416 extra_headers: &[(&str, &str)],
417 ) -> Result<T, NifiError> {
418 self.with_retry(|| async {
419 let req = self
420 .build_request(&Method::PUT, path, self.http.put(self.api_url(path)))
421 .await;
422 let req = apply_extra_headers(req, extra_headers);
423 let resp = req.send().await.context(HttpSnafu)?;
424 Self::deserialize(&Method::PUT, path, resp).await
425 })
426 .await
427 }
428
429 #[allow(dead_code)]
435 #[tracing::instrument(skip(self, body), fields(request_id = tracing::field::Empty))]
436 pub(crate) async fn put_void<B: serde::Serialize>(
437 &self,
438 path: &str,
439 extra_headers: &[(&str, &str)],
440 body: &B,
441 ) -> Result<(), NifiError> {
442 self.with_retry(|| async {
443 let req = self
444 .build_request(&Method::PUT, path, self.http.put(self.api_url(path)))
445 .await;
446 let req = apply_extra_headers(req, extra_headers);
447 let resp = req.json(body).send().await.context(HttpSnafu)?;
448 Self::check_void(&Method::PUT, path, resp).await
449 })
450 .await
451 }
452
453 #[allow(dead_code)]
459 #[tracing::instrument(skip(self), fields(request_id = tracing::field::Empty))]
460 pub(crate) async fn put_void_no_body(
461 &self,
462 path: &str,
463 extra_headers: &[(&str, &str)],
464 ) -> Result<(), NifiError> {
465 self.with_retry(|| async {
466 let req = self
467 .build_request(&Method::PUT, path, self.http.put(self.api_url(path)))
468 .await;
469 let req = apply_extra_headers(req, extra_headers);
470 let resp = req.send().await.context(HttpSnafu)?;
471 Self::check_void(&Method::PUT, path, resp).await
472 })
473 .await
474 }
475
476 #[tracing::instrument(skip(self, data), fields(request_id = tracing::field::Empty))]
482 pub(crate) async fn post_octet_stream<T: DeserializeOwned>(
483 &self,
484 path: &str,
485 extra_headers: &[(&str, &str)],
486 data: bytes::Bytes,
487 ) -> Result<T, NifiError> {
488 self.with_retry(|| async {
489 let req = self
490 .build_request(&Method::POST, path, self.http.post(self.api_url(path)))
491 .await
492 .header(CONTENT_TYPE, APPLICATION_OCTET_STREAM)
493 .body(data.clone());
494 let req = apply_extra_headers(req, extra_headers);
495 let resp = req.send().await.context(HttpSnafu)?;
496 Self::deserialize(&Method::POST, path, resp).await
497 })
498 .await
499 }
500
501 #[allow(dead_code)]
509 #[tracing::instrument(skip(self, data), fields(request_id = tracing::field::Empty))]
510 pub(crate) async fn post_multipart<T: DeserializeOwned>(
511 &self,
512 path: &str,
513 extra_headers: &[(&str, &str)],
514 filename: &str,
515 data: bytes::Bytes,
516 ) -> Result<T, NifiError> {
517 self.with_retry(|| async {
518 let req = self
519 .build_request(&Method::POST, path, self.http.post(self.api_url(path)))
520 .await;
521 let req = apply_extra_headers(req, extra_headers);
522 let len = data.len() as u64;
523 let part = reqwest::multipart::Part::stream_with_length(data.clone(), len)
524 .file_name(filename.to_string());
525 let form = reqwest::multipart::Form::new().part("file", part);
526 let resp = req.multipart(form).send().await.context(HttpSnafu)?;
527 Self::deserialize(&Method::POST, path, resp).await
528 })
529 .await
530 }
531
532 #[tracing::instrument(
545 skip(self, text_fields, data),
546 fields(request_id = tracing::field::Empty)
547 )]
548 pub(crate) async fn post_multipart_with_fields<T: DeserializeOwned>(
549 &self,
550 path: &str,
551 extra_headers: &[(&str, &str)],
552 text_fields: &[(&str, String)],
553 filename: &str,
554 data: bytes::Bytes,
555 ) -> Result<T, NifiError> {
556 self.with_retry(|| async {
557 let req = self
558 .build_request(&Method::POST, path, self.http.post(self.api_url(path)))
559 .await;
560 let req = apply_extra_headers(req, extra_headers);
561 let mut form = reqwest::multipart::Form::new();
562 for (name, value) in text_fields {
563 form = form.text((*name).to_string(), value.clone());
564 }
565 let len = data.len() as u64;
566 let part = reqwest::multipart::Part::stream_with_length(data.clone(), len)
567 .file_name(filename.to_string());
568 form = form.part("file", part);
569 let resp = req.multipart(form).send().await.context(HttpSnafu)?;
570 Self::deserialize(&Method::POST, path, resp).await
571 })
572 .await
573 }
574
575 #[allow(dead_code)]
584 #[tracing::instrument(skip(self), fields(request_id = tracing::field::Empty))]
585 pub(crate) async fn get_void(
586 &self,
587 path: &str,
588 extra_headers: &[(&str, &str)],
589 ) -> Result<(), NifiError> {
590 self.with_retry(|| async {
591 let req = self
592 .build_request(&Method::GET, path, self.http.get(self.api_url(path)))
593 .await;
594 let req = apply_extra_headers(req, extra_headers);
595 let resp = req.send().await.context(HttpSnafu)?;
596 Self::check_void_with_redirect(&Method::GET, path, resp).await
597 })
598 .await
599 }
600
601 #[allow(dead_code)]
610 #[tracing::instrument(skip(self, query), fields(request_id = tracing::field::Empty))]
611 pub(crate) async fn get_void_with_query(
612 &self,
613 path: &str,
614 extra_headers: &[(&str, &str)],
615 query: &[(&str, String)],
616 ) -> Result<(), NifiError> {
617 self.with_retry(|| async {
618 let req = self
619 .build_request(
620 &Method::GET,
621 path,
622 self.http.get(self.api_url(path)).query(query),
623 )
624 .await;
625 let req = apply_extra_headers(req, extra_headers);
626 let resp = req.send().await.context(HttpSnafu)?;
627 Self::check_void_with_redirect(&Method::GET, path, resp).await
628 })
629 .await
630 }
631
632 #[tracing::instrument(skip(self, query), fields(request_id = tracing::field::Empty))]
633 pub(crate) async fn get_with_query<T: DeserializeOwned>(
634 &self,
635 path: &str,
636 extra_headers: &[(&str, &str)],
637 query: &[(&str, String)],
638 ) -> Result<T, NifiError> {
639 self.with_retry(|| async {
640 let req = self
641 .build_request(
642 &Method::GET,
643 path,
644 self.http.get(self.api_url(path)).query(query),
645 )
646 .await;
647 let req = apply_extra_headers(req, extra_headers);
648 let resp = req.send().await.context(HttpSnafu)?;
649 Self::deserialize(&Method::GET, path, resp).await
650 })
651 .await
652 }
653
654 #[tracing::instrument(skip(self), fields(request_id = tracing::field::Empty))]
656 pub(crate) async fn get_text(
657 &self,
658 path: &str,
659 extra_headers: &[(&str, &str)],
660 ) -> Result<String, NifiError> {
661 self.with_retry(|| async {
662 let req = self
663 .build_request(&Method::GET, path, self.http.get(self.api_url(path)))
664 .await;
665 let req = apply_extra_headers(req, extra_headers);
666 let resp = req.send().await.context(HttpSnafu)?;
667 Self::text(&Method::GET, path, resp).await
668 })
669 .await
670 }
671
672 #[tracing::instrument(skip(self), fields(request_id = tracing::field::Empty))]
674 pub(crate) async fn get_bytes(
675 &self,
676 path: &str,
677 extra_headers: &[(&str, &str)],
678 ) -> Result<Vec<u8>, NifiError> {
679 self.with_retry(|| async {
680 let req = self
681 .build_request(&Method::GET, path, self.http.get(self.api_url(path)))
682 .await;
683 let req = apply_extra_headers(req, extra_headers);
684 let resp = req.send().await.context(HttpSnafu)?;
685 Self::bytes(&Method::GET, path, resp).await
686 })
687 .await
688 }
689
690 #[tracing::instrument(skip(self, query), fields(request_id = tracing::field::Empty))]
692 pub(crate) async fn get_bytes_with_query(
693 &self,
694 path: &str,
695 extra_headers: &[(&str, &str)],
696 query: &[(&str, String)],
697 ) -> Result<Vec<u8>, NifiError> {
698 self.with_retry(|| async {
699 let req = self
700 .build_request(
701 &Method::GET,
702 path,
703 self.http.get(self.api_url(path)).query(query),
704 )
705 .await;
706 let req = apply_extra_headers(req, extra_headers);
707 let resp = req.send().await.context(HttpSnafu)?;
708 Self::bytes(&Method::GET, path, resp).await
709 })
710 .await
711 }
712
713 #[tracing::instrument(skip(self), fields(request_id = tracing::field::Empty))]
721 pub(crate) async fn get_bytes_stream(
722 &self,
723 path: &str,
724 extra_headers: &[(&str, &str)],
725 ) -> Result<crate::BytesStream, NifiError> {
726 self.with_retry(|| async {
727 let req = self
728 .build_request(&Method::GET, path, self.http.get(self.api_url(path)))
729 .await;
730 let req = apply_extra_headers(req, extra_headers);
731 let resp = req.send().await.context(HttpSnafu)?;
732 Self::bytes_stream(&Method::GET, path, resp).await
733 })
734 .await
735 }
736
737 #[tracing::instrument(skip(self, query), fields(request_id = tracing::field::Empty))]
740 pub(crate) async fn get_bytes_stream_with_query(
741 &self,
742 path: &str,
743 extra_headers: &[(&str, &str)],
744 query: &[(&str, String)],
745 ) -> Result<crate::BytesStream, NifiError> {
746 self.with_retry(|| async {
747 let req = self
748 .build_request(
749 &Method::GET,
750 path,
751 self.http.get(self.api_url(path)).query(query),
752 )
753 .await;
754 let req = apply_extra_headers(req, extra_headers);
755 let resp = req.send().await.context(HttpSnafu)?;
756 Self::bytes_stream(&Method::GET, path, resp).await
757 })
758 .await
759 }
760
761 #[allow(dead_code)]
767 #[tracing::instrument(skip(self, data), fields(request_id = tracing::field::Empty))]
768 pub(crate) async fn post_void_octet_stream(
769 &self,
770 path: &str,
771 extra_headers: &[(&str, &str)],
772 data: bytes::Bytes,
773 ) -> Result<(), NifiError> {
774 self.with_retry(|| async {
775 let req = self
776 .build_request(&Method::POST, path, self.http.post(self.api_url(path)))
777 .await
778 .header(CONTENT_TYPE, APPLICATION_OCTET_STREAM)
779 .body(data.clone());
780 let req = apply_extra_headers(req, extra_headers);
781 let resp = req.send().await.context(HttpSnafu)?;
782 Self::check_void(&Method::POST, path, resp).await
783 })
784 .await
785 }
786
787 #[allow(dead_code)]
793 #[tracing::instrument(skip(self, data), fields(request_id = tracing::field::Empty))]
794 pub(crate) async fn post_void_multipart(
795 &self,
796 path: &str,
797 extra_headers: &[(&str, &str)],
798 filename: &str,
799 data: bytes::Bytes,
800 ) -> Result<(), NifiError> {
801 self.with_retry(|| async {
802 let req = self
803 .build_request(&Method::POST, path, self.http.post(self.api_url(path)))
804 .await;
805 let req = apply_extra_headers(req, extra_headers);
806 let len = data.len() as u64;
807 let part = reqwest::multipart::Part::stream_with_length(data.clone(), len)
808 .file_name(filename.to_string());
809 let form = reqwest::multipart::Form::new().part("file", part);
810 let resp = req.multipart(form).send().await.context(HttpSnafu)?;
811 Self::check_void(&Method::POST, path, resp).await
812 })
813 .await
814 }
815
816 #[allow(dead_code)]
822 #[tracing::instrument(skip(self, body), fields(request_id = tracing::field::Empty))]
823 pub(crate) async fn post_returning_text<B: serde::Serialize>(
824 &self,
825 path: &str,
826 extra_headers: &[(&str, &str)],
827 body: &B,
828 ) -> Result<String, NifiError> {
829 self.with_retry(|| async {
830 let req = self
831 .build_request(&Method::POST, path, self.http.post(self.api_url(path)))
832 .await;
833 let req = apply_extra_headers(req, extra_headers);
834 let resp = req.json(body).send().await.context(HttpSnafu)?;
835 Self::text(&Method::POST, path, resp).await
836 })
837 .await
838 }
839
840 #[allow(dead_code)]
846 #[tracing::instrument(skip(self, data), fields(request_id = tracing::field::Empty))]
847 pub(crate) async fn post_octet_stream_returning_text(
848 &self,
849 path: &str,
850 extra_headers: &[(&str, &str)],
851 data: bytes::Bytes,
852 ) -> Result<String, NifiError> {
853 self.with_retry(|| async {
854 let req = self
855 .build_request(&Method::POST, path, self.http.post(self.api_url(path)))
856 .await
857 .header(CONTENT_TYPE, APPLICATION_OCTET_STREAM)
858 .body(data.clone());
859 let req = apply_extra_headers(req, extra_headers);
860 let resp = req.send().await.context(HttpSnafu)?;
861 Self::text(&Method::POST, path, resp).await
862 })
863 .await
864 }
865
866 #[tracing::instrument(skip(self, query), fields(request_id = tracing::field::Empty))]
867 pub(crate) async fn delete_returning_with_query<T: DeserializeOwned>(
868 &self,
869 path: &str,
870 extra_headers: &[(&str, &str)],
871 query: &[(&str, String)],
872 ) -> Result<T, NifiError> {
873 self.with_retry(|| async {
874 let req = self
875 .build_request(
876 &Method::DELETE,
877 path,
878 self.http.delete(self.api_url(path)).query(query),
879 )
880 .await;
881 let req = apply_extra_headers(req, extra_headers);
882 let resp = req.send().await.context(HttpSnafu)?;
883 Self::deserialize(&Method::DELETE, path, resp).await
884 })
885 .await
886 }
887
888 #[tracing::instrument(skip(self, query), fields(request_id = tracing::field::Empty))]
889 pub(crate) async fn delete_with_query(
890 &self,
891 path: &str,
892 extra_headers: &[(&str, &str)],
893 query: &[(&str, String)],
894 ) -> Result<(), NifiError> {
895 self.with_retry(|| async {
896 let req = self
897 .build_request(
898 &Method::DELETE,
899 path,
900 self.http.delete(self.api_url(path)).query(query),
901 )
902 .await;
903 let req = apply_extra_headers(req, extra_headers);
904 let resp = req.send().await.context(HttpSnafu)?;
905 Self::check_void(&Method::DELETE, path, resp).await
906 })
907 .await
908 }
909
910 #[allow(dead_code)]
916 #[tracing::instrument(skip(self, body, query), fields(request_id = tracing::field::Empty))]
917 pub(crate) async fn post_void_with_query<B: serde::Serialize>(
918 &self,
919 path: &str,
920 extra_headers: &[(&str, &str)],
921 body: &B,
922 query: &[(&str, String)],
923 ) -> Result<(), NifiError> {
924 self.with_retry(|| async {
925 let req = self
926 .build_request(
927 &Method::POST,
928 path,
929 self.http.post(self.api_url(path)).query(query),
930 )
931 .await;
932 let req = apply_extra_headers(req, extra_headers);
933 let resp = req.json(body).send().await.context(HttpSnafu)?;
934 Self::check_void(&Method::POST, path, resp).await
935 })
936 .await
937 }
938
939 #[tracing::instrument(skip(self, body, query), fields(request_id = tracing::field::Empty))]
940 pub(crate) async fn post_with_query<B, T>(
941 &self,
942 path: &str,
943 extra_headers: &[(&str, &str)],
944 body: &B,
945 query: &[(&str, String)],
946 ) -> Result<T, NifiError>
947 where
948 B: serde::Serialize,
949 T: DeserializeOwned,
950 {
951 self.with_retry(|| async {
952 let req = self
953 .build_request(
954 &Method::POST,
955 path,
956 self.http.post(self.api_url(path)).query(query),
957 )
958 .await;
959 let req = apply_extra_headers(req, extra_headers);
960 let resp = req.json(body).send().await.context(HttpSnafu)?;
961 Self::deserialize(&Method::POST, path, resp).await
962 })
963 .await
964 }
965
966 #[allow(dead_code)]
972 #[tracing::instrument(skip(self, body, query), fields(request_id = tracing::field::Empty))]
973 pub(crate) async fn put_with_query<B, T>(
974 &self,
975 path: &str,
976 extra_headers: &[(&str, &str)],
977 body: &B,
978 query: &[(&str, String)],
979 ) -> Result<T, NifiError>
980 where
981 B: serde::Serialize,
982 T: DeserializeOwned,
983 {
984 self.with_retry(|| async {
985 let req = self
986 .build_request(
987 &Method::PUT,
988 path,
989 self.http.put(self.api_url(path)).query(query),
990 )
991 .await;
992 let req = apply_extra_headers(req, extra_headers);
993 let resp = req.json(body).send().await.context(HttpSnafu)?;
994 Self::deserialize(&Method::PUT, path, resp).await
995 })
996 .await
997 }
998
999 #[tracing::instrument(skip(self), fields(request_id = tracing::field::Empty))]
1000 pub(crate) async fn delete_returning<T: DeserializeOwned>(
1001 &self,
1002 path: &str,
1003 extra_headers: &[(&str, &str)],
1004 ) -> Result<T, NifiError> {
1005 self.with_retry(|| async {
1006 let req = self
1007 .build_request(&Method::DELETE, path, self.http.delete(self.api_url(path)))
1008 .await;
1009 let req = apply_extra_headers(req, extra_headers);
1010 let resp = req.send().await.context(HttpSnafu)?;
1011 Self::deserialize(&Method::DELETE, path, resp).await
1012 })
1013 .await
1014 }
1015
1016 #[tracing::instrument(skip(self), fields(request_id = tracing::field::Empty))]
1017 pub(crate) async fn delete(
1018 &self,
1019 path: &str,
1020 extra_headers: &[(&str, &str)],
1021 ) -> Result<(), NifiError> {
1022 self.with_retry(|| async {
1023 let req = self
1024 .build_request(&Method::DELETE, path, self.http.delete(self.api_url(path)))
1025 .await;
1026 let req = apply_extra_headers(req, extra_headers);
1027 let resp = req.send().await.context(HttpSnafu)?;
1028 Self::check_void(&Method::DELETE, path, resp).await
1029 })
1030 .await
1031 }
1032
1033 async fn delete_inner(&self, path: &str) -> Result<(), NifiError> {
1036 let resp = self
1037 .build_request(&Method::DELETE, path, self.http.delete(self.api_url(path)))
1038 .await
1039 .send()
1040 .await
1041 .context(HttpSnafu)?;
1042 Self::check_void(&Method::DELETE, path, resp).await
1043 }
1044
1045 fn apply_request_id(&self, req: reqwest::RequestBuilder) -> reqwest::RequestBuilder {
1054 let Some(header) = self.request_id_header.as_deref() else {
1055 return req;
1056 };
1057 let id = uuid::Uuid::new_v4().to_string();
1058 tracing::Span::current().record("request_id", id.as_str());
1059 req.header(header, id)
1060 }
1061
1062 async fn build_request(
1066 &self,
1067 method: &Method,
1068 path: &str,
1069 req: reqwest::RequestBuilder,
1070 ) -> reqwest::RequestBuilder {
1071 let req = self.apply_request_id(req);
1072 tracing::debug!(method = %method, path, "NiFi API request");
1073
1074 let guard = self.token.read().await;
1075 let mut req = match guard.as_deref() {
1076 Some(token) => req.bearer_auth(token.as_str()),
1077 None => {
1078 tracing::warn!(
1079 "sending NiFi API request without a bearer token — call login() first"
1080 );
1081 req
1082 }
1083 };
1084 if let Some(chain) = &self.proxied_entities_chain {
1085 req = req.header(PROXIED_ENTITIES_CHAIN, chain);
1086 }
1087 req
1088 }
1089
1090 async fn deserialize<T: DeserializeOwned>(
1091 method: &Method,
1092 path: &str,
1093 resp: reqwest::Response,
1094 ) -> Result<T, NifiError> {
1095 let resp = handle_response_status(method, path, resp).await?;
1096 resp.json::<T>().await.context(HttpSnafu)
1097 }
1098
1099 async fn check_void(
1102 method: &Method,
1103 path: &str,
1104 resp: reqwest::Response,
1105 ) -> Result<(), NifiError> {
1106 handle_response_status(method, path, resp).await?;
1107 Ok(())
1108 }
1109
1110 async fn text(
1112 method: &Method,
1113 path: &str,
1114 resp: reqwest::Response,
1115 ) -> Result<String, NifiError> {
1116 let resp = handle_response_status(method, path, resp).await?;
1117 resp.text().await.context(HttpSnafu)
1118 }
1119
1120 async fn bytes(
1122 method: &Method,
1123 path: &str,
1124 resp: reqwest::Response,
1125 ) -> Result<Vec<u8>, NifiError> {
1126 let resp = handle_response_status(method, path, resp).await?;
1127 let b = resp.bytes().await.context(HttpSnafu)?;
1128 Ok(b.to_vec())
1129 }
1130
1131 async fn bytes_stream(
1135 method: &Method,
1136 path: &str,
1137 resp: reqwest::Response,
1138 ) -> Result<crate::BytesStream, NifiError> {
1139 use futures_util::TryStreamExt;
1140 let resp = handle_response_status(method, path, resp).await?;
1141 let s = resp
1142 .bytes_stream()
1143 .map_err(|source| NifiError::Http { source });
1144 Ok(Box::pin(s))
1145 }
1146
1147 async fn check_void_with_redirect(
1154 method: &Method,
1155 path: &str,
1156 resp: reqwest::Response,
1157 ) -> Result<(), NifiError> {
1158 let status = resp.status();
1159 tracing::debug!(method = %method, path, status = status.as_u16(), "NiFi API response");
1160 if status.is_success() || status == StatusCode::FOUND {
1161 return Ok(());
1162 }
1163 let body = resp.text().await.unwrap_or_else(|_| status.to_string());
1164 tracing::debug!(method = %method, path, status = status.as_u16(), %body, "NiFi API raw error body");
1165 let message = extract_error_message(&body);
1166 tracing::warn!(method = %method, path, status = status.as_u16(), %message, "NiFi API error");
1167 Err(crate::error::api_error(status.as_u16(), message))
1168 }
1169
1170 pub(crate) fn api_url(&self, path: &str) -> Url {
1171 let mut url = self.base_url.clone();
1172 url.set_path(&format!("/nifi-api{path}"));
1173 url
1174 }
1175}
1176
1177async fn handle_response_status(
1191 method: &Method,
1192 path: &str,
1193 resp: reqwest::Response,
1194) -> Result<reqwest::Response, NifiError> {
1195 let status = resp.status();
1196 tracing::debug!(method = %method, path, status = status.as_u16(), "NiFi API response");
1197 if status.is_success() {
1198 return Ok(resp);
1199 }
1200 let body = resp.text().await.unwrap_or_else(|_| status.to_string());
1201 tracing::debug!(method = %method, path, status = status.as_u16(), %body, "NiFi API raw error body");
1202 let message = extract_error_message(&body);
1203 tracing::warn!(method = %method, path, status = status.as_u16(), %message, "NiFi API error");
1204 Err(crate::error::api_error(status.as_u16(), message))
1205}
1206
1207fn apply_extra_headers(
1210 mut req: reqwest::RequestBuilder,
1211 extra: &[(&str, &str)],
1212) -> reqwest::RequestBuilder {
1213 for (name, value) in extra {
1214 req = req.header(*name, *value);
1215 }
1216 req
1217}
1218
1219pub fn extract_error_message(body: &str) -> String {
1224 serde_json::from_str::<serde_json::Value>(body)
1225 .ok()
1226 .and_then(|v| v["message"].as_str().map(str::to_owned))
1227 .unwrap_or_else(|| body.to_owned())
1228}
1229
1230#[cfg(test)]
1231mod tests {
1232 #[test]
1241 fn bytes_clone_is_refcount_only() {
1242 use bytes::Bytes;
1243 let data = Bytes::from(vec![0u8; 1024]);
1244 let before = data.len();
1245 let clone1 = data.clone();
1246 let clone2 = data.clone();
1247 assert_eq!(clone1.len(), before);
1248 assert_eq!(clone2.len(), before);
1249 assert_eq!(
1250 data.as_ptr(),
1251 clone1.as_ptr(),
1252 "Bytes::clone should share buffer"
1253 );
1254 assert_eq!(
1255 data.as_ptr(),
1256 clone2.as_ptr(),
1257 "Bytes::clone should share buffer"
1258 );
1259 }
1260}