use std::sync::Mutex;
use crate::event::{Event, IngestEvent};
use crate::signing;
use crate::trace::Trace;
use crate::types::TraceData;
#[derive(Debug, Clone)]
pub struct BloopClientBuilder {
endpoint: Option<String>,
project_key: Option<String>,
environment: String,
release: String,
source: String,
max_buffer_size: usize,
}
impl BloopClientBuilder {
pub fn new() -> Self {
Self {
endpoint: None,
project_key: None,
environment: "production".into(),
release: String::new(),
source: "rust".into(),
max_buffer_size: 20,
}
}
pub fn endpoint(mut self, endpoint: impl Into<String>) -> Self {
self.endpoint = Some(endpoint.into());
self
}
pub fn project_key(mut self, key: impl Into<String>) -> Self {
self.project_key = Some(key.into());
self
}
pub fn environment(mut self, env: impl Into<String>) -> Self {
self.environment = env.into();
self
}
pub fn release(mut self, release: impl Into<String>) -> Self {
self.release = release.into();
self
}
pub fn source(mut self, source: impl Into<String>) -> Self {
self.source = source.into();
self
}
pub fn max_buffer_size(mut self, size: usize) -> Self {
self.max_buffer_size = size;
self
}
pub fn build(self) -> Result<BloopClient, String> {
let endpoint = self.endpoint.ok_or("endpoint is required")?;
let project_key = self.project_key.ok_or("project_key is required")?;
Ok(BloopClient {
endpoint: endpoint.trim_end_matches('/').to_string(),
project_key,
environment: self.environment,
release: self.release,
source: self.source,
max_buffer_size: self.max_buffer_size,
error_buffer: Mutex::new(Vec::new()),
trace_buffer: Mutex::new(Vec::new()),
})
}
}
impl Default for BloopClientBuilder {
fn default() -> Self {
Self::new()
}
}
pub struct BloopClient {
endpoint: String,
project_key: String,
environment: String,
release: String,
source: String,
max_buffer_size: usize,
error_buffer: Mutex<Vec<IngestEvent>>,
trace_buffer: Mutex<Vec<TraceData>>,
}
impl BloopClient {
pub fn builder() -> BloopClientBuilder {
BloopClientBuilder::new()
}
pub fn capture(&self, event: Event) {
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_secs() as i64;
let ingest = IngestEvent {
timestamp: now,
source: event.source.unwrap_or_else(|| self.source.clone()),
environment: self.environment.clone(),
release: self.release.clone(),
error_type: event.error_type,
message: event.message,
route_or_procedure: event.route_or_procedure,
screen: event.screen,
stack: event.stack,
http_status: event.http_status,
request_id: event.request_id,
user_id_hash: event.user_id_hash,
metadata: event.metadata,
};
let mut buf = self.error_buffer.lock().unwrap();
buf.push(ingest);
if buf.len() >= self.max_buffer_size {
let batch = std::mem::take(&mut *buf);
drop(buf);
self.send_error_batch(batch);
}
}
pub fn capture_error(&self, error_type: impl Into<String>, message: impl Into<String>) {
self.capture(Event {
error_type: error_type.into(),
message: message.into(),
..Default::default()
});
}
pub fn start_trace(&self, name: impl Into<String>) -> Trace {
Trace::new(name)
}
pub fn send_trace(&self, trace: Trace) {
let data = trace.to_data();
let mut buf = self.trace_buffer.lock().unwrap();
buf.push(data);
if buf.len() >= self.max_buffer_size {
let batch = std::mem::take(&mut *buf);
drop(buf);
self.send_trace_batch(batch);
}
}
pub fn flush(&self) {
let errors = {
let mut buf = self.error_buffer.lock().unwrap();
std::mem::take(&mut *buf)
};
if !errors.is_empty() {
self.send_error_batch(errors);
}
let traces = {
let mut buf = self.trace_buffer.lock().unwrap();
std::mem::take(&mut *buf)
};
if !traces.is_empty() {
self.send_trace_batch(traces);
}
}
pub fn close(&self) {
self.flush();
}
fn post(&self, path: &str, body: &[u8]) {
let url = format!("{}{}", self.endpoint, path);
let signature = signing::sign(&self.project_key, body);
#[cfg(feature = "blocking")]
{
let _ = ureq::post(&url)
.set("Content-Type", "application/json")
.set("X-Signature", &signature)
.set("X-Project-Key", &self.project_key)
.send_bytes(body);
}
#[cfg(all(feature = "async", not(feature = "blocking")))]
{
let _ = (url, signature, body);
}
#[cfg(not(any(feature = "blocking", feature = "async")))]
{
let _ = (url, signature, body);
}
}
fn send_error_batch(&self, events: Vec<IngestEvent>) {
if let Ok(body) = serde_json::to_vec(&serde_json::json!({ "events": events })) {
self.post("/v1/ingest/batch", &body);
}
}
fn send_trace_batch(&self, traces: Vec<TraceData>) {
if let Ok(body) = serde_json::to_vec(&serde_json::json!({ "traces": traces })) {
self.post("/v1/traces/batch", &body);
}
}
}
impl std::fmt::Debug for BloopClient {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("BloopClient")
.field("endpoint", &self.endpoint)
.field("environment", &self.environment)
.field("source", &self.source)
.finish()
}
}