use std::fmt::{Debug, Formatter};
use std::process;
use std::sync::{Arc, Mutex};
use serde_json::{Value, json};
#[derive(Clone, Default)]
pub struct Telemetry {
inner: Option<Arc<Mutex<k2db::ratatouille::Logger<k2db::ratatouille::HttpSink>>>>,
}
impl Telemetry {
pub fn from_env(default_app: &str, default_instance: Option<String>, filter: &str) -> Self {
let url = std::env::var("RINGTAIL_URL")
.ok()
.map(|value| value.trim().to_owned())
.filter(|value| !value.is_empty());
let Some(url) = url else {
return Self::default();
};
let token = std::env::var("RINGTAIL_TOKEN")
.ok()
.map(|value| value.trim().to_owned())
.filter(|value| !value.is_empty());
let app = std::env::var("RATATOUILLE_APP")
.ok()
.map(|value| value.trim().to_owned())
.filter(|value| !value.is_empty())
.unwrap_or_else(|| default_app.to_owned());
let where_value = std::env::var("RATATOUILLE_WHERE")
.ok()
.map(|value| value.trim().to_owned())
.filter(|value| !value.is_empty())
.unwrap_or_else(|| "start".to_owned());
let instance = std::env::var("RATATOUILLE_INSTANCE")
.ok()
.map(|value| value.trim().to_owned())
.filter(|value| !value.is_empty())
.or(default_instance)
.unwrap_or_else(|| format!("pid:{}", process::id()));
let Ok(sink) = k2db::ratatouille::HttpSink::new(k2db::ratatouille::HttpSinkConfig {
url,
token,
user_agent: Some(format!("{default_app}/telemetry")),
}) else {
return Self::default();
};
Self {
inner: Some(Arc::new(Mutex::new(k2db::ratatouille::Logger::with_sink(
k2db::ratatouille::LoggerConfig {
filter: Some(filter.to_owned()),
format: k2db::ratatouille::Format::Ndjson,
source: k2db::ratatouille::SourceIdentity {
app: Some(app),
r#where: Some(where_value),
instance: Some(instance),
},
..k2db::ratatouille::LoggerConfig::default()
},
sink,
)))),
}
}
pub fn emit(&self, topic: &str, kind: &str, mut payload: Value) {
let Some(inner) = &self.inner else {
return;
};
if !payload.is_object() {
payload = json!({ "value": payload });
}
if let Value::Object(object) = &mut payload {
object.insert("kind".to_owned(), Value::String(kind.to_owned()));
object.insert("at".to_owned(), Value::from(epoch_ms_now()));
}
let Ok(line) = serde_json::to_string(&payload) else {
return;
};
let mut logger = inner.lock().unwrap_or_else(|poisoned| poisoned.into_inner());
let _ = logger.log(topic, &line);
}
}
impl Debug for Telemetry {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Telemetry")
.field("enabled", &self.inner.is_some())
.finish()
}
}
fn epoch_ms_now() -> u64 {
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|duration| duration.as_millis() as u64)
.unwrap_or(0)
}