use reqwest::Method;
use serde_json::{json, Value};
use crate::client::PulseClient;
use crate::error::PulseError;
pub struct AuthResource<'c> {
pub(crate) client: &'c PulseClient,
}
impl AuthResource<'_> {
pub async fn login(&self, username: &str, password: &str) -> Result<Value, PulseError> {
let body = json!({ "username": username, "password": password });
let response = self
.client
.request(Method::POST, "/api/auth/login", Some(&body), false)
.await?;
cache_token(self.client, &response);
Ok(response)
}
pub async fn refresh(&self, refresh_token: &str) -> Result<Value, PulseError> {
let body = json!({ "refreshToken": refresh_token });
let response = self
.client
.request(Method::POST, "/api/auth/refresh", Some(&body), false)
.await?;
cache_token(self.client, &response);
Ok(response)
}
pub async fn organizations(&self) -> Result<Vec<Value>, PulseError> {
let result = self
.client
.request(Method::GET, "/api/auth/organizations", None::<&()>, true)
.await?;
Ok(unwrap_list(&result, "organizations"))
}
pub async fn switch_org(&self, org_id: &str) -> Result<Value, PulseError> {
let body = json!({ "orgId": org_id });
let response = self
.client
.request(Method::POST, "/api/auth/switch-org", Some(&body), true)
.await?;
cache_token(self.client, &response);
Ok(response)
}
}
fn cache_token(client: &PulseClient, response: &Value) {
if let Some(token) = response.get("token").and_then(Value::as_str) {
if !token.is_empty() {
client.set_token(token);
}
}
}
pub struct PipelinesResource<'c> {
pub(crate) client: &'c PulseClient,
}
impl PipelinesResource<'_> {
pub async fn list(&self) -> Result<Vec<Value>, PulseError> {
let result = self
.client
.request(Method::GET, "/api/pulse/pipelines", None::<&()>, true)
.await?;
Ok(unwrap_list(&result, "pipelines"))
}
pub async fn get(&self, pipeline_id: &str) -> Result<Value, PulseError> {
let path = format!("/api/pulse/pipelines/{}", encode_path(pipeline_id));
self.client
.request(Method::GET, &path, None::<&()>, true)
.await
}
pub async fn create(&self, definition: &Value) -> Result<Value, PulseError> {
self.client
.request(Method::POST, "/api/pulse/pipelines", Some(definition), true)
.await
}
pub async fn delete(&self, pipeline_id: &str) -> Result<(), PulseError> {
let path = format!("/api/pulse/pipelines/{}", encode_path(pipeline_id));
self.client
.request(Method::DELETE, &path, None::<&()>, true)
.await?;
Ok(())
}
}
pub struct AgentsResource<'c> {
pub(crate) client: &'c PulseClient,
}
impl AgentsResource<'_> {
pub async fn list(&self) -> Result<Vec<Value>, PulseError> {
let result = self
.client
.request(Method::GET, "/api/pulse/agents", None::<&()>, true)
.await?;
Ok(unwrap_list(&result, "agents"))
}
pub async fn get(&self, agent_id: &str) -> Result<Value, PulseError> {
let path = format!("/api/pulse/agents/{}", encode_path(agent_id));
self.client
.request(Method::GET, &path, None::<&()>, true)
.await
}
pub async fn update(&self, agent_id: &str, config: &Value) -> Result<Value, PulseError> {
let path = format!("/api/pulse/agents/{}", encode_path(agent_id));
self.client
.request(Method::PUT, &path, Some(config), true)
.await
}
pub async fn delete(&self, agent_id: &str) -> Result<(), PulseError> {
let path = format!("/api/pulse/agents/{}", encode_path(agent_id));
self.client
.request::<()>(Method::DELETE, &path, None, true)
.await?;
Ok(())
}
}
pub struct TemplatesResource<'c> {
pub(crate) client: &'c PulseClient,
}
impl TemplatesResource<'_> {
pub async fn list(&self) -> Result<Vec<Value>, PulseError> {
let result = self
.client
.request(Method::GET, "/api/pulse/templates", None::<&()>, true)
.await?;
Ok(unwrap_list(&result, "templates"))
}
}
pub struct ModelsResource<'c> {
pub(crate) client: &'c PulseClient,
}
#[derive(Debug, Clone, Default)]
pub struct ModelUpload {
pub name: String,
pub path: Option<String>,
pub data: Option<Vec<u8>>,
pub runtime: Option<String>,
pub input_schema: Option<std::collections::BTreeMap<String, String>>,
pub output_schema: Option<std::collections::BTreeMap<String, String>>,
}
impl ModelUpload {
pub fn from_path(name: impl Into<String>, path: impl Into<String>) -> Self {
Self {
name: name.into(),
path: Some(path.into()),
..Self::default()
}
}
pub fn from_bytes(name: impl Into<String>, data: Vec<u8>) -> Self {
Self {
name: name.into(),
data: Some(data),
..Self::default()
}
}
pub fn runtime(mut self, runtime: impl Into<String>) -> Self {
self.runtime = Some(runtime.into());
self
}
pub fn input_schema(mut self, schema: std::collections::BTreeMap<String, String>) -> Self {
self.input_schema = Some(schema);
self
}
pub fn output_schema(mut self, schema: std::collections::BTreeMap<String, String>) -> Self {
self.output_schema = Some(schema);
self
}
}
impl ModelsResource<'_> {
pub async fn upload(&self, upload: ModelUpload) -> Result<Value, PulseError> {
if upload.name.trim().is_empty() {
return Err(PulseError::InvalidConfig(
"model name must be a non-empty string".to_string(),
));
}
if upload.path.is_some() == upload.data.is_some() {
return Err(PulseError::InvalidConfig(
"provide exactly one of 'path' or 'data'".to_string(),
));
}
let (blob, filename) = match (&upload.path, upload.data) {
(Some(path), None) => {
let bytes = std::fs::read(path)
.map_err(|e| PulseError::InvalidConfig(format!("read {path}: {e}")))?;
let filename = path
.rsplit(['/', '\\'])
.next()
.filter(|s| !s.is_empty())
.unwrap_or("model.onnx")
.to_string();
(bytes, filename)
}
(None, Some(data)) => (data, format!("{}.onnx", upload.name)),
_ => unreachable!("exactly one of path/data enforced above"),
};
if blob.is_empty() {
return Err(PulseError::InvalidConfig(
"model bytes are empty".to_string(),
));
}
let runtime = upload.runtime.unwrap_or_else(|| "onnx".to_string());
let model_part = reqwest::multipart::Part::bytes(blob)
.file_name(filename)
.mime_str("application/octet-stream")
.map_err(PulseError::Transport)?;
let mut form = reqwest::multipart::Form::new()
.text("name", upload.name)
.text("runtime", runtime)
.part("model", model_part);
if let Some(schema) = upload.input_schema {
form = form.text("inputSchema", serde_json::to_string(&schema)?);
}
if let Some(schema) = upload.output_schema {
form = form.text("outputSchema", serde_json::to_string(&schema)?);
}
self.client
.request_multipart("/api/pulse/ml-models", form)
.await
}
pub async fn list(&self) -> Result<Vec<Value>, PulseError> {
let result = self
.client
.request(Method::GET, "/api/pulse/ml-models", None::<&()>, true)
.await?;
Ok(unwrap_list(&result, "models"))
}
pub async fn get(&self, name: &str) -> Result<Value, PulseError> {
let path = format!("/api/pulse/ml-models/{}", encode_path(name));
self.client
.request(Method::GET, &path, None::<&()>, true)
.await
}
pub async fn delete(&self, name: &str) -> Result<(), PulseError> {
let path = format!("/api/pulse/ml-models/{}", encode_path(name));
self.client
.request::<()>(Method::DELETE, &path, None, true)
.await?;
Ok(())
}
}
pub struct UsersResource<'c> {
pub(crate) client: &'c PulseClient,
}
impl UsersResource<'_> {
pub async fn list(&self) -> Result<Vec<Value>, PulseError> {
let result = self
.client
.request(Method::GET, "/api/pulse/users", None::<&()>, true)
.await?;
Ok(unwrap_list(&result, "users"))
}
}
fn unwrap_list(result: &Value, key: &str) -> Vec<Value> {
result
.get(key)
.and_then(Value::as_array)
.cloned()
.unwrap_or_default()
}
fn encode_path(segment: &str) -> String {
let mut out = String::with_capacity(segment.len());
for b in segment.bytes() {
match b {
b'A'..=b'Z' | b'a'..=b'z' | b'0'..=b'9' | b'-' | b'_' | b'.' | b'~' => {
out.push(b as char)
}
_ => out.push_str(&format!("%{b:02X}")),
}
}
out
}
pub struct ConnectorsResource<'c> {
pub(crate) client: &'c PulseClient,
}
impl ConnectorsResource<'_> {
pub async fn list(&self) -> Result<Value, PulseError> {
self.client
.request(Method::GET, "/api/pulse/connectors", None::<&()>, true)
.await
}
pub async fn sinks(&self) -> Result<Vec<Value>, PulseError> {
Ok(unwrap_list(&self.list().await?, "sinks"))
}
pub async fn sources(&self) -> Result<Vec<Value>, PulseError> {
Ok(unwrap_list(&self.list().await?, "sources"))
}
}