mod extraction;
pub mod layers;
pub mod request;
pub use extraction::ExtractionService;
pub use request::{ExtractionRequest, ExtractionSource};
use crate::KreuzbergError;
use crate::types::ExtractionResult;
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::Duration;
use tower::util::BoxCloneService;
use tower::{Service, ServiceBuilder, ServiceExt};
pub struct ExtractionServiceBuilder {
timeout: Option<Duration>,
concurrency_limit: Option<usize>,
tracing: bool,
#[cfg(feature = "otel")]
metrics: bool,
}
impl Default for ExtractionServiceBuilder {
fn default() -> Self {
Self::new()
}
}
impl ExtractionServiceBuilder {
pub fn new() -> Self {
Self {
timeout: None,
concurrency_limit: None,
tracing: false,
#[cfg(feature = "otel")]
metrics: false,
}
}
pub fn with_timeout(mut self, duration: Duration) -> Self {
self.timeout = Some(duration);
self
}
pub fn with_concurrency_limit(mut self, max: usize) -> Self {
self.concurrency_limit = Some(max);
self
}
pub fn with_tracing(mut self) -> Self {
self.tracing = true;
self
}
#[allow(unused_mut)]
pub fn with_metrics(mut self) -> Self {
#[cfg(feature = "otel")]
{
self.metrics = true;
}
self
}
pub fn build(self) -> BoxCloneService<ExtractionRequest, ExtractionResult, KreuzbergError> {
let svc = ExtractionService::new();
let svc = match self.concurrency_limit {
Some(limit) => ServiceBuilder::new()
.concurrency_limit(limit)
.service(svc)
.boxed_clone(),
None => svc.boxed_clone(),
};
let svc: BoxCloneService<ExtractionRequest, ExtractionResult, KreuzbergError> = match self.timeout {
Some(duration) => {
let timeout_svc = TimeoutService { inner: svc, duration };
timeout_svc.boxed_clone()
}
None => svc,
};
#[cfg(feature = "otel")]
let svc = if self.metrics {
ServiceBuilder::new()
.layer(layers::metrics::MetricsLayer::new())
.service(svc)
.boxed_clone()
} else {
svc
};
if self.tracing {
ServiceBuilder::new()
.layer(layers::tracing::TracingLayer::new())
.service(svc)
.boxed_clone()
} else {
svc
}
}
}
#[derive(Clone)]
struct TimeoutService {
inner: BoxCloneService<ExtractionRequest, ExtractionResult, KreuzbergError>,
duration: Duration,
}
impl Service<ExtractionRequest> for TimeoutService {
type Response = ExtractionResult;
type Error = KreuzbergError;
type Future = Pin<Box<dyn Future<Output = crate::Result<ExtractionResult>> + Send>>;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<crate::Result<()>> {
self.inner.poll_ready(cx)
}
fn call(&mut self, req: ExtractionRequest) -> Self::Future {
let fut = self.inner.call(req);
let duration = self.duration;
let start = std::time::Instant::now();
Box::pin(async move {
match tokio::time::timeout(duration, fut).await {
Ok(result) => result,
Err(_elapsed) => Err(KreuzbergError::Timeout {
elapsed_ms: start.elapsed().as_millis() as u64,
limit_ms: duration.as_millis() as u64,
}),
}
})
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::core::config::ExtractionConfig;
#[test]
fn builder_new_builds_service() {
let _svc = ExtractionServiceBuilder::new().build();
}
#[test]
fn builder_with_timeout_does_not_panic() {
let _svc = ExtractionServiceBuilder::new()
.with_timeout(Duration::from_secs(30))
.build();
}
#[test]
fn builder_with_concurrency_limit_does_not_panic() {
let _svc = ExtractionServiceBuilder::new().with_concurrency_limit(4).build();
}
#[tokio::test]
async fn builder_service_extracts_text() {
let mut svc = ExtractionServiceBuilder::new().build();
let req = ExtractionRequest::bytes(
b"hello from builder".as_slice(),
"text/plain",
ExtractionConfig::default(),
);
let result = svc.call(req).await.expect("extraction should succeed");
assert!(result.content.contains("hello from builder"));
}
#[tokio::test]
async fn builder_with_timeout_extracts_text() {
let mut svc = ExtractionServiceBuilder::new()
.with_timeout(Duration::from_secs(10))
.build();
let req = ExtractionRequest::bytes(b"timeout test".as_slice(), "text/plain", ExtractionConfig::default());
let result = svc.call(req).await.expect("extraction should succeed within timeout");
assert!(result.content.contains("timeout test"));
}
#[tokio::test]
async fn timeout_fires_on_zero_duration() {
let mut svc = ExtractionServiceBuilder::new()
.with_timeout(Duration::from_nanos(1))
.build();
let req = ExtractionRequest::bytes(b"hello".as_slice(), "text/plain", ExtractionConfig::default());
let result = svc.call(req).await;
match result {
Ok(r) => assert!(r.content.contains("hello")),
Err(KreuzbergError::Timeout { .. }) => { }
Err(other) => panic!("expected Ok or Timeout, got: {:?}", other),
}
}
}