1use std::sync::Arc;
2
3use reqwest::Client;
4use serde::de::DeserializeOwned;
5use snafu::ResultExt as _;
6use tokio::sync::RwLock;
7use url::Url;
8
9use crate::NifiError;
10use crate::config::credentials::CredentialProvider;
11use crate::error::{AuthSnafu, HttpSnafu};
12
13pub struct NifiClient {
15 base_url: Url,
16 http: Client,
17 token: Arc<RwLock<Option<String>>>,
18 credentials: Option<Arc<dyn CredentialProvider>>,
19 #[allow(dead_code)]
20 retry_policy: Option<crate::config::retry::RetryPolicy>,
21}
22
23impl Clone for NifiClient {
24 fn clone(&self) -> Self {
25 Self {
26 base_url: self.base_url.clone(),
27 http: self.http.clone(),
28 token: Arc::clone(&self.token),
29 credentials: self.credentials.clone(),
30 retry_policy: self.retry_policy.clone(),
31 }
32 }
33}
34
35impl std::fmt::Debug for NifiClient {
36 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
37 f.debug_struct("NifiClient")
38 .field("base_url", &self.base_url)
39 .field(
40 "credentials",
41 &self.credentials.as_ref().map(|c| format!("{c:?}")),
42 )
43 .field("retry_policy", &self.retry_policy)
44 .finish_non_exhaustive()
45 }
46}
47
48impl NifiClient {
49 pub(crate) fn from_parts(
51 base_url: Url,
52 http: Client,
53 credentials: Option<Arc<dyn CredentialProvider>>,
54 retry_policy: Option<crate::config::retry::RetryPolicy>,
55 ) -> Self {
56 Self {
57 base_url,
58 http,
59 token: Arc::new(RwLock::new(None)),
60 credentials,
61 retry_policy,
62 }
63 }
64
65 pub async fn token(&self) -> Option<String> {
73 self.token.read().await.clone()
74 }
75
76 pub async fn set_token(&self, token: String) {
83 *self.token.write().await = Some(token);
84 }
85
86 pub async fn logout(&self) -> Result<(), NifiError> {
96 let result = self.delete_inner("/access/logout").await;
97 *self.token.write().await = None;
98 if result.is_ok() {
99 tracing::info!("NiFi logout successful");
100 }
101 result
102 }
103
104 pub async fn login(&self, username: &str, password: &str) -> Result<(), NifiError> {
117 tracing::debug!(method = "POST", path = "/access/token", "NiFi API request");
118 let url = self.api_url("/access/token");
119 let resp = self
120 .http
121 .post(url)
122 .form(&[("username", username), ("password", password)])
123 .send()
124 .await
125 .context(HttpSnafu)?;
126
127 let status = resp.status();
128 tracing::debug!(
129 method = "POST",
130 path = "/access/token",
131 status = status.as_u16(),
132 "NiFi API response"
133 );
134 if !status.is_success() {
135 let body = resp.text().await.unwrap_or_else(|_| status.to_string());
136 tracing::debug!(
137 method = "POST",
138 path = "/access/token",
139 status = status.as_u16(),
140 %body,
141 "NiFi API raw error body"
142 );
143 let message = extract_error_message(&body);
144 tracing::warn!(
145 method = "POST",
146 path = "/access/token",
147 status = status.as_u16(),
148 %message,
149 "NiFi API error"
150 );
151 return AuthSnafu { message }.fail();
152 }
153
154 let token = resp.text().await.context(HttpSnafu)?;
155 *self.token.write().await = Some(token);
156 tracing::info!("NiFi login successful for {username}");
157 Ok(())
158 }
159
160 pub async fn login_with_provider(&self) -> Result<(), NifiError> {
164 let creds = self.credentials.as_ref().ok_or_else(|| NifiError::Auth {
165 message: "no credential provider configured".to_string(),
166 })?;
167 let (username, password) = creds.credentials().await?;
168 self.login(&username, &password).await
169 }
170
171 async fn with_auth_retry<T, F, Fut>(&self, f: F) -> Result<T, NifiError>
176 where
177 F: Fn() -> Fut,
178 Fut: std::future::Future<Output = Result<T, NifiError>>,
179 {
180 match f().await {
181 Err(NifiError::Unauthorized { .. }) if self.credentials.is_some() => {
182 tracing::info!("received 401, refreshing token via credential provider");
183 self.login_with_provider().await?;
184 f().await
185 }
186 other => other,
187 }
188 }
189
190 async fn with_retry<T, F, Fut>(&self, f: F) -> Result<T, NifiError>
198 where
199 F: Fn() -> Fut,
200 Fut: std::future::Future<Output = Result<T, NifiError>>,
201 {
202 let Some(policy) = &self.retry_policy else {
203 return self.with_auth_retry(&f).await;
204 };
205
206 let mut last_err: Option<NifiError> = None;
207 for attempt in 0..=policy.max_retries {
208 if attempt > 0 {
209 let backoff = policy.backoff_for(attempt - 1);
210 tracing::info!(
211 attempt,
212 backoff_ms = backoff.as_millis() as u64,
213 "retrying after transient error"
214 );
215 tokio::time::sleep(backoff).await;
216 }
217 match self.with_auth_retry(&f).await {
218 Ok(v) => return Ok(v),
219 Err(e) if e.is_retryable() => {
220 tracing::warn!(attempt, error = %e, "transient error, will retry");
221 last_err = Some(e);
222 }
223 Err(e) => return Err(e),
224 }
225 }
226 match last_err {
229 Some(e) => Err(e),
230 None => self.with_auth_retry(&f).await,
232 }
233 }
234
235 pub(crate) async fn get<T: DeserializeOwned>(&self, path: &str) -> Result<T, NifiError> {
238 self.with_retry(|| async {
239 tracing::debug!(method = "GET", path, "NiFi API request");
240 let url = self.api_url(path);
241 let resp = self
242 .authenticated(self.http.get(url))
243 .await
244 .send()
245 .await
246 .context(HttpSnafu)?;
247 Self::deserialize("GET", path, resp).await
248 })
249 .await
250 }
251
252 pub(crate) async fn post<B, T>(&self, path: &str, body: &B) -> Result<T, NifiError>
253 where
254 B: serde::Serialize,
255 T: DeserializeOwned,
256 {
257 self.with_retry(|| async {
258 tracing::debug!(method = "POST", path, "NiFi API request");
259 let url = self.api_url(path);
260 let resp = self
261 .authenticated(self.http.post(url))
262 .await
263 .json(body)
264 .send()
265 .await
266 .context(HttpSnafu)?;
267 Self::deserialize("POST", path, resp).await
268 })
269 .await
270 }
271
272 pub(crate) async fn put<B, T>(&self, path: &str, body: &B) -> Result<T, NifiError>
273 where
274 B: serde::Serialize,
275 T: DeserializeOwned,
276 {
277 self.with_retry(|| async {
278 tracing::debug!(method = "PUT", path, "NiFi API request");
279 let url = self.api_url(path);
280 let resp = self
281 .authenticated(self.http.put(url))
282 .await
283 .json(body)
284 .send()
285 .await
286 .context(HttpSnafu)?;
287 Self::deserialize("PUT", path, resp).await
288 })
289 .await
290 }
291
292 pub(crate) async fn post_void<B: serde::Serialize>(
294 &self,
295 path: &str,
296 body: &B,
297 ) -> Result<(), NifiError> {
298 self.with_retry(|| async {
299 tracing::debug!(method = "POST", path, "NiFi API request");
300 let url = self.api_url(path);
301 let resp = self
302 .authenticated(self.http.post(url))
303 .await
304 .json(body)
305 .send()
306 .await
307 .context(HttpSnafu)?;
308 Self::check_void("POST", path, resp).await
309 })
310 .await
311 }
312
313 #[allow(dead_code)]
315 pub(crate) async fn put_void<B: serde::Serialize>(
316 &self,
317 path: &str,
318 body: &B,
319 ) -> Result<(), NifiError> {
320 self.with_retry(|| async {
321 tracing::debug!(method = "PUT", path, "NiFi API request");
322 let url = self.api_url(path);
323 let resp = self
324 .authenticated(self.http.put(url))
325 .await
326 .json(body)
327 .send()
328 .await
329 .context(HttpSnafu)?;
330 Self::check_void("PUT", path, resp).await
331 })
332 .await
333 }
334
335 pub(crate) async fn post_no_body<T: DeserializeOwned>(
337 &self,
338 path: &str,
339 ) -> Result<T, NifiError> {
340 self.with_retry(|| async {
341 tracing::debug!(method = "POST", path, "NiFi API request");
342 let url = self.api_url(path);
343 let resp = self
344 .authenticated(self.http.post(url))
345 .await
346 .send()
347 .await
348 .context(HttpSnafu)?;
349 Self::deserialize("POST", path, resp).await
350 })
351 .await
352 }
353
354 #[allow(dead_code)]
358 pub(crate) async fn post_void_no_body(&self, path: &str) -> Result<(), NifiError> {
359 self.with_retry(|| async {
360 tracing::debug!(method = "POST", path, "NiFi API request");
361 let url = self.api_url(path);
362 let resp = self
363 .authenticated(self.http.post(url))
364 .await
365 .send()
366 .await
367 .context(HttpSnafu)?;
368 Self::check_void("POST", path, resp).await
369 })
370 .await
371 }
372
373 pub(crate) async fn put_no_body<T: DeserializeOwned>(
375 &self,
376 path: &str,
377 ) -> Result<T, NifiError> {
378 self.with_retry(|| async {
379 tracing::debug!(method = "PUT", path, "NiFi API request");
380 let url = self.api_url(path);
381 let resp = self
382 .authenticated(self.http.put(url))
383 .await
384 .send()
385 .await
386 .context(HttpSnafu)?;
387 Self::deserialize("PUT", path, resp).await
388 })
389 .await
390 }
391
392 #[allow(dead_code)]
394 pub(crate) async fn put_void_no_body(&self, path: &str) -> Result<(), NifiError> {
395 self.with_retry(|| async {
396 tracing::debug!(method = "PUT", path, "NiFi API request");
397 let url = self.api_url(path);
398 let resp = self
399 .authenticated(self.http.put(url))
400 .await
401 .send()
402 .await
403 .context(HttpSnafu)?;
404 Self::check_void("PUT", path, resp).await
405 })
406 .await
407 }
408
409 pub(crate) async fn post_octet_stream<T: DeserializeOwned>(
414 &self,
415 path: &str,
416 filename: Option<&str>,
417 data: Vec<u8>,
418 ) -> Result<T, NifiError> {
419 self.with_retry(|| async {
420 tracing::debug!(method = "POST", path, "NiFi API request");
421 let url = self.api_url(path);
422 let builder = self
423 .authenticated(self.http.post(url))
424 .await
425 .header("Content-Type", "application/octet-stream")
426 .body(data.clone());
427 let builder = if let Some(name) = filename {
428 builder.header("Filename", name)
429 } else {
430 builder
431 };
432 let resp = builder.send().await.context(HttpSnafu)?;
433 Self::deserialize("POST", path, resp).await
434 })
435 .await
436 }
437
438 pub(crate) async fn post_void_octet_stream(
443 &self,
444 path: &str,
445 filename: Option<&str>,
446 data: Vec<u8>,
447 ) -> Result<(), NifiError> {
448 self.with_retry(|| async {
449 tracing::debug!(method = "POST", path, "NiFi API request");
450 let url = self.api_url(path);
451 let builder = self
452 .authenticated(self.http.post(url))
453 .await
454 .header("Content-Type", "application/octet-stream")
455 .body(data.clone());
456 let builder = if let Some(name) = filename {
457 builder.header("Filename", name)
458 } else {
459 builder
460 };
461 let resp = builder.send().await.context(HttpSnafu)?;
462 Self::check_void("POST", path, resp).await
463 })
464 .await
465 }
466
467 #[allow(dead_code)]
471 pub(crate) async fn post_void_with_query<B: serde::Serialize>(
472 &self,
473 path: &str,
474 body: &B,
475 query: &[(&str, String)],
476 ) -> Result<(), NifiError> {
477 self.with_retry(|| async {
478 tracing::debug!(method = "POST", path, "NiFi API request");
479 let url = self.api_url(path);
480 let resp = self
481 .authenticated(self.http.post(url).query(query))
482 .await
483 .json(body)
484 .send()
485 .await
486 .context(HttpSnafu)?;
487 Self::check_void("POST", path, resp).await
488 })
489 .await
490 }
491
492 pub(crate) async fn get_void(&self, path: &str) -> Result<(), NifiError> {
497 self.with_retry(|| async {
498 tracing::debug!(method = "GET", path, "NiFi API request");
499 let url = self.api_url(path);
500 let resp = self
501 .authenticated(self.http.get(url))
502 .await
503 .send()
504 .await
505 .context(HttpSnafu)?;
506 Self::check_void_with_redirect("GET", path, resp).await
507 })
508 .await
509 }
510
511 pub(crate) async fn get_with_query<T: DeserializeOwned>(
512 &self,
513 path: &str,
514 query: &[(&str, String)],
515 ) -> Result<T, NifiError> {
516 self.with_retry(|| async {
517 tracing::debug!(method = "GET", path, "NiFi API request");
518 let url = self.api_url(path);
519 let resp = self
520 .authenticated(self.http.get(url).query(query))
521 .await
522 .send()
523 .await
524 .context(HttpSnafu)?;
525 Self::deserialize("GET", path, resp).await
526 })
527 .await
528 }
529
530 pub(crate) async fn get_void_with_query(
531 &self,
532 path: &str,
533 query: &[(&str, String)],
534 ) -> Result<(), NifiError> {
535 self.with_retry(|| async {
536 tracing::debug!(method = "GET", path, "NiFi API request");
537 let url = self.api_url(path);
538 let resp = self
539 .authenticated(self.http.get(url).query(query))
540 .await
541 .send()
542 .await
543 .context(HttpSnafu)?;
544 Self::check_void_with_redirect("GET", path, resp).await
545 })
546 .await
547 }
548
549 pub(crate) async fn delete_returning_with_query<T: DeserializeOwned>(
550 &self,
551 path: &str,
552 query: &[(&str, String)],
553 ) -> Result<T, NifiError> {
554 self.with_retry(|| async {
555 tracing::debug!(method = "DELETE", path, "NiFi API request");
556 let url = self.api_url(path);
557 let resp = self
558 .authenticated(self.http.delete(url).query(query))
559 .await
560 .send()
561 .await
562 .context(HttpSnafu)?;
563 Self::deserialize("DELETE", path, resp).await
564 })
565 .await
566 }
567
568 pub(crate) async fn delete_with_query(
569 &self,
570 path: &str,
571 query: &[(&str, String)],
572 ) -> Result<(), NifiError> {
573 self.with_retry(|| async {
574 tracing::debug!(method = "DELETE", path, "NiFi API request");
575 let url = self.api_url(path);
576 let resp = self
577 .authenticated(self.http.delete(url).query(query))
578 .await
579 .send()
580 .await
581 .context(HttpSnafu)?;
582 Self::check_void("DELETE", path, resp).await
583 })
584 .await
585 }
586
587 pub(crate) async fn post_with_query<B, T>(
588 &self,
589 path: &str,
590 body: &B,
591 query: &[(&str, String)],
592 ) -> Result<T, NifiError>
593 where
594 B: serde::Serialize,
595 T: DeserializeOwned,
596 {
597 self.with_retry(|| async {
598 tracing::debug!(method = "POST", path, "NiFi API request");
599 let url = self.api_url(path);
600 let resp = self
601 .authenticated(self.http.post(url).query(query))
602 .await
603 .json(body)
604 .send()
605 .await
606 .context(HttpSnafu)?;
607 Self::deserialize("POST", path, resp).await
608 })
609 .await
610 }
611
612 pub(crate) async fn delete_returning<T: DeserializeOwned>(
613 &self,
614 path: &str,
615 ) -> Result<T, NifiError> {
616 self.with_retry(|| async {
617 tracing::debug!(method = "DELETE", path, "NiFi API request");
618 let url = self.api_url(path);
619 let resp = self
620 .authenticated(self.http.delete(url))
621 .await
622 .send()
623 .await
624 .context(HttpSnafu)?;
625 Self::deserialize("DELETE", path, resp).await
626 })
627 .await
628 }
629
630 pub(crate) async fn delete(&self, path: &str) -> Result<(), NifiError> {
631 self.with_retry(|| async {
632 tracing::debug!(method = "DELETE", path, "NiFi API request");
633 let url = self.api_url(path);
634 let resp = self
635 .authenticated(self.http.delete(url))
636 .await
637 .send()
638 .await
639 .context(HttpSnafu)?;
640 Self::check_void("DELETE", path, resp).await
641 })
642 .await
643 }
644
645 async fn delete_inner(&self, path: &str) -> Result<(), NifiError> {
648 tracing::debug!(method = "DELETE", path, "NiFi API request");
649 let url = self.api_url(path);
650 let resp = self
651 .authenticated(self.http.delete(url))
652 .await
653 .send()
654 .await
655 .context(HttpSnafu)?;
656 Self::check_void("DELETE", path, resp).await
657 }
658
659 async fn authenticated(&self, req: reqwest::RequestBuilder) -> reqwest::RequestBuilder {
660 let guard = self.token.read().await;
661 match guard.as_deref() {
662 Some(token) => req.bearer_auth(token),
663 None => {
664 tracing::warn!(
665 "sending NiFi API request without a bearer token — call login() first"
666 );
667 req
668 }
669 }
670 }
671
672 async fn deserialize<T: DeserializeOwned>(
673 method: &str,
674 path: &str,
675 resp: reqwest::Response,
676 ) -> Result<T, NifiError> {
677 let status = resp.status();
678 tracing::debug!(method, path, status = status.as_u16(), "NiFi API response");
679 if status.is_success() {
680 return resp.json::<T>().await.context(HttpSnafu);
681 }
682 let body = resp.text().await.unwrap_or_else(|_| status.to_string());
683 tracing::debug!(method, path, status = status.as_u16(), %body, "NiFi API raw error body");
684 let message = extract_error_message(&body);
685 tracing::warn!(method, path, status = status.as_u16(), %message, "NiFi API error");
686 Err(crate::error::api_error(status.as_u16(), message))
687 }
688
689 async fn check_void(
692 method: &str,
693 path: &str,
694 resp: reqwest::Response,
695 ) -> Result<(), NifiError> {
696 let status = resp.status();
697 tracing::debug!(method, path, status = status.as_u16(), "NiFi API response");
698 if status.is_success() {
699 return Ok(());
700 }
701 let body = resp.text().await.unwrap_or_else(|_| status.to_string());
702 tracing::debug!(method, path, status = status.as_u16(), %body, "NiFi API raw error body");
703 let message = extract_error_message(&body);
704 tracing::warn!(method, path, status = status.as_u16(), %message, "NiFi API error");
705 Err(crate::error::api_error(status.as_u16(), message))
706 }
707
708 async fn check_void_with_redirect(
710 method: &str,
711 path: &str,
712 resp: reqwest::Response,
713 ) -> Result<(), NifiError> {
714 let status = resp.status();
715 tracing::debug!(method, path, status = status.as_u16(), "NiFi API response");
716 if status.is_success() || status.as_u16() == 302 {
717 return Ok(());
718 }
719 let body = resp.text().await.unwrap_or_else(|_| status.to_string());
720 tracing::debug!(method, path, status = status.as_u16(), %body, "NiFi API raw error body");
721 let message = extract_error_message(&body);
722 tracing::warn!(method, path, status = status.as_u16(), %message, "NiFi API error");
723 Err(crate::error::api_error(status.as_u16(), message))
724 }
725
726 pub(crate) fn api_url(&self, path: &str) -> Url {
727 let mut url = self.base_url.clone();
728 url.set_path(&format!("/nifi-api{path}"));
729 url
730 }
731}
732
733pub fn extract_error_message(body: &str) -> String {
738 serde_json::from_str::<serde_json::Value>(body)
739 .ok()
740 .and_then(|v| v["message"].as_str().map(str::to_owned))
741 .unwrap_or_else(|| body.to_owned())
742}