use reqwest::Client;
use serde::de::DeserializeOwned;
use snafu::ResultExt as _;
use url::Url;
use crate::NifiError;
use crate::error::{ApiSnafu, AuthSnafu, HttpSnafu};
#[derive(Debug, Clone)]
pub struct NifiClient {
base_url: Url,
http: Client,
token: Option<String>,
}
impl NifiClient {
pub(crate) fn from_parts(base_url: Url, http: Client) -> Self {
Self {
base_url,
http,
token: None,
}
}
pub fn token(&self) -> Option<&str> {
self.token.as_deref()
}
pub fn set_token(&mut self, token: String) {
self.token = Some(token);
}
pub async fn logout(&mut self) -> Result<(), NifiError> {
let result = self.delete("/access/logout").await;
self.token = None;
if result.is_ok() {
tracing::info!("NiFi logout successful");
}
result
}
pub async fn login(&mut self, username: &str, password: &str) -> Result<(), NifiError> {
tracing::debug!(method = "POST", path = "/access/token", "NiFi API request");
let url = self.api_url("/access/token");
let resp = self
.http
.post(url)
.form(&[("username", username), ("password", password)])
.send()
.await
.context(HttpSnafu)?;
let status = resp.status();
tracing::debug!(
method = "POST",
path = "/access/token",
status = status.as_u16(),
"NiFi API response"
);
if !status.is_success() {
let body = resp.text().await.unwrap_or_else(|_| status.to_string());
tracing::debug!(
method = "POST",
path = "/access/token",
status = status.as_u16(),
%body,
"NiFi API raw error body"
);
let message = extract_error_message(&body);
tracing::warn!(
method = "POST",
path = "/access/token",
status = status.as_u16(),
%message,
"NiFi API error"
);
return AuthSnafu { message }.fail();
}
let token = resp.text().await.context(HttpSnafu)?;
self.token = Some(token);
tracing::info!("NiFi login successful for {username}");
Ok(())
}
pub(crate) async fn get<T: DeserializeOwned>(&self, path: &str) -> Result<T, NifiError> {
tracing::debug!(method = "GET", path, "NiFi API request");
let url = self.api_url(path);
let resp = self
.authenticated(self.http.get(url))
.send()
.await
.context(HttpSnafu)?;
Self::deserialize("GET", path, resp).await
}
pub(crate) async fn post<B, T>(&self, path: &str, body: &B) -> Result<T, NifiError>
where
B: serde::Serialize,
T: DeserializeOwned,
{
tracing::debug!(method = "POST", path, "NiFi API request");
let url = self.api_url(path);
let resp = self
.authenticated(self.http.post(url))
.json(body)
.send()
.await
.context(HttpSnafu)?;
Self::deserialize("POST", path, resp).await
}
pub(crate) async fn put<B, T>(&self, path: &str, body: &B) -> Result<T, NifiError>
where
B: serde::Serialize,
T: DeserializeOwned,
{
tracing::debug!(method = "PUT", path, "NiFi API request");
let url = self.api_url(path);
let resp = self
.authenticated(self.http.put(url))
.json(body)
.send()
.await
.context(HttpSnafu)?;
Self::deserialize("PUT", path, resp).await
}
pub(crate) async fn post_void<B: serde::Serialize>(
&self,
path: &str,
body: &B,
) -> Result<(), NifiError> {
tracing::debug!(method = "POST", path, "NiFi API request");
let url = self.api_url(path);
let resp = self
.authenticated(self.http.post(url))
.json(body)
.send()
.await
.context(HttpSnafu)?;
let status = resp.status();
tracing::debug!(
method = "POST",
path,
status = status.as_u16(),
"NiFi API response"
);
if status.is_success() {
return Ok(());
}
let body = resp.text().await.unwrap_or_else(|_| status.to_string());
tracing::debug!(method = "POST", path, status = status.as_u16(), %body, "NiFi API raw error body");
let message = extract_error_message(&body);
tracing::warn!(method = "POST", path, status = status.as_u16(), %message, "NiFi API error");
ApiSnafu {
status: status.as_u16(),
message,
}
.fail()
}
#[allow(dead_code)]
pub(crate) async fn put_void<B: serde::Serialize>(
&self,
path: &str,
body: &B,
) -> Result<(), NifiError> {
tracing::debug!(method = "PUT", path, "NiFi API request");
let url = self.api_url(path);
let resp = self
.authenticated(self.http.put(url))
.json(body)
.send()
.await
.context(HttpSnafu)?;
let status = resp.status();
tracing::debug!(
method = "PUT",
path,
status = status.as_u16(),
"NiFi API response"
);
if status.is_success() {
return Ok(());
}
let body = resp.text().await.unwrap_or_else(|_| status.to_string());
tracing::debug!(method = "PUT", path, status = status.as_u16(), %body, "NiFi API raw error body");
let message = extract_error_message(&body);
tracing::warn!(method = "PUT", path, status = status.as_u16(), %message, "NiFi API error");
ApiSnafu {
status: status.as_u16(),
message,
}
.fail()
}
pub(crate) async fn post_no_body<T: DeserializeOwned>(
&self,
path: &str,
) -> Result<T, NifiError> {
tracing::debug!(method = "POST", path, "NiFi API request");
let url = self.api_url(path);
let resp = self
.authenticated(self.http.post(url))
.send()
.await
.context(HttpSnafu)?;
Self::deserialize("POST", path, resp).await
}
pub(crate) async fn post_void_no_body(&self, path: &str) -> Result<(), NifiError> {
tracing::debug!(method = "POST", path, "NiFi API request");
let url = self.api_url(path);
let resp = self
.authenticated(self.http.post(url))
.send()
.await
.context(HttpSnafu)?;
let status = resp.status();
tracing::debug!(
method = "POST",
path,
status = status.as_u16(),
"NiFi API response"
);
if status.is_success() {
return Ok(());
}
let body = resp.text().await.unwrap_or_else(|_| status.to_string());
tracing::debug!(method = "POST", path, status = status.as_u16(), %body, "NiFi API raw error body");
let message = extract_error_message(&body);
tracing::warn!(method = "POST", path, status = status.as_u16(), %message, "NiFi API error");
ApiSnafu {
status: status.as_u16(),
message,
}
.fail()
}
pub(crate) async fn put_no_body<T: DeserializeOwned>(
&self,
path: &str,
) -> Result<T, NifiError> {
tracing::debug!(method = "PUT", path, "NiFi API request");
let url = self.api_url(path);
let resp = self
.authenticated(self.http.put(url))
.send()
.await
.context(HttpSnafu)?;
Self::deserialize("PUT", path, resp).await
}
#[allow(dead_code)]
pub(crate) async fn put_void_no_body(&self, path: &str) -> Result<(), NifiError> {
tracing::debug!(method = "PUT", path, "NiFi API request");
let url = self.api_url(path);
let resp = self
.authenticated(self.http.put(url))
.send()
.await
.context(HttpSnafu)?;
let status = resp.status();
tracing::debug!(
method = "PUT",
path,
status = status.as_u16(),
"NiFi API response"
);
if status.is_success() {
return Ok(());
}
let body = resp.text().await.unwrap_or_else(|_| status.to_string());
tracing::debug!(method = "PUT", path, status = status.as_u16(), %body, "NiFi API raw error body");
let message = extract_error_message(&body);
tracing::warn!(method = "PUT", path, status = status.as_u16(), %message, "NiFi API error");
ApiSnafu {
status: status.as_u16(),
message,
}
.fail()
}
pub(crate) async fn post_octet_stream<T: DeserializeOwned>(
&self,
path: &str,
filename: Option<&str>,
data: Vec<u8>,
) -> Result<T, NifiError> {
tracing::debug!(method = "POST", path, "NiFi API request");
let url = self.api_url(path);
let builder = self
.authenticated(self.http.post(url))
.header("Content-Type", "application/octet-stream")
.body(data);
let builder = if let Some(name) = filename {
builder.header("Filename", name)
} else {
builder
};
let resp = builder.send().await.context(HttpSnafu)?;
Self::deserialize("POST", path, resp).await
}
pub(crate) async fn post_void_octet_stream(
&self,
path: &str,
filename: Option<&str>,
data: Vec<u8>,
) -> Result<(), NifiError> {
tracing::debug!(method = "POST", path, "NiFi API request");
let url = self.api_url(path);
let builder = self
.authenticated(self.http.post(url))
.header("Content-Type", "application/octet-stream")
.body(data);
let builder = if let Some(name) = filename {
builder.header("Filename", name)
} else {
builder
};
let resp = builder.send().await.context(HttpSnafu)?;
let status = resp.status();
tracing::debug!(
method = "POST",
path,
status = status.as_u16(),
"NiFi API response"
);
if status.is_success() {
return Ok(());
}
let body = resp.text().await.unwrap_or_else(|_| status.to_string());
tracing::debug!(method = "POST", path, status = status.as_u16(), %body, "NiFi API raw error body");
let message = extract_error_message(&body);
tracing::warn!(method = "POST", path, status = status.as_u16(), %message, "NiFi API error");
ApiSnafu {
status: status.as_u16(),
message,
}
.fail()
}
#[allow(dead_code)]
pub(crate) async fn post_void_with_query<B: serde::Serialize>(
&self,
path: &str,
body: &B,
query: &[(&str, String)],
) -> Result<(), NifiError> {
tracing::debug!(method = "POST", path, "NiFi API request");
let url = self.api_url(path);
let resp = self
.authenticated(self.http.post(url).query(query))
.json(body)
.send()
.await
.context(HttpSnafu)?;
let status = resp.status();
tracing::debug!(
method = "POST",
path,
status = status.as_u16(),
"NiFi API response"
);
if status.is_success() {
return Ok(());
}
let body = resp.text().await.unwrap_or_else(|_| status.to_string());
tracing::debug!(method = "POST", path, status = status.as_u16(), %body, "NiFi API raw error body");
let message = extract_error_message(&body);
tracing::warn!(method = "POST", path, status = status.as_u16(), %message, "NiFi API error");
ApiSnafu {
status: status.as_u16(),
message,
}
.fail()
}
pub(crate) async fn get_void(&self, path: &str) -> Result<(), NifiError> {
tracing::debug!(method = "GET", path, "NiFi API request");
let url = self.api_url(path);
let resp = self
.authenticated(self.http.get(url))
.send()
.await
.context(HttpSnafu)?;
let status = resp.status();
tracing::debug!(
method = "GET",
path,
status = status.as_u16(),
"NiFi API response"
);
if status.is_success() || status.as_u16() == 302 {
return Ok(());
}
let body = resp.text().await.unwrap_or_else(|_| status.to_string());
tracing::debug!(method = "GET", path, status = status.as_u16(), %body, "NiFi API raw error body");
let message = extract_error_message(&body);
tracing::warn!(method = "GET", path, status = status.as_u16(), %message, "NiFi API error");
ApiSnafu {
status: status.as_u16(),
message,
}
.fail()
}
pub(crate) async fn get_with_query<T: DeserializeOwned>(
&self,
path: &str,
query: &[(&str, String)],
) -> Result<T, NifiError> {
tracing::debug!(method = "GET", path, "NiFi API request");
let url = self.api_url(path);
let resp = self
.authenticated(self.http.get(url).query(query))
.send()
.await
.context(HttpSnafu)?;
Self::deserialize("GET", path, resp).await
}
pub(crate) async fn get_void_with_query(
&self,
path: &str,
query: &[(&str, String)],
) -> Result<(), NifiError> {
tracing::debug!(method = "GET", path, "NiFi API request");
let url = self.api_url(path);
let resp = self
.authenticated(self.http.get(url).query(query))
.send()
.await
.context(HttpSnafu)?;
let status = resp.status();
tracing::debug!(
method = "GET",
path,
status = status.as_u16(),
"NiFi API response"
);
if status.is_success() || status.as_u16() == 302 {
return Ok(());
}
let body = resp.text().await.unwrap_or_else(|_| status.to_string());
tracing::debug!(method = "GET", path, status = status.as_u16(), %body, "NiFi API raw error body");
let message = extract_error_message(&body);
tracing::warn!(method = "GET", path, status = status.as_u16(), %message, "NiFi API error");
ApiSnafu {
status: status.as_u16(),
message,
}
.fail()
}
pub(crate) async fn delete_returning_with_query<T: DeserializeOwned>(
&self,
path: &str,
query: &[(&str, String)],
) -> Result<T, NifiError> {
tracing::debug!(method = "DELETE", path, "NiFi API request");
let url = self.api_url(path);
let resp = self
.authenticated(self.http.delete(url).query(query))
.send()
.await
.context(HttpSnafu)?;
Self::deserialize("DELETE", path, resp).await
}
pub(crate) async fn delete_with_query(
&self,
path: &str,
query: &[(&str, String)],
) -> Result<(), NifiError> {
tracing::debug!(method = "DELETE", path, "NiFi API request");
let url = self.api_url(path);
let resp = self
.authenticated(self.http.delete(url).query(query))
.send()
.await
.context(HttpSnafu)?;
let status = resp.status();
tracing::debug!(
method = "DELETE",
path,
status = status.as_u16(),
"NiFi API response"
);
if status.is_success() {
return Ok(());
}
let body = resp.text().await.unwrap_or_else(|_| status.to_string());
tracing::debug!(method = "DELETE", path, status = status.as_u16(), %body, "NiFi API raw error body");
let message = extract_error_message(&body);
tracing::warn!(method = "DELETE", path, status = status.as_u16(), %message, "NiFi API error");
ApiSnafu {
status: status.as_u16(),
message,
}
.fail()
}
pub(crate) async fn post_with_query<B, T>(
&self,
path: &str,
body: &B,
query: &[(&str, String)],
) -> Result<T, NifiError>
where
B: serde::Serialize,
T: DeserializeOwned,
{
tracing::debug!(method = "POST", path, "NiFi API request");
let url = self.api_url(path);
let resp = self
.authenticated(self.http.post(url).query(query))
.json(body)
.send()
.await
.context(HttpSnafu)?;
Self::deserialize("POST", path, resp).await
}
pub(crate) async fn delete_returning<T: DeserializeOwned>(
&self,
path: &str,
) -> Result<T, NifiError> {
tracing::debug!(method = "DELETE", path, "NiFi API request");
let url = self.api_url(path);
let resp = self
.authenticated(self.http.delete(url))
.send()
.await
.context(HttpSnafu)?;
Self::deserialize("DELETE", path, resp).await
}
pub(crate) async fn delete(&self, path: &str) -> Result<(), NifiError> {
tracing::debug!(method = "DELETE", path, "NiFi API request");
let url = self.api_url(path);
let resp = self
.authenticated(self.http.delete(url))
.send()
.await
.context(HttpSnafu)?;
let status = resp.status();
tracing::debug!(
method = "DELETE",
path,
status = status.as_u16(),
"NiFi API response"
);
if status.is_success() {
return Ok(());
}
let body = resp.text().await.unwrap_or_else(|_| status.to_string());
tracing::debug!(method = "DELETE", path, status = status.as_u16(), %body, "NiFi API raw error body");
let message = extract_error_message(&body);
tracing::warn!(method = "DELETE", path, status = status.as_u16(), %message, "NiFi API error");
ApiSnafu {
status: status.as_u16(),
message,
}
.fail()
}
fn authenticated(&self, req: reqwest::RequestBuilder) -> reqwest::RequestBuilder {
match &self.token {
Some(token) => req.bearer_auth(token),
None => {
tracing::warn!(
"sending NiFi API request without a bearer token — call login() first"
);
req
}
}
}
async fn deserialize<T: DeserializeOwned>(
method: &str,
path: &str,
resp: reqwest::Response,
) -> Result<T, NifiError> {
let status = resp.status();
tracing::debug!(method, path, status = status.as_u16(), "NiFi API response");
if status.is_success() {
return resp.json::<T>().await.context(HttpSnafu);
}
let body = resp.text().await.unwrap_or_else(|_| status.to_string());
tracing::debug!(method, path, status = status.as_u16(), %body, "NiFi API raw error body");
let message = extract_error_message(&body);
tracing::warn!(method, path, status = status.as_u16(), %message, "NiFi API error");
ApiSnafu {
status: status.as_u16(),
message,
}
.fail()
}
pub(crate) fn api_url(&self, path: &str) -> Url {
let mut url = self.base_url.clone();
url.set_path(&format!("/nifi-api{path}"));
url
}
}
pub fn extract_error_message(body: &str) -> String {
serde_json::from_str::<serde_json::Value>(body)
.ok()
.and_then(|v| v["message"].as_str().map(str::to_owned))
.unwrap_or_else(|| body.to_owned())
}