1#![allow(dead_code)]
10use anyhow::{Context, Result};
174use std::{collections::HashMap, sync::Arc, time::Duration};
175use tokio::sync::{Mutex, RwLock};
176use tracing::{debug, error, info, warn};
177
178pub mod analytics;
181pub mod cache;
182pub mod chat;
183pub mod chat_session;
185pub mod collaboration; pub mod collaboration_server; pub mod context;
188pub mod custom_prompts; pub mod custom_tools; pub mod dashboard; pub mod dashboard_server; pub mod enterprise_integration;
193pub mod error;
194pub mod explanation;
195pub mod exploration_guidance; pub mod external_services;
197pub mod memory_compression;
198pub mod graph_exploration;
200pub mod health_monitoring;
201pub mod i18n; pub mod knowledge_bases; pub mod llm;
204pub mod message_analytics;
205pub mod messages;
206pub mod nl2sparql;
207pub mod nlp; pub mod performance;
209pub mod persistence;
211pub mod query_refinement; pub mod rag;
213pub mod schema_introspection; pub mod export; pub mod plugins; pub mod rich_content;
218pub mod server;
219pub mod session;
220pub mod session_manager;
221pub mod sparql_optimizer;
222pub mod suggestions; pub mod types;
224pub mod utils; pub mod visualization; pub mod voice; pub mod webhooks; pub mod workflow;
229
230pub mod finetuning;
232pub mod history; pub mod memory_optimization; pub mod providers; pub mod resilience; pub mod revolutionary_chat; pub mod security; pub mod sso; pub mod conversation_summarizer;
242
243pub mod prompt_template;
245
246pub mod intent_classifier;
248
249pub mod response_ranker;
251
252pub mod knowledge_retriever;
254
255pub mod conversation_history;
257
258pub mod tool_registry;
260
261pub mod prompt_builder;
263pub use prompt_builder::{PromptBuilder, PromptError, PromptTemplate};
264
265pub mod conversation_state;
267
268pub mod context_window;
270
271pub mod intent_detector;
273
274pub mod memory_store;
276
277pub mod response_cache;
279
280pub mod dialogue_manager;
282
283pub mod session_store;
285
286pub use chat_session::{ChatSession, SessionStatistics};
288pub use messages::{Message, MessageAttachment, MessageContent, MessageRole, RichContentElement};
289pub use session::*;
290pub use session_manager::{
291 ChatConfig, ContextWindow, SessionData, SessionMetrics, SessionState, TopicTracker,
292};
293pub use types::*;
294pub use types::{SessionStats, ThreadInfo};
295
296pub use rag::{AssembledContext, QueryContext, RAGConfig, RAGSystem};
298
299pub use schema_introspection::{
301 DiscoveredSchema, IntrospectionConfig, RdfClass, RdfProperty, SchemaIntrospector,
302};
303
304pub type ChatManager = llm::manager::EnhancedLLMManager;
306
307pub use llm::{
309 CircuitBreakerConfig, CircuitBreakerState, CircuitBreakerStats, LLMConfig, LLMResponse,
310};
311
312pub use collaboration::{
314 AccessControl, CollaborationConfig, CollaborationManager, CollaborationStats,
315 CollaborationUpdate, CursorPosition, Participant, ParticipantRole, ParticipantStatus,
316 SharedSession, TextRange,
317};
318
319pub use voice::{
321 AudioFormat, SpeechToTextProvider, SttProviderType, SttResult, SttStreamResult,
322 TextToSpeechProvider, TtsProviderType, TtsResult, VoiceConfig, VoiceInterface, WordTimestamp,
323};
324
325pub use dashboard::{
327 ActivityDataPoint, DashboardAnalytics, DashboardConfig, DashboardOverview, ExportFormat,
328 HealthAnalytics, HealthDataPoint, QueryAnalytics, QueryRecord, QueryType, SystemHealthMetrics,
329 TimeRange, UserActivity, UserActivityTracker, UserAnalytics,
330};
331
332pub struct OxiRSChat {
346 pub config: ChatConfig,
348 pub store: Arc<dyn oxirs_core::Store>,
350 sessions: Arc<RwLock<HashMap<String, Arc<Mutex<ChatSession>>>>>,
352 session_timeout: Duration,
354 rag_engine: Arc<Mutex<rag::RagEngine>>,
356 llm_manager: Arc<Mutex<llm::LLMManager>>,
358 nl2sparql_engine: Arc<Mutex<nl2sparql::NL2SPARQLSystem>>,
360}
361
362impl OxiRSChat {
363 pub async fn new(config: ChatConfig, store: Arc<dyn oxirs_core::Store>) -> Result<Self> {
365 Self::new_with_llm_config(config, store, None).await
366 }
367
368 pub async fn new_with_llm_config(
370 config: ChatConfig,
371 store: Arc<dyn oxirs_core::Store>,
372 llm_config: Option<llm::LLMConfig>,
373 ) -> Result<Self> {
374 let rag_config = rag::RagConfig {
376 retrieval: rag::RetrievalConfig {
377 enable_quantum_enhancement: true,
378 enable_consciousness_integration: true,
379 ..Default::default()
380 },
381 quantum: rag::QuantumConfig {
382 enabled: true,
383 ..Default::default()
384 },
385 consciousness: rag::consciousness::ConsciousnessConfig {
386 enabled: true,
387 ..Default::default()
388 },
389 ..Default::default()
390 };
391
392 let mut rag_engine =
393 rag::RagEngine::new(rag_config, store.clone() as Arc<dyn oxirs_core::Store>);
394 rag_engine
395 .initialize()
396 .await
397 .context("Failed to initialize RAG engine")?;
398
399 let llm_config = llm_config.unwrap_or_default();
401 let llm_manager = llm::LLMManager::new(llm_config)?;
402
403 let nl2sparql_config = nl2sparql::NL2SPARQLConfig::default();
405 let nl2sparql_engine =
406 nl2sparql::NL2SPARQLSystem::with_store(nl2sparql_config, None, store.clone())?;
407
408 let nl2sparql_for_schema = Arc::new(Mutex::new(nl2sparql_engine));
411 let nl2sparql_clone = nl2sparql_for_schema.clone();
412
413 tokio::spawn(async move {
415 let mut engine = nl2sparql_clone.lock().await;
416 if let Err(e) = engine.discover_schema().await {
417 warn!("Failed to discover schema for NL2SPARQL: {}", e);
418 } else {
419 info!("Schema discovery completed for NL2SPARQL enhancement");
420 }
421 });
422
423 Ok(Self {
424 config,
425 store,
426 sessions: Arc::new(RwLock::new(HashMap::new())),
427 session_timeout: Duration::from_secs(3600), rag_engine: Arc::new(Mutex::new(rag_engine)),
429 llm_manager: Arc::new(Mutex::new(llm_manager)),
430 nl2sparql_engine: nl2sparql_for_schema,
431 })
432 }
433
434 pub async fn discover_schema(&self) -> Result<()> {
436 let mut nl2sparql = self.nl2sparql_engine.lock().await;
437 nl2sparql.discover_schema().await
438 }
439
440 pub async fn get_discovered_schema(&self) -> Option<DiscoveredSchema> {
442 let nl2sparql = self.nl2sparql_engine.lock().await;
443 nl2sparql.get_schema().cloned()
444 }
445
446 pub async fn create_session(&self, session_id: String) -> Result<Arc<Mutex<ChatSession>>> {
448 let session = Arc::new(Mutex::new(ChatSession::new(
449 session_id.clone(),
450 self.store.clone(),
451 )));
452
453 let mut sessions = self.sessions.write().await;
454 sessions.insert(session_id, session.clone());
455
456 Ok(session)
457 }
458
459 pub async fn get_session(&self, session_id: &str) -> Option<Arc<Mutex<ChatSession>>> {
461 let sessions = self.sessions.read().await;
462 sessions.get(session_id).cloned()
463 }
464
465 pub async fn remove_session(&self, session_id: &str) -> bool {
467 let mut sessions = self.sessions.write().await;
468 sessions.remove(session_id).is_some()
469 }
470
471 pub async fn list_sessions(&self) -> Vec<String> {
473 let sessions = self.sessions.read().await;
474 sessions.keys().cloned().collect()
475 }
476
477 pub async fn cleanup_expired_sessions(&self) -> usize {
479 let mut sessions = self.sessions.write().await;
480 let mut expired_sessions = Vec::new();
481
482 for (session_id, session) in sessions.iter() {
483 if let Ok(session_guard) = session.try_lock() {
484 if session_guard.should_expire(
485 chrono::Duration::from_std(self.session_timeout)
486 .unwrap_or(chrono::Duration::seconds(3600)),
487 ) {
488 expired_sessions.push(session_id.clone());
489 }
490 }
491 }
492
493 for session_id in &expired_sessions {
494 sessions.remove(session_id);
495 }
496
497 expired_sessions.len()
498 }
499
500 pub async fn session_count(&self) -> usize {
502 let sessions = self.sessions.read().await;
503 sessions.len()
504 }
505
506 pub async fn save_sessions<P: AsRef<std::path::Path>>(
508 &self,
509 persistence_path: P,
510 ) -> Result<usize> {
511 use std::fs;
512
513 let sessions = self.sessions.read().await;
514 let mut saved_count = 0;
515
516 let persistence_dir = persistence_path.as_ref();
518 if !persistence_dir.exists() {
519 fs::create_dir_all(persistence_dir)
520 .context("Failed to create persistence directory")?;
521 }
522
523 info!(
524 "Saving {} active sessions to {:?}",
525 sessions.len(),
526 persistence_dir
527 );
528
529 for (session_id, session_arc) in sessions.iter() {
530 match session_arc.try_lock() {
531 Ok(session) => {
532 let session_data = session.to_data();
533 let session_file = persistence_dir.join(format!("{session_id}.json"));
534
535 match serde_json::to_string_pretty(&session_data) {
536 Ok(json_data) => {
537 if let Err(e) = fs::write(&session_file, json_data) {
538 error!("Failed to save session {}: {}", session_id, e);
539 } else {
540 debug!("Saved session {} to {:?}", session_id, session_file);
541 saved_count += 1;
542 }
543 }
544 Err(e) => {
545 error!("Failed to serialize session {}: {}", session_id, e);
546 }
547 }
548 }
549 Err(_) => {
550 warn!("Session {} is locked, skipping save", session_id);
551 }
552 }
553 }
554
555 info!(
556 "Successfully saved {} out of {} sessions",
557 saved_count,
558 sessions.len()
559 );
560 Ok(saved_count)
561 }
562
563 pub async fn load_sessions<P: AsRef<std::path::Path>>(
565 &self,
566 persistence_path: P,
567 ) -> Result<usize> {
568 use crate::chat_session::ChatSession;
569 use crate::session_manager::SessionData;
570 use std::fs;
571
572 let persistence_dir = persistence_path.as_ref();
573 if !persistence_dir.exists() {
574 info!(
575 "Persistence directory {:?} does not exist, no sessions to load",
576 persistence_dir
577 );
578 return Ok(0);
579 }
580
581 let mut loaded_count = 0;
582 let mut sessions = self.sessions.write().await;
583
584 info!("Loading sessions from {:?}", persistence_dir);
585
586 for entry in fs::read_dir(persistence_dir)? {
587 let entry = entry?;
588 let path = entry.path();
589
590 if path.extension().and_then(|s| s.to_str()) == Some("json") {
591 let session_id = path
592 .file_stem()
593 .and_then(|s| s.to_str())
594 .unwrap_or("unknown");
595
596 match fs::read_to_string(&path) {
597 Ok(json_data) => match serde_json::from_str::<SessionData>(&json_data) {
598 Ok(session_data) => {
599 let session = ChatSession::from_data(session_data, self.store.clone());
600 sessions.insert(session_id.to_string(), Arc::new(Mutex::new(session)));
601 loaded_count += 1;
602 debug!("Loaded session {} from {:?}", session_id, path);
603 }
604 Err(e) => {
605 error!("Failed to deserialize session from {:?}: {}", path, e);
606 }
607 },
608 Err(e) => {
609 error!("Failed to read session file {:?}: {}", path, e);
610 }
611 }
612 }
613 }
614
615 info!("Successfully loaded {} sessions", loaded_count);
616 Ok(loaded_count)
617 }
618
619 pub async fn process_message(&self, session_id: &str, user_message: String) -> Result<Message> {
621 let processing_start = std::time::Instant::now();
622 info!(
623 "Processing message for session {}: {}",
624 session_id,
625 user_message.chars().take(100).collect::<String>()
626 );
627
628 let session = self
629 .get_session(session_id)
630 .await
631 .ok_or_else(|| anyhow::anyhow!("Session not found: {}", session_id))?;
632
633 let mut session = session.lock().await;
634
635 let user_msg = Message {
637 id: uuid::Uuid::new_v4().to_string(),
638 role: MessageRole::User,
639 content: MessageContent::from_text(user_message.clone()),
640 timestamp: chrono::Utc::now(),
641 metadata: None,
642 thread_id: None,
643 parent_message_id: None,
644 token_count: Some(user_message.len() / 4), reactions: Vec::new(),
646 attachments: Vec::new(),
647 rich_elements: Vec::new(),
648 };
649
650 let user_msg_id = user_msg.id.clone();
652
653 session.add_message(user_msg)?;
655
656 debug!("Starting advanced RAG retrieval with quantum and consciousness capabilities");
660 let assembled_context = {
661 let mut rag_engine = self.rag_engine.lock().await;
662 rag_engine
663 .retrieve(&user_message)
664 .await
665 .context("Failed to perform advanced RAG retrieval")?
666 };
667
668 let (sparql_query, sparql_results) = if self.is_sparql_query(&user_message) {
670 debug!("Detected SPARQL query, performing NL2SPARQL translation");
671 let mut nl2sparql = self.nl2sparql_engine.lock().await;
672 let query_context = rag::QueryContext::new(session_id.to_string()).add_message(
673 rag::ConversationMessage {
674 role: rag::MessageRole::User,
675 content: user_message.clone(),
676 timestamp: chrono::Utc::now(),
677 },
678 );
679 match nl2sparql.generate_sparql(&query_context).await {
680 Ok(sparql) => {
681 debug!("Generated SPARQL: {}", sparql.query);
682 match self.execute_sparql(&sparql.query).await {
684 Ok(results) => (Some(sparql), Some(results)),
685 Err(e) => {
686 warn!("SPARQL execution failed: {}", e);
687 (Some(sparql), None)
688 }
689 }
690 }
691 Err(e) => {
692 warn!("NL2SPARQL translation failed: {}", e);
693 (None, None)
694 }
695 }
696 } else {
697 (None, None)
698 };
699
700 debug!("Generating response using LLM with assembled context");
702 let response_text = {
703 let mut llm_manager = self.llm_manager.lock().await;
704 self.generate_enhanced_response(
705 &mut llm_manager,
706 &user_message,
707 &assembled_context,
708 sparql_query.as_ref(),
709 sparql_results.as_ref(),
710 )
711 .await
712 .context("Failed to generate enhanced response")?
713 };
714
715 let mut rich_elements = Vec::new();
717
718 if let Some(ref quantum_results) = assembled_context.quantum_results {
720 if !quantum_results.is_empty() {
721 rich_elements.push(RichContentElement::QuantumVisualization {
722 results: quantum_results.clone(),
723 entanglement_map: HashMap::new(),
724 });
725 }
726 }
727
728 if let Some(ref consciousness_insights) = assembled_context.consciousness_insights {
730 if !consciousness_insights.is_empty() {
731 rich_elements.push(RichContentElement::ConsciousnessInsights {
732 insights: consciousness_insights.clone(),
733 awareness_level: 0.8, });
735 }
736 }
737
738 if let Some(ref reasoning_results) = assembled_context.reasoning_results {
740 rich_elements.push(RichContentElement::ReasoningChain {
741 reasoning_steps: reasoning_results.primary_chain.steps.clone(),
742 confidence_score: reasoning_results.reasoning_quality.overall_quality,
743 });
744 }
745
746 if let Some(ref results) = sparql_results {
748 rich_elements.push(RichContentElement::SPARQLResults {
749 query: sparql_query.map(|s| s.query).unwrap_or_default(),
750 results: results.clone(),
751 execution_time: processing_start.elapsed(),
752 });
753 }
754
755 let response_text_len = response_text.len();
757 let response = Message {
758 id: uuid::Uuid::new_v4().to_string(),
759 role: MessageRole::Assistant,
760 content: MessageContent::from_text(response_text),
761 timestamp: chrono::Utc::now(),
762 metadata: Some(messages::MessageMetadata {
763 source: Some("oxirs-chat".to_string()),
764 confidence: Some(assembled_context.context_score as f64),
765 processing_time_ms: Some(processing_start.elapsed().as_millis() as u64),
766 model_used: Some("oxirs-chat-ai".to_string()),
767 temperature: None,
768 max_tokens: None,
769 custom_fields: self
770 .create_response_metadata(&assembled_context, processing_start.elapsed())
771 .into_iter()
772 .map(|(k, v)| (k, serde_json::Value::String(v)))
773 .collect(),
774 }),
775 thread_id: None,
776 parent_message_id: Some(user_msg_id),
777 token_count: Some(response_text_len / 4), reactions: Vec::new(),
779 attachments: Vec::new(),
780 rich_elements,
781 };
782
783 session.add_message(response.clone())?;
785
786 info!(
787 "Advanced AI processing completed in {:?} with context score: {:.3}",
788 processing_start.elapsed(),
789 assembled_context.context_score
790 );
791
792 Ok(response)
793 }
794
795 fn is_sparql_query(&self, message: &str) -> bool {
797 let sparql_keywords = [
798 "select",
799 "construct",
800 "ask",
801 "describe",
802 "insert",
803 "delete",
804 "where",
805 "prefix",
806 "base",
807 "distinct",
808 "reduced",
809 "from",
810 "named",
811 "graph",
812 "optional",
813 "union",
814 "minus",
815 "bind",
816 "values",
817 "filter",
818 "order by",
819 "group by",
820 "having",
821 "limit",
822 "offset",
823 ];
824
825 let lowercase_message = message.to_lowercase();
826 sparql_keywords
827 .iter()
828 .any(|&keyword| lowercase_message.contains(keyword))
829 || lowercase_message.contains("sparql")
830 || lowercase_message.contains("query")
831 || lowercase_message.contains("find all")
832 || lowercase_message.contains("show me")
833 || lowercase_message.contains("list")
834 }
835
836 async fn execute_sparql(&self, sparql: &str) -> Result<Vec<HashMap<String, String>>> {
838 debug!("Executing SPARQL query: {}", sparql);
839
840 let query = self
842 .store
843 .prepare_query(sparql)
844 .context("Failed to prepare SPARQL query")?;
845
846 let results = query.exec().context("Failed to execute SPARQL query")?;
848
849 let mut result_maps = Vec::new();
850
851 for solution in results {
854 let mut result_map = HashMap::new();
855 for (var, term) in solution.iter() {
856 result_map.insert(var.to_string(), term.to_string());
857 }
858 result_maps.push(result_map);
859 }
860
861 debug!("SPARQL query returned {} results", result_maps.len());
862 Ok(result_maps)
863 }
864
865 async fn generate_enhanced_response(
867 &self,
868 llm_manager: &mut llm::LLMManager,
869 user_message: &str,
870 assembled_context: &rag::AssembledContext,
871 sparql_query: Option<&nl2sparql::SPARQLGenerationResult>,
872 sparql_results: Option<&Vec<HashMap<String, String>>>,
873 ) -> Result<String> {
874 let mut prompt = String::new();
876
877 prompt.push_str("You are an advanced AI assistant with access to a knowledge graph. ");
879 prompt.push_str("You have quantum-enhanced retrieval, consciousness-aware processing, ");
880 prompt.push_str("and advanced reasoning capabilities. ");
881 prompt.push_str("Provide helpful, accurate, and insightful responses based on the available context.\n\n");
882
883 prompt.push_str(&format!("User Query: {user_message}\n\n"));
885
886 if !assembled_context.semantic_results.is_empty() {
888 prompt.push_str("Relevant Knowledge Graph Facts:\n");
889 for (i, result) in assembled_context
890 .semantic_results
891 .iter()
892 .take(5)
893 .enumerate()
894 {
895 prompt.push_str(&format!(
896 "{}. {} (relevance: {:.2})\n",
897 i + 1,
898 result.triple,
899 result.score
900 ));
901 }
902 prompt.push('\n');
903 }
904
905 if !assembled_context.extracted_entities.is_empty() {
907 prompt.push_str("Extracted Entities:\n");
908 for entity in assembled_context.extracted_entities.iter().take(10) {
909 prompt.push_str(&format!(
910 "- {} (type: {:?}, confidence: {:.2})\n",
911 entity.text, entity.entity_type, entity.confidence
912 ));
913 }
914 prompt.push('\n');
915 }
916
917 if let Some(ref reasoning_results) = assembled_context.reasoning_results {
919 prompt.push_str("Advanced Reasoning Analysis:\n");
920 for step in reasoning_results.primary_chain.steps.iter().take(3) {
921 prompt.push_str(&format!(
922 "- {:?}: {:?} (confidence: {:.2})\n",
923 step.reasoning_type, step.conclusion_triple, step.confidence
924 ));
925 }
926 prompt.push('\n');
927 }
928
929 if let Some(ref consciousness_insights) = assembled_context.consciousness_insights {
931 if !consciousness_insights.is_empty() {
932 prompt.push_str("Consciousness-Aware Insights:\n");
933 for insight in consciousness_insights.iter().take(3) {
934 prompt.push_str(&format!(
935 "- {} (confidence: {:.2})\n",
936 insight.content, insight.confidence
937 ));
938 }
939 prompt.push('\n');
940 }
941 }
942
943 if let Some(sparql) = sparql_query {
945 prompt.push_str(&format!("Generated SPARQL Query:\n{}\n\n", sparql.query));
946
947 if let Some(results) = sparql_results {
948 prompt.push_str("SPARQL Query Results:\n");
949 for (i, result) in results.iter().take(10).enumerate() {
950 prompt.push_str(&format!("{}. ", i + 1));
951 for (key, value) in result {
952 prompt.push_str(&format!("{key}: {value} "));
953 }
954 prompt.push('\n');
955 }
956 prompt.push('\n');
957 }
958 }
959
960 if let Some(ref quantum_results) = assembled_context.quantum_results {
962 if !quantum_results.is_empty() {
963 prompt.push_str("Quantum-Enhanced Retrieval Information:\n");
964 prompt.push_str(&format!(
965 "Found {} quantum-optimized results with enhanced relevance scoring.\n\n",
966 quantum_results.len()
967 ));
968 }
969 }
970
971 prompt.push_str(
972 "Please provide a comprehensive, helpful response based on this information. ",
973 );
974 prompt.push_str(
975 "If SPARQL results are available, incorporate them naturally into your answer. ",
976 );
977 prompt.push_str("Highlight any interesting patterns or insights you discover.");
978
979 debug!(
981 "Generating LLM response with context length: {} chars",
982 prompt.len()
983 );
984 let llm_request = llm::LLMRequest {
985 messages: vec![llm::ChatMessage {
986 role: llm::ChatRole::User,
987 content: prompt.clone(),
988 metadata: None,
989 }],
990 system_prompt: Some(
991 "You are an advanced AI assistant with access to a knowledge graph.".to_string(),
992 ),
993 temperature: 0.7,
994 max_tokens: Some(1000),
995 use_case: llm::UseCase::Conversation,
996 priority: llm::Priority::Normal,
997 timeout: None,
998 };
999
1000 let response = llm_manager
1001 .generate_response(llm_request)
1002 .await
1003 .context("Failed to generate LLM response")?;
1004
1005 Ok(response.content)
1006 }
1007
1008 fn create_response_metadata(
1010 &self,
1011 assembled_context: &rag::AssembledContext,
1012 processing_time: Duration,
1013 ) -> HashMap<String, String> {
1014 let mut metadata = HashMap::new();
1015
1016 metadata.insert(
1017 "context_score".to_string(),
1018 assembled_context.context_score.to_string(),
1019 );
1020 metadata.insert(
1021 "processing_time_ms".to_string(),
1022 processing_time.as_millis().to_string(),
1023 );
1024 metadata.insert(
1025 "semantic_results_count".to_string(),
1026 assembled_context.semantic_results.len().to_string(),
1027 );
1028 metadata.insert(
1029 "graph_results_count".to_string(),
1030 assembled_context.graph_results.len().to_string(),
1031 );
1032 metadata.insert(
1033 "extracted_entities_count".to_string(),
1034 assembled_context.extracted_entities.len().to_string(),
1035 );
1036 metadata.insert(
1037 "assembly_time_ms".to_string(),
1038 assembled_context.assembly_time.as_millis().to_string(),
1039 );
1040
1041 if let Some(ref quantum_results) = assembled_context.quantum_results {
1043 metadata.insert(
1044 "quantum_results_count".to_string(),
1045 quantum_results.len().to_string(),
1046 );
1047 metadata.insert("quantum_enhanced".to_string(), "true".to_string());
1048 }
1049
1050 if let Some(ref consciousness_insights) = assembled_context.consciousness_insights {
1052 metadata.insert(
1053 "consciousness_insights_count".to_string(),
1054 consciousness_insights.len().to_string(),
1055 );
1056 metadata.insert("consciousness_enhanced".to_string(), "true".to_string());
1057 }
1058
1059 if let Some(ref reasoning_results) = assembled_context.reasoning_results {
1061 metadata.insert(
1062 "reasoning_quality".to_string(),
1063 reasoning_results
1064 .reasoning_quality
1065 .overall_quality
1066 .to_string(),
1067 );
1068 metadata.insert("reasoning_enhanced".to_string(), "true".to_string());
1069 }
1070
1071 if let Some(ref extracted_knowledge) = assembled_context.extracted_knowledge {
1073 metadata.insert(
1074 "extracted_knowledge_score".to_string(),
1075 extracted_knowledge.confidence_score.to_string(),
1076 );
1077 metadata.insert(
1078 "knowledge_extraction_enhanced".to_string(),
1079 "true".to_string(),
1080 );
1081 }
1082
1083 metadata.insert("oxirs_chat_version".to_string(), VERSION.to_string());
1084 metadata.insert("advanced_ai_enabled".to_string(), "true".to_string());
1085
1086 metadata
1087 }
1088
1089 pub async fn get_session_statistics(&self, session_id: &str) -> Result<SessionStatistics> {
1091 let session = self
1092 .get_session(session_id)
1093 .await
1094 .ok_or_else(|| anyhow::anyhow!("Session not found: {}", session_id))?;
1095
1096 let session = session.lock().await;
1097 Ok(session.get_statistics())
1098 }
1099
1100 pub async fn export_session(&self, session_id: &str) -> Result<SessionData> {
1102 let session = self
1103 .get_session(session_id)
1104 .await
1105 .ok_or_else(|| anyhow::anyhow!("Session not found: {}", session_id))?;
1106
1107 let session = session.lock().await;
1108 Ok(session.export_data())
1109 }
1110
1111 pub async fn import_session(&self, session_data: SessionData) -> Result<()> {
1113 let session = Arc::new(Mutex::new(ChatSession::from_data(
1114 session_data.clone(),
1115 self.store.clone(),
1116 )));
1117
1118 let mut sessions = self.sessions.write().await;
1119 sessions.insert(session_data.id, session);
1120
1121 Ok(())
1122 }
1123
1124 pub async fn get_circuit_breaker_stats(
1126 &self,
1127 ) -> Result<HashMap<String, llm::CircuitBreakerStats>> {
1128 let llm_manager = self.llm_manager.lock().await;
1129 Ok(llm_manager.get_circuit_breaker_stats().await)
1130 }
1131
1132 pub async fn reset_circuit_breaker(&self, provider_name: &str) -> Result<()> {
1134 let llm_manager = self.llm_manager.lock().await;
1135 llm_manager.reset_circuit_breaker(provider_name).await
1136 }
1137
1138 pub async fn process_message_stream(
1140 &self,
1141 session_id: &str,
1142 user_message: String,
1143 ) -> Result<tokio::sync::mpsc::Receiver<StreamResponseChunk>> {
1144 let processing_start = std::time::Instant::now();
1145 info!(
1146 "Processing streaming message for session {}: {}",
1147 session_id,
1148 user_message.chars().take(100).collect::<String>()
1149 );
1150
1151 let (tx, rx) = tokio::sync::mpsc::channel(100);
1152
1153 let session = self
1154 .get_session(session_id)
1155 .await
1156 .ok_or_else(|| anyhow::anyhow!("Session not found: {}", session_id))?;
1157
1158 let rag_engine = self.rag_engine.clone();
1160 let llm_manager = self.llm_manager.clone();
1161 let nl2sparql_engine = self.nl2sparql_engine.clone();
1162 let store = self.store.clone();
1163 let session_id = session_id.to_string();
1164
1165 tokio::spawn(async move {
1167 let _ = tx
1169 .send(StreamResponseChunk::Status {
1170 stage: ProcessingStage::Initializing,
1171 progress: 0.0,
1172 message: Some("Starting message processing".to_string()),
1173 })
1174 .await;
1175
1176 let user_msg = Message {
1178 id: uuid::Uuid::new_v4().to_string(),
1179 role: MessageRole::User,
1180 content: MessageContent::from_text(user_message.clone()),
1181 timestamp: chrono::Utc::now(),
1182 metadata: None,
1183 thread_id: None,
1184 parent_message_id: None,
1185 token_count: Some(user_message.len() / 4),
1186 reactions: Vec::new(),
1187 attachments: Vec::new(),
1188 rich_elements: Vec::new(),
1189 };
1190
1191 let user_msg_id = user_msg.id.clone();
1192
1193 {
1195 let mut session_guard = session.lock().await;
1196 if let Err(e) = session_guard.add_message(user_msg) {
1197 let _ = tx
1198 .send(StreamResponseChunk::Error {
1199 error: StructuredError {
1200 error_type: ErrorType::InternalError,
1201 message: format!("Failed to store user message: {e}"),
1202 error_code: Some("MSG_STORE_FAILED".to_string()),
1203 component: "ChatSession".to_string(),
1204 timestamp: chrono::Utc::now(),
1205 context: std::collections::HashMap::new(),
1206 },
1207 recoverable: false,
1208 })
1209 .await;
1210 return;
1211 }
1212 }
1213
1214 let _ = tx
1216 .send(StreamResponseChunk::Status {
1217 stage: ProcessingStage::RetrievingContext,
1218 progress: 0.1,
1219 message: Some("Retrieving relevant context from knowledge graph".to_string()),
1220 })
1221 .await;
1222
1223 let assembled_context = {
1224 let mut rag_engine = rag_engine.lock().await;
1225 match rag_engine.retrieve(&user_message).await {
1226 Ok(context) => context,
1227 Err(e) => {
1228 let _ = tx
1229 .send(StreamResponseChunk::Error {
1230 error: StructuredError {
1231 error_type: ErrorType::RagRetrievalError,
1232 message: format!("RAG retrieval failed: {e}"),
1233 error_code: Some("RAG_RETRIEVAL_FAILED".to_string()),
1234 component: "RagEngine".to_string(),
1235 timestamp: chrono::Utc::now(),
1236 context: std::collections::HashMap::new(),
1237 },
1238 recoverable: true,
1239 })
1240 .await;
1241 return;
1242 }
1243 }
1244 };
1245
1246 let _ = tx
1247 .send(StreamResponseChunk::Status {
1248 stage: ProcessingStage::QuantumProcessing,
1249 progress: 0.3,
1250 message: Some("Context retrieval complete".to_string()),
1251 })
1252 .await;
1253
1254 if !assembled_context.semantic_results.is_empty() {
1256 let facts: Vec<String> = assembled_context
1257 .semantic_results
1258 .iter()
1259 .take(5)
1260 .map(|result| result.triple.to_string())
1261 .collect();
1262
1263 let entities: Vec<String> = assembled_context
1264 .extracted_entities
1265 .iter()
1266 .take(10)
1267 .map(|entity| entity.text.clone())
1268 .collect();
1269
1270 let _ = tx
1271 .send(StreamResponseChunk::Context {
1272 facts,
1273 sparql_results: None,
1274 entities,
1275 })
1276 .await;
1277 }
1278
1279 let (_sparql_query, _sparql_results) = if user_message.to_lowercase().contains("sparql")
1281 || user_message.to_lowercase().contains("query")
1282 {
1283 let _ = tx
1284 .send(StreamResponseChunk::Status {
1285 stage: ProcessingStage::GeneratingSparql,
1286 progress: 0.5,
1287 message: Some("Generating SPARQL query".to_string()),
1288 })
1289 .await;
1290
1291 let mut nl2sparql = nl2sparql_engine.lock().await;
1292 let query_context = rag::QueryContext::new(session_id.clone()).add_message(
1293 rag::ConversationMessage {
1294 role: rag::MessageRole::User,
1295 content: user_message.clone(),
1296 timestamp: chrono::Utc::now(),
1297 },
1298 );
1299
1300 match nl2sparql.generate_sparql(&query_context).await {
1301 Ok(sparql) => {
1302 let _ = tx
1303 .send(StreamResponseChunk::Context {
1304 facts: vec!["Generated SPARQL query".to_string()],
1305 sparql_results: None,
1306 entities: vec![],
1307 })
1308 .await;
1309
1310 let query_result = store.prepare_query(&sparql.query);
1312 match query_result {
1313 Ok(query) => match query.exec() {
1314 Ok(results) => {
1315 let result_count = results.count();
1316 let _ = tx
1317 .send(StreamResponseChunk::Context {
1318 facts: vec![format!(
1319 "SPARQL query returned {} results",
1320 result_count
1321 )],
1322 sparql_results: None,
1323 entities: vec![],
1324 })
1325 .await;
1326 (Some(sparql), Some(Vec::<String>::new())) }
1328 Err(_) => (Some(sparql), None),
1329 },
1330 Err(_) => (None, None),
1331 }
1332 }
1333 Err(_) => (None, None),
1334 }
1335 } else {
1336 (None, None)
1337 };
1338
1339 let _ = tx
1341 .send(StreamResponseChunk::Status {
1342 stage: ProcessingStage::GeneratingResponse,
1343 progress: 0.7,
1344 message: Some("Generating AI response".to_string()),
1345 })
1346 .await;
1347
1348 let mut prompt = String::new();
1350 prompt.push_str("You are an advanced AI assistant with access to a knowledge graph. ");
1351 prompt.push_str(&format!("User Query: {user_message}\n\n"));
1352
1353 if !assembled_context.semantic_results.is_empty() {
1354 prompt.push_str("Relevant Knowledge Graph Facts:\n");
1355 for (i, result) in assembled_context
1356 .semantic_results
1357 .iter()
1358 .take(3)
1359 .enumerate()
1360 {
1361 prompt.push_str(&format!(
1362 "{}. {} (relevance: {:.2})\n",
1363 i + 1,
1364 result.triple,
1365 result.score
1366 ));
1367 }
1368 }
1369
1370 let response_text = {
1372 let mut llm_manager = llm_manager.lock().await;
1373 let llm_request = llm::LLMRequest {
1374 messages: vec![llm::ChatMessage {
1375 role: llm::ChatRole::User,
1376 content: prompt,
1377 metadata: None,
1378 }],
1379 system_prompt: Some("You are an advanced AI assistant.".to_string()),
1380 temperature: 0.7,
1381 max_tokens: Some(1000),
1382 use_case: llm::UseCase::Conversation,
1383 priority: llm::Priority::Normal,
1384 timeout: None,
1385 };
1386
1387 match llm_manager.generate_response(llm_request).await {
1388 Ok(response) => response.content,
1389 Err(e) => {
1390 let _ = tx
1391 .send(StreamResponseChunk::Error {
1392 error: StructuredError {
1393 error_type: ErrorType::LlmGenerationError,
1394 message: format!("LLM generation failed: {e}"),
1395 error_code: Some("LLM_GENERATION_FAILED".to_string()),
1396 component: "LLMManager".to_string(),
1397 timestamp: chrono::Utc::now(),
1398 context: std::collections::HashMap::new(),
1399 },
1400 recoverable: true,
1401 })
1402 .await;
1403 return;
1404 }
1405 }
1406 };
1407
1408 let words: Vec<&str> = response_text.split_whitespace().collect();
1410 let chunk_size = 3; for (i, chunk) in words.chunks(chunk_size).enumerate() {
1413 let _progress = 0.8 + (0.2 * i as f32 / (words.len() / chunk_size) as f32);
1414 let _ = tx
1415 .send(StreamResponseChunk::Content {
1416 text: chunk.join(" ") + " ",
1417 is_complete: false,
1418 })
1419 .await;
1420
1421 tokio::time::sleep(Duration::from_millis(50)).await;
1423 }
1424
1425 let response = Message {
1427 id: uuid::Uuid::new_v4().to_string(),
1428 role: MessageRole::Assistant,
1429 content: MessageContent::from_text(response_text.clone()),
1430 timestamp: chrono::Utc::now(),
1431 metadata: Some(messages::MessageMetadata {
1432 source: Some("oxirs-chat-streaming".to_string()),
1433 confidence: Some(assembled_context.context_score as f64),
1434 processing_time_ms: Some(processing_start.elapsed().as_millis() as u64),
1435 model_used: Some("oxirs-chat-ai-streaming".to_string()),
1436 temperature: None,
1437 max_tokens: None,
1438 custom_fields: HashMap::new(),
1439 }),
1440 thread_id: None,
1441 parent_message_id: Some(user_msg_id),
1442 token_count: Some(response_text.len() / 4),
1443 reactions: Vec::new(),
1444 attachments: Vec::new(),
1445 rich_elements: Vec::new(),
1446 };
1447
1448 {
1450 let mut session_guard = session.lock().await;
1451 let _ = session_guard.add_message(response.clone());
1452 }
1453
1454 let _ = tx
1456 .send(StreamResponseChunk::Complete {
1457 total_time: processing_start.elapsed(),
1458 token_count: response_text.len() / 4, final_message: Some("Response generation complete".to_string()),
1460 })
1461 .await;
1462 });
1463
1464 Ok(rx)
1465 }
1466}
1467
1468impl OxiRSChat {
1470 pub fn create_default() -> Result<Self> {
1472 let rt = tokio::runtime::Runtime::new()?;
1473 rt.block_on(async {
1474 let store = Arc::new(oxirs_core::ConcreteStore::new()?);
1475 Self::new(ChatConfig::default(), store).await
1476 })
1477 }
1478}
1479
1480pub const VERSION: &str = env!("CARGO_PKG_VERSION");
1482pub const NAME: &str = env!("CARGO_PKG_NAME");
1483
1484pub mod features {
1486 pub const RAG_ENABLED: bool = true;
1487 pub const NL2SPARQL_ENABLED: bool = true;
1488 pub const ANALYTICS_ENABLED: bool = true;
1489 pub const CACHING_ENABLED: bool = true;
1490 pub const RICH_CONTENT_ENABLED: bool = true;
1491}
1492
1493#[cfg(test)]
1494mod tests {
1495 use super::*;
1496
1497 #[tokio::test]
1498 async fn test_chat_creation() {
1499 let store = Arc::new(oxirs_core::ConcreteStore::new().expect("Failed to create store"));
1500 let chat = OxiRSChat::new(ChatConfig::default(), store)
1501 .await
1502 .expect("Failed to create chat");
1503
1504 assert_eq!(chat.session_count().await, 0);
1505 }
1506
1507 #[tokio::test]
1508 async fn test_session_management() {
1509 let store = Arc::new(oxirs_core::ConcreteStore::new().expect("Failed to create store"));
1510 let chat = OxiRSChat::new(ChatConfig::default(), store)
1511 .await
1512 .expect("Failed to create chat");
1513
1514 let session_id = "test-session".to_string();
1515 let _session = chat
1516 .create_session(session_id.clone())
1517 .await
1518 .expect("should succeed");
1519
1520 assert_eq!(chat.session_count().await, 1);
1521 assert!(chat.get_session(&session_id).await.is_some());
1522
1523 let removed = chat.remove_session(&session_id).await;
1524 assert!(removed);
1525 assert_eq!(chat.session_count().await, 0);
1526 }
1527}