use std::time::Duration;
#[cfg(not(test))]
use opentelemetry::trace::TracerProvider as _;
use opentelemetry_otlp::WithExportConfig;
use opentelemetry_sdk::{metrics::SdkMeterProvider, trace::SdkTracerProvider};
#[cfg(not(test))]
use tracing_subscriber::{Registry, layer::SubscriberExt, util::SubscriberInitExt};
const DEFAULT_SERVICE_NAME: &str = "git-prism";
const EXPORT_TIMEOUT: Duration = Duration::from_secs(5);
const DROP_SHUTDOWN_TIMEOUT: Duration = Duration::from_millis(500);
const ENV_OTLP_ENDPOINT: &str = "GIT_PRISM_OTLP_ENDPOINT";
const ENV_SERVICE_NAME: &str = "GIT_PRISM_SERVICE_NAME";
const ENV_SERVICE_VERSION: &str = "GIT_PRISM_SERVICE_VERSION";
fn signal_endpoints(base: &str) -> (String, String) {
let trimmed = base.trim_end_matches('/');
(
format!("{trimmed}/v1/traces"),
format!("{trimmed}/v1/metrics"),
)
}
pub struct TelemetryGuard {
tracer_provider: Option<SdkTracerProvider>,
meter_provider: Option<SdkMeterProvider>,
}
impl TelemetryGuard {
pub fn is_active(&self) -> bool {
self.tracer_provider.is_some()
}
pub fn force_flush_bounded(&mut self, timeout: Duration) {
if self.meter_provider.is_none() && self.tracer_provider.is_none() {
return;
}
let mp = self.meter_provider.clone();
let tp = self.tracer_provider.clone();
let (tx, rx) = std::sync::mpsc::channel::<()>();
std::thread::spawn(move || {
if let Some(mp) = mp
&& let Err(e) = mp.force_flush()
{
eprintln!("git-prism: failed to force-flush metrics: {e}");
}
if let Some(tp) = tp
&& let Err(e) = tp.force_flush()
{
eprintln!("git-prism: failed to force-flush traces: {e}");
}
let _ = tx.send(());
});
let _ = rx.recv_timeout(timeout);
}
}
impl Drop for TelemetryGuard {
fn drop(&mut self) {
let tp = self.tracer_provider.take();
let mp = self.meter_provider.take();
if tp.is_none() && mp.is_none() {
return;
}
let (tx, rx) = std::sync::mpsc::channel::<()>();
std::thread::spawn(move || {
if let Some(tp) = tp
&& let Err(e) = tp.shutdown()
{
eprintln!("git-prism: failed to flush traces on shutdown: {e}");
}
if let Some(mp) = mp
&& let Err(e) = mp.shutdown()
{
eprintln!("git-prism: failed to flush metrics on shutdown: {e}");
}
let _ = tx.send(());
});
let _ = rx.recv_timeout(DROP_SHUTDOWN_TIMEOUT);
}
}
#[cfg(not(test))]
fn attach_tracing_subscriber_default(tracer_provider: &SdkTracerProvider) -> Result<(), String> {
let tracer = tracer_provider.tracer("git-prism");
let otel_layer = tracing_opentelemetry::layer().with_tracer(tracer);
Registry::default()
.with(otel_layer)
.try_init()
.map_err(|e| e.to_string())
}
#[cfg(test)]
fn attach_tracing_subscriber_default(_tracer_provider: &SdkTracerProvider) -> Result<(), String> {
Ok(())
}
pub fn init() -> TelemetryGuard {
init_with_attacher(attach_tracing_subscriber_default, true)
}
pub fn init_quiet() -> TelemetryGuard {
init_with_attacher(attach_tracing_subscriber_default, false)
}
fn resolve_service_name() -> String {
match std::env::var(ENV_SERVICE_NAME) {
Ok(v) if !v.is_empty() => v,
_ => DEFAULT_SERVICE_NAME.to_string(),
}
}
fn resolve_service_version() -> String {
match std::env::var(ENV_SERVICE_VERSION) {
Ok(v) if !v.is_empty() => v,
_ => env!("CARGO_PKG_VERSION").to_string(),
}
}
fn init_with_attacher<F>(attach_subscriber: F, emit_success_banner: bool) -> TelemetryGuard
where
F: FnOnce(&SdkTracerProvider) -> Result<(), String>,
{
let endpoint = match std::env::var(ENV_OTLP_ENDPOINT) {
Ok(ep) if !ep.is_empty() => ep,
_ => {
return TelemetryGuard {
tracer_provider: None,
meter_provider: None,
};
}
};
let base = endpoint.trim_end_matches('/');
let (traces_endpoint, metrics_endpoint) = signal_endpoints(base);
let service_name = resolve_service_name();
let service_version = resolve_service_version();
let trace_exporter = match opentelemetry_otlp::SpanExporter::builder()
.with_http()
.with_endpoint(&traces_endpoint)
.with_timeout(EXPORT_TIMEOUT)
.build()
{
Ok(exp) => exp,
Err(e) => {
eprintln!("git-prism: failed to initialize trace exporter: {e}");
return TelemetryGuard {
tracer_provider: None,
meter_provider: None,
};
}
};
let metrics_exporter = match opentelemetry_otlp::MetricExporter::builder()
.with_http()
.with_endpoint(&metrics_endpoint)
.with_timeout(EXPORT_TIMEOUT)
.build()
{
Ok(exp) => exp,
Err(e) => {
eprintln!("git-prism: failed to initialize metrics exporter: {e}");
return TelemetryGuard {
tracer_provider: None,
meter_provider: None,
};
}
};
let resource = opentelemetry_sdk::Resource::builder()
.with_service_name(service_name)
.with_attribute(opentelemetry::KeyValue::new(
"service.version",
service_version,
))
.build();
let tracer_provider = SdkTracerProvider::builder()
.with_batch_exporter(trace_exporter)
.with_resource(resource.clone())
.build();
let reader = opentelemetry_sdk::metrics::PeriodicReader::builder(metrics_exporter).build();
let meter_provider = SdkMeterProvider::builder()
.with_reader(reader)
.with_resource(resource)
.build();
#[cfg(not(test))]
opentelemetry::global::set_meter_provider(meter_provider.clone());
if let Err(e) = attach_subscriber(&tracer_provider) {
eprintln!("git-prism: failed to initialize tracing subscriber: {e}");
return TelemetryGuard {
tracer_provider: None,
meter_provider: None,
};
}
if emit_success_banner {
eprintln!("git-prism: telemetry initialized (HTTP/protobuf, endpoint={base})");
}
TelemetryGuard {
tracer_provider: Some(tracer_provider),
meter_provider: Some(meter_provider),
}
}
#[cfg(test)]
impl TelemetryGuard {
pub(crate) fn noop() -> Self {
Self {
tracer_provider: None,
meter_provider: None,
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::Mutex;
static ENV_MUTEX: Mutex<()> = Mutex::new(());
unsafe fn clear_telemetry_env() {
unsafe {
std::env::remove_var(ENV_OTLP_ENDPOINT);
std::env::remove_var(ENV_SERVICE_NAME);
std::env::remove_var(ENV_SERVICE_VERSION);
}
}
#[test]
fn test_init_without_env_returns_noop() {
let _lock = ENV_MUTEX.lock().unwrap();
unsafe {
clear_telemetry_env();
}
let guard = init();
assert!(
!guard.is_active(),
"guard should be no-op when no endpoint is set"
);
}
#[test]
fn test_init_with_empty_endpoint_returns_noop() {
let _lock = ENV_MUTEX.lock().unwrap();
unsafe {
clear_telemetry_env();
std::env::set_var(ENV_OTLP_ENDPOINT, "");
}
let guard = init();
assert!(
!guard.is_active(),
"guard should be no-op when endpoint is empty"
);
unsafe {
std::env::remove_var(ENV_OTLP_ENDPOINT);
}
}
#[test]
fn test_init_quiet_without_env_returns_noop() {
let _lock = ENV_MUTEX.lock().unwrap();
unsafe {
clear_telemetry_env();
}
let guard = init_quiet();
assert!(
!guard.is_active(),
"init_quiet must produce a no-op guard when no endpoint is set, same as init"
);
}
#[tokio::test]
async fn test_init_quiet_with_endpoint_creates_providers() {
let _lock = ENV_MUTEX.lock().unwrap();
unsafe {
clear_telemetry_env();
std::env::set_var(ENV_OTLP_ENDPOINT, "http://localhost:4318");
}
let guard = init_quiet();
assert!(
guard.is_active(),
"init_quiet must still create active providers when an endpoint is set; \
suppressing the banner must not suppress telemetry itself"
);
unsafe {
std::env::remove_var(ENV_OTLP_ENDPOINT);
}
drop(guard);
}
#[tokio::test]
async fn test_init_with_endpoint_creates_providers() {
let _lock = ENV_MUTEX.lock().unwrap();
unsafe {
clear_telemetry_env();
std::env::set_var(ENV_OTLP_ENDPOINT, "http://localhost:4318");
}
let guard = init();
assert!(
guard.is_active(),
"guard should be active when endpoint is set"
);
unsafe {
std::env::remove_var(ENV_OTLP_ENDPOINT);
}
drop(guard);
}
#[tokio::test]
async fn test_guard_drop_does_not_panic() {
let _lock = ENV_MUTEX.lock().unwrap();
let noop_guard = TelemetryGuard {
tracer_provider: None,
meter_provider: None,
};
drop(noop_guard);
unsafe {
clear_telemetry_env();
std::env::set_var(ENV_OTLP_ENDPOINT, "http://localhost:4318");
}
let active_guard = init();
unsafe {
std::env::remove_var(ENV_OTLP_ENDPOINT);
}
drop(active_guard);
}
#[test]
fn it_trims_trailing_slash_when_computing_signal_paths() {
let (traces, metrics) = signal_endpoints("http://localhost:4318/");
assert_eq!(traces, "http://localhost:4318/v1/traces");
assert_eq!(metrics, "http://localhost:4318/v1/metrics");
}
#[test]
fn it_appends_signal_paths_to_a_bare_base() {
let (traces, metrics) = signal_endpoints("http://localhost:4318");
assert_eq!(traces, "http://localhost:4318/v1/traces");
assert_eq!(metrics, "http://localhost:4318/v1/metrics");
}
#[tokio::test]
async fn test_init_with_custom_service_name_succeeds() {
let _lock = ENV_MUTEX.lock().unwrap();
unsafe {
clear_telemetry_env();
std::env::set_var(ENV_OTLP_ENDPOINT, "http://localhost:4318");
std::env::set_var(ENV_SERVICE_NAME, "custom-prism");
}
let guard = init();
assert!(guard.is_active());
unsafe {
std::env::remove_var(ENV_OTLP_ENDPOINT);
std::env::remove_var(ENV_SERVICE_NAME);
}
drop(guard);
}
#[test]
fn it_uses_crate_version_when_service_version_env_is_empty_string() {
let _lock = ENV_MUTEX.lock().unwrap();
unsafe {
clear_telemetry_env();
std::env::set_var(ENV_SERVICE_VERSION, "");
}
let version = resolve_service_version();
unsafe {
std::env::remove_var(ENV_SERVICE_VERSION);
}
assert_eq!(
version,
env!("CARGO_PKG_VERSION"),
"empty GIT_PRISM_SERVICE_VERSION must fall back to crate version"
);
}
#[test]
fn it_uses_custom_service_name_when_service_name_env_is_set() {
let _lock = ENV_MUTEX.lock().unwrap();
unsafe {
clear_telemetry_env();
std::env::set_var(ENV_SERVICE_NAME, "custom-prism");
}
let name = resolve_service_name();
unsafe {
std::env::remove_var(ENV_SERVICE_NAME);
}
assert_eq!(
name, "custom-prism",
"a non-empty GIT_PRISM_SERVICE_NAME must be used verbatim, not replaced by the default"
);
assert_ne!(
name, DEFAULT_SERVICE_NAME,
"the custom name must not collapse to the default"
);
}
#[test]
fn it_uses_custom_service_version_when_service_version_env_is_set() {
let _lock = ENV_MUTEX.lock().unwrap();
unsafe {
clear_telemetry_env();
std::env::set_var(ENV_SERVICE_VERSION, "9.9.9-custom");
}
let version = resolve_service_version();
unsafe {
std::env::remove_var(ENV_SERVICE_VERSION);
}
assert_eq!(
version, "9.9.9-custom",
"a non-empty GIT_PRISM_SERVICE_VERSION must be used verbatim"
);
assert_ne!(
version,
env!("CARGO_PKG_VERSION"),
"the custom version must not collapse to the crate-version fallback"
);
}
#[test]
fn it_uses_default_service_name_when_service_name_env_is_empty_string() {
let _lock = ENV_MUTEX.lock().unwrap();
unsafe {
clear_telemetry_env();
std::env::set_var(ENV_SERVICE_NAME, "");
}
let name = resolve_service_name();
unsafe {
std::env::remove_var(ENV_SERVICE_NAME);
}
assert_eq!(
name, DEFAULT_SERVICE_NAME,
"empty GIT_PRISM_SERVICE_NAME must fall back to default 'git-prism'"
);
}
#[test]
fn it_uses_crate_version_when_service_version_env_is_unset() {
let _lock = ENV_MUTEX.lock().unwrap();
unsafe {
clear_telemetry_env();
}
let version = resolve_service_version();
assert_eq!(
version,
env!("CARGO_PKG_VERSION"),
"unset GIT_PRISM_SERVICE_VERSION must fall back to the crate version"
);
assert!(
!version.is_empty(),
"the crate-version fallback must never be empty"
);
}
#[tokio::test]
async fn it_quiet_init_still_activates_providers_when_endpoint_is_set() {
let _lock = ENV_MUTEX.lock().unwrap();
unsafe {
clear_telemetry_env();
std::env::set_var(ENV_OTLP_ENDPOINT, "http://localhost:4318");
}
let guard = init_quiet();
unsafe {
std::env::remove_var(ENV_OTLP_ENDPOINT);
}
assert!(
guard.is_active(),
"init_quiet must still create providers when an endpoint is configured; \
it only suppresses the stderr success banner"
);
drop(guard);
}
#[test]
fn it_quiet_init_returns_noop_without_endpoint() {
let _lock = ENV_MUTEX.lock().unwrap();
unsafe {
clear_telemetry_env();
}
let guard = init_quiet();
assert!(
!guard.is_active(),
"init_quiet must be a no-op guard when no endpoint is set"
);
}
#[test]
fn it_force_flushes_bounded_noop_guard_without_panic() {
let mut guard = TelemetryGuard::noop();
guard.force_flush_bounded(Duration::from_millis(200));
}
#[test]
fn it_force_flushes_noop_guard_without_panic() {
let mut guard = TelemetryGuard {
tracer_provider: None,
meter_provider: None,
};
guard.force_flush_bounded(Duration::from_millis(500));
}
#[tokio::test]
async fn it_force_flushes_active_guard_without_panic() {
let _lock = ENV_MUTEX.lock().unwrap();
unsafe {
clear_telemetry_env();
std::env::set_var(ENV_OTLP_ENDPOINT, "http://localhost:4318");
}
let mut guard = init();
assert!(
guard.is_active(),
"guard must be active for this test to exercise flush"
);
guard.force_flush_bounded(Duration::from_millis(500));
unsafe {
std::env::remove_var(ENV_OTLP_ENDPOINT);
}
drop(guard);
}
#[tokio::test]
async fn it_tolerates_double_force_flush_then_drop_on_active_guard() {
let _lock = ENV_MUTEX.lock().unwrap();
unsafe {
clear_telemetry_env();
std::env::set_var(ENV_OTLP_ENDPOINT, "http://localhost:4318");
}
let mut guard = init();
assert!(
guard.is_active(),
"guard must be active for this test to exercise repeated flush"
);
guard.force_flush_bounded(Duration::from_millis(500));
guard.force_flush_bounded(Duration::from_millis(500));
assert!(
guard.is_active(),
"force_flush_bounded must not consume the providers; guard stays active"
);
unsafe {
std::env::remove_var(ENV_OTLP_ENDPOINT);
}
drop(guard);
}
#[test]
fn it_tolerates_double_force_flush_on_noop_guard() {
let mut guard = TelemetryGuard::noop();
guard.force_flush_bounded(Duration::from_millis(500));
guard.force_flush_bounded(Duration::from_millis(500));
assert!(
!guard.is_active(),
"a no-op guard must stay inactive across repeated force_flush_bounded calls"
);
drop(guard);
}
#[tokio::test]
async fn it_force_flushes_bounded_active_guard_within_deadline() {
let _lock = ENV_MUTEX.lock().unwrap();
unsafe {
clear_telemetry_env();
std::env::set_var(ENV_OTLP_ENDPOINT, "http://192.0.2.1:4318");
}
let mut guard = init();
assert!(
guard.is_active(),
"guard must be active to exercise the bounded-flush wait path"
);
let start = std::time::Instant::now();
guard.force_flush_bounded(Duration::from_millis(200));
let elapsed = start.elapsed();
unsafe {
std::env::remove_var(ENV_OTLP_ENDPOINT);
}
drop(guard);
assert!(
elapsed < Duration::from_secs(1),
"bounded flush must abandon a stalled exporter well before the 1s budget \
(200ms cap × 5 margin); took {elapsed:?}"
);
}
#[tokio::test]
async fn qa_structured_path_total_teardown_is_bounded_on_black_hole_endpoint() {
use crate::shim::STRUCTURED_FLUSH_TIMEOUT;
let _lock = ENV_MUTEX.lock().unwrap();
unsafe {
clear_telemetry_env();
std::env::set_var(ENV_OTLP_ENDPOINT, "http://192.0.2.1:4318");
}
let mut guard = init();
assert!(
guard.is_active(),
"guard must be active to exercise the structured-path teardown"
);
unsafe {
std::env::remove_var(ENV_OTLP_ENDPOINT);
}
let start = std::time::Instant::now();
guard.force_flush_bounded(STRUCTURED_FLUSH_TIMEOUT); drop(guard); let elapsed = start.elapsed();
assert!(
elapsed < Duration::from_secs(2),
"structured-path teardown (bounded flush + guard Drop) must stay bounded; \
a black-hole collector must not stall the agent's git diff on teardown. \
took {elapsed:?} (Drop::shutdown is unbounded — #361 stall reintroduced)"
);
}
#[tokio::test]
async fn it_returns_noop_guard_when_tracing_subscriber_init_fails() {
let _lock = ENV_MUTEX.lock().unwrap();
unsafe {
clear_telemetry_env();
std::env::set_var(ENV_OTLP_ENDPOINT, "http://localhost:4318");
}
let guard = init_with_attacher(
|_tp| Err("subscriber already registered (simulated)".to_string()),
true,
);
unsafe {
std::env::remove_var(ENV_OTLP_ENDPOINT);
}
assert!(
!guard.is_active(),
"guard must degrade to no-op when the tracing subscriber cannot be attached; \
returning an active guard with no attached subscriber silently drops every span"
);
}
}