1use reqwest::Client;
2use serde::de::DeserializeOwned;
3use snafu::ResultExt as _;
4use url::Url;
5
6use crate::NifiError;
7use crate::error::{ApiSnafu, AuthSnafu, HttpSnafu};
8
9#[derive(Debug, Clone)]
11pub struct NifiClient {
12 base_url: Url,
13 http: Client,
14 token: Option<String>,
15}
16
17impl NifiClient {
18 pub(crate) fn from_parts(base_url: Url, http: Client) -> Self {
20 Self {
21 base_url,
22 http,
23 token: None,
24 }
25 }
26
27 pub fn token(&self) -> Option<&str> {
35 self.token.as_deref()
36 }
37
38 pub fn set_token(&mut self, token: String) {
45 self.token = Some(token);
46 }
47
48 pub async fn logout(&mut self) -> Result<(), NifiError> {
58 let result = self.delete("/access/logout").await;
59 self.token = None;
60 if result.is_ok() {
61 tracing::info!("NiFi logout successful");
62 }
63 result
64 }
65
66 pub async fn login(&mut self, username: &str, password: &str) -> Result<(), NifiError> {
97 tracing::debug!(method = "POST", path = "/access/token", "NiFi API request");
98 let url = self.api_url("/access/token");
99 let resp = self
100 .http
101 .post(url)
102 .form(&[("username", username), ("password", password)])
103 .send()
104 .await
105 .context(HttpSnafu)?;
106
107 let status = resp.status();
108 tracing::debug!(
109 method = "POST",
110 path = "/access/token",
111 status = status.as_u16(),
112 "NiFi API response"
113 );
114 if !status.is_success() {
115 let body = resp.text().await.unwrap_or_else(|_| status.to_string());
116 tracing::debug!(
117 method = "POST",
118 path = "/access/token",
119 status = status.as_u16(),
120 %body,
121 "NiFi API raw error body"
122 );
123 let message = extract_error_message(&body);
124 tracing::warn!(
125 method = "POST",
126 path = "/access/token",
127 status = status.as_u16(),
128 %message,
129 "NiFi API error"
130 );
131 return AuthSnafu { message }.fail();
132 }
133
134 let token = resp.text().await.context(HttpSnafu)?;
135 self.token = Some(token);
136 tracing::info!("NiFi login successful for {username}");
137 Ok(())
138 }
139
140 pub(crate) async fn get<T: DeserializeOwned>(&self, path: &str) -> Result<T, NifiError> {
143 tracing::debug!(method = "GET", path, "NiFi API request");
144 let url = self.api_url(path);
145 let resp = self
146 .authenticated(self.http.get(url))
147 .send()
148 .await
149 .context(HttpSnafu)?;
150 Self::deserialize("GET", path, resp).await
151 }
152
153 pub(crate) async fn post<B, T>(&self, path: &str, body: &B) -> Result<T, NifiError>
154 where
155 B: serde::Serialize,
156 T: DeserializeOwned,
157 {
158 tracing::debug!(method = "POST", path, "NiFi API request");
159 let url = self.api_url(path);
160 let resp = self
161 .authenticated(self.http.post(url))
162 .json(body)
163 .send()
164 .await
165 .context(HttpSnafu)?;
166 Self::deserialize("POST", path, resp).await
167 }
168
169 pub(crate) async fn put<B, T>(&self, path: &str, body: &B) -> Result<T, NifiError>
170 where
171 B: serde::Serialize,
172 T: DeserializeOwned,
173 {
174 tracing::debug!(method = "PUT", path, "NiFi API request");
175 let url = self.api_url(path);
176 let resp = self
177 .authenticated(self.http.put(url))
178 .json(body)
179 .send()
180 .await
181 .context(HttpSnafu)?;
182 Self::deserialize("PUT", path, resp).await
183 }
184
185 pub(crate) async fn post_void<B: serde::Serialize>(
187 &self,
188 path: &str,
189 body: &B,
190 ) -> Result<(), NifiError> {
191 tracing::debug!(method = "POST", path, "NiFi API request");
192 let url = self.api_url(path);
193 let resp = self
194 .authenticated(self.http.post(url))
195 .json(body)
196 .send()
197 .await
198 .context(HttpSnafu)?;
199 let status = resp.status();
200 tracing::debug!(
201 method = "POST",
202 path,
203 status = status.as_u16(),
204 "NiFi API response"
205 );
206 if status.is_success() {
207 return Ok(());
208 }
209 let body = resp.text().await.unwrap_or_else(|_| status.to_string());
210 tracing::debug!(method = "POST", path, status = status.as_u16(), %body, "NiFi API raw error body");
211 let message = extract_error_message(&body);
212 tracing::warn!(method = "POST", path, status = status.as_u16(), %message, "NiFi API error");
213 ApiSnafu {
214 status: status.as_u16(),
215 message,
216 }
217 .fail()
218 }
219
220 #[allow(dead_code)]
222 pub(crate) async fn put_void<B: serde::Serialize>(
223 &self,
224 path: &str,
225 body: &B,
226 ) -> Result<(), NifiError> {
227 tracing::debug!(method = "PUT", path, "NiFi API request");
228 let url = self.api_url(path);
229 let resp = self
230 .authenticated(self.http.put(url))
231 .json(body)
232 .send()
233 .await
234 .context(HttpSnafu)?;
235 let status = resp.status();
236 tracing::debug!(
237 method = "PUT",
238 path,
239 status = status.as_u16(),
240 "NiFi API response"
241 );
242 if status.is_success() {
243 return Ok(());
244 }
245 let body = resp.text().await.unwrap_or_else(|_| status.to_string());
246 tracing::debug!(method = "PUT", path, status = status.as_u16(), %body, "NiFi API raw error body");
247 let message = extract_error_message(&body);
248 tracing::warn!(method = "PUT", path, status = status.as_u16(), %message, "NiFi API error");
249 ApiSnafu {
250 status: status.as_u16(),
251 message,
252 }
253 .fail()
254 }
255
256 pub(crate) async fn post_no_body<T: DeserializeOwned>(
258 &self,
259 path: &str,
260 ) -> Result<T, NifiError> {
261 tracing::debug!(method = "POST", path, "NiFi API request");
262 let url = self.api_url(path);
263 let resp = self
264 .authenticated(self.http.post(url))
265 .send()
266 .await
267 .context(HttpSnafu)?;
268 Self::deserialize("POST", path, resp).await
269 }
270
271 #[allow(dead_code)]
275 pub(crate) async fn post_void_no_body(&self, path: &str) -> Result<(), NifiError> {
276 tracing::debug!(method = "POST", path, "NiFi API request");
277 let url = self.api_url(path);
278 let resp = self
279 .authenticated(self.http.post(url))
280 .send()
281 .await
282 .context(HttpSnafu)?;
283 let status = resp.status();
284 tracing::debug!(
285 method = "POST",
286 path,
287 status = status.as_u16(),
288 "NiFi API response"
289 );
290 if status.is_success() {
291 return Ok(());
292 }
293 let body = resp.text().await.unwrap_or_else(|_| status.to_string());
294 tracing::debug!(method = "POST", path, status = status.as_u16(), %body, "NiFi API raw error body");
295 let message = extract_error_message(&body);
296 tracing::warn!(method = "POST", path, status = status.as_u16(), %message, "NiFi API error");
297 ApiSnafu {
298 status: status.as_u16(),
299 message,
300 }
301 .fail()
302 }
303
304 pub(crate) async fn put_no_body<T: DeserializeOwned>(
306 &self,
307 path: &str,
308 ) -> Result<T, NifiError> {
309 tracing::debug!(method = "PUT", path, "NiFi API request");
310 let url = self.api_url(path);
311 let resp = self
312 .authenticated(self.http.put(url))
313 .send()
314 .await
315 .context(HttpSnafu)?;
316 Self::deserialize("PUT", path, resp).await
317 }
318
319 #[allow(dead_code)]
321 pub(crate) async fn put_void_no_body(&self, path: &str) -> Result<(), NifiError> {
322 tracing::debug!(method = "PUT", path, "NiFi API request");
323 let url = self.api_url(path);
324 let resp = self
325 .authenticated(self.http.put(url))
326 .send()
327 .await
328 .context(HttpSnafu)?;
329 let status = resp.status();
330 tracing::debug!(
331 method = "PUT",
332 path,
333 status = status.as_u16(),
334 "NiFi API response"
335 );
336 if status.is_success() {
337 return Ok(());
338 }
339 let body = resp.text().await.unwrap_or_else(|_| status.to_string());
340 tracing::debug!(method = "PUT", path, status = status.as_u16(), %body, "NiFi API raw error body");
341 let message = extract_error_message(&body);
342 tracing::warn!(method = "PUT", path, status = status.as_u16(), %message, "NiFi API error");
343 ApiSnafu {
344 status: status.as_u16(),
345 message,
346 }
347 .fail()
348 }
349
350 pub(crate) async fn post_octet_stream<T: DeserializeOwned>(
355 &self,
356 path: &str,
357 filename: Option<&str>,
358 data: Vec<u8>,
359 ) -> Result<T, NifiError> {
360 tracing::debug!(method = "POST", path, "NiFi API request");
361 let url = self.api_url(path);
362 let builder = self
363 .authenticated(self.http.post(url))
364 .header("Content-Type", "application/octet-stream")
365 .body(data);
366 let builder = if let Some(name) = filename {
367 builder.header("Filename", name)
368 } else {
369 builder
370 };
371 let resp = builder.send().await.context(HttpSnafu)?;
372 Self::deserialize("POST", path, resp).await
373 }
374
375 pub(crate) async fn post_void_octet_stream(
380 &self,
381 path: &str,
382 filename: Option<&str>,
383 data: Vec<u8>,
384 ) -> Result<(), NifiError> {
385 tracing::debug!(method = "POST", path, "NiFi API request");
386 let url = self.api_url(path);
387 let builder = self
388 .authenticated(self.http.post(url))
389 .header("Content-Type", "application/octet-stream")
390 .body(data);
391 let builder = if let Some(name) = filename {
392 builder.header("Filename", name)
393 } else {
394 builder
395 };
396 let resp = builder.send().await.context(HttpSnafu)?;
397 let status = resp.status();
398 tracing::debug!(
399 method = "POST",
400 path,
401 status = status.as_u16(),
402 "NiFi API response"
403 );
404 if status.is_success() {
405 return Ok(());
406 }
407 let body = resp.text().await.unwrap_or_else(|_| status.to_string());
408 tracing::debug!(method = "POST", path, status = status.as_u16(), %body, "NiFi API raw error body");
409 let message = extract_error_message(&body);
410 tracing::warn!(method = "POST", path, status = status.as_u16(), %message, "NiFi API error");
411 ApiSnafu {
412 status: status.as_u16(),
413 message,
414 }
415 .fail()
416 }
417
418 #[allow(dead_code)]
422 pub(crate) async fn post_void_with_query<B: serde::Serialize>(
423 &self,
424 path: &str,
425 body: &B,
426 query: &[(&str, String)],
427 ) -> Result<(), NifiError> {
428 tracing::debug!(method = "POST", path, "NiFi API request");
429 let url = self.api_url(path);
430 let resp = self
431 .authenticated(self.http.post(url).query(query))
432 .json(body)
433 .send()
434 .await
435 .context(HttpSnafu)?;
436 let status = resp.status();
437 tracing::debug!(
438 method = "POST",
439 path,
440 status = status.as_u16(),
441 "NiFi API response"
442 );
443 if status.is_success() {
444 return Ok(());
445 }
446 let body = resp.text().await.unwrap_or_else(|_| status.to_string());
447 tracing::debug!(method = "POST", path, status = status.as_u16(), %body, "NiFi API raw error body");
448 let message = extract_error_message(&body);
449 tracing::warn!(method = "POST", path, status = status.as_u16(), %message, "NiFi API error");
450 ApiSnafu {
451 status: status.as_u16(),
452 message,
453 }
454 .fail()
455 }
456
457 pub(crate) async fn get_void(&self, path: &str) -> Result<(), NifiError> {
462 tracing::debug!(method = "GET", path, "NiFi API request");
463 let url = self.api_url(path);
464 let resp = self
465 .authenticated(self.http.get(url))
466 .send()
467 .await
468 .context(HttpSnafu)?;
469 let status = resp.status();
470 tracing::debug!(
471 method = "GET",
472 path,
473 status = status.as_u16(),
474 "NiFi API response"
475 );
476 if status.is_success() || status.as_u16() == 302 {
477 return Ok(());
478 }
479 let body = resp.text().await.unwrap_or_else(|_| status.to_string());
480 tracing::debug!(method = "GET", path, status = status.as_u16(), %body, "NiFi API raw error body");
481 let message = extract_error_message(&body);
482 tracing::warn!(method = "GET", path, status = status.as_u16(), %message, "NiFi API error");
483 ApiSnafu {
484 status: status.as_u16(),
485 message,
486 }
487 .fail()
488 }
489
490 pub(crate) async fn get_with_query<T: DeserializeOwned>(
491 &self,
492 path: &str,
493 query: &[(&str, String)],
494 ) -> Result<T, NifiError> {
495 tracing::debug!(method = "GET", path, "NiFi API request");
496 let url = self.api_url(path);
497 let resp = self
498 .authenticated(self.http.get(url).query(query))
499 .send()
500 .await
501 .context(HttpSnafu)?;
502 Self::deserialize("GET", path, resp).await
503 }
504
505 pub(crate) async fn get_void_with_query(
506 &self,
507 path: &str,
508 query: &[(&str, String)],
509 ) -> Result<(), NifiError> {
510 tracing::debug!(method = "GET", path, "NiFi API request");
511 let url = self.api_url(path);
512 let resp = self
513 .authenticated(self.http.get(url).query(query))
514 .send()
515 .await
516 .context(HttpSnafu)?;
517 let status = resp.status();
518 tracing::debug!(
519 method = "GET",
520 path,
521 status = status.as_u16(),
522 "NiFi API response"
523 );
524 if status.is_success() || status.as_u16() == 302 {
525 return Ok(());
526 }
527 let body = resp.text().await.unwrap_or_else(|_| status.to_string());
528 tracing::debug!(method = "GET", path, status = status.as_u16(), %body, "NiFi API raw error body");
529 let message = extract_error_message(&body);
530 tracing::warn!(method = "GET", path, status = status.as_u16(), %message, "NiFi API error");
531 ApiSnafu {
532 status: status.as_u16(),
533 message,
534 }
535 .fail()
536 }
537
538 pub(crate) async fn delete_returning_with_query<T: DeserializeOwned>(
539 &self,
540 path: &str,
541 query: &[(&str, String)],
542 ) -> Result<T, NifiError> {
543 tracing::debug!(method = "DELETE", path, "NiFi API request");
544 let url = self.api_url(path);
545 let resp = self
546 .authenticated(self.http.delete(url).query(query))
547 .send()
548 .await
549 .context(HttpSnafu)?;
550 Self::deserialize("DELETE", path, resp).await
551 }
552
553 pub(crate) async fn delete_with_query(
554 &self,
555 path: &str,
556 query: &[(&str, String)],
557 ) -> Result<(), NifiError> {
558 tracing::debug!(method = "DELETE", path, "NiFi API request");
559 let url = self.api_url(path);
560 let resp = self
561 .authenticated(self.http.delete(url).query(query))
562 .send()
563 .await
564 .context(HttpSnafu)?;
565 let status = resp.status();
566 tracing::debug!(
567 method = "DELETE",
568 path,
569 status = status.as_u16(),
570 "NiFi API response"
571 );
572 if status.is_success() {
573 return Ok(());
574 }
575 let body = resp.text().await.unwrap_or_else(|_| status.to_string());
576 tracing::debug!(method = "DELETE", path, status = status.as_u16(), %body, "NiFi API raw error body");
577 let message = extract_error_message(&body);
578 tracing::warn!(method = "DELETE", path, status = status.as_u16(), %message, "NiFi API error");
579 ApiSnafu {
580 status: status.as_u16(),
581 message,
582 }
583 .fail()
584 }
585
586 pub(crate) async fn post_with_query<B, T>(
587 &self,
588 path: &str,
589 body: &B,
590 query: &[(&str, String)],
591 ) -> Result<T, NifiError>
592 where
593 B: serde::Serialize,
594 T: DeserializeOwned,
595 {
596 tracing::debug!(method = "POST", path, "NiFi API request");
597 let url = self.api_url(path);
598 let resp = self
599 .authenticated(self.http.post(url).query(query))
600 .json(body)
601 .send()
602 .await
603 .context(HttpSnafu)?;
604 Self::deserialize("POST", path, resp).await
605 }
606
607 pub(crate) async fn delete_returning<T: DeserializeOwned>(
608 &self,
609 path: &str,
610 ) -> Result<T, NifiError> {
611 tracing::debug!(method = "DELETE", path, "NiFi API request");
612 let url = self.api_url(path);
613 let resp = self
614 .authenticated(self.http.delete(url))
615 .send()
616 .await
617 .context(HttpSnafu)?;
618 Self::deserialize("DELETE", path, resp).await
619 }
620
621 pub(crate) async fn delete(&self, path: &str) -> Result<(), NifiError> {
622 tracing::debug!(method = "DELETE", path, "NiFi API request");
623 let url = self.api_url(path);
624 let resp = self
625 .authenticated(self.http.delete(url))
626 .send()
627 .await
628 .context(HttpSnafu)?;
629 let status = resp.status();
630 tracing::debug!(
631 method = "DELETE",
632 path,
633 status = status.as_u16(),
634 "NiFi API response"
635 );
636 if status.is_success() {
637 return Ok(());
638 }
639 let body = resp.text().await.unwrap_or_else(|_| status.to_string());
640 tracing::debug!(method = "DELETE", path, status = status.as_u16(), %body, "NiFi API raw error body");
641 let message = extract_error_message(&body);
642 tracing::warn!(method = "DELETE", path, status = status.as_u16(), %message, "NiFi API error");
643 ApiSnafu {
644 status: status.as_u16(),
645 message,
646 }
647 .fail()
648 }
649
650 fn authenticated(&self, req: reqwest::RequestBuilder) -> reqwest::RequestBuilder {
651 match &self.token {
652 Some(token) => req.bearer_auth(token),
653 None => {
654 tracing::warn!(
655 "sending NiFi API request without a bearer token — call login() first"
656 );
657 req
658 }
659 }
660 }
661
662 async fn deserialize<T: DeserializeOwned>(
663 method: &str,
664 path: &str,
665 resp: reqwest::Response,
666 ) -> Result<T, NifiError> {
667 let status = resp.status();
668 tracing::debug!(method, path, status = status.as_u16(), "NiFi API response");
669 if status.is_success() {
670 return resp.json::<T>().await.context(HttpSnafu);
671 }
672 let body = resp.text().await.unwrap_or_else(|_| status.to_string());
673 tracing::debug!(method, path, status = status.as_u16(), %body, "NiFi API raw error body");
674 let message = extract_error_message(&body);
675 tracing::warn!(method, path, status = status.as_u16(), %message, "NiFi API error");
676 ApiSnafu {
677 status: status.as_u16(),
678 message,
679 }
680 .fail()
681 }
682
683 pub(crate) fn api_url(&self, path: &str) -> Url {
684 let mut url = self.base_url.clone();
685 url.set_path(&format!("/nifi-api{path}"));
686 url
687 }
688}
689
690pub fn extract_error_message(body: &str) -> String {
695 serde_json::from_str::<serde_json::Value>(body)
696 .ok()
697 .and_then(|v| v["message"].as_str().map(str::to_owned))
698 .unwrap_or_else(|| body.to_owned())
699}