1use reqwest::Client;
2use serde::de::DeserializeOwned;
3use snafu::ResultExt as _;
4use url::Url;
5
6use crate::NifiError;
7use crate::error::{ApiSnafu, AuthSnafu, HttpSnafu};
8
9#[derive(Debug, Clone)]
11pub struct NifiClient {
12 base_url: Url,
13 http: Client,
14 token: Option<String>,
15}
16
17impl NifiClient {
18 pub(crate) fn from_parts(base_url: Url, http: Client) -> Self {
20 Self {
21 base_url,
22 http,
23 token: None,
24 }
25 }
26
27 pub fn token(&self) -> Option<&str> {
35 self.token.as_deref()
36 }
37
38 pub fn set_token(&mut self, token: String) {
45 self.token = Some(token);
46 }
47
48 pub async fn logout(&mut self) -> Result<(), NifiError> {
58 let result = self.delete("/access/logout").await;
59 self.token = None;
60 if result.is_ok() {
61 tracing::info!("NiFi logout successful");
62 }
63 result
64 }
65
66 pub async fn login(&mut self, username: &str, password: &str) -> Result<(), NifiError> {
97 tracing::debug!(method = "POST", path = "/access/token", "NiFi API request");
98 let url = self.api_url("/access/token");
99 let resp = self
100 .http
101 .post(url)
102 .form(&[("username", username), ("password", password)])
103 .send()
104 .await
105 .context(HttpSnafu)?;
106
107 let status = resp.status();
108 tracing::debug!(
109 method = "POST",
110 path = "/access/token",
111 status = status.as_u16(),
112 "NiFi API response"
113 );
114 if !status.is_success() {
115 let body = resp.text().await.unwrap_or_else(|_| status.to_string());
116 tracing::debug!(
117 method = "POST",
118 path = "/access/token",
119 status = status.as_u16(),
120 %body,
121 "NiFi API raw error body"
122 );
123 let message = extract_error_message(&body);
124 tracing::warn!(
125 method = "POST",
126 path = "/access/token",
127 status = status.as_u16(),
128 %message,
129 "NiFi API error"
130 );
131 return AuthSnafu { message }.fail();
132 }
133
134 let token = resp.text().await.context(HttpSnafu)?;
135 self.token = Some(token);
136 tracing::info!("NiFi login successful for {username}");
137 Ok(())
138 }
139
140 pub(crate) async fn get<T: DeserializeOwned>(&self, path: &str) -> Result<T, NifiError> {
143 tracing::debug!(method = "GET", path, "NiFi API request");
144 let url = self.api_url(path);
145 let resp = self
146 .authenticated(self.http.get(url))
147 .send()
148 .await
149 .context(HttpSnafu)?;
150 Self::deserialize("GET", path, resp).await
151 }
152
153 pub(crate) async fn post<B, T>(&self, path: &str, body: &B) -> Result<T, NifiError>
154 where
155 B: serde::Serialize,
156 T: DeserializeOwned,
157 {
158 tracing::debug!(method = "POST", path, "NiFi API request");
159 let url = self.api_url(path);
160 let resp = self
161 .authenticated(self.http.post(url))
162 .json(body)
163 .send()
164 .await
165 .context(HttpSnafu)?;
166 Self::deserialize("POST", path, resp).await
167 }
168
169 pub(crate) async fn put<B, T>(&self, path: &str, body: &B) -> Result<T, NifiError>
170 where
171 B: serde::Serialize,
172 T: DeserializeOwned,
173 {
174 tracing::debug!(method = "PUT", path, "NiFi API request");
175 let url = self.api_url(path);
176 let resp = self
177 .authenticated(self.http.put(url))
178 .json(body)
179 .send()
180 .await
181 .context(HttpSnafu)?;
182 Self::deserialize("PUT", path, resp).await
183 }
184
185 pub(crate) async fn post_void<B: serde::Serialize>(
187 &self,
188 path: &str,
189 body: &B,
190 ) -> Result<(), NifiError> {
191 tracing::debug!(method = "POST", path, "NiFi API request");
192 let url = self.api_url(path);
193 let resp = self
194 .authenticated(self.http.post(url))
195 .json(body)
196 .send()
197 .await
198 .context(HttpSnafu)?;
199 let status = resp.status();
200 tracing::debug!(
201 method = "POST",
202 path,
203 status = status.as_u16(),
204 "NiFi API response"
205 );
206 if status.is_success() {
207 return Ok(());
208 }
209 let body = resp.text().await.unwrap_or_else(|_| status.to_string());
210 tracing::debug!(method = "POST", path, status = status.as_u16(), %body, "NiFi API raw error body");
211 let message = extract_error_message(&body);
212 tracing::warn!(method = "POST", path, status = status.as_u16(), %message, "NiFi API error");
213 ApiSnafu {
214 status: status.as_u16(),
215 message,
216 }
217 .fail()
218 }
219
220 #[allow(dead_code)]
222 pub(crate) async fn put_void<B: serde::Serialize>(
223 &self,
224 path: &str,
225 body: &B,
226 ) -> Result<(), NifiError> {
227 tracing::debug!(method = "PUT", path, "NiFi API request");
228 let url = self.api_url(path);
229 let resp = self
230 .authenticated(self.http.put(url))
231 .json(body)
232 .send()
233 .await
234 .context(HttpSnafu)?;
235 let status = resp.status();
236 tracing::debug!(
237 method = "PUT",
238 path,
239 status = status.as_u16(),
240 "NiFi API response"
241 );
242 if status.is_success() {
243 return Ok(());
244 }
245 let body = resp.text().await.unwrap_or_else(|_| status.to_string());
246 tracing::debug!(method = "PUT", path, status = status.as_u16(), %body, "NiFi API raw error body");
247 let message = extract_error_message(&body);
248 tracing::warn!(method = "PUT", path, status = status.as_u16(), %message, "NiFi API error");
249 ApiSnafu {
250 status: status.as_u16(),
251 message,
252 }
253 .fail()
254 }
255
256 pub(crate) async fn post_no_body<T: DeserializeOwned>(
258 &self,
259 path: &str,
260 ) -> Result<T, NifiError> {
261 tracing::debug!(method = "POST", path, "NiFi API request");
262 let url = self.api_url(path);
263 let resp = self
264 .authenticated(self.http.post(url))
265 .send()
266 .await
267 .context(HttpSnafu)?;
268 Self::deserialize("POST", path, resp).await
269 }
270
271 pub(crate) async fn post_void_no_body(&self, path: &str) -> Result<(), NifiError> {
273 tracing::debug!(method = "POST", path, "NiFi API request");
274 let url = self.api_url(path);
275 let resp = self
276 .authenticated(self.http.post(url))
277 .send()
278 .await
279 .context(HttpSnafu)?;
280 let status = resp.status();
281 tracing::debug!(
282 method = "POST",
283 path,
284 status = status.as_u16(),
285 "NiFi API response"
286 );
287 if status.is_success() {
288 return Ok(());
289 }
290 let body = resp.text().await.unwrap_or_else(|_| status.to_string());
291 tracing::debug!(method = "POST", path, status = status.as_u16(), %body, "NiFi API raw error body");
292 let message = extract_error_message(&body);
293 tracing::warn!(method = "POST", path, status = status.as_u16(), %message, "NiFi API error");
294 ApiSnafu {
295 status: status.as_u16(),
296 message,
297 }
298 .fail()
299 }
300
301 pub(crate) async fn put_no_body<T: DeserializeOwned>(
303 &self,
304 path: &str,
305 ) -> Result<T, NifiError> {
306 tracing::debug!(method = "PUT", path, "NiFi API request");
307 let url = self.api_url(path);
308 let resp = self
309 .authenticated(self.http.put(url))
310 .send()
311 .await
312 .context(HttpSnafu)?;
313 Self::deserialize("PUT", path, resp).await
314 }
315
316 #[allow(dead_code)]
318 pub(crate) async fn put_void_no_body(&self, path: &str) -> Result<(), NifiError> {
319 tracing::debug!(method = "PUT", path, "NiFi API request");
320 let url = self.api_url(path);
321 let resp = self
322 .authenticated(self.http.put(url))
323 .send()
324 .await
325 .context(HttpSnafu)?;
326 let status = resp.status();
327 tracing::debug!(
328 method = "PUT",
329 path,
330 status = status.as_u16(),
331 "NiFi API response"
332 );
333 if status.is_success() {
334 return Ok(());
335 }
336 let body = resp.text().await.unwrap_or_else(|_| status.to_string());
337 tracing::debug!(method = "PUT", path, status = status.as_u16(), %body, "NiFi API raw error body");
338 let message = extract_error_message(&body);
339 tracing::warn!(method = "PUT", path, status = status.as_u16(), %message, "NiFi API error");
340 ApiSnafu {
341 status: status.as_u16(),
342 message,
343 }
344 .fail()
345 }
346
347 pub(crate) async fn post_octet_stream<T: DeserializeOwned>(
352 &self,
353 path: &str,
354 filename: Option<&str>,
355 data: Vec<u8>,
356 ) -> Result<T, NifiError> {
357 tracing::debug!(method = "POST", path, "NiFi API request");
358 let url = self.api_url(path);
359 let builder = self
360 .authenticated(self.http.post(url))
361 .header("Content-Type", "application/octet-stream")
362 .body(data);
363 let builder = if let Some(name) = filename {
364 builder.header("Filename", name)
365 } else {
366 builder
367 };
368 let resp = builder.send().await.context(HttpSnafu)?;
369 Self::deserialize("POST", path, resp).await
370 }
371
372 pub(crate) async fn post_void_octet_stream(
377 &self,
378 path: &str,
379 filename: Option<&str>,
380 data: Vec<u8>,
381 ) -> Result<(), NifiError> {
382 tracing::debug!(method = "POST", path, "NiFi API request");
383 let url = self.api_url(path);
384 let builder = self
385 .authenticated(self.http.post(url))
386 .header("Content-Type", "application/octet-stream")
387 .body(data);
388 let builder = if let Some(name) = filename {
389 builder.header("Filename", name)
390 } else {
391 builder
392 };
393 let resp = builder.send().await.context(HttpSnafu)?;
394 let status = resp.status();
395 tracing::debug!(
396 method = "POST",
397 path,
398 status = status.as_u16(),
399 "NiFi API response"
400 );
401 if status.is_success() {
402 return Ok(());
403 }
404 let body = resp.text().await.unwrap_or_else(|_| status.to_string());
405 tracing::debug!(method = "POST", path, status = status.as_u16(), %body, "NiFi API raw error body");
406 let message = extract_error_message(&body);
407 tracing::warn!(method = "POST", path, status = status.as_u16(), %message, "NiFi API error");
408 ApiSnafu {
409 status: status.as_u16(),
410 message,
411 }
412 .fail()
413 }
414
415 #[allow(dead_code)]
419 pub(crate) async fn post_void_with_query<B: serde::Serialize>(
420 &self,
421 path: &str,
422 body: &B,
423 query: &[(&str, String)],
424 ) -> Result<(), NifiError> {
425 tracing::debug!(method = "POST", path, "NiFi API request");
426 let url = self.api_url(path);
427 let resp = self
428 .authenticated(self.http.post(url).query(query))
429 .json(body)
430 .send()
431 .await
432 .context(HttpSnafu)?;
433 let status = resp.status();
434 tracing::debug!(
435 method = "POST",
436 path,
437 status = status.as_u16(),
438 "NiFi API response"
439 );
440 if status.is_success() {
441 return Ok(());
442 }
443 let body = resp.text().await.unwrap_or_else(|_| status.to_string());
444 tracing::debug!(method = "POST", path, status = status.as_u16(), %body, "NiFi API raw error body");
445 let message = extract_error_message(&body);
446 tracing::warn!(method = "POST", path, status = status.as_u16(), %message, "NiFi API error");
447 ApiSnafu {
448 status: status.as_u16(),
449 message,
450 }
451 .fail()
452 }
453
454 pub(crate) async fn get_void(&self, path: &str) -> Result<(), NifiError> {
459 tracing::debug!(method = "GET", path, "NiFi API request");
460 let url = self.api_url(path);
461 let resp = self
462 .authenticated(self.http.get(url))
463 .send()
464 .await
465 .context(HttpSnafu)?;
466 let status = resp.status();
467 tracing::debug!(
468 method = "GET",
469 path,
470 status = status.as_u16(),
471 "NiFi API response"
472 );
473 if status.is_success() || status.as_u16() == 302 {
474 return Ok(());
475 }
476 let body = resp.text().await.unwrap_or_else(|_| status.to_string());
477 tracing::debug!(method = "GET", path, status = status.as_u16(), %body, "NiFi API raw error body");
478 let message = extract_error_message(&body);
479 tracing::warn!(method = "GET", path, status = status.as_u16(), %message, "NiFi API error");
480 ApiSnafu {
481 status: status.as_u16(),
482 message,
483 }
484 .fail()
485 }
486
487 pub(crate) async fn get_with_query<T: DeserializeOwned>(
488 &self,
489 path: &str,
490 query: &[(&str, String)],
491 ) -> Result<T, NifiError> {
492 tracing::debug!(method = "GET", path, "NiFi API request");
493 let url = self.api_url(path);
494 let resp = self
495 .authenticated(self.http.get(url).query(query))
496 .send()
497 .await
498 .context(HttpSnafu)?;
499 Self::deserialize("GET", path, resp).await
500 }
501
502 pub(crate) async fn get_void_with_query(
503 &self,
504 path: &str,
505 query: &[(&str, String)],
506 ) -> Result<(), NifiError> {
507 tracing::debug!(method = "GET", path, "NiFi API request");
508 let url = self.api_url(path);
509 let resp = self
510 .authenticated(self.http.get(url).query(query))
511 .send()
512 .await
513 .context(HttpSnafu)?;
514 let status = resp.status();
515 tracing::debug!(
516 method = "GET",
517 path,
518 status = status.as_u16(),
519 "NiFi API response"
520 );
521 if status.is_success() || status.as_u16() == 302 {
522 return Ok(());
523 }
524 let body = resp.text().await.unwrap_or_else(|_| status.to_string());
525 tracing::debug!(method = "GET", path, status = status.as_u16(), %body, "NiFi API raw error body");
526 let message = extract_error_message(&body);
527 tracing::warn!(method = "GET", path, status = status.as_u16(), %message, "NiFi API error");
528 ApiSnafu {
529 status: status.as_u16(),
530 message,
531 }
532 .fail()
533 }
534
535 pub(crate) async fn delete_returning_with_query<T: DeserializeOwned>(
536 &self,
537 path: &str,
538 query: &[(&str, String)],
539 ) -> Result<T, NifiError> {
540 tracing::debug!(method = "DELETE", path, "NiFi API request");
541 let url = self.api_url(path);
542 let resp = self
543 .authenticated(self.http.delete(url).query(query))
544 .send()
545 .await
546 .context(HttpSnafu)?;
547 Self::deserialize("DELETE", path, resp).await
548 }
549
550 pub(crate) async fn delete_with_query(
551 &self,
552 path: &str,
553 query: &[(&str, String)],
554 ) -> Result<(), NifiError> {
555 tracing::debug!(method = "DELETE", path, "NiFi API request");
556 let url = self.api_url(path);
557 let resp = self
558 .authenticated(self.http.delete(url).query(query))
559 .send()
560 .await
561 .context(HttpSnafu)?;
562 let status = resp.status();
563 tracing::debug!(
564 method = "DELETE",
565 path,
566 status = status.as_u16(),
567 "NiFi API response"
568 );
569 if status.is_success() {
570 return Ok(());
571 }
572 let body = resp.text().await.unwrap_or_else(|_| status.to_string());
573 tracing::debug!(method = "DELETE", path, status = status.as_u16(), %body, "NiFi API raw error body");
574 let message = extract_error_message(&body);
575 tracing::warn!(method = "DELETE", path, status = status.as_u16(), %message, "NiFi API error");
576 ApiSnafu {
577 status: status.as_u16(),
578 message,
579 }
580 .fail()
581 }
582
583 pub(crate) async fn post_with_query<B, T>(
584 &self,
585 path: &str,
586 body: &B,
587 query: &[(&str, String)],
588 ) -> Result<T, NifiError>
589 where
590 B: serde::Serialize,
591 T: DeserializeOwned,
592 {
593 tracing::debug!(method = "POST", path, "NiFi API request");
594 let url = self.api_url(path);
595 let resp = self
596 .authenticated(self.http.post(url).query(query))
597 .json(body)
598 .send()
599 .await
600 .context(HttpSnafu)?;
601 Self::deserialize("POST", path, resp).await
602 }
603
604 pub(crate) async fn delete_returning<T: DeserializeOwned>(
605 &self,
606 path: &str,
607 ) -> Result<T, NifiError> {
608 tracing::debug!(method = "DELETE", path, "NiFi API request");
609 let url = self.api_url(path);
610 let resp = self
611 .authenticated(self.http.delete(url))
612 .send()
613 .await
614 .context(HttpSnafu)?;
615 Self::deserialize("DELETE", path, resp).await
616 }
617
618 pub(crate) async fn delete(&self, path: &str) -> Result<(), NifiError> {
619 tracing::debug!(method = "DELETE", path, "NiFi API request");
620 let url = self.api_url(path);
621 let resp = self
622 .authenticated(self.http.delete(url))
623 .send()
624 .await
625 .context(HttpSnafu)?;
626 let status = resp.status();
627 tracing::debug!(
628 method = "DELETE",
629 path,
630 status = status.as_u16(),
631 "NiFi API response"
632 );
633 if status.is_success() {
634 return Ok(());
635 }
636 let body = resp.text().await.unwrap_or_else(|_| status.to_string());
637 tracing::debug!(method = "DELETE", path, status = status.as_u16(), %body, "NiFi API raw error body");
638 let message = extract_error_message(&body);
639 tracing::warn!(method = "DELETE", path, status = status.as_u16(), %message, "NiFi API error");
640 ApiSnafu {
641 status: status.as_u16(),
642 message,
643 }
644 .fail()
645 }
646
647 fn authenticated(&self, req: reqwest::RequestBuilder) -> reqwest::RequestBuilder {
648 match &self.token {
649 Some(token) => req.bearer_auth(token),
650 None => {
651 tracing::warn!(
652 "sending NiFi API request without a bearer token — call login() first"
653 );
654 req
655 }
656 }
657 }
658
659 async fn deserialize<T: DeserializeOwned>(
660 method: &str,
661 path: &str,
662 resp: reqwest::Response,
663 ) -> Result<T, NifiError> {
664 let status = resp.status();
665 tracing::debug!(method, path, status = status.as_u16(), "NiFi API response");
666 if status.is_success() {
667 return resp.json::<T>().await.context(HttpSnafu);
668 }
669 let body = resp.text().await.unwrap_or_else(|_| status.to_string());
670 tracing::debug!(method, path, status = status.as_u16(), %body, "NiFi API raw error body");
671 let message = extract_error_message(&body);
672 tracing::warn!(method, path, status = status.as_u16(), %message, "NiFi API error");
673 ApiSnafu {
674 status: status.as_u16(),
675 message,
676 }
677 .fail()
678 }
679
680 pub(crate) fn api_url(&self, path: &str) -> Url {
681 let mut url = self.base_url.clone();
682 url.set_path(&format!("/nifi-api{path}"));
683 url
684 }
685}
686
687pub fn extract_error_message(body: &str) -> String {
692 serde_json::from_str::<serde_json::Value>(body)
693 .ok()
694 .and_then(|v| v["message"].as_str().map(str::to_owned))
695 .unwrap_or_else(|| body.to_owned())
696}