kreuzberg 4.8.4

High-performance document intelligence library for Rust. Extract text, metadata, and structured data from PDFs, Office documents, images, and 91+ formats and 248 programming languages via tree-sitter code intelligence with async/sync APIs.
Documentation
//! Metrics layer for the extraction service.
//!
//! Records service-level counters, histograms, and gauges on every
//! extraction request using the kreuzberg OTel metric instruments.

use crate::types::ExtractionResult;
use crate::{KreuzbergError, Result};
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use tower::{Layer, Service};

use crate::service::request::{ExtractionRequest, ExtractionSource};
use crate::telemetry::conventions;

// ---------------------------------------------------------------------------
// Layer
// ---------------------------------------------------------------------------

/// A [`tower::Layer`] that records service-level extraction metrics.
#[derive(Debug, Clone, Default)]
pub struct MetricsLayer;

impl MetricsLayer {
    pub fn new() -> Self {
        Self
    }
}

impl<S> Layer<S> for MetricsLayer {
    type Service = MetricsService<S>;

    fn layer(&self, inner: S) -> Self::Service {
        MetricsService { inner }
    }
}

// ---------------------------------------------------------------------------
// Service
// ---------------------------------------------------------------------------

/// Middleware service that records extraction metrics.
#[derive(Debug, Clone)]
pub struct MetricsService<S> {
    inner: S,
}

impl<S> Service<ExtractionRequest> for MetricsService<S>
where
    S: Service<ExtractionRequest, Response = ExtractionResult, Error = KreuzbergError> + Clone + Send + 'static,
    S::Future: Send,
{
    type Response = ExtractionResult;
    type Error = KreuzbergError;
    type Future = Pin<Box<dyn Future<Output = Result<ExtractionResult>> + Send>>;

    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> {
        self.inner.poll_ready(cx)
    }

    fn call(&mut self, req: ExtractionRequest) -> Self::Future {
        let metrics = crate::telemetry::metrics::get_metrics();
        let mime_type = match &req.source {
            ExtractionSource::File { .. } => "unknown".to_owned(),
            ExtractionSource::Bytes { mime_type, .. } => mime_type.clone(),
        };

        metrics.concurrent_extractions.add(1, &[]);

        let mut inner = self.inner.clone();

        Box::pin(async move {
            let result = inner.call(req).await;

            let status = if result.is_ok() { "ok" } else { "error" };
            let attrs = [
                opentelemetry::KeyValue::new(conventions::DOCUMENT_MIME_TYPE, mime_type),
                opentelemetry::KeyValue::new("status", status),
            ];

            metrics.extraction_total.add(1, &attrs);
            metrics.concurrent_extractions.add(-1, &[]);

            result
        })
    }
}