use uuid::Uuid;
use crate::error::Error;
use crate::models::*;
pub struct PipeStreamrClient {
base_url: String,
api_key: String,
http: reqwest::Client,
}
impl PipeStreamrClient {
pub fn new(base_url: &str, api_key: &str) -> Self {
Self {
base_url: base_url.trim_end_matches('/').to_string(),
api_key: api_key.to_string(),
http: reqwest::Client::new(),
}
}
async fn check_response(&self, resp: reqwest::Response) -> Result<reqwest::Response, Error> {
if resp.status().is_success() {
return Ok(resp);
}
let status = resp.status().as_u16();
let message = resp
.json::<serde_json::Value>()
.await
.ok()
.and_then(|v| v.get("error")?.as_str().map(String::from))
.unwrap_or_else(|| "Unknown error".into());
Err(Error::Api { status, message })
}
pub async fn list_events(&self, query: &EventQuery) -> Result<EventListResponse, Error> {
let mut params: Vec<(&str, String)> = Vec::new();
if let Some(ref t) = query.event_type {
params.push(("type", t.clone()));
}
if let Some(ref p) = query.platform {
params.push(("platform", p.clone()));
}
if let Some(sev) = query.severity {
params.push(("severity", sev.to_string()));
}
if let Some(ref svc) = query.service {
params.push(("service", svc.clone()));
}
if let Some(ref f) = query.from {
params.push(("from", f.clone()));
}
if let Some(ref s) = query.search {
params.push(("search", s.clone()));
}
if let Some(s) = query.since {
params.push(("since", s.to_rfc3339()));
}
if let Some(u) = query.until {
params.push(("until", u.to_rfc3339()));
}
if let Some(l) = query.limit {
params.push(("limit", l.to_string()));
}
if let Some(ref c) = query.cursor {
params.push(("cursor", c.clone()));
}
let resp = self
.http
.get(format!("{}/api/v1/messages", self.base_url))
.bearer_auth(&self.api_key)
.query(¶ms)
.send()
.await?;
let resp = self.check_response(resp).await?;
Ok(resp.json().await?)
}
pub async fn get_event(&self, id: Uuid) -> Result<EventResponse, Error> {
let resp = self
.http
.get(format!("{}/api/v1/messages/{}", self.base_url, id))
.bearer_auth(&self.api_key)
.send()
.await?;
let resp = self.check_response(resp).await?;
Ok(resp.json().await?)
}
pub async fn stats(&self) -> Result<StatsResponse, Error> {
let resp = self
.http
.get(format!("{}/api/v1/stats", self.base_url))
.bearer_auth(&self.api_key)
.send()
.await?;
let resp = self.check_response(resp).await?;
Ok(resp.json().await?)
}
pub async fn send_logs(
&self,
service_name: &str,
entries: &[LogEntry],
) -> Result<(), Error> {
let log_records: Vec<serde_json::Value> = entries
.iter()
.map(|e| {
let mut record = serde_json::json!({
"body": { "stringValue": &e.body },
});
if let Some(ref sev) = e.severity_text {
record["severityText"] = serde_json::json!(sev);
}
if let Some(ref attrs) = e.attributes {
if let Some(obj) = attrs.as_object() {
let kv: Vec<serde_json::Value> = obj
.iter()
.map(|(k, v)| {
serde_json::json!({
"key": k,
"value": { "stringValue": v.to_string() }
})
})
.collect();
record["attributes"] = serde_json::json!(kv);
}
}
record
})
.collect();
let payload = serde_json::json!({
"resourceLogs": [{
"resource": {
"attributes": [{
"key": "service.name",
"value": { "stringValue": service_name }
}]
},
"scopeLogs": [{
"logRecords": log_records
}]
}]
});
let resp = self
.http
.post(format!("{}/v1/logs", self.base_url))
.bearer_auth(&self.api_key)
.json(&payload)
.send()
.await?;
self.check_response(resp).await?;
Ok(())
}
pub async fn health(&self) -> Result<bool, Error> {
let resp = self
.http
.get(format!("{}/health", self.base_url))
.send()
.await?;
Ok(resp.status().is_success())
}
}