1#![allow(dead_code)]
10use anyhow::{Context, Result};
174use std::{collections::HashMap, sync::Arc, time::Duration};
175use tokio::sync::{Mutex, RwLock};
176use tracing::{debug, error, info, warn};
177
178pub mod analytics;
181pub mod cache;
182pub mod chat;
183pub mod chat_session;
184pub mod collaboration; pub mod collaboration_server; pub mod context;
187pub mod custom_prompts; pub mod custom_tools; pub mod dashboard; pub mod dashboard_server; pub mod enterprise_integration;
192pub mod error;
193pub mod explanation;
194pub mod exploration_guidance; pub mod external_services;
196pub mod graph_exploration;
198pub mod health_monitoring;
199pub mod i18n; pub mod knowledge_bases; pub mod llm;
202pub mod message_analytics;
203pub mod messages;
204pub mod nl2sparql;
205pub mod nlp; pub mod performance;
207pub mod persistence;
209pub mod query_refinement; pub mod rag;
211pub mod schema_introspection; pub mod export; pub mod plugins; pub mod rich_content;
216pub mod server;
217pub mod session;
218pub mod session_manager;
219pub mod sparql_optimizer;
220pub mod suggestions; pub mod types;
222pub mod utils; pub mod visualization; pub mod voice; pub mod webhooks; pub mod workflow;
227
228pub use chat_session::{ChatSession, SessionStatistics};
230pub use messages::{Message, MessageAttachment, MessageContent, MessageRole, RichContentElement};
231pub use session::*;
232pub use session_manager::{
233 ChatConfig, ContextWindow, SessionData, SessionMetrics, SessionState, TopicTracker,
234};
235pub use types::*;
236pub use types::{SessionStats, ThreadInfo};
237
238pub use rag::{AssembledContext, QueryContext, RAGConfig, RAGSystem};
240
241pub use schema_introspection::{
243 DiscoveredSchema, IntrospectionConfig, RdfClass, RdfProperty, SchemaIntrospector,
244};
245
246pub type ChatManager = llm::manager::EnhancedLLMManager;
248
249pub use llm::{
251 CircuitBreakerConfig, CircuitBreakerState, CircuitBreakerStats, LLMConfig, LLMResponse,
252};
253
254pub use collaboration::{
256 AccessControl, CollaborationConfig, CollaborationManager, CollaborationStats,
257 CollaborationUpdate, CursorPosition, Participant, ParticipantRole, ParticipantStatus,
258 SharedSession, TextRange,
259};
260
261pub use voice::{
263 AudioFormat, SpeechToTextProvider, SttProviderType, SttResult, SttStreamResult,
264 TextToSpeechProvider, TtsProviderType, TtsResult, VoiceConfig, VoiceInterface, WordTimestamp,
265};
266
267pub use dashboard::{
269 ActivityDataPoint, DashboardAnalytics, DashboardConfig, DashboardOverview, ExportFormat,
270 HealthAnalytics, HealthDataPoint, QueryAnalytics, QueryRecord, QueryType, SystemHealthMetrics,
271 TimeRange, UserActivity, UserActivityTracker, UserAnalytics,
272};
273
274pub struct OxiRSChat {
288 pub config: ChatConfig,
290 pub store: Arc<dyn oxirs_core::Store>,
292 sessions: Arc<RwLock<HashMap<String, Arc<Mutex<ChatSession>>>>>,
294 session_timeout: Duration,
296 rag_engine: Arc<Mutex<rag::RagEngine>>,
298 llm_manager: Arc<Mutex<llm::LLMManager>>,
300 nl2sparql_engine: Arc<Mutex<nl2sparql::NL2SPARQLSystem>>,
302}
303
304impl OxiRSChat {
305 pub async fn new(config: ChatConfig, store: Arc<dyn oxirs_core::Store>) -> Result<Self> {
307 Self::new_with_llm_config(config, store, None).await
308 }
309
310 pub async fn new_with_llm_config(
312 config: ChatConfig,
313 store: Arc<dyn oxirs_core::Store>,
314 llm_config: Option<llm::LLMConfig>,
315 ) -> Result<Self> {
316 let rag_config = rag::RagConfig {
318 retrieval: rag::RetrievalConfig {
319 enable_quantum_enhancement: true,
320 enable_consciousness_integration: true,
321 ..Default::default()
322 },
323 quantum: rag::QuantumConfig {
324 enabled: true,
325 ..Default::default()
326 },
327 consciousness: rag::consciousness::ConsciousnessConfig {
328 enabled: true,
329 ..Default::default()
330 },
331 ..Default::default()
332 };
333
334 let mut rag_engine =
335 rag::RagEngine::new(rag_config, store.clone() as Arc<dyn oxirs_core::Store>);
336 rag_engine
337 .initialize()
338 .await
339 .context("Failed to initialize RAG engine")?;
340
341 let llm_config = llm_config.unwrap_or_default();
343 let llm_manager = llm::LLMManager::new(llm_config)?;
344
345 let nl2sparql_config = nl2sparql::NL2SPARQLConfig::default();
347 let nl2sparql_engine =
348 nl2sparql::NL2SPARQLSystem::with_store(nl2sparql_config, None, store.clone())?;
349
350 let nl2sparql_for_schema = Arc::new(Mutex::new(nl2sparql_engine));
353 let nl2sparql_clone = nl2sparql_for_schema.clone();
354
355 tokio::spawn(async move {
357 let mut engine = nl2sparql_clone.lock().await;
358 if let Err(e) = engine.discover_schema().await {
359 warn!("Failed to discover schema for NL2SPARQL: {}", e);
360 } else {
361 info!("Schema discovery completed for NL2SPARQL enhancement");
362 }
363 });
364
365 Ok(Self {
366 config,
367 store,
368 sessions: Arc::new(RwLock::new(HashMap::new())),
369 session_timeout: Duration::from_secs(3600), rag_engine: Arc::new(Mutex::new(rag_engine)),
371 llm_manager: Arc::new(Mutex::new(llm_manager)),
372 nl2sparql_engine: nl2sparql_for_schema,
373 })
374 }
375
376 pub async fn discover_schema(&self) -> Result<()> {
378 let mut nl2sparql = self.nl2sparql_engine.lock().await;
379 nl2sparql.discover_schema().await
380 }
381
382 pub async fn get_discovered_schema(&self) -> Option<DiscoveredSchema> {
384 let nl2sparql = self.nl2sparql_engine.lock().await;
385 nl2sparql.get_schema().cloned()
386 }
387
388 pub async fn create_session(&self, session_id: String) -> Result<Arc<Mutex<ChatSession>>> {
390 let session = Arc::new(Mutex::new(ChatSession::new(
391 session_id.clone(),
392 self.store.clone(),
393 )));
394
395 let mut sessions = self.sessions.write().await;
396 sessions.insert(session_id, session.clone());
397
398 Ok(session)
399 }
400
401 pub async fn get_session(&self, session_id: &str) -> Option<Arc<Mutex<ChatSession>>> {
403 let sessions = self.sessions.read().await;
404 sessions.get(session_id).cloned()
405 }
406
407 pub async fn remove_session(&self, session_id: &str) -> bool {
409 let mut sessions = self.sessions.write().await;
410 sessions.remove(session_id).is_some()
411 }
412
413 pub async fn list_sessions(&self) -> Vec<String> {
415 let sessions = self.sessions.read().await;
416 sessions.keys().cloned().collect()
417 }
418
419 pub async fn cleanup_expired_sessions(&self) -> usize {
421 let mut sessions = self.sessions.write().await;
422 let mut expired_sessions = Vec::new();
423
424 for (session_id, session) in sessions.iter() {
425 if let Ok(session_guard) = session.try_lock() {
426 if session_guard.should_expire(
427 chrono::Duration::from_std(self.session_timeout)
428 .unwrap_or(chrono::Duration::seconds(3600)),
429 ) {
430 expired_sessions.push(session_id.clone());
431 }
432 }
433 }
434
435 for session_id in &expired_sessions {
436 sessions.remove(session_id);
437 }
438
439 expired_sessions.len()
440 }
441
442 pub async fn session_count(&self) -> usize {
444 let sessions = self.sessions.read().await;
445 sessions.len()
446 }
447
448 pub async fn save_sessions<P: AsRef<std::path::Path>>(
450 &self,
451 persistence_path: P,
452 ) -> Result<usize> {
453 use std::fs;
454
455 let sessions = self.sessions.read().await;
456 let mut saved_count = 0;
457
458 let persistence_dir = persistence_path.as_ref();
460 if !persistence_dir.exists() {
461 fs::create_dir_all(persistence_dir)
462 .context("Failed to create persistence directory")?;
463 }
464
465 info!(
466 "Saving {} active sessions to {:?}",
467 sessions.len(),
468 persistence_dir
469 );
470
471 for (session_id, session_arc) in sessions.iter() {
472 match session_arc.try_lock() {
473 Ok(session) => {
474 let session_data = session.to_data();
475 let session_file = persistence_dir.join(format!("{session_id}.json"));
476
477 match serde_json::to_string_pretty(&session_data) {
478 Ok(json_data) => {
479 if let Err(e) = fs::write(&session_file, json_data) {
480 error!("Failed to save session {}: {}", session_id, e);
481 } else {
482 debug!("Saved session {} to {:?}", session_id, session_file);
483 saved_count += 1;
484 }
485 }
486 Err(e) => {
487 error!("Failed to serialize session {}: {}", session_id, e);
488 }
489 }
490 }
491 Err(_) => {
492 warn!("Session {} is locked, skipping save", session_id);
493 }
494 }
495 }
496
497 info!(
498 "Successfully saved {} out of {} sessions",
499 saved_count,
500 sessions.len()
501 );
502 Ok(saved_count)
503 }
504
505 pub async fn load_sessions<P: AsRef<std::path::Path>>(
507 &self,
508 persistence_path: P,
509 ) -> Result<usize> {
510 use crate::chat_session::ChatSession;
511 use crate::session_manager::SessionData;
512 use std::fs;
513
514 let persistence_dir = persistence_path.as_ref();
515 if !persistence_dir.exists() {
516 info!(
517 "Persistence directory {:?} does not exist, no sessions to load",
518 persistence_dir
519 );
520 return Ok(0);
521 }
522
523 let mut loaded_count = 0;
524 let mut sessions = self.sessions.write().await;
525
526 info!("Loading sessions from {:?}", persistence_dir);
527
528 for entry in fs::read_dir(persistence_dir)? {
529 let entry = entry?;
530 let path = entry.path();
531
532 if path.extension().and_then(|s| s.to_str()) == Some("json") {
533 let session_id = path
534 .file_stem()
535 .and_then(|s| s.to_str())
536 .unwrap_or("unknown");
537
538 match fs::read_to_string(&path) {
539 Ok(json_data) => match serde_json::from_str::<SessionData>(&json_data) {
540 Ok(session_data) => {
541 let session = ChatSession::from_data(session_data, self.store.clone());
542 sessions.insert(session_id.to_string(), Arc::new(Mutex::new(session)));
543 loaded_count += 1;
544 debug!("Loaded session {} from {:?}", session_id, path);
545 }
546 Err(e) => {
547 error!("Failed to deserialize session from {:?}: {}", path, e);
548 }
549 },
550 Err(e) => {
551 error!("Failed to read session file {:?}: {}", path, e);
552 }
553 }
554 }
555 }
556
557 info!("Successfully loaded {} sessions", loaded_count);
558 Ok(loaded_count)
559 }
560
561 pub async fn process_message(&self, session_id: &str, user_message: String) -> Result<Message> {
563 let processing_start = std::time::Instant::now();
564 info!(
565 "Processing message for session {}: {}",
566 session_id,
567 user_message.chars().take(100).collect::<String>()
568 );
569
570 let session = self
571 .get_session(session_id)
572 .await
573 .ok_or_else(|| anyhow::anyhow!("Session not found: {}", session_id))?;
574
575 let mut session = session.lock().await;
576
577 let user_msg = Message {
579 id: uuid::Uuid::new_v4().to_string(),
580 role: MessageRole::User,
581 content: MessageContent::from_text(user_message.clone()),
582 timestamp: chrono::Utc::now(),
583 metadata: None,
584 thread_id: None,
585 parent_message_id: None,
586 token_count: Some(user_message.len() / 4), reactions: Vec::new(),
588 attachments: Vec::new(),
589 rich_elements: Vec::new(),
590 };
591
592 let user_msg_id = user_msg.id.clone();
594
595 session.add_message(user_msg)?;
597
598 debug!("Starting advanced RAG retrieval with quantum and consciousness capabilities");
602 let assembled_context = {
603 let mut rag_engine = self.rag_engine.lock().await;
604 rag_engine
605 .retrieve(&user_message)
606 .await
607 .context("Failed to perform advanced RAG retrieval")?
608 };
609
610 let (sparql_query, sparql_results) = if self.is_sparql_query(&user_message) {
612 debug!("Detected SPARQL query, performing NL2SPARQL translation");
613 let mut nl2sparql = self.nl2sparql_engine.lock().await;
614 let query_context = rag::QueryContext::new(session_id.to_string()).add_message(
615 rag::ConversationMessage {
616 role: rag::MessageRole::User,
617 content: user_message.clone(),
618 timestamp: chrono::Utc::now(),
619 },
620 );
621 match nl2sparql.generate_sparql(&query_context).await {
622 Ok(sparql) => {
623 debug!("Generated SPARQL: {}", sparql.query);
624 match self.execute_sparql(&sparql.query).await {
626 Ok(results) => (Some(sparql), Some(results)),
627 Err(e) => {
628 warn!("SPARQL execution failed: {}", e);
629 (Some(sparql), None)
630 }
631 }
632 }
633 Err(e) => {
634 warn!("NL2SPARQL translation failed: {}", e);
635 (None, None)
636 }
637 }
638 } else {
639 (None, None)
640 };
641
642 debug!("Generating response using LLM with assembled context");
644 let response_text = {
645 let mut llm_manager = self.llm_manager.lock().await;
646 self.generate_enhanced_response(
647 &mut llm_manager,
648 &user_message,
649 &assembled_context,
650 sparql_query.as_ref(),
651 sparql_results.as_ref(),
652 )
653 .await
654 .context("Failed to generate enhanced response")?
655 };
656
657 let mut rich_elements = Vec::new();
659
660 if let Some(ref quantum_results) = assembled_context.quantum_results {
662 if !quantum_results.is_empty() {
663 rich_elements.push(RichContentElement::QuantumVisualization {
664 results: quantum_results.clone(),
665 entanglement_map: HashMap::new(),
666 });
667 }
668 }
669
670 if let Some(ref consciousness_insights) = assembled_context.consciousness_insights {
672 if !consciousness_insights.is_empty() {
673 rich_elements.push(RichContentElement::ConsciousnessInsights {
674 insights: consciousness_insights.clone(),
675 awareness_level: 0.8, });
677 }
678 }
679
680 if let Some(ref reasoning_results) = assembled_context.reasoning_results {
682 rich_elements.push(RichContentElement::ReasoningChain {
683 reasoning_steps: reasoning_results.primary_chain.steps.clone(),
684 confidence_score: reasoning_results.reasoning_quality.overall_quality,
685 });
686 }
687
688 if let Some(ref results) = sparql_results {
690 rich_elements.push(RichContentElement::SPARQLResults {
691 query: sparql_query.map(|s| s.query).unwrap_or_default(),
692 results: results.clone(),
693 execution_time: processing_start.elapsed(),
694 });
695 }
696
697 let response_text_len = response_text.len();
699 let response = Message {
700 id: uuid::Uuid::new_v4().to_string(),
701 role: MessageRole::Assistant,
702 content: MessageContent::from_text(response_text),
703 timestamp: chrono::Utc::now(),
704 metadata: Some(messages::MessageMetadata {
705 source: Some("oxirs-chat".to_string()),
706 confidence: Some(assembled_context.context_score as f64),
707 processing_time_ms: Some(processing_start.elapsed().as_millis() as u64),
708 model_used: Some("oxirs-chat-ai".to_string()),
709 temperature: None,
710 max_tokens: None,
711 custom_fields: self
712 .create_response_metadata(&assembled_context, processing_start.elapsed())
713 .into_iter()
714 .map(|(k, v)| (k, serde_json::Value::String(v)))
715 .collect(),
716 }),
717 thread_id: None,
718 parent_message_id: Some(user_msg_id),
719 token_count: Some(response_text_len / 4), reactions: Vec::new(),
721 attachments: Vec::new(),
722 rich_elements,
723 };
724
725 session.add_message(response.clone())?;
727
728 info!(
729 "Advanced AI processing completed in {:?} with context score: {:.3}",
730 processing_start.elapsed(),
731 assembled_context.context_score
732 );
733
734 Ok(response)
735 }
736
737 fn is_sparql_query(&self, message: &str) -> bool {
739 let sparql_keywords = [
740 "select",
741 "construct",
742 "ask",
743 "describe",
744 "insert",
745 "delete",
746 "where",
747 "prefix",
748 "base",
749 "distinct",
750 "reduced",
751 "from",
752 "named",
753 "graph",
754 "optional",
755 "union",
756 "minus",
757 "bind",
758 "values",
759 "filter",
760 "order by",
761 "group by",
762 "having",
763 "limit",
764 "offset",
765 ];
766
767 let lowercase_message = message.to_lowercase();
768 sparql_keywords
769 .iter()
770 .any(|&keyword| lowercase_message.contains(keyword))
771 || lowercase_message.contains("sparql")
772 || lowercase_message.contains("query")
773 || lowercase_message.contains("find all")
774 || lowercase_message.contains("show me")
775 || lowercase_message.contains("list")
776 }
777
778 async fn execute_sparql(&self, sparql: &str) -> Result<Vec<HashMap<String, String>>> {
780 debug!("Executing SPARQL query: {}", sparql);
781
782 let query = self
784 .store
785 .prepare_query(sparql)
786 .context("Failed to prepare SPARQL query")?;
787
788 let results = query.exec().context("Failed to execute SPARQL query")?;
790
791 let mut result_maps = Vec::new();
792
793 for solution in results {
796 let mut result_map = HashMap::new();
797 for (var, term) in solution.iter() {
798 result_map.insert(var.to_string(), term.to_string());
799 }
800 result_maps.push(result_map);
801 }
802
803 debug!("SPARQL query returned {} results", result_maps.len());
804 Ok(result_maps)
805 }
806
807 async fn generate_enhanced_response(
809 &self,
810 llm_manager: &mut llm::LLMManager,
811 user_message: &str,
812 assembled_context: &rag::AssembledContext,
813 sparql_query: Option<&nl2sparql::SPARQLGenerationResult>,
814 sparql_results: Option<&Vec<HashMap<String, String>>>,
815 ) -> Result<String> {
816 let mut prompt = String::new();
818
819 prompt.push_str("You are an advanced AI assistant with access to a knowledge graph. ");
821 prompt.push_str("You have quantum-enhanced retrieval, consciousness-aware processing, ");
822 prompt.push_str("and advanced reasoning capabilities. ");
823 prompt.push_str("Provide helpful, accurate, and insightful responses based on the available context.\n\n");
824
825 prompt.push_str(&format!("User Query: {user_message}\n\n"));
827
828 if !assembled_context.semantic_results.is_empty() {
830 prompt.push_str("Relevant Knowledge Graph Facts:\n");
831 for (i, result) in assembled_context
832 .semantic_results
833 .iter()
834 .take(5)
835 .enumerate()
836 {
837 prompt.push_str(&format!(
838 "{}. {} (relevance: {:.2})\n",
839 i + 1,
840 result.triple,
841 result.score
842 ));
843 }
844 prompt.push('\n');
845 }
846
847 if !assembled_context.extracted_entities.is_empty() {
849 prompt.push_str("Extracted Entities:\n");
850 for entity in assembled_context.extracted_entities.iter().take(10) {
851 prompt.push_str(&format!(
852 "- {} (type: {:?}, confidence: {:.2})\n",
853 entity.text, entity.entity_type, entity.confidence
854 ));
855 }
856 prompt.push('\n');
857 }
858
859 if let Some(ref reasoning_results) = assembled_context.reasoning_results {
861 prompt.push_str("Advanced Reasoning Analysis:\n");
862 for step in reasoning_results.primary_chain.steps.iter().take(3) {
863 prompt.push_str(&format!(
864 "- {:?}: {:?} (confidence: {:.2})\n",
865 step.reasoning_type, step.conclusion_triple, step.confidence
866 ));
867 }
868 prompt.push('\n');
869 }
870
871 if let Some(ref consciousness_insights) = assembled_context.consciousness_insights {
873 if !consciousness_insights.is_empty() {
874 prompt.push_str("Consciousness-Aware Insights:\n");
875 for insight in consciousness_insights.iter().take(3) {
876 prompt.push_str(&format!(
877 "- {} (confidence: {:.2})\n",
878 insight.content, insight.confidence
879 ));
880 }
881 prompt.push('\n');
882 }
883 }
884
885 if let Some(sparql) = sparql_query {
887 prompt.push_str(&format!("Generated SPARQL Query:\n{}\n\n", sparql.query));
888
889 if let Some(results) = sparql_results {
890 prompt.push_str("SPARQL Query Results:\n");
891 for (i, result) in results.iter().take(10).enumerate() {
892 prompt.push_str(&format!("{}. ", i + 1));
893 for (key, value) in result {
894 prompt.push_str(&format!("{key}: {value} "));
895 }
896 prompt.push('\n');
897 }
898 prompt.push('\n');
899 }
900 }
901
902 if let Some(ref quantum_results) = assembled_context.quantum_results {
904 if !quantum_results.is_empty() {
905 prompt.push_str("Quantum-Enhanced Retrieval Information:\n");
906 prompt.push_str(&format!(
907 "Found {} quantum-optimized results with enhanced relevance scoring.\n\n",
908 quantum_results.len()
909 ));
910 }
911 }
912
913 prompt.push_str(
914 "Please provide a comprehensive, helpful response based on this information. ",
915 );
916 prompt.push_str(
917 "If SPARQL results are available, incorporate them naturally into your answer. ",
918 );
919 prompt.push_str("Highlight any interesting patterns or insights you discover.");
920
921 debug!(
923 "Generating LLM response with context length: {} chars",
924 prompt.len()
925 );
926 let llm_request = llm::LLMRequest {
927 messages: vec![llm::ChatMessage {
928 role: llm::ChatRole::User,
929 content: prompt.clone(),
930 metadata: None,
931 }],
932 system_prompt: Some(
933 "You are an advanced AI assistant with access to a knowledge graph.".to_string(),
934 ),
935 temperature: 0.7,
936 max_tokens: Some(1000),
937 use_case: llm::UseCase::Conversation,
938 priority: llm::Priority::Normal,
939 timeout: None,
940 };
941
942 let response = llm_manager
943 .generate_response(llm_request)
944 .await
945 .context("Failed to generate LLM response")?;
946
947 Ok(response.content)
948 }
949
950 fn create_response_metadata(
952 &self,
953 assembled_context: &rag::AssembledContext,
954 processing_time: Duration,
955 ) -> HashMap<String, String> {
956 let mut metadata = HashMap::new();
957
958 metadata.insert(
959 "context_score".to_string(),
960 assembled_context.context_score.to_string(),
961 );
962 metadata.insert(
963 "processing_time_ms".to_string(),
964 processing_time.as_millis().to_string(),
965 );
966 metadata.insert(
967 "semantic_results_count".to_string(),
968 assembled_context.semantic_results.len().to_string(),
969 );
970 metadata.insert(
971 "graph_results_count".to_string(),
972 assembled_context.graph_results.len().to_string(),
973 );
974 metadata.insert(
975 "extracted_entities_count".to_string(),
976 assembled_context.extracted_entities.len().to_string(),
977 );
978 metadata.insert(
979 "assembly_time_ms".to_string(),
980 assembled_context.assembly_time.as_millis().to_string(),
981 );
982
983 if let Some(ref quantum_results) = assembled_context.quantum_results {
985 metadata.insert(
986 "quantum_results_count".to_string(),
987 quantum_results.len().to_string(),
988 );
989 metadata.insert("quantum_enhanced".to_string(), "true".to_string());
990 }
991
992 if let Some(ref consciousness_insights) = assembled_context.consciousness_insights {
994 metadata.insert(
995 "consciousness_insights_count".to_string(),
996 consciousness_insights.len().to_string(),
997 );
998 metadata.insert("consciousness_enhanced".to_string(), "true".to_string());
999 }
1000
1001 if let Some(ref reasoning_results) = assembled_context.reasoning_results {
1003 metadata.insert(
1004 "reasoning_quality".to_string(),
1005 reasoning_results
1006 .reasoning_quality
1007 .overall_quality
1008 .to_string(),
1009 );
1010 metadata.insert("reasoning_enhanced".to_string(), "true".to_string());
1011 }
1012
1013 if let Some(ref extracted_knowledge) = assembled_context.extracted_knowledge {
1015 metadata.insert(
1016 "extracted_knowledge_score".to_string(),
1017 extracted_knowledge.confidence_score.to_string(),
1018 );
1019 metadata.insert(
1020 "knowledge_extraction_enhanced".to_string(),
1021 "true".to_string(),
1022 );
1023 }
1024
1025 metadata.insert("oxirs_chat_version".to_string(), VERSION.to_string());
1026 metadata.insert("advanced_ai_enabled".to_string(), "true".to_string());
1027
1028 metadata
1029 }
1030
1031 pub async fn get_session_statistics(&self, session_id: &str) -> Result<SessionStatistics> {
1033 let session = self
1034 .get_session(session_id)
1035 .await
1036 .ok_or_else(|| anyhow::anyhow!("Session not found: {}", session_id))?;
1037
1038 let session = session.lock().await;
1039 Ok(session.get_statistics())
1040 }
1041
1042 pub async fn export_session(&self, session_id: &str) -> Result<SessionData> {
1044 let session = self
1045 .get_session(session_id)
1046 .await
1047 .ok_or_else(|| anyhow::anyhow!("Session not found: {}", session_id))?;
1048
1049 let session = session.lock().await;
1050 Ok(session.export_data())
1051 }
1052
1053 pub async fn import_session(&self, session_data: SessionData) -> Result<()> {
1055 let session = Arc::new(Mutex::new(ChatSession::from_data(
1056 session_data.clone(),
1057 self.store.clone(),
1058 )));
1059
1060 let mut sessions = self.sessions.write().await;
1061 sessions.insert(session_data.id, session);
1062
1063 Ok(())
1064 }
1065
1066 pub async fn get_circuit_breaker_stats(
1068 &self,
1069 ) -> Result<HashMap<String, llm::CircuitBreakerStats>> {
1070 let llm_manager = self.llm_manager.lock().await;
1071 Ok(llm_manager.get_circuit_breaker_stats().await)
1072 }
1073
1074 pub async fn reset_circuit_breaker(&self, provider_name: &str) -> Result<()> {
1076 let llm_manager = self.llm_manager.lock().await;
1077 llm_manager.reset_circuit_breaker(provider_name).await
1078 }
1079
1080 pub async fn process_message_stream(
1082 &self,
1083 session_id: &str,
1084 user_message: String,
1085 ) -> Result<tokio::sync::mpsc::Receiver<StreamResponseChunk>> {
1086 let processing_start = std::time::Instant::now();
1087 info!(
1088 "Processing streaming message for session {}: {}",
1089 session_id,
1090 user_message.chars().take(100).collect::<String>()
1091 );
1092
1093 let (tx, rx) = tokio::sync::mpsc::channel(100);
1094
1095 let session = self
1096 .get_session(session_id)
1097 .await
1098 .ok_or_else(|| anyhow::anyhow!("Session not found: {}", session_id))?;
1099
1100 let rag_engine = self.rag_engine.clone();
1102 let llm_manager = self.llm_manager.clone();
1103 let nl2sparql_engine = self.nl2sparql_engine.clone();
1104 let store = self.store.clone();
1105 let session_id = session_id.to_string();
1106
1107 tokio::spawn(async move {
1109 let _ = tx
1111 .send(StreamResponseChunk::Status {
1112 stage: ProcessingStage::Initializing,
1113 progress: 0.0,
1114 message: Some("Starting message processing".to_string()),
1115 })
1116 .await;
1117
1118 let user_msg = Message {
1120 id: uuid::Uuid::new_v4().to_string(),
1121 role: MessageRole::User,
1122 content: MessageContent::from_text(user_message.clone()),
1123 timestamp: chrono::Utc::now(),
1124 metadata: None,
1125 thread_id: None,
1126 parent_message_id: None,
1127 token_count: Some(user_message.len() / 4),
1128 reactions: Vec::new(),
1129 attachments: Vec::new(),
1130 rich_elements: Vec::new(),
1131 };
1132
1133 let user_msg_id = user_msg.id.clone();
1134
1135 {
1137 let mut session_guard = session.lock().await;
1138 if let Err(e) = session_guard.add_message(user_msg) {
1139 let _ = tx
1140 .send(StreamResponseChunk::Error {
1141 error: StructuredError {
1142 error_type: ErrorType::InternalError,
1143 message: format!("Failed to store user message: {e}"),
1144 error_code: Some("MSG_STORE_FAILED".to_string()),
1145 component: "ChatSession".to_string(),
1146 timestamp: chrono::Utc::now(),
1147 context: std::collections::HashMap::new(),
1148 },
1149 recoverable: false,
1150 })
1151 .await;
1152 return;
1153 }
1154 }
1155
1156 let _ = tx
1158 .send(StreamResponseChunk::Status {
1159 stage: ProcessingStage::RetrievingContext,
1160 progress: 0.1,
1161 message: Some("Retrieving relevant context from knowledge graph".to_string()),
1162 })
1163 .await;
1164
1165 let assembled_context = {
1166 let mut rag_engine = rag_engine.lock().await;
1167 match rag_engine.retrieve(&user_message).await {
1168 Ok(context) => context,
1169 Err(e) => {
1170 let _ = tx
1171 .send(StreamResponseChunk::Error {
1172 error: StructuredError {
1173 error_type: ErrorType::RagRetrievalError,
1174 message: format!("RAG retrieval failed: {e}"),
1175 error_code: Some("RAG_RETRIEVAL_FAILED".to_string()),
1176 component: "RagEngine".to_string(),
1177 timestamp: chrono::Utc::now(),
1178 context: std::collections::HashMap::new(),
1179 },
1180 recoverable: true,
1181 })
1182 .await;
1183 return;
1184 }
1185 }
1186 };
1187
1188 let _ = tx
1189 .send(StreamResponseChunk::Status {
1190 stage: ProcessingStage::QuantumProcessing,
1191 progress: 0.3,
1192 message: Some("Context retrieval complete".to_string()),
1193 })
1194 .await;
1195
1196 if !assembled_context.semantic_results.is_empty() {
1198 let facts: Vec<String> = assembled_context
1199 .semantic_results
1200 .iter()
1201 .take(5)
1202 .map(|result| result.triple.to_string())
1203 .collect();
1204
1205 let entities: Vec<String> = assembled_context
1206 .extracted_entities
1207 .iter()
1208 .take(10)
1209 .map(|entity| entity.text.clone())
1210 .collect();
1211
1212 let _ = tx
1213 .send(StreamResponseChunk::Context {
1214 facts,
1215 sparql_results: None,
1216 entities,
1217 })
1218 .await;
1219 }
1220
1221 let (_sparql_query, _sparql_results) = if user_message.to_lowercase().contains("sparql")
1223 || user_message.to_lowercase().contains("query")
1224 {
1225 let _ = tx
1226 .send(StreamResponseChunk::Status {
1227 stage: ProcessingStage::GeneratingSparql,
1228 progress: 0.5,
1229 message: Some("Generating SPARQL query".to_string()),
1230 })
1231 .await;
1232
1233 let mut nl2sparql = nl2sparql_engine.lock().await;
1234 let query_context = rag::QueryContext::new(session_id.clone()).add_message(
1235 rag::ConversationMessage {
1236 role: rag::MessageRole::User,
1237 content: user_message.clone(),
1238 timestamp: chrono::Utc::now(),
1239 },
1240 );
1241
1242 match nl2sparql.generate_sparql(&query_context).await {
1243 Ok(sparql) => {
1244 let _ = tx
1245 .send(StreamResponseChunk::Context {
1246 facts: vec!["Generated SPARQL query".to_string()],
1247 sparql_results: None,
1248 entities: vec![],
1249 })
1250 .await;
1251
1252 let query_result = store.prepare_query(&sparql.query);
1254 match query_result {
1255 Ok(query) => match query.exec() {
1256 Ok(results) => {
1257 let result_count = results.count();
1258 let _ = tx
1259 .send(StreamResponseChunk::Context {
1260 facts: vec![format!(
1261 "SPARQL query returned {} results",
1262 result_count
1263 )],
1264 sparql_results: None,
1265 entities: vec![],
1266 })
1267 .await;
1268 (Some(sparql), Some(Vec::<String>::new())) }
1270 Err(_) => (Some(sparql), None),
1271 },
1272 Err(_) => (None, None),
1273 }
1274 }
1275 Err(_) => (None, None),
1276 }
1277 } else {
1278 (None, None)
1279 };
1280
1281 let _ = tx
1283 .send(StreamResponseChunk::Status {
1284 stage: ProcessingStage::GeneratingResponse,
1285 progress: 0.7,
1286 message: Some("Generating AI response".to_string()),
1287 })
1288 .await;
1289
1290 let mut prompt = String::new();
1292 prompt.push_str("You are an advanced AI assistant with access to a knowledge graph. ");
1293 prompt.push_str(&format!("User Query: {user_message}\n\n"));
1294
1295 if !assembled_context.semantic_results.is_empty() {
1296 prompt.push_str("Relevant Knowledge Graph Facts:\n");
1297 for (i, result) in assembled_context
1298 .semantic_results
1299 .iter()
1300 .take(3)
1301 .enumerate()
1302 {
1303 prompt.push_str(&format!(
1304 "{}. {} (relevance: {:.2})\n",
1305 i + 1,
1306 result.triple,
1307 result.score
1308 ));
1309 }
1310 }
1311
1312 let response_text = {
1314 let mut llm_manager = llm_manager.lock().await;
1315 let llm_request = llm::LLMRequest {
1316 messages: vec![llm::ChatMessage {
1317 role: llm::ChatRole::User,
1318 content: prompt,
1319 metadata: None,
1320 }],
1321 system_prompt: Some("You are an advanced AI assistant.".to_string()),
1322 temperature: 0.7,
1323 max_tokens: Some(1000),
1324 use_case: llm::UseCase::Conversation,
1325 priority: llm::Priority::Normal,
1326 timeout: None,
1327 };
1328
1329 match llm_manager.generate_response(llm_request).await {
1330 Ok(response) => response.content,
1331 Err(e) => {
1332 let _ = tx
1333 .send(StreamResponseChunk::Error {
1334 error: StructuredError {
1335 error_type: ErrorType::LlmGenerationError,
1336 message: format!("LLM generation failed: {e}"),
1337 error_code: Some("LLM_GENERATION_FAILED".to_string()),
1338 component: "LLMManager".to_string(),
1339 timestamp: chrono::Utc::now(),
1340 context: std::collections::HashMap::new(),
1341 },
1342 recoverable: true,
1343 })
1344 .await;
1345 return;
1346 }
1347 }
1348 };
1349
1350 let words: Vec<&str> = response_text.split_whitespace().collect();
1352 let chunk_size = 3; for (i, chunk) in words.chunks(chunk_size).enumerate() {
1355 let _progress = 0.8 + (0.2 * i as f32 / (words.len() / chunk_size) as f32);
1356 let _ = tx
1357 .send(StreamResponseChunk::Content {
1358 text: chunk.join(" ") + " ",
1359 is_complete: false,
1360 })
1361 .await;
1362
1363 tokio::time::sleep(Duration::from_millis(50)).await;
1365 }
1366
1367 let response = Message {
1369 id: uuid::Uuid::new_v4().to_string(),
1370 role: MessageRole::Assistant,
1371 content: MessageContent::from_text(response_text.clone()),
1372 timestamp: chrono::Utc::now(),
1373 metadata: Some(messages::MessageMetadata {
1374 source: Some("oxirs-chat-streaming".to_string()),
1375 confidence: Some(assembled_context.context_score as f64),
1376 processing_time_ms: Some(processing_start.elapsed().as_millis() as u64),
1377 model_used: Some("oxirs-chat-ai-streaming".to_string()),
1378 temperature: None,
1379 max_tokens: None,
1380 custom_fields: HashMap::new(),
1381 }),
1382 thread_id: None,
1383 parent_message_id: Some(user_msg_id),
1384 token_count: Some(response_text.len() / 4),
1385 reactions: Vec::new(),
1386 attachments: Vec::new(),
1387 rich_elements: Vec::new(),
1388 };
1389
1390 {
1392 let mut session_guard = session.lock().await;
1393 let _ = session_guard.add_message(response.clone());
1394 }
1395
1396 let _ = tx
1398 .send(StreamResponseChunk::Complete {
1399 total_time: processing_start.elapsed(),
1400 token_count: response_text.len() / 4, final_message: Some("Response generation complete".to_string()),
1402 })
1403 .await;
1404 });
1405
1406 Ok(rx)
1407 }
1408}
1409
1410impl OxiRSChat {
1412 pub fn create_default() -> Result<Self> {
1414 let rt = tokio::runtime::Runtime::new()?;
1415 rt.block_on(async {
1416 let store = Arc::new(oxirs_core::ConcreteStore::new()?);
1417 Self::new(ChatConfig::default(), store).await
1418 })
1419 }
1420}
1421
1422pub const VERSION: &str = env!("CARGO_PKG_VERSION");
1424pub const NAME: &str = env!("CARGO_PKG_NAME");
1425
1426pub mod features {
1428 pub const RAG_ENABLED: bool = true;
1429 pub const NL2SPARQL_ENABLED: bool = true;
1430 pub const ANALYTICS_ENABLED: bool = true;
1431 pub const CACHING_ENABLED: bool = true;
1432 pub const RICH_CONTENT_ENABLED: bool = true;
1433}
1434
1435#[cfg(test)]
1436mod tests {
1437 use super::*;
1438
1439 #[tokio::test]
1440 async fn test_chat_creation() {
1441 let store = Arc::new(oxirs_core::ConcreteStore::new().expect("Failed to create store"));
1442 let chat = OxiRSChat::new(ChatConfig::default(), store)
1443 .await
1444 .expect("Failed to create chat");
1445
1446 assert_eq!(chat.session_count().await, 0);
1447 }
1448
1449 #[tokio::test]
1450 async fn test_session_management() {
1451 let store = Arc::new(oxirs_core::ConcreteStore::new().expect("Failed to create store"));
1452 let chat = OxiRSChat::new(ChatConfig::default(), store)
1453 .await
1454 .expect("Failed to create chat");
1455
1456 let session_id = "test-session".to_string();
1457 let _session = chat.create_session(session_id.clone()).await.unwrap();
1458
1459 assert_eq!(chat.session_count().await, 1);
1460 assert!(chat.get_session(&session_id).await.is_some());
1461
1462 let removed = chat.remove_session(&session_id).await;
1463 assert!(removed);
1464 assert_eq!(chat.session_count().await, 0);
1465 }
1466}