use crate::{
Document, EmbeddingService, MemoryService, Pipeline, RetrievalService, RragAgent, RragError,
RragResult, SearchResult, StorageService,
};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Instant;
use tokio::sync::RwLock;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RragSystemConfig {
pub name: String,
pub version: String,
pub environment: String,
pub components: ComponentConfigs,
pub performance: PerformanceConfig,
pub monitoring: MonitoringConfig,
pub features: FeatureFlags,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ComponentConfigs {
pub embedding: EmbeddingConfig,
pub storage: StorageConfig,
pub retrieval: RetrievalConfig,
pub memory: MemoryConfig,
pub agent: AgentConfig,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct EmbeddingConfig {
pub provider: String,
pub model: String,
pub batch_size: usize,
pub timeout_seconds: u64,
pub max_retries: usize,
pub api_key_env: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct StorageConfig {
pub backend: String,
pub connection_string: Option<String>,
pub max_connections: Option<usize>,
pub timeout_seconds: u64,
pub enable_compression: bool,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RetrievalConfig {
pub index_type: String,
pub similarity_threshold: f32,
pub max_results: usize,
pub enable_reranking: bool,
pub cache_results: bool,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct MemoryConfig {
pub memory_type: String,
pub max_messages: usize,
pub max_tokens: Option<usize>,
pub enable_summarization: bool,
pub persistence_enabled: bool,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AgentConfig {
pub model_provider: String,
pub model_name: String,
pub temperature: f32,
pub max_tokens: usize,
pub max_tool_calls: usize,
pub enable_streaming: bool,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PerformanceConfig {
pub max_concurrency: usize,
pub request_timeout_seconds: u64,
pub connection_pool_size: usize,
pub cache_size: usize,
pub cache_ttl_seconds: u64,
pub rate_limit_per_second: Option<u32>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct MonitoringConfig {
pub enable_metrics: bool,
pub enable_tracing: bool,
pub log_level: String,
pub health_check_interval_seconds: u64,
pub metrics_endpoint: Option<String>,
pub tracing_endpoint: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct FeatureFlags {
pub enable_experimental: bool,
pub enable_async_processing: bool,
pub enable_auto_retry: bool,
pub enable_validation: bool,
pub enable_caching: bool,
}
impl Default for RragSystemConfig {
fn default() -> Self {
Self {
name: "RRAG System".to_string(),
version: "1.0.0".to_string(),
environment: "development".to_string(),
components: ComponentConfigs::default(),
performance: PerformanceConfig::default(),
monitoring: MonitoringConfig::default(),
features: FeatureFlags::default(),
}
}
}
impl Default for ComponentConfigs {
fn default() -> Self {
Self {
embedding: EmbeddingConfig::default(),
storage: StorageConfig::default(),
retrieval: RetrievalConfig::default(),
memory: MemoryConfig::default(),
agent: AgentConfig::default(),
}
}
}
impl Default for EmbeddingConfig {
fn default() -> Self {
Self {
provider: "openai".to_string(),
model: "text-embedding-ada-002".to_string(),
batch_size: 100,
timeout_seconds: 30,
max_retries: 3,
api_key_env: "OPENAI_API_KEY".to_string(),
}
}
}
impl Default for StorageConfig {
fn default() -> Self {
Self {
backend: "in_memory".to_string(),
connection_string: None,
max_connections: Some(10),
timeout_seconds: 30,
enable_compression: false,
}
}
}
impl Default for RetrievalConfig {
fn default() -> Self {
Self {
index_type: "in_memory".to_string(),
similarity_threshold: 0.7,
max_results: 10,
enable_reranking: true,
cache_results: false,
}
}
}
impl Default for MemoryConfig {
fn default() -> Self {
Self {
memory_type: "buffer".to_string(),
max_messages: 100,
max_tokens: Some(4000),
enable_summarization: false,
persistence_enabled: false,
}
}
}
impl Default for AgentConfig {
fn default() -> Self {
Self {
model_provider: "openai".to_string(),
model_name: "gpt-3.5-turbo".to_string(),
temperature: 0.7,
max_tokens: 2048,
max_tool_calls: 10,
enable_streaming: true,
}
}
}
impl Default for PerformanceConfig {
fn default() -> Self {
Self {
max_concurrency: 10,
request_timeout_seconds: 300,
connection_pool_size: 10,
cache_size: 1000,
cache_ttl_seconds: 3600,
rate_limit_per_second: None,
}
}
}
impl Default for MonitoringConfig {
fn default() -> Self {
Self {
enable_metrics: true,
enable_tracing: false,
log_level: "info".to_string(),
health_check_interval_seconds: 30,
metrics_endpoint: None,
tracing_endpoint: None,
}
}
}
impl Default for FeatureFlags {
fn default() -> Self {
Self {
enable_experimental: false,
enable_async_processing: true,
enable_auto_retry: true,
enable_validation: true,
enable_caching: true,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SystemMetrics {
pub uptime_seconds: u64,
pub request_counts: RequestCounts,
pub performance: PerformanceMetrics,
pub component_health: HashMap<String, HealthStatus>,
pub resource_usage: ResourceUsage,
pub last_updated: chrono::DateTime<chrono::Utc>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RequestCounts {
pub total_requests: u64,
pub successful_requests: u64,
pub failed_requests: u64,
pub embedding_requests: u64,
pub retrieval_requests: u64,
pub agent_requests: u64,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PerformanceMetrics {
pub average_response_time_ms: f64,
pub p95_response_time_ms: f64,
pub p99_response_time_ms: f64,
pub requests_per_second: f64,
pub error_rate: f64,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ResourceUsage {
pub memory_usage_mb: f64,
pub cpu_usage_percent: f64,
pub storage_usage_mb: f64,
pub network_bytes_sent: u64,
pub network_bytes_received: u64,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub enum HealthStatus {
Healthy,
Degraded,
Unhealthy,
Unknown,
}
pub struct RragSystem {
config: RragSystemConfig,
services: SystemServices,
metrics: Arc<RwLock<SystemMetrics>>,
start_time: Instant,
health_checkers: HashMap<String, Box<dyn HealthChecker>>,
}
pub struct SystemServices {
pub embedding: Option<Arc<EmbeddingService>>,
pub storage: Option<Arc<StorageService>>,
pub retrieval: Option<Arc<RetrievalService>>,
pub memory: Option<Arc<MemoryService>>,
pub agents: HashMap<String, Arc<RragAgent>>,
pub pipelines: HashMap<String, Arc<Pipeline>>,
}
trait HealthChecker: Send + Sync {
fn check_health(
&self,
) -> Box<dyn std::future::Future<Output = RragResult<HealthStatus>> + Send + '_>;
}
impl RragSystem {
pub async fn new(config: RragSystemConfig) -> RragResult<Self> {
let services = SystemServices {
embedding: None,
storage: None,
retrieval: None,
memory: None,
agents: HashMap::new(),
pipelines: HashMap::new(),
};
let metrics = Arc::new(RwLock::new(SystemMetrics {
uptime_seconds: 0,
request_counts: RequestCounts {
total_requests: 0,
successful_requests: 0,
failed_requests: 0,
embedding_requests: 0,
retrieval_requests: 0,
agent_requests: 0,
},
performance: PerformanceMetrics {
average_response_time_ms: 0.0,
p95_response_time_ms: 0.0,
p99_response_time_ms: 0.0,
requests_per_second: 0.0,
error_rate: 0.0,
},
component_health: HashMap::new(),
resource_usage: ResourceUsage {
memory_usage_mb: 0.0,
cpu_usage_percent: 0.0,
storage_usage_mb: 0.0,
network_bytes_sent: 0,
network_bytes_received: 0,
},
last_updated: chrono::Utc::now(),
}));
Ok(Self {
config,
services,
metrics,
start_time: Instant::now(),
health_checkers: HashMap::new(),
})
}
pub async fn initialize(&mut self) -> RragResult<()> {
println!("Initializing RRAG System: {}", self.config.name);
println!("Environment: {}", self.config.environment);
println!("Version: {}", self.config.version);
let mut metrics = self.metrics.write().await;
metrics
.component_health
.insert("system".to_string(), HealthStatus::Healthy);
metrics.last_updated = chrono::Utc::now();
Ok(())
}
pub async fn process_document(&self, document: Document) -> RragResult<ProcessingResult> {
let start_time = Instant::now();
let mut result = ProcessingResult::new();
{
let mut metrics = self.metrics.write().await;
metrics.request_counts.total_requests += 1;
}
result.processing_time_ms = start_time.elapsed().as_millis() as u64;
result.success = true;
result.metadata.insert(
"document_id".to_string(),
serde_json::Value::String(document.id.clone()),
);
{
let mut metrics = self.metrics.write().await;
metrics.request_counts.successful_requests += 1;
}
Ok(result)
}
pub async fn search(&self, query: String, _limit: Option<usize>) -> RragResult<SearchResponse> {
let start_time = Instant::now();
{
let mut metrics = self.metrics.write().await;
metrics.request_counts.total_requests += 1;
metrics.request_counts.retrieval_requests += 1;
}
let response = SearchResponse {
query: query.clone(),
results: Vec::new(), processing_time_ms: start_time.elapsed().as_millis() as u64,
total_results: 0,
metadata: HashMap::new(),
};
{
let mut metrics = self.metrics.write().await;
metrics.request_counts.successful_requests += 1;
}
Ok(response)
}
pub async fn chat(
&self,
agent_id: &str,
message: String,
conversation_id: Option<String>,
) -> RragResult<ChatResponse> {
let start_time = Instant::now();
{
let mut metrics = self.metrics.write().await;
metrics.request_counts.total_requests += 1;
metrics.request_counts.agent_requests += 1;
}
let response = ChatResponse {
agent_id: agent_id.to_string(),
response: format!("Echo: {}", message), conversation_id: conversation_id.unwrap_or_else(|| uuid::Uuid::new_v4().to_string()),
processing_time_ms: start_time.elapsed().as_millis() as u64,
tool_calls: Vec::new(),
metadata: HashMap::new(),
};
{
let mut metrics = self.metrics.write().await;
metrics.request_counts.successful_requests += 1;
}
Ok(response)
}
pub async fn get_metrics(&self) -> SystemMetrics {
let mut metrics = self.metrics.read().await.clone();
metrics.uptime_seconds = self.start_time.elapsed().as_secs();
metrics.last_updated = chrono::Utc::now();
metrics
}
pub async fn health_check(&self) -> RragResult<HealthCheckResult> {
let mut result = HealthCheckResult {
overall_status: HealthStatus::Healthy,
component_status: HashMap::new(),
check_time: chrono::Utc::now(),
uptime_seconds: self.start_time.elapsed().as_secs(),
version: self.config.version.clone(),
};
result
.component_status
.insert("system".to_string(), HealthStatus::Healthy);
if let Some(_embedding_service) = &self.services.embedding {
result
.component_status
.insert("embedding".to_string(), HealthStatus::Healthy);
}
if let Some(_storage_service) = &self.services.storage {
result
.component_status
.insert("storage".to_string(), HealthStatus::Healthy);
}
if let Some(_retrieval_service) = &self.services.retrieval {
result
.component_status
.insert("retrieval".to_string(), HealthStatus::Healthy);
}
let has_unhealthy = result
.component_status
.values()
.any(|status| *status == HealthStatus::Unhealthy);
let has_degraded = result
.component_status
.values()
.any(|status| *status == HealthStatus::Degraded);
result.overall_status = if has_unhealthy {
HealthStatus::Unhealthy
} else if has_degraded {
HealthStatus::Degraded
} else {
HealthStatus::Healthy
};
Ok(result)
}
pub async fn shutdown(&self) -> RragResult<()> {
println!("Shutting down RRAG System gracefully...");
println!("RRAG System shutdown complete");
Ok(())
}
pub fn get_config(&self) -> &RragSystemConfig {
&self.config
}
pub async fn update_config(&mut self, new_config: RragSystemConfig) -> RragResult<()> {
self.validate_config(&new_config)?;
self.config = new_config;
println!("System configuration updated");
Ok(())
}
fn validate_config(&self, config: &RragSystemConfig) -> RragResult<()> {
if config.name.is_empty() {
return Err(RragError::validation("name", "non-empty", "empty"));
}
if config.version.is_empty() {
return Err(RragError::validation("version", "non-empty", "empty"));
}
if config.performance.max_concurrency == 0 {
return Err(RragError::validation("max_concurrency", "> 0", "0"));
}
Ok(())
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ProcessingResult {
pub success: bool,
pub processing_time_ms: u64,
pub items_processed: usize,
pub errors: Vec<String>,
pub metadata: HashMap<String, serde_json::Value>,
}
impl ProcessingResult {
pub fn new() -> Self {
Self {
success: false,
processing_time_ms: 0,
items_processed: 0,
errors: Vec::new(),
metadata: HashMap::new(),
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SearchResponse {
pub query: String,
pub results: Vec<SearchResult>,
pub processing_time_ms: u64,
pub total_results: usize,
pub metadata: HashMap<String, serde_json::Value>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ChatResponse {
pub agent_id: String,
pub response: String,
pub conversation_id: String,
pub processing_time_ms: u64,
pub tool_calls: Vec<String>, pub metadata: HashMap<String, serde_json::Value>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct HealthCheckResult {
pub overall_status: HealthStatus,
pub component_status: HashMap<String, HealthStatus>,
pub check_time: chrono::DateTime<chrono::Utc>,
pub uptime_seconds: u64,
pub version: String,
}
pub struct RragSystemBuilder {
config: RragSystemConfig,
}
impl RragSystemBuilder {
pub fn new() -> Self {
Self {
config: RragSystemConfig::default(),
}
}
pub fn with_name(mut self, name: impl Into<String>) -> Self {
self.config.name = name.into();
self
}
pub fn with_environment(mut self, environment: impl Into<String>) -> Self {
self.config.environment = environment.into();
self
}
pub fn with_embedding_config(mut self, config: EmbeddingConfig) -> Self {
self.config.components.embedding = config;
self
}
pub fn with_storage_config(mut self, config: StorageConfig) -> Self {
self.config.components.storage = config;
self
}
pub fn with_performance_config(mut self, config: PerformanceConfig) -> Self {
self.config.performance = config;
self
}
pub fn enable_feature(mut self, feature: &str, enabled: bool) -> Self {
match feature {
"experimental" => self.config.features.enable_experimental = enabled,
"async_processing" => self.config.features.enable_async_processing = enabled,
"auto_retry" => self.config.features.enable_auto_retry = enabled,
"validation" => self.config.features.enable_validation = enabled,
"caching" => self.config.features.enable_caching = enabled,
_ => {} }
self
}
pub async fn build(self) -> RragResult<RragSystem> {
let mut system = RragSystem::new(self.config).await?;
system.initialize().await?;
Ok(system)
}
}
impl Default for RragSystemBuilder {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_system_creation() {
let config = RragSystemConfig::default();
let system = RragSystem::new(config).await.unwrap();
assert_eq!(system.config.name, "RRAG System");
assert_eq!(system.config.environment, "development");
}
#[tokio::test]
async fn test_system_builder() {
let system = RragSystemBuilder::new()
.with_name("Test System")
.with_environment("test")
.enable_feature("experimental", true)
.build()
.await
.unwrap();
assert_eq!(system.config.name, "Test System");
assert_eq!(system.config.environment, "test");
assert!(system.config.features.enable_experimental);
}
#[tokio::test]
async fn test_health_check() {
let system = RragSystemBuilder::new().build().await.unwrap();
let health = system.health_check().await.unwrap();
assert_eq!(health.overall_status, HealthStatus::Healthy);
assert!(health.component_status.contains_key("system"));
assert!(health.uptime_seconds >= 0);
}
#[tokio::test]
async fn test_metrics() {
let system = RragSystemBuilder::new().build().await.unwrap();
let metrics = system.get_metrics().await;
assert_eq!(metrics.request_counts.total_requests, 0);
assert!(metrics.uptime_seconds >= 0);
}
#[tokio::test]
async fn test_document_processing() {
let system = RragSystemBuilder::new().build().await.unwrap();
let doc = Document::new("Test document");
let result = system.process_document(doc).await.unwrap();
assert!(result.success);
assert!(result.processing_time_ms > 0);
}
#[tokio::test]
async fn test_search() {
let system = RragSystemBuilder::new().build().await.unwrap();
let response = system
.search("test query".to_string(), Some(5))
.await
.unwrap();
assert_eq!(response.query, "test query");
assert!(response.processing_time_ms > 0);
}
#[tokio::test]
async fn test_chat() {
let system = RragSystemBuilder::new().build().await.unwrap();
let response = system
.chat("test_agent", "Hello".to_string(), None)
.await
.unwrap();
assert_eq!(response.agent_id, "test_agent");
assert!(response.response.contains("Hello"));
assert!(response.processing_time_ms > 0);
}
#[test]
fn test_config_validation() {
let system = RragSystemBuilder::new().build();
let mut invalid_config = RragSystemConfig::default();
invalid_config.name = "".to_string();
assert!(invalid_config.name.is_empty());
}
#[test]
fn test_feature_flags() {
let mut config = RragSystemConfig::default();
config.features.enable_experimental = true;
config.features.enable_caching = false;
assert!(config.features.enable_experimental);
assert!(!config.features.enable_caching);
}
}