use std::borrow::Cow;
use diesel::connection::{Instrumentation, InstrumentationEvent};
use opentelemetry::{
global,
trace::{Span, SpanKind, Status, Tracer},
KeyValue,
};
pub struct OtelInstrumentation {
current_span: Option<opentelemetry::global::BoxedSpan>,
include_query_text: bool,
server_address: Option<String>,
db_namespace: Option<String>,
}
impl OtelInstrumentation {
pub fn new() -> Self {
Self {
current_span: None,
include_query_text: true,
server_address: None,
db_namespace: None,
}
}
pub fn with_query_text(mut self, enabled: bool) -> Self {
self.include_query_text = enabled;
self
}
}
impl Default for OtelInstrumentation {
fn default() -> Self {
Self::new()
}
}
fn extract_table_name(sql: &str) -> Option<&str> {
let upper = sql.to_uppercase();
let trimmed = upper.trim();
if let Some(pos) = trimmed.find("FROM ") {
let after_from = &sql[pos + 5..];
return after_from.split_whitespace().next().map(|t| t.trim_matches('`'));
}
if let Some(pos) = trimmed.find("INTO ") {
let after_into = &sql[pos + 5..];
return after_into.split_whitespace().next().map(|t| t.trim_matches('`'));
}
if trimmed.starts_with("UPDATE ") {
let after_update = &sql[7..];
return after_update.split_whitespace().next().map(|t| t.trim_matches('`'));
}
None
}
fn build_query_summary(op_name: &str, table: Option<&str>) -> String {
match table {
Some(t) => format!("{} {}", op_name, t),
None => op_name.to_string(),
}
}
fn redact_url(url: &str) -> String {
if let Some(idx) = url.find("authToken=") {
format!("{}authToken=REDACTED", &url[..idx])
} else {
url.to_string()
}
}
fn extract_namespace(url: &str) -> Option<String> {
if url == ":memory:" {
return Some(":memory:".to_string());
}
if url.starts_with("libsql://") || url.starts_with("http://") || url.starts_with("https://") {
url.split("://")
.nth(1)
.and_then(|rest| rest.split(['?', '/', ':']).next())
.map(|s| s.to_string())
} else {
url.rsplit('/').next().map(|s| s.to_string())
}
}
impl Instrumentation for OtelInstrumentation {
fn on_connection_event(&mut self, event: InstrumentationEvent<'_>) {
let tracer = global::tracer("diesel-libsql");
match event {
InstrumentationEvent::StartQuery { query, .. } => {
let query_text = format!("{}", query);
let op_name = query_text
.split_whitespace()
.next()
.unwrap_or("SQL")
.to_uppercase();
let table = extract_table_name(&query_text);
let summary = build_query_summary(&op_name, table);
let span_name = if let Some(t) = table {
format!("{} {}", op_name, t)
} else {
format!("{} libsql", op_name)
};
let mut attrs = vec![
KeyValue::new("db.system.name", "sqlite"),
KeyValue::new("db.operation.name", op_name),
KeyValue::new("db.query.summary", summary),
];
if let Some(t) = table {
attrs.push(KeyValue::new("db.collection.name", t.to_string()));
}
if self.include_query_text {
attrs.push(KeyValue::new("db.query.text", query_text));
}
if let Some(ref addr) = self.server_address {
attrs.push(KeyValue::new("server.address", addr.clone()));
}
if let Some(ref ns) = self.db_namespace {
attrs.push(KeyValue::new("db.namespace", ns.clone()));
}
let span = tracer
.span_builder(span_name)
.with_kind(SpanKind::Client)
.with_attributes(attrs)
.start(&tracer);
self.current_span = Some(span);
}
InstrumentationEvent::FinishQuery { error, .. } => {
if let Some(ref mut span) = self.current_span.take() {
if let Some(err) = error {
span.set_status(Status::Error {
description: Cow::Owned(err.to_string()),
});
span.set_attribute(KeyValue::new("error.type", err.to_string()));
} else {
span.set_status(Status::Ok);
}
span.end();
}
}
InstrumentationEvent::StartEstablishConnection { url, .. } => {
let safe_url = redact_url(url);
let namespace = extract_namespace(url);
self.server_address = Some(safe_url.clone());
self.db_namespace = namespace.clone();
let mut attrs = vec![
KeyValue::new("db.system.name", "sqlite"),
KeyValue::new("server.address", safe_url),
];
if let Some(ns) = namespace {
attrs.push(KeyValue::new("db.namespace", ns));
}
let span = tracer
.span_builder("db.connect")
.with_kind(SpanKind::Client)
.with_attributes(attrs)
.start(&tracer);
self.current_span = Some(span);
}
InstrumentationEvent::FinishEstablishConnection { error, .. } => {
if let Some(ref mut span) = self.current_span.take() {
if let Some(err) = error {
span.set_status(Status::Error {
description: Cow::Owned(err.to_string()),
});
} else {
span.set_status(Status::Ok);
}
span.end();
}
}
InstrumentationEvent::BeginTransaction { depth, .. } => {
let mut attrs = vec![
KeyValue::new("db.system.name", "sqlite"),
KeyValue::new("db.operation.name", "BEGIN"),
KeyValue::new("db.transaction.depth", depth.get() as i64),
];
if let Some(ref addr) = self.server_address {
attrs.push(KeyValue::new("server.address", addr.clone()));
}
if let Some(ref ns) = self.db_namespace {
attrs.push(KeyValue::new("db.namespace", ns.clone()));
}
let span = tracer
.span_builder("db.transaction")
.with_kind(SpanKind::Client)
.with_attributes(attrs)
.start(&tracer);
self.current_span = Some(span);
}
InstrumentationEvent::CommitTransaction { .. } => {
if let Some(ref mut span) = self.current_span.take() {
span.set_attribute(KeyValue::new("db.operation.name", "COMMIT"));
span.end();
}
}
InstrumentationEvent::RollbackTransaction { .. } => {
if let Some(ref mut span) = self.current_span.take() {
span.set_attribute(KeyValue::new("db.operation.name", "ROLLBACK"));
span.end();
}
}
_ => {} }
}
}