polaris_dashboard 0.1.3

Opinionated read-only dashboard for Polaris sessions.
Documentation
//! OpenTelemetry trace collector for the dashboard.
//!
//! [`OtelTracingPlugin`] turns the dashboard into a standalone OTLP viewer.
//! It mounts an OTLP/HTTP **JSON** ingest endpoint and serves the same
//! `/v1/tracing/*` read surface the SPA already consumes, backed by its own
//! [`TraceStore`]:
//!
//! | Method & path                | Role                                        |
//! |------------------------------|---------------------------------------------|
//! | `POST /v1/traces`            | OTLP/HTTP JSON ingest (any number of sources) |
//! | `GET  /v1/tracing/runs`      | Runs list (one run per trace id)            |
//! | `GET  /v1/tracing/runs/{id}` | Span tree for one run                       |
//!
//! Every emitter — an arbitrary microservice, or Polaris itself —
//! points its OTLP exporter at `POST /v1/traces`. The collector copies all
//! attributes into the generic run-label / span-field bags and hardcodes
//! nothing about any source; the dashboard derives the "Source" column from
//! the OTLP resource attributes client-side.
//!
//! ## Relationship to the upstream tracing plugin
//!
//! This plugin **replaces** [`TracingPlugin`](polaris_ai::plugins::TracingPlugin):
//! both mount `GET /v1/tracing/runs`, so registering both panics axum on the duplicate
//! route. Use this plugin to view arbitrary `OTel` sources; to also see native
//! Polaris runs, have the Polaris server export OTLP to `POST /v1/traces`.
//!
//! See [`OtelTracingPlugin`] for the full route table, lifecycle and security
//! notes, and a worked registration example.

mod model;
mod otlp;
mod store;

use std::path::PathBuf;
use std::sync::Arc;
use std::time::Duration;

use axum::Json;
use axum::Router;
use axum::extract::{DefaultBodyLimit, Path, State};
use axum::http::StatusCode;
use axum::response::{IntoResponse, Response};
use axum::routing::{get, post};
use polaris_ai::app::{AppPlugin, HttpRouter};
use polaris_ai::system::plugin::{Plugin, PluginId, Version};
use polaris_ai::system::server::Server;
use serde_json::json;

// The output contract (`/v1/tracing/*` response shapes) is re-exported so
// [`TraceStore`]'s public method signatures are nameable and navigable on
// docs.rs, not dead-end types in a private module. The OTLP ingest types stay
// internal — consumers feed a JSON payload via `TraceStore::ingest_otlp_json`,
// never hand-build the envelope.
use otlp::ExportTraceServiceRequest;

pub use model::{RunSummary, RunsResponse, SpanEvent, SpanNode, SpanTree};
pub use store::{DEFAULT_CAPACITY, TraceStore};

/// Maximum accepted OTLP ingest body size. Bounds memory use from a single
/// `POST /v1/traces` so an oversized (or hostile) payload can't exhaust the
/// process before deserialization. Generous enough for batched OTLP exports;
/// override upstream by layering a different [`DefaultBodyLimit`] if needed.
const MAX_INGEST_BODY_BYTES: usize = 4 * 1024 * 1024;

/// Turns the dashboard into a standalone OTLP viewer.
///
/// Reach for this plugin when you want to view traces from **arbitrary `OTel`
/// sources** (a microservice, or Polaris exporting OTLP) rather than only
/// native Polaris runs. It mounts an OTLP/HTTP JSON ingest endpoint plus the
/// `/v1/tracing/*` read surface the SPA already consumes, backed by its own
/// in-memory [`TraceStore`].
///
/// **Conflicts with the upstream tracing surface:** this plugin and
/// [`TracingPlugin`](polaris_ai::plugins::TracingPlugin) both mount
/// `GET /v1/tracing/runs`, so registering both panics axum on the duplicate
/// route. Pick one — use this plugin for `OTel` sources, or point a Polaris
/// server's OTLP exporter at `POST /v1/traces` to see native runs here too.
///
/// # Routes Provided
///
/// | Method | Path | Description |
/// |--------|------|-------------|
/// | `POST` | `/v1/traces` | OTLP/HTTP JSON trace ingest (any number of sources). |
/// | `GET`  | `/v1/tracing/runs` | Runs list — one run per trace id. |
/// | `GET`  | `/v1/tracing/runs/{id}` | Span tree for one run. |
///
/// # Resources Provided
///
/// | Resource | Scope | Description |
/// |----------|-------|-------------|
/// | _none_   | —     | The collector's [`TraceStore`] is held as axum router state, not registered as a Polaris resource. |
///
/// # APIs Provided
///
/// None.
///
/// # Dependencies
///
/// - [`AppPlugin`] — provides the [`HttpRouter`] the routes are mounted on.
///
/// # Lifecycle
///
/// - **Requires `default-features = false`** on `polaris_dashboard`: the
///   default `native-tracing` feature mounts the upstream `TracingPlugin`'s
///   `/v1/tracing/runs`, which collides with this plugin's identical route and
///   panics axum on the duplicate. Disable it so this plugin owns the trace
///   surface.
/// - **`build()` panics** if no [`AppPlugin`] (the [`HttpRouter`] provider) has
///   been registered before this plugin.
///
/// # Security
///
/// `POST /v1/traces` is a state-mutating ingest endpoint that inherits whatever
/// `AuthProvider` the host installs on [`HttpRouter`] — the same (possibly
/// absent) auth as the read routes. If the host registers no provider, treat it
/// as an open write sink and keep it on a trusted network. Ingested attributes
/// are stored and re-served verbatim; the SPA is responsible for escaping them
/// on render.
///
/// # Example
///
/// ```no_run
/// use std::sync::Arc;
/// use polaris_ai::app::{AppConfig, AppPlugin};
/// use polaris_ai::sessions::{HttpPlugin, InMemoryStore, SessionsPlugin};
/// use polaris_ai::system::server::Server;
/// use polaris_dashboard::{DashboardPlugin, OtelTracingPlugin};
///
/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
/// let mut server = Server::new();
/// server
///     .add_plugins(AppPlugin::new(AppConfig::new()))
///     .add_plugins(SessionsPlugin::new(Arc::new(InMemoryStore::new())))
///     .add_plugins(HttpPlugin::new())
///     .add_plugins(OtelTracingPlugin::new())
///     .add_plugins(DashboardPlugin::default());
/// server.run().await?;
/// # Ok(())
/// # }
/// ```
pub struct OtelTracingPlugin<S: StoreState = Ready> {
    state: S::Data,
}

/// Retention configuration accumulated while a plugin is [`Configuring`],
/// applied to a fresh [`TraceStore`] by [`build`](OtelTracingPlugin::build).
///
/// Public only because it is the [`Configuring`] state's
/// [`StoreState::Data`]; it is opaque (no public fields or constructors) and
/// callers never name it — configure through the plugin's `with_*` methods.
#[doc(hidden)]
#[derive(Debug, Clone, Default)]
pub struct StoreConfig {
    capacity: Option<usize>,
    ttl: Option<Duration>,
    persist_dir: Option<PathBuf>,
}

mod sealed {
    pub trait Sealed {}
}

/// Typestate of an [`OtelTracingPlugin`] — whether its [`TraceStore`] has been
/// built yet. Sealed: the only states are [`Configuring`] and [`Ready`].
pub trait StoreState: sealed::Sealed {
    /// Payload carried in each state: accumulated [`StoreConfig`] while
    /// configuring, or the finalised shared store once ready.
    type Data: std::fmt::Debug + Clone + Send + Sync + 'static;
}

/// Typestate marker: the store is still being configured. Retention knobs
/// ([`with_capacity`](OtelTracingPlugin::with_capacity),
/// [`with_ttl`](OtelTracingPlugin::with_ttl),
/// [`with_persistence`](OtelTracingPlugin::with_persistence)) are available, but
/// the plugin is **not** a [`Plugin`] yet and cannot be mounted until
/// [`build`](OtelTracingPlugin::build) seals it into [`Ready`].
#[derive(Debug, Clone)]
pub struct Configuring;

/// Typestate marker: the store is built and the plugin is ready to mount. This
/// is the default state — `OtelTracingPlugin` means `OtelTracingPlugin<Ready>`,
/// so every existing reference and `add_plugins` call is unaffected.
#[derive(Debug, Clone)]
pub struct Ready;

impl sealed::Sealed for Configuring {}
impl sealed::Sealed for Ready {}
impl StoreState for Configuring {
    type Data = StoreConfig;
}
impl StoreState for Ready {
    type Data = Arc<TraceStore>;
}

// `Data` is `Debug + Clone` in every state, so both impls are unconditional —
// hand-written because `derive` would wrongly demand `S: Debug`/`S: Clone`.
impl<S: StoreState> std::fmt::Debug for OtelTracingPlugin<S> {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        f.debug_struct("OtelTracingPlugin")
            .field("state", &self.state)
            .finish()
    }
}

impl<S: StoreState> Clone for OtelTracingPlugin<S> {
    fn clone(&self) -> Self {
        Self {
            state: self.state.clone(),
        }
    }
}

impl OtelTracingPlugin<Ready> {
    /// In-memory collector with the default retention, ready to mount.
    pub fn new() -> Self {
        Self {
            state: Arc::new(TraceStore::new()),
        }
    }

    /// Start configuring a collector's store from the plugin surface. Chain any
    /// of [`with_capacity`](OtelTracingPlugin::with_capacity),
    /// [`with_ttl`](OtelTracingPlugin::with_ttl), and
    /// [`with_persistence`](OtelTracingPlugin::with_persistence) in any order,
    /// then [`build`](OtelTracingPlugin::build) to seal it for mounting:
    ///
    /// ```no_run
    /// use std::time::Duration;
    /// use polaris_dashboard::OtelTracingPlugin;
    ///
    /// let plugin = OtelTracingPlugin::builder()
    ///     .with_capacity(500)
    ///     .with_ttl(Duration::from_secs(3600))
    ///     .with_persistence("var/traces")
    ///     .build();
    /// ```
    ///
    /// The knobs compose independently because they only accumulate config; the
    /// store is constructed once, in `build`. The returned
    /// `OtelTracingPlugin<Configuring>` is deliberately *not* a [`Plugin`], so
    /// forgetting `build` is a compile error rather than a silently
    /// unconfigured collector.
    pub fn builder() -> OtelTracingPlugin<Configuring> {
        OtelTracingPlugin {
            state: StoreConfig::default(),
        }
    }

    /// Build from an already-built store, bypassing the [`builder`](Self::builder).
    ///
    /// Accepts anything convertible into an `Arc<TraceStore>` — pass a freshly
    /// built [`TraceStore`] directly (no `Arc::new`), or an existing `Arc` to
    /// share the store with another component or pre-seed fixtures.
    pub fn with_store(store: impl Into<Arc<TraceStore>>) -> Self {
        Self {
            state: store.into(),
        }
    }

    /// Handle to the underlying store (e.g. to pre-seed traces in tests).
    pub fn store(&self) -> Arc<TraceStore> {
        self.state.clone()
    }
}

impl OtelTracingPlugin<Configuring> {
    /// Retain at most `capacity` distinct traces in memory (oldest evicted
    /// first).
    #[must_use]
    pub fn with_capacity(mut self, capacity: usize) -> Self {
        self.state.capacity = Some(capacity);
        self
    }

    /// Age out traces idle for longer than `ttl` (no span ingested within the
    /// window), on top of the capacity bound. Combined with
    /// [`with_persistence`](Self::with_persistence), the TTL also bounds disk:
    /// each trace's NDJSON file is removed as it ages out, and files stale
    /// beyond `ttl` are pruned on `build`'s reload (`build` applies the TTL
    /// before persistence, so the load-time prune always sees it).
    #[must_use]
    pub fn with_ttl(mut self, ttl: Duration) -> Self {
        self.state.ttl = Some(ttl);
        self
    }

    /// Persist traces under `dir` as NDJSON, reloading any existing history on
    /// `build` so runs survive a restart. With [`with_ttl`](Self::with_ttl)
    /// also set, persistence is TTL-bounded — expired files are removed rather
    /// than reloaded; otherwise `dir` is an unbounded append-only archive.
    #[must_use]
    pub fn with_persistence(mut self, dir: impl Into<PathBuf>) -> Self {
        self.state.persist_dir = Some(dir.into());
        self
    }

    /// Construct the configured store and seal the plugin into [`Ready`] so it
    /// can be mounted. This is the single point where the [`TraceStore`] is
    /// built, so the accumulated knobs apply together regardless of order.
    pub fn build(self) -> OtelTracingPlugin<Ready> {
        let StoreConfig {
            capacity,
            ttl,
            persist_dir,
        } = self.state;
        let mut store = TraceStore::with_capacity(capacity.unwrap_or(DEFAULT_CAPACITY));
        if let Some(ttl) = ttl {
            store = store.with_ttl(ttl);
        }
        if let Some(dir) = persist_dir {
            store = store.with_persistence(dir);
        }
        OtelTracingPlugin {
            state: Arc::new(store),
        }
    }
}

impl Default for OtelTracingPlugin<Ready> {
    fn default() -> Self {
        Self::new()
    }
}

impl Plugin for OtelTracingPlugin<Ready> {
    const ID: &'static str = "polaris::dashboard::otel";
    const VERSION: Version = Version::new(0, 1, 0);

    fn build(&self, server: &mut Server) {
        let router = Router::new()
            .route(
                "/v1/traces",
                post(ingest_traces).layer(DefaultBodyLimit::max(MAX_INGEST_BODY_BYTES)),
            )
            .route("/v1/tracing/runs", get(list_runs))
            .route("/v1/tracing/runs/{run_id}", get(get_run))
            .with_state(self.state.clone());

        server
            .api::<HttpRouter>()
            .expect("AppPlugin must be added before OtelTracingPlugin")
            .add_routes(router);
    }

    fn dependencies(&self) -> Vec<PluginId> {
        vec![PluginId::of::<AppPlugin>()]
    }
}

/// OTLP/HTTP JSON ingest. Returns the OTLP success envelope. Non-JSON bodies
/// (e.g. protobuf) are rejected by the `Json` extractor with a 4xx — this
/// collector speaks OTLP/JSON only.
async fn ingest_traces(
    State(store): State<Arc<TraceStore>>,
    Json(request): Json<ExportTraceServiceRequest>,
) -> Response {
    // `ingest` locks a std mutex and, when persistence is enabled, performs
    // synchronous filesystem I/O. Run it on the blocking pool so it never
    // stalls an async runtime thread. A panic in the closure (it shouldn't —
    // poison is handled internally) surfaces as a JoinError → zero accepted.
    let accepted = tokio::task::spawn_blocking(move || store.ingest(request))
        .await
        .unwrap_or(0);
    tracing::debug!(accepted, "otel: ingested spans");
    // `partialSuccess` empty == fully accepted, per the OTLP spec.
    (StatusCode::OK, Json(json!({ "partialSuccess": {} }))).into_response()
}

async fn list_runs(State(store): State<Arc<TraceStore>>) -> Json<RunsResponse> {
    Json(RunsResponse {
        items: store.list_runs(),
    })
}

async fn get_run(State(store): State<Arc<TraceStore>>, Path(run_id): Path<String>) -> Response {
    match store.run_tree(&run_id) {
        Some(tree) => Json(tree).into_response(),
        None => (StatusCode::NOT_FOUND, "run not found").into_response(),
    }
}

#[cfg(test)]
mod tests;