use anyhow::Result;
use async_trait::async_trait;
use serde::{Deserialize, Serialize};
use std::sync::Arc;
use super::intent::{IntentRequest, IntentResponse};
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub enum HealthStatus {
Healthy,
Degraded { reason: String },
Unhealthy { reason: String },
Starting,
Stopping,
}
impl HealthStatus {
pub fn is_healthy(&self) -> bool {
matches!(self, HealthStatus::Healthy | HealthStatus::Degraded { .. })
}
}
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct AdapterStats {
pub requests_received: u64,
pub requests_in_flight: u64,
pub requests_succeeded: u64,
pub requests_failed: u64,
pub avg_latency_ms: f64,
pub p95_latency_ms: f64,
pub p99_latency_ms: f64,
pub bytes_received: u64,
pub bytes_sent: u64,
pub active_connections: u64,
pub custom_metrics: Vec<(String, f64)>,
}
#[derive(Debug, Clone)]
pub struct RequestContext {
pub request_id: String,
pub source_adapter: String,
pub client_id: String,
pub received_at: chrono::DateTime<chrono::Utc>,
pub reply_to: Option<String>,
pub supports_streaming: bool,
pub metadata: Vec<(String, String)>,
}
impl Default for RequestContext {
fn default() -> Self {
Self {
request_id: uuid::Uuid::new_v4().to_string(),
source_adapter: String::new(),
client_id: String::new(),
received_at: chrono::Utc::now(),
reply_to: None,
supports_streaming: false,
metadata: vec![],
}
}
}
#[async_trait]
pub trait IntentHandler: Send + Sync {
async fn handle(&self, request: IntentRequest, ctx: RequestContext) -> Result<IntentResponse>;
async fn handle_streaming(
&self,
request: IntentRequest,
ctx: RequestContext,
output_tx: tokio::sync::mpsc::Sender<OutputChunk>,
) -> Result<IntentResponse>;
async fn supports_capability(&self, capability: &str) -> bool;
async fn list_capabilities(&self) -> Vec<CapabilityInfo>;
async fn health(&self) -> HealthStatus;
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum OutputChunk {
Stdout(Vec<u8>),
Stderr(Vec<u8>),
Progress { percent: f32, message: String },
Log { level: String, message: String },
Done,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CapabilityInfo {
pub name: String,
pub description: String,
pub version: u32,
pub param_schema: Option<serde_json::Value>,
pub requires_elevated: bool,
pub supports_streaming: bool,
pub tags: Vec<String>,
}
#[async_trait]
pub trait IngestAdapter: Send + Sync {
fn name(&self) -> &str;
async fn start(&self, handler: Arc<dyn IntentHandler>) -> Result<()>;
async fn stop(&self) -> Result<()>;
async fn health(&self) -> HealthStatus;
async fn stats(&self) -> AdapterStats;
fn is_running(&self) -> bool;
fn config_info(&self) -> AdapterConfigInfo;
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AdapterConfigInfo {
pub adapter_type: String,
pub listen_address: Option<String>,
pub remote_address: Option<String>,
pub tls_enabled: bool,
pub auth_methods: Vec<String>,
pub max_concurrent: Option<u32>,
pub extra: Vec<(String, String)>,
}
pub struct AdapterBuilder {
adapter_type: String,
listen_address: Option<String>,
tls_config: Option<TlsConfig>,
auth_config: Option<AuthConfig>,
max_concurrent: Option<u32>,
}
#[derive(Debug, Clone)]
pub struct TlsConfig {
pub cert_path: String,
pub key_path: String,
pub ca_path: Option<String>,
pub require_client_cert: bool,
}
#[derive(Debug, Clone)]
pub struct AuthConfig {
pub methods: Vec<String>,
pub jwt_config: Option<JwtConfig>,
pub api_key_config: Option<ApiKeyConfig>,
}
#[derive(Debug, Clone)]
pub struct JwtConfig {
pub issuer: Option<String>,
pub audience: Option<String>,
pub verification_key: String,
}
#[derive(Debug, Clone)]
pub struct ApiKeyConfig {
pub header_name: String,
pub prefix: Option<String>,
}
impl AdapterBuilder {
pub fn new(adapter_type: &str) -> Self {
Self {
adapter_type: adapter_type.to_string(),
listen_address: None,
tls_config: None,
auth_config: None,
max_concurrent: None,
}
}
pub fn listen(mut self, address: &str) -> Self {
self.listen_address = Some(address.to_string());
self
}
pub fn with_tls(mut self, config: TlsConfig) -> Self {
self.tls_config = Some(config);
self
}
pub fn with_auth(mut self, config: AuthConfig) -> Self {
self.auth_config = Some(config);
self
}
pub fn max_concurrent(mut self, max: u32) -> Self {
self.max_concurrent = Some(max);
self
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_health_status_is_healthy() {
assert!(HealthStatus::Healthy.is_healthy());
assert!(HealthStatus::Degraded {
reason: "test".to_string()
}
.is_healthy());
assert!(!HealthStatus::Unhealthy {
reason: "test".to_string()
}
.is_healthy());
assert!(!HealthStatus::Starting.is_healthy());
assert!(!HealthStatus::Stopping.is_healthy());
}
#[test]
fn test_health_status_equality() {
assert_eq!(HealthStatus::Healthy, HealthStatus::Healthy);
assert_eq!(HealthStatus::Starting, HealthStatus::Starting);
assert_ne!(HealthStatus::Healthy, HealthStatus::Starting);
}
#[test]
fn test_adapter_stats_default() {
let stats = AdapterStats::default();
assert_eq!(stats.requests_received, 0);
assert_eq!(stats.requests_in_flight, 0);
assert_eq!(stats.requests_succeeded, 0);
assert_eq!(stats.requests_failed, 0);
assert_eq!(stats.avg_latency_ms, 0.0);
assert_eq!(stats.p95_latency_ms, 0.0);
assert_eq!(stats.p99_latency_ms, 0.0);
assert_eq!(stats.bytes_received, 0);
assert_eq!(stats.bytes_sent, 0);
assert_eq!(stats.active_connections, 0);
assert!(stats.custom_metrics.is_empty());
}
#[test]
fn test_request_context_default() {
let ctx = RequestContext::default();
assert!(!ctx.request_id.is_empty());
assert!(ctx.source_adapter.is_empty());
assert!(ctx.client_id.is_empty());
assert!(ctx.reply_to.is_none());
assert!(!ctx.supports_streaming);
assert!(ctx.metadata.is_empty());
}
#[test]
fn test_output_chunk_variants() {
let stdout = OutputChunk::Stdout(vec![1, 2, 3]);
let stderr = OutputChunk::Stderr(vec![4, 5, 6]);
let progress = OutputChunk::Progress {
percent: 50.0,
message: "halfway".to_string(),
};
let log = OutputChunk::Log {
level: "info".to_string(),
message: "test".to_string(),
};
let done = OutputChunk::Done;
let _ = stdout.clone();
let _ = stderr.clone();
let _ = progress.clone();
let _ = log.clone();
let _ = done.clone();
}
#[test]
fn test_capability_info_creation() {
let info = CapabilityInfo {
name: "fs.read.v1".to_string(),
description: "Read files".to_string(),
version: 1,
param_schema: None,
requires_elevated: false,
supports_streaming: false,
tags: vec!["filesystem".to_string()],
};
assert_eq!(info.name, "fs.read.v1");
assert_eq!(info.version, 1);
}
#[test]
fn test_adapter_config_info_creation() {
let info = AdapterConfigInfo {
adapter_type: "grpc".to_string(),
listen_address: Some("0.0.0.0:9500".to_string()),
remote_address: None,
tls_enabled: true,
auth_methods: vec!["jwt".to_string()],
max_concurrent: Some(100),
extra: vec![],
};
assert_eq!(info.adapter_type, "grpc");
assert!(info.tls_enabled);
}
#[test]
fn test_adapter_builder() {
let builder = AdapterBuilder::new("grpc")
.listen("0.0.0.0:9500")
.max_concurrent(100);
assert_eq!(builder.adapter_type, "grpc");
assert_eq!(builder.listen_address, Some("0.0.0.0:9500".to_string()));
assert_eq!(builder.max_concurrent, Some(100));
}
#[test]
fn test_adapter_builder_with_tls() {
let tls_config = TlsConfig {
cert_path: "/path/to/cert".to_string(),
key_path: "/path/to/key".to_string(),
ca_path: Some("/path/to/ca".to_string()),
require_client_cert: true,
};
let builder = AdapterBuilder::new("grpc").with_tls(tls_config);
assert!(builder.tls_config.is_some());
let tls = builder.tls_config.unwrap();
assert_eq!(tls.cert_path, "/path/to/cert");
assert!(tls.require_client_cert);
}
#[test]
fn test_adapter_builder_with_auth() {
let auth_config = AuthConfig {
methods: vec!["jwt".to_string()],
jwt_config: Some(JwtConfig {
issuer: Some("test-issuer".to_string()),
audience: Some("test-audience".to_string()),
verification_key: "test-key".to_string(),
}),
api_key_config: None,
};
let builder = AdapterBuilder::new("http").with_auth(auth_config);
assert!(builder.auth_config.is_some());
}
#[test]
fn test_tls_config_creation() {
let config = TlsConfig {
cert_path: "cert.pem".to_string(),
key_path: "key.pem".to_string(),
ca_path: None,
require_client_cert: false,
};
assert_eq!(config.cert_path, "cert.pem");
assert!(!config.require_client_cert);
}
#[test]
fn test_api_key_config_creation() {
let config = ApiKeyConfig {
header_name: "X-API-Key".to_string(),
prefix: Some("Bearer ".to_string()),
};
assert_eq!(config.header_name, "X-API-Key");
assert_eq!(config.prefix, Some("Bearer ".to_string()));
}
}