use std::{collections::HashMap, time::Duration};
use reifydb_type::{
error::{Diagnostic, Error},
params::Params,
};
use reqwest::Client as ReqwestClient;
use serde::Deserialize;
use crate::{
AdminRequest, AdminResponse, AdminResult, ClientFrame, CommandRequest, CommandResponse, CommandResult,
ErrResponse, LoginResult, QueryRequest, QueryResponse, QueryResult, Response, ResponsePayload, params_to_wire,
session::{parse_admin_response, parse_command_response, parse_query_response},
};
#[derive(Debug, serde::Deserialize)]
struct HttpFrameResponse {
frames: Vec<ClientFrame>,
}
impl HttpFrameResponse {
fn into_admin(self) -> AdminResponse {
AdminResponse {
content_type: "application/vnd.reifydb.frames+json".to_string(),
body: serde_json::json!({ "frames": self.frames }),
}
}
fn into_command(self) -> CommandResponse {
CommandResponse {
content_type: "application/vnd.reifydb.frames+json".to_string(),
body: serde_json::json!({ "frames": self.frames }),
}
}
fn into_query(self) -> QueryResponse {
QueryResponse {
content_type: "application/vnd.reifydb.frames+json".to_string(),
body: serde_json::json!({ "frames": self.frames }),
}
}
}
#[derive(Debug, Deserialize)]
struct HttpErrorResponse {
code: String,
error: String,
#[serde(default)]
diagnostic: Option<Diagnostic>,
}
#[derive(Debug, Deserialize)]
struct HttpAuthenticateResponse {
status: String,
token: Option<String>,
identity: Option<String>,
reason: Option<String>,
}
#[derive(Clone)]
pub struct HttpClient {
inner: ReqwestClient,
base_url: String,
token: Option<String>,
}
impl HttpClient {
pub async fn connect(url: &str) -> Result<Self, Error> {
let inner = ReqwestClient::builder().timeout(Duration::from_secs(30)).build().unwrap();
let base_url = url.trim_end_matches('/').to_string();
Ok(Self {
inner,
base_url,
token: None,
})
}
pub fn with_client(client: ReqwestClient, url: &str) -> Self {
let base_url = url.trim_end_matches('/').to_string();
Self {
inner: client,
base_url,
token: None,
}
}
pub fn authenticate(&mut self, token: &str) {
self.token = Some(token.to_string());
}
pub async fn login_with_password(&mut self, identifier: &str, password: &str) -> Result<LoginResult, Error> {
let mut credentials = HashMap::new();
credentials.insert("identifier".to_string(), identifier.to_string());
credentials.insert("password".to_string(), password.to_string());
self.login("password", credentials).await
}
pub async fn login_with_token(&mut self, token: &str) -> Result<LoginResult, Error> {
let mut credentials = HashMap::new();
credentials.insert("token".to_string(), token.to_string());
self.login("token", credentials).await
}
pub async fn login(
&mut self,
method: &str,
credentials: HashMap<String, String>,
) -> Result<LoginResult, Error> {
let body = serde_json::json!({
"method": method,
"credentials": credentials
});
let url = format!("{}/v1/authenticate", self.base_url);
let response = self.inner.post(&url).json(&body).send().await.unwrap(); let response_body = response.text().await.unwrap();
let auth: HttpAuthenticateResponse = serde_json::from_str(&response_body).unwrap();
if auth.status == "authenticated" {
let token = auth.token.unwrap_or_default();
let identity = auth.identity.unwrap_or_default();
self.token = Some(token.clone());
Ok(LoginResult {
token,
identity,
})
} else {
let reason = auth.reason.unwrap_or_else(|| "Authentication failed".to_string());
panic!("Authentication failed: {}", reason) }
}
pub async fn logout(&mut self) -> Result<(), Error> {
let token = match self.token.as_ref() {
Some(t) => t.clone(),
None => return Ok(()),
};
let url = format!("{}/v1/logout", self.base_url);
let response = self.inner.post(&url).bearer_auth(&token).send().await.unwrap();
let status = response.status();
if status.is_success() {
self.token = None;
Ok(())
} else {
let body = response.text().await.unwrap(); Err(self.parse_error_response(&body))
}
}
pub async fn admin(&self, rql: &str, params: Option<Params>) -> Result<AdminResult, Error> {
let request = AdminRequest {
statements: vec![rql.to_string()],
params: params.and_then(params_to_wire),
};
let response = self.send_admin(&request).await?;
let ws_response = Response {
id: String::new(),
payload: ResponsePayload::Admin(response),
};
parse_admin_response(ws_response)
}
pub async fn admin_batch(&self, statements: Vec<&str>, params: Option<Params>) -> Result<AdminResult, Error> {
let request = AdminRequest {
statements: statements.into_iter().map(String::from).collect(),
params: params.and_then(params_to_wire),
};
let response = self.send_admin(&request).await?;
let ws_response = Response {
id: String::new(),
payload: ResponsePayload::Admin(response),
};
parse_admin_response(ws_response)
}
pub async fn command(&self, rql: &str, params: Option<Params>) -> Result<CommandResult, Error> {
let request = CommandRequest {
statements: vec![rql.to_string()],
params: params.and_then(params_to_wire),
};
let response = self.send_command(&request).await?;
let ws_response = Response {
id: String::new(),
payload: ResponsePayload::Command(response),
};
parse_command_response(ws_response)
}
pub async fn query(&self, rql: &str, params: Option<Params>) -> Result<QueryResult, Error> {
let request = QueryRequest {
statements: vec![rql.to_string()],
params: params.and_then(params_to_wire),
};
let response = self.send_query(&request).await?;
let ws_response = Response {
id: String::new(),
payload: ResponsePayload::Query(response),
};
parse_query_response(ws_response)
}
pub async fn command_batch(
&self,
statements: Vec<&str>,
params: Option<Params>,
) -> Result<CommandResult, Error> {
let request = CommandRequest {
statements: statements.into_iter().map(String::from).collect(),
params: params.and_then(params_to_wire),
};
let response = self.send_command(&request).await?;
let ws_response = Response {
id: String::new(),
payload: ResponsePayload::Command(response),
};
parse_command_response(ws_response)
}
pub async fn query_batch(&self, statements: Vec<&str>, params: Option<Params>) -> Result<QueryResult, Error> {
let request = QueryRequest {
statements: statements.into_iter().map(String::from).collect(),
params: params.and_then(params_to_wire),
};
let response = self.send_query(&request).await?;
let ws_response = Response {
id: String::new(),
payload: ResponsePayload::Query(response),
};
parse_query_response(ws_response)
}
async fn send_admin(&self, request: &AdminRequest) -> Result<AdminResponse, Error> {
let url = format!("{}/v1/admin", self.base_url);
let response_body = self.send_request(&url, request).await?;
match serde_json::from_str::<HttpFrameResponse>(&response_body) {
Ok(response) => Ok(response.into_admin()),
Err(_) => Err(self.parse_error_response(&response_body)),
}
}
async fn send_command(&self, request: &CommandRequest) -> Result<CommandResponse, Error> {
let url = format!("{}/v1/command", self.base_url);
let response_body = self.send_request(&url, request).await?;
match serde_json::from_str::<HttpFrameResponse>(&response_body) {
Ok(response) => Ok(response.into_command()),
Err(_) => Err(self.parse_error_response(&response_body)),
}
}
async fn send_query(&self, request: &QueryRequest) -> Result<QueryResponse, Error> {
let url = format!("{}/v1/query", self.base_url);
let response_body = self.send_request(&url, request).await?;
match serde_json::from_str::<HttpFrameResponse>(&response_body) {
Ok(response) => Ok(response.into_query()),
Err(_) => Err(self.parse_error_response(&response_body)),
}
}
async fn send_request<T: serde::Serialize>(&self, url: &str, body: &T) -> Result<String, Error> {
let mut request = self.inner.post(url).json(body);
if let Some(ref token) = self.token {
request = request.bearer_auth(token);
}
let response = request.send().await.unwrap();
Ok(response.text().await.unwrap()) }
fn parse_error_response(&self, body: &str) -> Error {
if let Ok(http_err) = serde_json::from_str::<HttpErrorResponse>(body) {
let diag = http_err.diagnostic.unwrap_or_else(|| Diagnostic {
code: http_err.code,
message: http_err.error,
..Default::default()
});
return Error(Box::new(diag));
}
if let Ok(err_response) = serde_json::from_str::<ErrResponse>(body) {
return Error(Box::new(err_response.diagnostic));
}
panic!("Failed to parse response: {}", body) }
}