use futures::future::BoxFuture;
use metrics_exporter_prometheus::PrometheusHandle;
use crate::core::engine::{FrozenDiContainer, HttpMethod};
use crate::core::plugins::{ArclyPlugin, ArclyPluginContext, PluginError};
pub struct ArclyObservabilityPlugin {
pub service_name: &'static str,
pub service_version: &'static str,
pub otlp_endpoint: &'static str,
}
impl ArclyPlugin for ArclyObservabilityPlugin {
fn name(&self) -> &'static str {
"ArclyObservabilityPlugin"
}
fn on_init<'a>(
&'a mut self,
ctx: &'a mut ArclyPluginContext,
) -> BoxFuture<'a, Result<(), PluginError>> {
Box::pin(async move {
use tracing_subscriber::{fmt, EnvFilter};
let _ = fmt()
.json()
.with_env_filter(
EnvFilter::try_from_default_env()
.unwrap_or_else(|_| EnvFilter::new("info,opentelemetry_sdk=off")),
)
.try_init();
let handle: PrometheusHandle = crate::observability::metrics::init_metrics();
ctx.provide::<PrometheusHandle>(handle.clone());
let metrics_handler = crate::observability::metrics::metrics_route_handler(handle);
ctx.add_route(HttpMethod::GET, "/metrics", metrics_handler);
crate::observability::otel::init_tracer(&crate::observability::otel::OtelConfig {
service_name: self.service_name,
service_version: self.service_version,
otlp_endpoint: self.otlp_endpoint,
});
let _ = crate::observability::health::global();
ctx.add_route(
HttpMethod::GET,
"/healthz",
crate::observability::health::healthz_handler(),
);
ctx.add_route(
HttpMethod::GET,
"/readyz",
crate::observability::health::readyz_handler(),
);
ctx.modify_openapi(|spec| {
let Some(paths) = spec
.as_object_mut()
.and_then(|m| m.get_mut("paths"))
.and_then(|v| v.as_object_mut())
else { return };
let health_schema = serde_json::json!({
"type": "object",
"required": ["status", "checks", "uptime_secs"],
"properties": {
"status": {
"type": "string",
"enum": ["healthy", "degraded", "unhealthy"],
"description": "Overall health of the service."
},
"checks": {
"type": "object",
"description": "Per-subsystem health results.",
"additionalProperties": {
"oneOf": [
{ "type": "string", "enum": ["healthy"] },
{ "type": "object", "required": ["degraded"], "properties": { "degraded": { "type": "string" } } },
{ "type": "object", "required": ["unhealthy"], "properties": { "unhealthy": { "type": "string" } } }
]
}
},
"uptime_secs": {
"type": "integer",
"format": "int64",
"description": "Process uptime in seconds."
}
}
});
paths.insert("/healthz".to_string(), serde_json::json!({
"get": {
"summary": "Liveness probe",
"description": "Returns 200 when the service is healthy or degraded, \
503 when any registered health check is Unhealthy.",
"operationId": "healthz",
"tags": ["observability"],
"responses": {
"200": {
"description": "Healthy or degraded — service is alive.",
"content": { "application/json": { "schema": health_schema } }
},
"503": {
"description": "Unhealthy — one or more subsystems have failed."
}
}
}
}));
paths.insert("/readyz".to_string(), serde_json::json!({
"get": {
"summary": "Readiness probe",
"description": "Identical to /healthz. Use for Kubernetes `readinessProbe`; \
load-balancers should stop sending traffic when this returns 503.",
"operationId": "readyz",
"tags": ["observability"],
"responses": {
"200": { "description": "Ready to serve traffic." },
"503": { "description": "Not ready — remove from load-balancer rotation." }
}
}
}));
paths.insert("/metrics".to_string(), serde_json::json!({
"get": {
"summary": "Prometheus metrics scrape",
"description": "Returns Prometheus text-format (version 0.0.4) metrics. \
Counters: `http_requests_total`. \
Histogram: `http_request_duration_seconds`. \
Gauge: `http_requests_in_flight`.",
"operationId": "metrics",
"tags": ["observability"],
"responses": {
"200": {
"description": "Prometheus text-format metric lines.",
"content": {
"text/plain": {
"schema": { "type": "string" }
}
}
}
}
}
}));
});
tracing::info!(
service = self.service_name,
version = self.service_version,
otlp = self.otlp_endpoint,
"ArclyObservabilityPlugin initialised"
);
Ok(())
})
}
fn on_shutdown<'a>(
&'a self,
_container: &'static FrozenDiContainer,
) -> BoxFuture<'a, Result<(), PluginError>> {
Box::pin(async move {
opentelemetry::global::shutdown_tracer_provider();
Ok(())
})
}
}