1use super::client::{ChatSession, LLMClient};
26use super::provider::{ChatStream, LLMProvider};
27use super::tool_executor::ToolExecutor;
28use super::types::{ChatMessage, LLMError, LLMResult, Tool};
29use crate::prompt;
30use futures::{Stream, StreamExt};
31use mofa_kernel::agent::AgentMetadata;
32use mofa_kernel::agent::AgentState;
33use mofa_kernel::plugin::{AgentPlugin, PluginType};
34use mofa_plugins::tts::TTSPlugin;
35use std::collections::HashMap;
36use std::io::Write;
37use std::pin::Pin;
38use std::sync::Arc;
39use std::sync::atomic::{AtomicBool, Ordering};
40use std::time::Duration;
41use tokio::sync::{Mutex, RwLock};
42use crate::llm::{AnthropicConfig, AnthropicProvider, GeminiConfig, GeminiProvider};
43
44pub type TtsAudioStream = Pin<Box<dyn Stream<Item = (Vec<f32>, Duration)> + Send>>;
46
47struct CancellationToken {
49 cancel: Arc<AtomicBool>,
50}
51
52impl CancellationToken {
53 fn new() -> Self {
54 Self {
55 cancel: Arc::new(AtomicBool::new(false)),
56 }
57 }
58
59 fn is_cancelled(&self) -> bool {
60 self.cancel.load(Ordering::Relaxed)
61 }
62
63 fn cancel(&self) {
64 self.cancel.store(true, Ordering::Relaxed);
65 }
66
67 fn clone_token(&self) -> CancellationToken {
68 CancellationToken {
69 cancel: Arc::clone(&self.cancel),
70 }
71 }
72}
73
74pub type TextStream = Pin<Box<dyn Stream<Item = LLMResult<String>> + Send>>;
78
79#[cfg(feature = "kokoro")]
83struct TTSStreamHandle {
84 sink: mofa_plugins::tts::kokoro_wrapper::SynthSink<String>,
85 _stream_handle: tokio::task::JoinHandle<()>,
86}
87
88struct TTSSession {
90 cancellation_token: CancellationToken,
91 is_active: Arc<AtomicBool>,
92}
93
94impl TTSSession {
95 fn new(token: CancellationToken) -> Self {
96 let is_active = Arc::new(AtomicBool::new(true));
97 TTSSession {
98 cancellation_token: token,
99 is_active,
100 }
101 }
102
103 fn cancel(&self) {
104 self.cancellation_token.cancel();
105 self.is_active.store(false, Ordering::Relaxed);
106 }
107
108 fn is_active(&self) -> bool {
109 self.is_active.load(Ordering::Relaxed)
110 }
111}
112
113struct SentenceBuffer {
115 buffer: String,
116}
117
118impl SentenceBuffer {
119 fn new() -> Self {
120 Self {
121 buffer: String::new(),
122 }
123 }
124
125 fn push(&mut self, text: &str) -> Option<String> {
127 for ch in text.chars() {
128 self.buffer.push(ch);
129 if matches!(ch, '。' | '!' | '?' | '!' | '?') {
131 let sentence = self.buffer.trim().to_string();
132 if !sentence.is_empty() {
133 self.buffer.clear();
134 return Some(sentence);
135 }
136 }
137 }
138 None
139 }
140
141 fn flush(&mut self) -> Option<String> {
143 if self.buffer.trim().is_empty() {
144 None
145 } else {
146 let remaining = self.buffer.trim().to_string();
147 self.buffer.clear();
148 Some(remaining)
149 }
150 }
151}
152
153#[derive(Debug, Clone)]
155pub enum StreamEvent {
156 Text(String),
158 ToolCallStart { id: String, name: String },
160 ToolCallDelta { id: String, arguments_delta: String },
162 Done(Option<String>),
164}
165
166#[derive(Clone)]
168pub struct LLMAgentConfig {
169 pub agent_id: String,
171 pub name: String,
173 pub system_prompt: Option<String>,
175 pub temperature: Option<f32>,
177 pub max_tokens: Option<u32>,
179 pub custom_config: HashMap<String, String>,
181 pub user_id: Option<String>,
183 pub tenant_id: Option<String>,
185 pub context_window_size: Option<usize>,
190}
191
192impl Default for LLMAgentConfig {
193 fn default() -> Self {
194 Self {
195 agent_id: "llm-agent".to_string(),
196 name: "LLM Agent".to_string(),
197 system_prompt: None,
198 temperature: Some(0.7),
199 max_tokens: Some(4096),
200 custom_config: HashMap::new(),
201 user_id: None,
202 tenant_id: None,
203 context_window_size: None,
204 }
205 }
206}
207
208pub struct LLMAgent {
256 config: LLMAgentConfig,
257 metadata: AgentMetadata,
259 client: LLMClient,
260 sessions: Arc<RwLock<HashMap<String, Arc<RwLock<ChatSession>>>>>,
262 active_session_id: Arc<RwLock<String>>,
264 tools: Vec<Tool>,
265 tool_executor: Option<Arc<dyn ToolExecutor>>,
266 event_handler: Option<Box<dyn LLMAgentEventHandler>>,
267 plugins: Vec<Box<dyn AgentPlugin>>,
269 state: AgentState,
271 provider: Arc<dyn LLMProvider>,
273 prompt_plugin: Option<Box<dyn prompt::PromptTemplatePlugin>>,
275 tts_plugin: Option<Arc<Mutex<TTSPlugin>>>,
277 #[cfg(feature = "kokoro")]
279 cached_kokoro_engine: Arc<Mutex<Option<Arc<mofa_plugins::tts::kokoro_wrapper::KokoroTTS>>>>,
280 active_tts_session: Arc<Mutex<Option<TTSSession>>>,
282 message_store: Option<Arc<dyn crate::persistence::MessageStore + Send + Sync>>,
284 session_store: Option<Arc<dyn crate::persistence::SessionStore + Send + Sync>>,
285 persistence_user_id: Option<uuid::Uuid>,
287 persistence_agent_id: Option<uuid::Uuid>,
289}
290
291#[async_trait::async_trait]
295pub trait LLMAgentEventHandler: Send + Sync {
296 fn clone_box(&self) -> Box<dyn LLMAgentEventHandler>;
298
299 fn as_any(&self) -> &dyn std::any::Any;
301
302 async fn before_chat(&self, message: &str) -> LLMResult<Option<String>> {
304 Ok(Some(message.to_string()))
305 }
306
307 async fn before_chat_with_model(
312 &self,
313 message: &str,
314 _model: &str,
315 ) -> LLMResult<Option<String>> {
316 self.before_chat(message).await
317 }
318
319 async fn after_chat(&self, response: &str) -> LLMResult<Option<String>> {
321 Ok(Some(response.to_string()))
322 }
323
324 async fn after_chat_with_metadata(
329 &self,
330 response: &str,
331 _metadata: &super::types::LLMResponseMetadata,
332 ) -> LLMResult<Option<String>> {
333 self.after_chat(response).await
334 }
335
336 async fn on_tool_call(&self, name: &str, arguments: &str) -> LLMResult<Option<String>> {
338 let _ = (name, arguments);
339 Ok(None)
340 }
341
342 async fn on_error(&self, error: &LLMError) -> LLMResult<Option<String>> {
344 let _ = error;
345 Ok(None)
346 }
347}
348
349impl Clone for Box<dyn LLMAgentEventHandler> {
350 fn clone(&self) -> Self {
351 self.clone_box()
352 }
353}
354
355impl LLMAgent {
356 pub fn new(config: LLMAgentConfig, provider: Arc<dyn LLMProvider>) -> Self {
358 Self::with_initial_session(config, provider, None)
359 }
360
361 pub fn with_initial_session(
378 config: LLMAgentConfig,
379 provider: Arc<dyn LLMProvider>,
380 initial_session_id: Option<String>,
381 ) -> Self {
382 let client = LLMClient::new(provider.clone());
383
384 let mut session = if let Some(sid) = initial_session_id {
385 ChatSession::with_id_str(&sid, LLMClient::new(provider.clone()))
386 } else {
387 ChatSession::new(LLMClient::new(provider.clone()))
388 };
389
390 if let Some(ref prompt) = config.system_prompt {
392 session = session.with_system(prompt.clone());
393 }
394
395 session = session.with_context_window_size(config.context_window_size);
397
398 let session_id = session.session_id().to_string();
399 let session_arc = Arc::new(RwLock::new(session));
400
401 let mut sessions = HashMap::new();
403 sessions.insert(session_id.clone(), session_arc);
404
405 let agent_id = config.agent_id.clone();
407 let name = config.name.clone();
408
409 let capabilities = mofa_kernel::agent::AgentCapabilities::builder()
411 .tags(vec![
412 "llm".to_string(),
413 "chat".to_string(),
414 "text-generation".to_string(),
415 "multi-session".to_string(),
416 ])
417 .build();
418
419 Self {
420 config,
421 metadata: AgentMetadata {
422 id: agent_id,
423 name,
424 description: None,
425 version: None,
426 capabilities,
427 state: AgentState::Created,
428 },
429 client,
430 sessions: Arc::new(RwLock::new(sessions)),
431 active_session_id: Arc::new(RwLock::new(session_id)),
432 tools: Vec::new(),
433 tool_executor: None,
434 event_handler: None,
435 plugins: Vec::new(),
436 state: AgentState::Created,
437 provider,
438 prompt_plugin: None,
439 tts_plugin: None,
440 #[cfg(feature = "kokoro")]
441 cached_kokoro_engine: Arc::new(Mutex::new(None)),
442 active_tts_session: Arc::new(Mutex::new(None)),
443 message_store: None,
444 session_store: None,
445 persistence_user_id: None,
446 persistence_agent_id: None,
447 }
448 }
449
450 pub async fn with_initial_session_async(
478 config: LLMAgentConfig,
479 provider: Arc<dyn LLMProvider>,
480 initial_session_id: Option<String>,
481 message_store: Option<Arc<dyn crate::persistence::MessageStore + Send + Sync>>,
482 session_store: Option<Arc<dyn crate::persistence::SessionStore + Send + Sync>>,
483 persistence_user_id: Option<uuid::Uuid>,
484 persistence_tenant_id: Option<uuid::Uuid>,
485 persistence_agent_id: Option<uuid::Uuid>,
486 ) -> Self {
487 let client = LLMClient::new(provider.clone());
488
489 let initial_session_id_clone = initial_session_id.clone();
491
492 let session = if let (
494 Some(sid),
495 Some(msg_store),
496 Some(sess_store),
497 Some(user_id),
498 Some(tenant_id),
499 Some(agent_id),
500 ) = (
501 initial_session_id_clone,
502 message_store.clone(),
503 session_store.clone(),
504 persistence_user_id,
505 persistence_tenant_id,
506 persistence_agent_id,
507 ) {
508 let msg_store_clone = msg_store.clone();
510 let sess_store_clone = sess_store.clone();
511
512 let session_uuid = uuid::Uuid::parse_str(&sid).unwrap_or_else(|_| {
513 tracing::warn!("⚠️ 无效的 session_id 格式 '{}', 将生成新的 UUID", sid);
514 uuid::Uuid::now_v7()
515 });
516
517 match ChatSession::load(
519 session_uuid,
520 LLMClient::new(provider.clone()),
521 user_id,
522 agent_id,
523 tenant_id,
524 msg_store,
525 sess_store,
526 config.context_window_size,
527 )
528 .await
529 {
530 Ok(loaded_session) => {
531 tracing::info!(
532 "✅ 从数据库加载会话: {} ({} 条消息)",
533 sid,
534 loaded_session.messages().len()
535 );
536 loaded_session
537 }
538 Err(e) => {
539 tracing::info!("📝 创建新会话并持久化: {} (数据库中不存在: {})", sid, e);
541
542 let msg_store_clone2 = msg_store_clone.clone();
544 let sess_store_clone2 = sess_store_clone.clone();
545
546 match ChatSession::with_id_and_stores_and_persist(
548 session_uuid,
549 LLMClient::new(provider.clone()),
550 user_id,
551 agent_id,
552 tenant_id,
553 msg_store_clone,
554 sess_store_clone,
555 config.context_window_size,
556 )
557 .await
558 {
559 Ok(mut new_session) => {
560 if let Some(ref prompt) = config.system_prompt {
561 new_session = new_session.with_system(prompt.clone());
562 }
563 new_session
564 }
565 Err(persist_err) => {
566 tracing::error!("❌ 持久化会话失败: {}, 降级为内存会话", persist_err);
567 let new_session = ChatSession::with_id_and_stores(
569 session_uuid,
570 LLMClient::new(provider.clone()),
571 user_id,
572 agent_id,
573 tenant_id,
574 msg_store_clone2,
575 sess_store_clone2,
576 config.context_window_size,
577 );
578 if let Some(ref prompt) = config.system_prompt {
579 new_session.with_system(prompt.clone())
580 } else {
581 new_session
582 }
583 }
584 }
585 }
586 }
587 } else {
588 let mut session = if let Some(sid) = initial_session_id {
590 ChatSession::with_id_str(&sid, LLMClient::new(provider.clone()))
591 } else {
592 ChatSession::new(LLMClient::new(provider.clone()))
593 };
594 if let Some(ref prompt) = config.system_prompt {
595 session = session.with_system(prompt.clone());
596 }
597 session.with_context_window_size(config.context_window_size)
598 };
599
600 let session_id = session.session_id().to_string();
601 let session_arc = Arc::new(RwLock::new(session));
602
603 let mut sessions = HashMap::new();
605 sessions.insert(session_id.clone(), session_arc);
606
607 let agent_id = config.agent_id.clone();
609 let name = config.name.clone();
610
611 let capabilities = mofa_kernel::agent::AgentCapabilities::builder()
613 .tags(vec![
614 "llm".to_string(),
615 "chat".to_string(),
616 "text-generation".to_string(),
617 "multi-session".to_string(),
618 ])
619 .build();
620
621 Self {
622 config,
623 metadata: AgentMetadata {
624 id: agent_id,
625 name,
626 description: None,
627 version: None,
628 capabilities,
629 state: AgentState::Created,
630 },
631 client,
632 sessions: Arc::new(RwLock::new(sessions)),
633 active_session_id: Arc::new(RwLock::new(session_id)),
634 tools: Vec::new(),
635 tool_executor: None,
636 event_handler: None,
637 plugins: Vec::new(),
638 state: AgentState::Created,
639 provider,
640 prompt_plugin: None,
641 tts_plugin: None,
642 #[cfg(feature = "kokoro")]
643 cached_kokoro_engine: Arc::new(Mutex::new(None)),
644 active_tts_session: Arc::new(Mutex::new(None)),
645 message_store,
646 session_store,
647 persistence_user_id,
648 persistence_agent_id,
649 }
650 }
651
652 pub fn config(&self) -> &LLMAgentConfig {
654 &self.config
655 }
656
657 pub fn client(&self) -> &LLMClient {
659 &self.client
660 }
661
662 pub async fn current_session_id(&self) -> String {
668 self.active_session_id.read().await.clone()
669 }
670
671 pub async fn create_session(&self) -> String {
682 let mut session = ChatSession::new(LLMClient::new(self.provider.clone()));
683
684 let mut system_prompt = self.config.system_prompt.clone();
686
687 if let Some(ref plugin) = self.prompt_plugin
688 && let Some(template) = plugin.get_current_template().await
689 {
690 system_prompt = match template.render(&[]) {
692 Ok(prompt) => Some(prompt),
693 Err(_) => self.config.system_prompt.clone(),
694 };
695 }
696
697 if let Some(ref prompt) = system_prompt {
698 session = session.with_system(prompt.clone());
699 }
700
701 session = session.with_context_window_size(self.config.context_window_size);
703
704 let session_id = session.session_id().to_string();
705 let session_arc = Arc::new(RwLock::new(session));
706
707 let mut sessions = self.sessions.write().await;
708 sessions.insert(session_id.clone(), session_arc);
709
710 session_id
711 }
712
713 pub async fn create_session_with_id(&self, session_id: impl Into<String>) -> LLMResult<String> {
723 let session_id = session_id.into();
724
725 {
726 let sessions = self.sessions.read().await;
727 if sessions.contains_key(&session_id) {
728 return Err(LLMError::Other(format!(
729 "Session with id '{}' already exists",
730 session_id
731 )));
732 }
733 }
734
735 let mut session =
736 ChatSession::with_id_str(&session_id, LLMClient::new(self.provider.clone()));
737
738 let mut system_prompt = self.config.system_prompt.clone();
740
741 if let Some(ref plugin) = self.prompt_plugin
742 && let Some(template) = plugin.get_current_template().await
743 {
744 system_prompt = match template.render(&[]) {
746 Ok(prompt) => Some(prompt),
747 Err(_) => self.config.system_prompt.clone(),
748 };
749 }
750
751 if let Some(ref prompt) = system_prompt {
752 session = session.with_system(prompt.clone());
753 }
754
755 session = session.with_context_window_size(self.config.context_window_size);
757
758 let session_arc = Arc::new(RwLock::new(session));
759
760 let mut sessions = self.sessions.write().await;
761 sessions.insert(session_id.clone(), session_arc);
762
763 Ok(session_id)
764 }
765
766 pub async fn switch_session(&self, session_id: &str) -> LLMResult<()> {
771 let sessions = self.sessions.read().await;
772 if !sessions.contains_key(session_id) {
773 return Err(LLMError::Other(format!(
774 "Session '{}' not found",
775 session_id
776 )));
777 }
778 drop(sessions);
779
780 let mut active = self.active_session_id.write().await;
781 *active = session_id.to_string();
782 Ok(())
783 }
784
785 pub async fn get_or_create_session(&self, session_id: impl Into<String>) -> String {
789 let session_id = session_id.into();
790
791 {
792 let sessions = self.sessions.read().await;
793 if sessions.contains_key(&session_id) {
794 return session_id;
795 }
796 }
797
798 let _ = self.create_session_with_id(&session_id).await;
800 session_id
801 }
802
803 pub async fn remove_session(&self, session_id: &str) -> LLMResult<()> {
808 let active = self.active_session_id.read().await.clone();
809 if active == session_id {
810 return Err(LLMError::Other(
811 "Cannot remove active session. Switch to another session first.".to_string(),
812 ));
813 }
814
815 let mut sessions = self.sessions.write().await;
816 if sessions.remove(session_id).is_none() {
817 return Err(LLMError::Other(format!(
818 "Session '{}' not found",
819 session_id
820 )));
821 }
822
823 Ok(())
824 }
825
826 pub async fn list_sessions(&self) -> Vec<String> {
828 let sessions = self.sessions.read().await;
829 sessions.keys().cloned().collect()
830 }
831
832 pub async fn session_count(&self) -> usize {
834 let sessions = self.sessions.read().await;
835 sessions.len()
836 }
837
838 pub async fn has_session(&self, session_id: &str) -> bool {
840 let sessions = self.sessions.read().await;
841 sessions.contains_key(session_id)
842 }
843
844 pub async fn tts_speak(&self, text: &str) -> LLMResult<()> {
856 let tts = self
857 .tts_plugin
858 .as_ref()
859 .ok_or_else(|| LLMError::Other("TTS plugin not configured".to_string()))?;
860
861 let mut tts_guard = tts.lock().await;
862 tts_guard
863 .synthesize_and_play(text)
864 .await
865 .map_err(|e| LLMError::Other(format!("TTS synthesis failed: {}", e)))
866 }
867
868 pub async fn tts_speak_streaming(
878 &self,
879 text: &str,
880 callback: Box<dyn Fn(Vec<u8>) + Send + Sync>,
881 ) -> LLMResult<()> {
882 let tts = self
883 .tts_plugin
884 .as_ref()
885 .ok_or_else(|| LLMError::Other("TTS plugin not configured".to_string()))?;
886
887 let mut tts_guard = tts.lock().await;
888 tts_guard
889 .synthesize_streaming(text, callback)
890 .await
891 .map_err(|e| LLMError::Other(format!("TTS streaming failed: {}", e)))
892 }
893
894 pub async fn tts_speak_f32_stream(
910 &self,
911 text: &str,
912 callback: Box<dyn Fn(Vec<f32>) + Send + Sync>,
913 ) -> LLMResult<()> {
914 let tts = self
915 .tts_plugin
916 .as_ref()
917 .ok_or_else(|| LLMError::Other("TTS plugin not configured".to_string()))?;
918
919 let mut tts_guard = tts.lock().await;
920 tts_guard
921 .synthesize_streaming_f32(text, callback)
922 .await
923 .map_err(|e| LLMError::Other(format!("TTS f32 streaming failed: {}", e)))
924 }
925
926 pub async fn tts_create_stream(&self, text: &str) -> LLMResult<TtsAudioStream> {
944 #[cfg(feature = "kokoro")]
945 {
946 use mofa_plugins::tts::kokoro_wrapper::KokoroTTS;
947
948 let cached_engine = {
950 let cache_guard = self.cached_kokoro_engine.lock().await;
951 cache_guard.clone()
952 };
953
954 let kokoro = if let Some(engine) = cached_engine {
955 engine
957 } else {
958 let tts = self
960 .tts_plugin
961 .as_ref()
962 .ok_or_else(|| LLMError::Other("TTS plugin not configured".to_string()))?;
963
964 let tts_guard = tts.lock().await;
965
966 let engine = tts_guard
967 .engine()
968 .ok_or_else(|| LLMError::Other("TTS engine not initialized".to_string()))?;
969
970 if let Some(kokoro_ref) = engine.as_any().downcast_ref::<KokoroTTS>() {
971 let cloned = kokoro_ref.clone();
973 let cloned_arc = Arc::new(cloned);
974
975 let voice = tts_guard
977 .stats()
978 .get("default_voice")
979 .and_then(|v| v.as_str())
980 .unwrap_or("default");
981
982 {
984 let mut cache_guard = self.cached_kokoro_engine.lock().await;
985 *cache_guard = Some(cloned_arc.clone());
986 }
987
988 cloned_arc
989 } else {
990 return Err(LLMError::Other("TTS engine is not KokoroTTS".to_string()));
991 }
992 };
993
994 let voice = "default"; let (mut sink, stream) = kokoro
997 .create_stream(voice)
998 .await
999 .map_err(|e| LLMError::Other(format!("Failed to create TTS stream: {}", e)))?;
1000
1001 sink.synth(text.to_string()).await.map_err(|e| {
1003 LLMError::Other(format!("Failed to submit text for synthesis: {}", e))
1004 })?;
1005
1006 return Ok(Box::pin(stream));
1008 }
1009
1010 #[cfg(not(feature = "kokoro"))]
1011 {
1012 Err(LLMError::Other("Kokoro feature not enabled".to_string()))
1013 }
1014 }
1015
1016 pub async fn tts_speak_f32_stream_batch(
1039 &self,
1040 sentences: Vec<String>,
1041 callback: Box<dyn Fn(Vec<f32>) + Send + Sync>,
1042 ) -> LLMResult<()> {
1043 let tts = self
1044 .tts_plugin
1045 .as_ref()
1046 .ok_or_else(|| LLMError::Other("TTS plugin not configured".to_string()))?;
1047
1048 let tts_guard = tts.lock().await;
1049
1050 #[cfg(feature = "kokoro")]
1051 {
1052 use mofa_plugins::tts::kokoro_wrapper::KokoroTTS;
1053
1054 let engine = tts_guard
1055 .engine()
1056 .ok_or_else(|| LLMError::Other("TTS engine not initialized".to_string()))?;
1057
1058 if let Some(kokoro) = engine.as_any().downcast_ref::<KokoroTTS>() {
1059 let voice = tts_guard
1060 .stats()
1061 .get("default_voice")
1062 .and_then(|v| v.as_str())
1063 .unwrap_or("default")
1064 .to_string();
1065
1066 let (mut sink, mut stream) = kokoro
1068 .create_stream(&voice)
1069 .await
1070 .map_err(|e| LLMError::Other(format!("Failed to create TTS stream: {}", e)))?;
1071
1072 tokio::spawn(async move {
1074 while let Some((audio, _took)) = stream.next().await {
1075 callback(audio);
1076 }
1077 });
1078
1079 for sentence in sentences {
1081 sink.synth(sentence)
1082 .await
1083 .map_err(|e| LLMError::Other(format!("Failed to submit text: {}", e)))?;
1084 }
1085
1086 return Ok(());
1087 }
1088
1089 return Err(LLMError::Other("TTS engine is not KokoroTTS".to_string()));
1090 }
1091
1092 #[cfg(not(feature = "kokoro"))]
1093 {
1094 Err(LLMError::Other("Kokoro feature not enabled".to_string()))
1095 }
1096 }
1097
1098 pub fn has_tts(&self) -> bool {
1100 self.tts_plugin.is_some()
1101 }
1102
1103 pub async fn interrupt_tts(&self) -> LLMResult<()> {
1115 let mut session_guard = self.active_tts_session.lock().await;
1116 if let Some(session) = session_guard.take() {
1117 session.cancel();
1118 }
1119 Ok(())
1120 }
1121
1122 pub async fn chat_with_tts(
1138 &self,
1139 session_id: &str,
1140 message: impl Into<String>,
1141 ) -> LLMResult<()> {
1142 self.chat_with_tts_internal(session_id, message, None).await
1143 }
1144
1145 pub async fn chat_with_tts_callback(
1156 &self,
1157 session_id: &str,
1158 message: impl Into<String>,
1159 callback: impl Fn(Vec<f32>) + Send + Sync + 'static,
1160 ) -> LLMResult<()> {
1161 self.chat_with_tts_internal(session_id, message, Some(Box::new(callback)))
1162 .await
1163 }
1164
1165 #[cfg(feature = "kokoro")]
1174 async fn create_tts_stream_handle(
1175 &self,
1176 callback: Box<dyn Fn(Vec<f32>) + Send + Sync>,
1177 cancellation_token: Option<CancellationToken>,
1178 ) -> LLMResult<TTSStreamHandle> {
1179 use mofa_plugins::tts::kokoro_wrapper::KokoroTTS;
1180
1181 let tts = self
1182 .tts_plugin
1183 .as_ref()
1184 .ok_or_else(|| LLMError::Other("TTS plugin not configured".to_string()))?;
1185
1186 let tts_guard = tts.lock().await;
1187 let engine = tts_guard
1188 .engine()
1189 .ok_or_else(|| LLMError::Other("TTS engine not initialized".to_string()))?;
1190
1191 let kokoro = engine
1192 .as_any()
1193 .downcast_ref::<KokoroTTS>()
1194 .ok_or_else(|| LLMError::Other("TTS engine is not KokoroTTS".to_string()))?;
1195
1196 let voice = tts_guard
1197 .stats()
1198 .get("default_voice")
1199 .and_then(|v| v.as_str())
1200 .unwrap_or("default")
1201 .to_string();
1202
1203 let (sink, mut stream) = kokoro
1205 .create_stream(&voice)
1206 .await
1207 .map_err(|e| LLMError::Other(format!("Failed to create TTS stream: {}", e)))?;
1208
1209 let token_clone = cancellation_token.as_ref().map(|t| t.clone_token());
1211
1212 let stream_handle = tokio::spawn(async move {
1214 while let Some((audio, _took)) = stream.next().await {
1215 if let Some(ref token) = token_clone {
1217 if token.is_cancelled() {
1218 break; }
1220 }
1221 callback(audio);
1222 }
1223 });
1224
1225 Ok(TTSStreamHandle {
1226 sink,
1227 _stream_handle: stream_handle,
1228 })
1229 }
1230
1231 async fn chat_with_tts_internal(
1238 &self,
1239 session_id: &str,
1240 message: impl Into<String>,
1241 callback: Option<Box<dyn Fn(Vec<f32>) + Send + Sync>>,
1242 ) -> LLMResult<()> {
1243 #[cfg(feature = "kokoro")]
1244 {
1245 use mofa_plugins::tts::kokoro_wrapper::KokoroTTS;
1246
1247 let callback = match callback {
1248 Some(cb) => cb,
1249 None => {
1250 let mut text_stream =
1252 self.chat_stream_with_session(session_id, message).await?;
1253 while let Some(result) = text_stream.next().await {
1254 match result {
1255 Ok(text_chunk) => {
1256 print!("{}", text_chunk);
1257 std::io::stdout().flush().map_err(|e| {
1258 LLMError::Other(format!("Failed to flush stdout: {}", e))
1259 })?;
1260 }
1261 Err(e) if e.to_string().contains("__stream_end__") => break,
1262 Err(e) => return Err(e),
1263 }
1264 }
1265 println!();
1266 return Ok(());
1267 }
1268 };
1269
1270 self.interrupt_tts().await?;
1272
1273 let cancellation_token = CancellationToken::new();
1275
1276 let mut tts_handle = self
1278 .create_tts_stream_handle(callback, Some(cancellation_token.clone_token()))
1279 .await?;
1280
1281 let session = TTSSession::new(cancellation_token);
1283
1284 {
1285 let mut active_session = self.active_tts_session.lock().await;
1286 *active_session = Some(session);
1287 }
1288
1289 let mut buffer = SentenceBuffer::new();
1290
1291 let mut text_stream = self.chat_stream_with_session(session_id, message).await?;
1293
1294 while let Some(result) = text_stream.next().await {
1295 match result {
1296 Ok(text_chunk) => {
1297 {
1299 let active_session = self.active_tts_session.lock().await;
1300 if let Some(ref session) = *active_session {
1301 if !session.is_active() {
1302 return Ok(()); }
1304 }
1305 }
1306
1307 print!("{}", text_chunk);
1309 std::io::stdout().flush().map_err(|e| {
1310 LLMError::Other(format!("Failed to flush stdout: {}", e))
1311 })?;
1312
1313 if let Some(sentence) = buffer.push(&text_chunk) {
1315 if let Err(e) = tts_handle.sink.synth(sentence).await {
1316 eprintln!("[TTS Error] Failed to submit sentence: {}", e);
1317 }
1319 }
1320 }
1321 Err(e) if e.to_string().contains("__stream_end__") => break,
1322 Err(e) => return Err(e),
1323 }
1324 }
1325
1326 if let Some(remaining) = buffer.flush() {
1328 if let Err(e) = tts_handle.sink.synth(remaining).await {
1329 eprintln!("[TTS Error] Failed to submit final sentence: {}", e);
1330 }
1331 }
1332
1333 {
1335 let mut active_session = self.active_tts_session.lock().await;
1336 *active_session = None;
1337 }
1338
1339 let _ = tokio::time::timeout(
1341 tokio::time::Duration::from_secs(30),
1342 tts_handle._stream_handle,
1343 )
1344 .await
1345 .map_err(|_| LLMError::Other("TTS stream processing timeout".to_string()))
1346 .and_then(|r| r.map_err(|e| LLMError::Other(format!("TTS stream task failed: {}", e))));
1347
1348 Ok(())
1349 }
1350
1351 #[cfg(not(feature = "kokoro"))]
1352 {
1353 let mut text_stream = self.chat_stream_with_session(session_id, message).await?;
1355 let mut buffer = SentenceBuffer::new();
1356 let mut sentences = Vec::new();
1357
1358 while let Some(result) = text_stream.next().await {
1360 match result {
1361 Ok(text_chunk) => {
1362 print!("{}", text_chunk);
1363 std::io::stdout().flush().map_err(|e| {
1364 LLMError::Other(format!("Failed to flush stdout: {}", e))
1365 })?;
1366
1367 if let Some(sentence) = buffer.push(&text_chunk) {
1368 sentences.push(sentence);
1369 }
1370 }
1371 Err(e) if e.to_string().contains("__stream_end__") => break,
1372 Err(e) => return Err(e),
1373 }
1374 }
1375
1376 if let Some(remaining) = buffer.flush() {
1378 sentences.push(remaining);
1379 }
1380
1381 if !sentences.is_empty()
1383 && let Some(cb) = callback
1384 {
1385 for sentence in &sentences {
1386 println!("\n[TTS] {}", sentence);
1387 }
1388 let _ = cb;
1391 }
1392
1393 Ok(())
1394 }
1395 }
1396
1397 async fn get_session_arc(&self, session_id: &str) -> LLMResult<Arc<RwLock<ChatSession>>> {
1399 let sessions = self.sessions.read().await;
1400 sessions
1401 .get(session_id)
1402 .cloned()
1403 .ok_or_else(|| LLMError::Other(format!("Session '{}' not found", session_id)))
1404 }
1405
1406 pub async fn chat(&self, message: impl Into<String>) -> LLMResult<String> {
1412 let session_id = self.active_session_id.read().await.clone();
1413 self.chat_with_session(&session_id, message).await
1414 }
1415
1416 pub async fn chat_with_session(
1429 &self,
1430 session_id: &str,
1431 message: impl Into<String>,
1432 ) -> LLMResult<String> {
1433 let message = message.into();
1434
1435 let model = self.provider.default_model();
1437
1438 let processed_message = if let Some(ref handler) = self.event_handler {
1440 match handler.before_chat_with_model(&message, model).await? {
1441 Some(msg) => msg,
1442 None => return Ok(String::new()),
1443 }
1444 } else {
1445 message
1446 };
1447
1448 let session = self.get_session_arc(session_id).await?;
1450
1451 let mut session_guard = session.write().await;
1453 let response = match session_guard.send(&processed_message).await {
1454 Ok(resp) => resp,
1455 Err(e) => {
1456 if let Some(ref handler) = self.event_handler
1457 && let Some(fallback) = handler.on_error(&e).await?
1458 {
1459 return Ok(fallback);
1460 }
1461 return Err(e);
1462 }
1463 };
1464
1465 let final_response = if let Some(ref handler) = self.event_handler {
1467 let metadata = session_guard.last_response_metadata();
1469 if let Some(meta) = metadata {
1470 match handler.after_chat_with_metadata(&response, meta).await? {
1471 Some(resp) => resp,
1472 None => response,
1473 }
1474 } else {
1475 match handler.after_chat(&response).await? {
1477 Some(resp) => resp,
1478 None => response,
1479 }
1480 }
1481 } else {
1482 response
1483 };
1484
1485 Ok(final_response)
1486 }
1487
1488 pub async fn ask(&self, question: impl Into<String>) -> LLMResult<String> {
1490 let question = question.into();
1491
1492 let mut builder = self.client.chat();
1493
1494 let mut system_prompt = self.config.system_prompt.clone();
1496
1497 if let Some(ref plugin) = self.prompt_plugin
1498 && let Some(template) = plugin.get_current_template().await
1499 {
1500 match template.render(&[]) {
1502 Ok(prompt) => system_prompt = Some(prompt),
1503 Err(_) => {
1504 system_prompt = self.config.system_prompt.clone();
1506 }
1507 }
1508 }
1509
1510 if let Some(ref system) = system_prompt {
1512 builder = builder.system(system.clone());
1513 }
1514
1515 if let Some(temp) = self.config.temperature {
1516 builder = builder.temperature(temp);
1517 }
1518
1519 if let Some(tokens) = self.config.max_tokens {
1520 builder = builder.max_tokens(tokens);
1521 }
1522
1523 builder = builder.user(question);
1524
1525 if let Some(ref executor) = self.tool_executor {
1527 let tools = if self.tools.is_empty() {
1528 executor.available_tools().await?
1529 } else {
1530 self.tools.clone()
1531 };
1532
1533 if !tools.is_empty() {
1534 builder = builder.tools(tools);
1535 }
1536
1537 builder = builder.with_tool_executor(executor.clone());
1538 let response = builder.send_with_tools().await?;
1539 return response
1540 .content()
1541 .map(|s| s.to_string())
1542 .ok_or_else(|| LLMError::Other("No content in response".to_string()));
1543 }
1544
1545 let response = builder.send().await?;
1546 response
1547 .content()
1548 .map(|s| s.to_string())
1549 .ok_or_else(|| LLMError::Other("No content in response".to_string()))
1550 }
1551
1552 pub async fn set_prompt_scenario(&self, scenario: impl Into<String>) {
1554 let scenario = scenario.into();
1555
1556 if let Some(ref plugin) = self.prompt_plugin {
1557 plugin.set_active_scenario(&scenario).await;
1558 }
1559 }
1560
1561 pub async fn clear_history(&self) {
1563 let session_id = self.active_session_id.read().await.clone();
1564 let _ = self.clear_session_history(&session_id).await;
1565 }
1566
1567 pub async fn clear_session_history(&self, session_id: &str) -> LLMResult<()> {
1569 let session = self.get_session_arc(session_id).await?;
1570 let mut session_guard = session.write().await;
1571 session_guard.clear();
1572 Ok(())
1573 }
1574
1575 pub async fn history(&self) -> Vec<ChatMessage> {
1577 let session_id = self.active_session_id.read().await.clone();
1578 self.get_session_history(&session_id)
1579 .await
1580 .unwrap_or_default()
1581 }
1582
1583 pub async fn get_session_history(&self, session_id: &str) -> LLMResult<Vec<ChatMessage>> {
1585 let session = self.get_session_arc(session_id).await?;
1586 let session_guard = session.read().await;
1587 Ok(session_guard.messages().to_vec())
1588 }
1589
1590 pub fn set_tools(&mut self, tools: Vec<Tool>, executor: Arc<dyn ToolExecutor>) {
1592 self.tools = tools;
1593 self.tool_executor = Some(executor);
1594
1595 }
1598
1599 pub fn set_event_handler(&mut self, handler: Box<dyn LLMAgentEventHandler>) {
1601 self.event_handler = Some(handler);
1602 }
1603
1604 pub fn add_plugin<P: AgentPlugin + 'static>(&mut self, plugin: P) {
1606 self.plugins.push(Box::new(plugin));
1607 }
1608
1609 pub fn add_plugins(&mut self, plugins: Vec<Box<dyn AgentPlugin>>) {
1611 self.plugins.extend(plugins);
1612 }
1613
1614 pub async fn ask_stream(&self, question: impl Into<String>) -> LLMResult<TextStream> {
1636 let question = question.into();
1637
1638 let mut builder = self.client.chat();
1639
1640 if let Some(ref system) = self.config.system_prompt {
1641 builder = builder.system(system.clone());
1642 }
1643
1644 if let Some(temp) = self.config.temperature {
1645 builder = builder.temperature(temp);
1646 }
1647
1648 if let Some(tokens) = self.config.max_tokens {
1649 builder = builder.max_tokens(tokens);
1650 }
1651
1652 builder = builder.user(question);
1653
1654 let chunk_stream = builder.send_stream().await?;
1656
1657 Ok(Self::chunk_stream_to_text_stream(chunk_stream))
1659 }
1660
1661 pub async fn chat_stream(&self, message: impl Into<String>) -> LLMResult<TextStream> {
1684 let session_id = self.active_session_id.read().await.clone();
1685 self.chat_stream_with_session(&session_id, message).await
1686 }
1687
1688 pub async fn chat_stream_with_session(
1694 &self,
1695 session_id: &str,
1696 message: impl Into<String>,
1697 ) -> LLMResult<TextStream> {
1698 let message = message.into();
1699
1700 let model = self.provider.default_model();
1702
1703 let processed_message = if let Some(ref handler) = self.event_handler {
1705 match handler.before_chat_with_model(&message, model).await? {
1706 Some(msg) => msg,
1707 None => return Ok(Box::pin(futures::stream::empty())),
1708 }
1709 } else {
1710 message
1711 };
1712
1713 let session = self.get_session_arc(session_id).await?;
1715
1716 let history = {
1718 let session_guard = session.read().await;
1719 session_guard.messages().to_vec()
1720 };
1721
1722 let mut builder = self.client.chat();
1724
1725 if let Some(ref system) = self.config.system_prompt {
1726 builder = builder.system(system.clone());
1727 }
1728
1729 if let Some(temp) = self.config.temperature {
1730 builder = builder.temperature(temp);
1731 }
1732
1733 if let Some(tokens) = self.config.max_tokens {
1734 builder = builder.max_tokens(tokens);
1735 }
1736
1737 builder = builder.messages(history);
1739 builder = builder.user(processed_message.clone());
1740
1741 let chunk_stream = builder.send_stream().await?;
1743
1744 {
1746 let mut session_guard = session.write().await;
1747 session_guard
1748 .messages_mut()
1749 .push(ChatMessage::user(&processed_message));
1750 }
1751
1752 let event_handler = self.event_handler.clone().map(Arc::new);
1754 let wrapped_stream =
1755 Self::create_history_updating_stream(chunk_stream, session, event_handler);
1756
1757 Ok(wrapped_stream)
1758 }
1759
1760 pub async fn ask_stream_raw(&self, question: impl Into<String>) -> LLMResult<ChatStream> {
1764 let question = question.into();
1765
1766 let mut builder = self.client.chat();
1767
1768 if let Some(ref system) = self.config.system_prompt {
1769 builder = builder.system(system.clone());
1770 }
1771
1772 if let Some(temp) = self.config.temperature {
1773 builder = builder.temperature(temp);
1774 }
1775
1776 if let Some(tokens) = self.config.max_tokens {
1777 builder = builder.max_tokens(tokens);
1778 }
1779
1780 builder = builder.user(question);
1781
1782 builder.send_stream().await
1783 }
1784
1785 pub async fn chat_stream_with_full(
1806 &self,
1807 message: impl Into<String>,
1808 ) -> LLMResult<(TextStream, tokio::sync::oneshot::Receiver<String>)> {
1809 let session_id = self.active_session_id.read().await.clone();
1810 self.chat_stream_with_full_session(&session_id, message)
1811 .await
1812 }
1813
1814 pub async fn chat_stream_with_full_session(
1820 &self,
1821 session_id: &str,
1822 message: impl Into<String>,
1823 ) -> LLMResult<(TextStream, tokio::sync::oneshot::Receiver<String>)> {
1824 let message = message.into();
1825
1826 let model = self.provider.default_model();
1828
1829 let processed_message = if let Some(ref handler) = self.event_handler {
1831 match handler.before_chat_with_model(&message, model).await? {
1832 Some(msg) => msg,
1833 None => {
1834 let (tx, rx) = tokio::sync::oneshot::channel();
1835 let _ = tx.send(String::new());
1836 return Ok((Box::pin(futures::stream::empty()), rx));
1837 }
1838 }
1839 } else {
1840 message
1841 };
1842
1843 let session = self.get_session_arc(session_id).await?;
1845
1846 let history = {
1848 let session_guard = session.read().await;
1849 session_guard.messages().to_vec()
1850 };
1851
1852 let mut builder = self.client.chat();
1854
1855 if let Some(ref system) = self.config.system_prompt {
1856 builder = builder.system(system.clone());
1857 }
1858
1859 if let Some(temp) = self.config.temperature {
1860 builder = builder.temperature(temp);
1861 }
1862
1863 if let Some(tokens) = self.config.max_tokens {
1864 builder = builder.max_tokens(tokens);
1865 }
1866
1867 builder = builder.messages(history);
1868 builder = builder.user(processed_message.clone());
1869
1870 let chunk_stream = builder.send_stream().await?;
1871
1872 {
1874 let mut session_guard = session.write().await;
1875 session_guard
1876 .messages_mut()
1877 .push(ChatMessage::user(&processed_message));
1878 }
1879
1880 let (tx, rx) = tokio::sync::oneshot::channel();
1882
1883 let event_handler = self.event_handler.clone().map(Arc::new);
1885 let wrapped_stream =
1886 Self::create_collecting_stream(chunk_stream, session, tx, event_handler);
1887
1888 Ok((wrapped_stream, rx))
1889 }
1890
1891 fn chunk_stream_to_text_stream(chunk_stream: ChatStream) -> TextStream {
1897 use futures::StreamExt;
1898
1899 let text_stream = chunk_stream.filter_map(|result| async move {
1900 match result {
1901 Ok(chunk) => {
1902 if let Some(choice) = chunk.choices.first()
1904 && let Some(ref content) = choice.delta.content
1905 && !content.is_empty()
1906 {
1907 return Some(Ok(content.clone()));
1908 }
1909 None
1910 }
1911 Err(e) => Some(Err(e)),
1912 }
1913 });
1914
1915 Box::pin(text_stream)
1916 }
1917
1918 fn create_history_updating_stream(
1920 chunk_stream: ChatStream,
1921 session: Arc<RwLock<ChatSession>>,
1922 event_handler: Option<Arc<Box<dyn LLMAgentEventHandler>>>,
1923 ) -> TextStream {
1924 use super::types::LLMResponseMetadata;
1925
1926 let collected = Arc::new(tokio::sync::Mutex::new(String::new()));
1927 let collected_clone = collected.clone();
1928 let event_handler_clone = event_handler.clone();
1929 let metadata_collected = Arc::new(tokio::sync::Mutex::new(None::<LLMResponseMetadata>));
1930 let metadata_collected_clone = metadata_collected.clone();
1931
1932 let stream = chunk_stream.filter_map(move |result| {
1933 let collected = collected.clone();
1934 let event_handler = event_handler.clone();
1935 let metadata_collected = metadata_collected.clone();
1936 async move {
1937 match result {
1938 Ok(chunk) => {
1939 if let Some(choice) = chunk.choices.first() {
1940 if choice.finish_reason.is_some() {
1941 let metadata = LLMResponseMetadata::from(&chunk);
1943 *metadata_collected.lock().await = Some(metadata);
1944 return None;
1945 }
1946 if let Some(ref content) = choice.delta.content
1947 && !content.is_empty()
1948 {
1949 let mut collected = collected.lock().await;
1950 collected.push_str(content);
1951 return Some(Ok(content.clone()));
1952 }
1953 }
1954 None
1955 }
1956 Err(e) => {
1957 if let Some(handler) = event_handler {
1958 let _ = handler.on_error(&e).await;
1959 }
1960 Some(Err(e))
1961 }
1962 }
1963 }
1964 });
1965
1966 let stream = stream
1967 .chain(futures::stream::once(async move {
1968 let full_response = collected_clone.lock().await.clone();
1969 let metadata = metadata_collected_clone.lock().await.clone();
1970 if !full_response.is_empty() {
1971 let mut session = session.write().await;
1972 session
1973 .messages_mut()
1974 .push(ChatMessage::assistant(&full_response));
1975
1976 let window_size = session.context_window_size();
1978 if window_size.is_some() {
1979 let current_messages = session.messages().to_vec();
1980 *session.messages_mut() = ChatSession::apply_sliding_window_static(
1981 ¤t_messages,
1982 window_size,
1983 );
1984 }
1985
1986 if let Some(handler) = event_handler_clone {
1987 if let Some(meta) = &metadata {
1988 let _ = handler.after_chat_with_metadata(&full_response, meta).await;
1989 } else {
1990 let _ = handler.after_chat(&full_response).await;
1991 }
1992 }
1993 }
1994 Err(LLMError::Other("__stream_end__".to_string()))
1995 }))
1996 .filter_map(|result| async move {
1997 match result {
1998 Ok(s) => Some(Ok(s)),
1999 Err(e) if e.to_string() == "__stream_end__" => None,
2000 Err(e) => Some(Err(e)),
2001 }
2002 });
2003
2004 Box::pin(stream)
2005 }
2006
2007 fn create_collecting_stream(
2009 chunk_stream: ChatStream,
2010 session: Arc<RwLock<ChatSession>>,
2011 tx: tokio::sync::oneshot::Sender<String>,
2012 event_handler: Option<Arc<Box<dyn LLMAgentEventHandler>>>,
2013 ) -> TextStream {
2014 use super::types::LLMResponseMetadata;
2015 use futures::StreamExt;
2016
2017 let collected = Arc::new(tokio::sync::Mutex::new(String::new()));
2018 let collected_clone = collected.clone();
2019 let event_handler_clone = event_handler.clone();
2020 let metadata_collected = Arc::new(tokio::sync::Mutex::new(None::<LLMResponseMetadata>));
2021 let metadata_collected_clone = metadata_collected.clone();
2022
2023 let stream = chunk_stream.filter_map(move |result| {
2024 let collected = collected.clone();
2025 let event_handler = event_handler.clone();
2026 let metadata_collected = metadata_collected.clone();
2027 async move {
2028 match result {
2029 Ok(chunk) => {
2030 if let Some(choice) = chunk.choices.first() {
2031 if choice.finish_reason.is_some() {
2032 let metadata = LLMResponseMetadata::from(&chunk);
2034 *metadata_collected.lock().await = Some(metadata);
2035 return None;
2036 }
2037 if let Some(ref content) = choice.delta.content
2038 && !content.is_empty()
2039 {
2040 let mut collected = collected.lock().await;
2041 collected.push_str(content);
2042 return Some(Ok(content.clone()));
2043 }
2044 }
2045 None
2046 }
2047 Err(e) => {
2048 if let Some(handler) = event_handler {
2049 let _ = handler.on_error(&e).await;
2050 }
2051 Some(Err(e))
2052 }
2053 }
2054 }
2055 });
2056
2057 let stream = stream
2059 .chain(futures::stream::once(async move {
2060 let full_response = collected_clone.lock().await.clone();
2061 let mut processed_response = full_response.clone();
2062 let metadata = metadata_collected_clone.lock().await.clone();
2063
2064 if !full_response.is_empty() {
2065 let mut session = session.write().await;
2066 session
2067 .messages_mut()
2068 .push(ChatMessage::assistant(&processed_response));
2069
2070 let window_size = session.context_window_size();
2072 if window_size.is_some() {
2073 let current_messages = session.messages().to_vec();
2074 *session.messages_mut() = ChatSession::apply_sliding_window_static(
2075 ¤t_messages,
2076 window_size,
2077 );
2078 }
2079
2080 if let Some(handler) = event_handler_clone {
2082 if let Some(meta) = &metadata {
2083 if let Ok(Some(resp)) = handler
2084 .after_chat_with_metadata(&processed_response, meta)
2085 .await
2086 {
2087 processed_response = resp;
2088 }
2089 } else if let Ok(Some(resp)) = handler.after_chat(&processed_response).await
2090 {
2091 processed_response = resp;
2092 }
2093 }
2094 }
2095
2096 let _ = tx.send(processed_response);
2097
2098 Err(LLMError::Other("__stream_end__".to_string()))
2099 }))
2100 .filter_map(|result| async move {
2101 match result {
2102 Ok(s) => Some(Ok(s)),
2103 Err(e) if e.to_string() == "__stream_end__" => None,
2104 Err(e) => Some(Err(e)),
2105 }
2106 });
2107
2108 Box::pin(stream)
2109 }
2110}
2111
2112pub struct LLMAgentBuilder {
2114 agent_id: String,
2115 name: Option<String>,
2116 provider: Option<Arc<dyn LLMProvider>>,
2117 system_prompt: Option<String>,
2118 temperature: Option<f32>,
2119 max_tokens: Option<u32>,
2120 tools: Vec<Tool>,
2121 tool_executor: Option<Arc<dyn ToolExecutor>>,
2122 event_handler: Option<Box<dyn LLMAgentEventHandler>>,
2123 plugins: Vec<Box<dyn AgentPlugin>>,
2124 custom_config: HashMap<String, String>,
2125 prompt_plugin: Option<Box<dyn prompt::PromptTemplatePlugin>>,
2126 session_id: Option<String>,
2127 user_id: Option<String>,
2128 tenant_id: Option<String>,
2129 context_window_size: Option<usize>,
2130 message_store: Option<Arc<dyn crate::persistence::MessageStore + Send + Sync>>,
2132 session_store: Option<Arc<dyn crate::persistence::SessionStore + Send + Sync>>,
2133 persistence_user_id: Option<uuid::Uuid>,
2134 persistence_tenant_id: Option<uuid::Uuid>,
2135 persistence_agent_id: Option<uuid::Uuid>,
2136}
2137
2138impl LLMAgentBuilder {
2139 pub fn new() -> Self {
2141 Self {
2142 agent_id: uuid::Uuid::now_v7().to_string(),
2143 name: None,
2144 provider: None,
2145 system_prompt: None,
2146 temperature: None,
2147 max_tokens: None,
2148 tools: Vec::new(),
2149 tool_executor: None,
2150 event_handler: None,
2151 plugins: Vec::new(),
2152 custom_config: HashMap::new(),
2153 prompt_plugin: None,
2154 session_id: None,
2155 user_id: None,
2156 tenant_id: None,
2157 context_window_size: None,
2158 message_store: None,
2159 session_store: None,
2160 persistence_user_id: None,
2161 persistence_tenant_id: None,
2162 persistence_agent_id: None,
2163 }
2164 }
2165
2166 pub fn with_id(mut self, id: impl Into<String>) -> Self {
2168 self.agent_id = id.into();
2169 self
2170 }
2171
2172 pub fn with_name(mut self, name: impl Into<String>) -> Self {
2174 self.name = Some(name.into());
2175 self
2176 }
2177
2178 pub fn with_provider(mut self, provider: Arc<dyn LLMProvider>) -> Self {
2180 self.provider = Some(provider);
2181 self
2182 }
2183
2184 pub fn with_system_prompt(mut self, prompt: impl Into<String>) -> Self {
2186 self.system_prompt = Some(prompt.into());
2187 self
2188 }
2189
2190 pub fn with_temperature(mut self, temperature: f32) -> Self {
2192 self.temperature = Some(temperature);
2193 self
2194 }
2195
2196 pub fn with_max_tokens(mut self, max_tokens: u32) -> Self {
2198 self.max_tokens = Some(max_tokens);
2199 self
2200 }
2201
2202 pub fn with_tool(mut self, tool: Tool) -> Self {
2204 self.tools.push(tool);
2205 self
2206 }
2207
2208 pub fn with_tools(mut self, tools: Vec<Tool>) -> Self {
2210 self.tools = tools;
2211 self
2212 }
2213
2214 pub fn with_tool_executor(mut self, executor: Arc<dyn ToolExecutor>) -> Self {
2216 self.tool_executor = Some(executor);
2217 self
2218 }
2219
2220 pub fn with_event_handler(mut self, handler: Box<dyn LLMAgentEventHandler>) -> Self {
2222 self.event_handler = Some(handler);
2223 self
2224 }
2225
2226 pub fn with_plugin(mut self, plugin: impl AgentPlugin + 'static) -> Self {
2228 self.plugins.push(Box::new(plugin));
2229 self
2230 }
2231
2232 pub fn with_plugins(mut self, plugins: Vec<Box<dyn AgentPlugin>>) -> Self {
2234 self.plugins.extend(plugins);
2235 self
2236 }
2237
2238 pub fn with_persistence_plugin(
2276 mut self,
2277 plugin: crate::persistence::PersistencePlugin,
2278 ) -> Self {
2279 self.message_store = Some(plugin.message_store());
2280 self.session_store = plugin.session_store();
2281 self.persistence_user_id = Some(plugin.user_id());
2282 self.persistence_tenant_id = Some(plugin.tenant_id());
2283 self.persistence_agent_id = Some(plugin.agent_id());
2284
2285 let plugin_box: Box<dyn AgentPlugin> = Box::new(plugin.clone());
2288 let event_handler: Box<dyn LLMAgentEventHandler> = Box::new(plugin);
2289 self.plugins.push(plugin_box);
2290 self.event_handler = Some(event_handler);
2291 self
2292 }
2293
2294 pub fn with_prompt_plugin(
2296 mut self,
2297 plugin: impl prompt::PromptTemplatePlugin + 'static,
2298 ) -> Self {
2299 self.prompt_plugin = Some(Box::new(plugin));
2300 self
2301 }
2302
2303 pub fn with_hot_reload_prompt_plugin(
2305 mut self,
2306 plugin: prompt::HotReloadableRhaiPromptPlugin,
2307 ) -> Self {
2308 self.prompt_plugin = Some(Box::new(plugin));
2309 self
2310 }
2311
2312 pub fn with_config(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
2314 self.custom_config.insert(key.into(), value.into());
2315 self
2316 }
2317
2318 pub fn with_session_id(mut self, session_id: impl Into<String>) -> Self {
2329 self.session_id = Some(session_id.into());
2330 self
2331 }
2332
2333 pub fn with_user(mut self, user_id: impl Into<String>) -> Self {
2346 self.user_id = Some(user_id.into());
2347 self
2348 }
2349
2350 pub fn with_tenant(mut self, tenant_id: impl Into<String>) -> Self {
2363 self.tenant_id = Some(tenant_id.into());
2364 self
2365 }
2366
2367 pub fn with_sliding_window(mut self, size: usize) -> Self {
2390 self.context_window_size = Some(size);
2391 self
2392 }
2393
2394 pub fn from_env() -> LLMResult<Self> {
2415 use super::openai::{OpenAIConfig, OpenAIProvider};
2416
2417 let api_key = std::env::var("OPENAI_API_KEY").map_err(|_| {
2418 LLMError::ConfigError("OPENAI_API_KEY environment variable not set".to_string())
2419 })?;
2420
2421 let mut config = OpenAIConfig::new(api_key);
2422
2423 if let Ok(base_url) = std::env::var("OPENAI_BASE_URL") {
2424 config = config.with_base_url(&base_url);
2425 }
2426
2427 if let Ok(model) = std::env::var("OPENAI_MODEL") {
2428 config = config.with_model(&model);
2429 }
2430
2431 Ok(Self::new()
2432 .with_provider(Arc::new(OpenAIProvider::with_config(config)))
2433 .with_temperature(0.7)
2434 .with_max_tokens(4096))
2435 }
2436
2437 pub fn build(self) -> LLMAgent {
2442 let provider = self
2443 .provider
2444 .expect("LLM provider must be set before building");
2445
2446 let config = LLMAgentConfig {
2447 agent_id: self.agent_id.clone(),
2448 name: self.name.unwrap_or_else(|| self.agent_id.clone()),
2449 system_prompt: self.system_prompt,
2450 temperature: self.temperature,
2451 max_tokens: self.max_tokens,
2452 custom_config: self.custom_config,
2453 user_id: self.user_id,
2454 tenant_id: self.tenant_id,
2455 context_window_size: self.context_window_size,
2456 };
2457
2458 let mut agent = LLMAgent::with_initial_session(config, provider, self.session_id);
2459
2460 agent.prompt_plugin = self.prompt_plugin;
2462
2463 if let Some(executor) = self.tool_executor {
2464 agent.set_tools(self.tools, executor);
2465 }
2466
2467 if let Some(handler) = self.event_handler {
2468 agent.set_event_handler(handler);
2469 }
2470
2471 let mut plugins = self.plugins;
2473 let mut tts_plugin = None;
2474
2475 for i in (0..plugins.len()).rev() {
2477 if plugins[i].as_any().is::<mofa_plugins::tts::TTSPlugin>() {
2478 let plugin = plugins.remove(i);
2481 if let Ok(tts) = plugin.into_any().downcast::<mofa_plugins::tts::TTSPlugin>() {
2483 tts_plugin = Some(Arc::new(Mutex::new(*tts)));
2484 }
2485 }
2486 }
2487
2488 agent.add_plugins(plugins);
2490
2491 agent.tts_plugin = tts_plugin;
2493
2494 agent
2495 }
2496
2497 pub fn try_build(self) -> LLMResult<LLMAgent> {
2501 let provider = self
2502 .provider
2503 .ok_or_else(|| LLMError::ConfigError("LLM provider not set".to_string()))?;
2504
2505 let config = LLMAgentConfig {
2506 agent_id: self.agent_id.clone(),
2507 name: self.name.unwrap_or_else(|| self.agent_id.clone()),
2508 system_prompt: self.system_prompt,
2509 temperature: self.temperature,
2510 max_tokens: self.max_tokens,
2511 custom_config: self.custom_config,
2512 user_id: self.user_id,
2513 tenant_id: self.tenant_id,
2514 context_window_size: self.context_window_size,
2515 };
2516
2517 let mut agent = LLMAgent::with_initial_session(config, provider, self.session_id);
2518
2519 if let Some(executor) = self.tool_executor {
2520 agent.set_tools(self.tools, executor);
2521 }
2522
2523 if let Some(handler) = self.event_handler {
2524 agent.set_event_handler(handler);
2525 }
2526
2527 let mut plugins = self.plugins;
2529 let mut tts_plugin = None;
2530
2531 for i in (0..plugins.len()).rev() {
2533 if plugins[i].as_any().is::<mofa_plugins::tts::TTSPlugin>() {
2534 let plugin = plugins.remove(i);
2537 if let Ok(tts) = plugin.into_any().downcast::<mofa_plugins::tts::TTSPlugin>() {
2539 tts_plugin = Some(Arc::new(Mutex::new(*tts)));
2540 }
2541 }
2542 }
2543
2544 agent.add_plugins(plugins);
2546
2547 agent.tts_plugin = tts_plugin;
2549
2550 Ok(agent)
2551 }
2552
2553 pub async fn build_async(mut self) -> LLMAgent {
2584 let provider = self
2585 .provider
2586 .expect("LLM provider must be set before building");
2587
2588 let tenant_id_for_persistence = self.tenant_id.clone();
2590
2591 let config = LLMAgentConfig {
2592 agent_id: self.agent_id.clone(),
2593 name: self.name.unwrap_or_else(|| self.agent_id.clone()),
2594 system_prompt: self.system_prompt,
2595 temperature: self.temperature,
2596 max_tokens: self.max_tokens,
2597 custom_config: self.custom_config,
2598 user_id: self.user_id,
2599 tenant_id: self.tenant_id,
2600 context_window_size: self.context_window_size,
2601 };
2602
2603 let persistence_tenant_id = if self.session_store.is_some()
2605 && self.persistence_tenant_id.is_none()
2606 && tenant_id_for_persistence.is_some()
2607 {
2608 uuid::Uuid::parse_str(&tenant_id_for_persistence.unwrap()).ok()
2609 } else {
2610 self.persistence_tenant_id
2611 };
2612
2613 let mut agent = LLMAgent::with_initial_session_async(
2615 config,
2616 provider,
2617 self.session_id,
2618 self.message_store,
2619 self.session_store,
2620 self.persistence_user_id,
2621 persistence_tenant_id,
2622 self.persistence_agent_id,
2623 )
2624 .await;
2625
2626 agent.prompt_plugin = self.prompt_plugin;
2628
2629 if self.tools.is_empty() {
2630 if let Some(executor) = self.tool_executor.as_ref() {
2631 if let Ok(tools) = executor.available_tools().await {
2632 self.tools = tools;
2633 }
2634 }
2635 }
2636
2637 if let Some(executor) = self.tool_executor {
2638 agent.set_tools(self.tools, executor);
2639 }
2640
2641 let mut plugins = self.plugins;
2645 let mut tts_plugin = None;
2646 let history_loaded_from_plugin = false;
2647
2648 for i in (0..plugins.len()).rev() {
2650 if plugins[i].as_any().is::<mofa_plugins::tts::TTSPlugin>() {
2651 let plugin = plugins.remove(i);
2654 if let Ok(tts) = plugin.into_any().downcast::<mofa_plugins::tts::TTSPlugin>() {
2656 tts_plugin = Some(Arc::new(Mutex::new(*tts)));
2657 }
2658 }
2659 }
2660
2661 if !history_loaded_from_plugin {
2663 for plugin in &plugins {
2664 if plugin.metadata().plugin_type == PluginType::Storage
2666 && plugin
2667 .metadata()
2668 .capabilities
2669 .contains(&"message_persistence".to_string())
2670 {
2671 tracing::info!("📦 检测到持久化插件,将在 agent 初始化后加载历史");
2675 break;
2676 }
2677 }
2678 }
2679
2680 agent.add_plugins(plugins);
2682
2683 agent.tts_plugin = tts_plugin;
2685
2686 if let Some(handler) = self.event_handler {
2688 agent.set_event_handler(handler);
2689 }
2690
2691 agent
2692 }
2693}
2694
2695impl LLMAgentBuilder {
2700 pub fn from_config_file(path: impl AsRef<std::path::Path>) -> LLMResult<Self> {
2711 let config = crate::config::AgentYamlConfig::from_file(path)
2712 .map_err(|e| LLMError::ConfigError(e.to_string()))?;
2713 Self::from_yaml_config(config)
2714 }
2715
2716 pub fn from_yaml_config(config: crate::config::AgentYamlConfig) -> LLMResult<Self> {
2718 let mut builder = Self::new()
2719 .with_id(&config.agent.id)
2720 .with_name(&config.agent.name);
2721 if let Some(llm_config) = config.llm {
2723 let provider = create_provider_from_config(&llm_config)?;
2724 builder = builder.with_provider(Arc::new(provider));
2725
2726 if let Some(temp) = llm_config.temperature {
2727 builder = builder.with_temperature(temp);
2728 }
2729 if let Some(tokens) = llm_config.max_tokens {
2730 builder = builder.with_max_tokens(tokens);
2731 }
2732 if let Some(prompt) = llm_config.system_prompt {
2733 builder = builder.with_system_prompt(prompt);
2734 }
2735 }
2736
2737 Ok(builder)
2738 }
2739
2740 #[cfg(feature = "persistence-postgres")]
2766 pub async fn from_database<S>(store: &S, agent_code: &str) -> LLMResult<Self>
2767 where
2768 S: crate::persistence::AgentStore + Send + Sync,
2769 {
2770 let config = store
2771 .get_agent_by_code_with_provider(agent_code)
2772 .await
2773 .map_err(|e| LLMError::Other(format!("Failed to load agent from database: {}", e)))?
2774 .ok_or_else(|| {
2775 LLMError::Other(format!(
2776 "Agent with code '{}' not found in database",
2777 agent_code
2778 ))
2779 })?;
2780
2781 Self::from_agent_config(&config)
2782 }
2783
2784 #[cfg(feature = "persistence-postgres")]
2809 pub async fn from_database_with_tenant<S>(
2810 store: &S,
2811 tenant_id: uuid::Uuid,
2812 agent_code: &str,
2813 ) -> LLMResult<Self>
2814 where
2815 S: crate::persistence::AgentStore + Send + Sync,
2816 {
2817 let config = store
2818 .get_agent_by_code_and_tenant_with_provider(tenant_id, agent_code)
2819 .await
2820 .map_err(|e| LLMError::Other(format!("Failed to load agent from database: {}", e)))?
2821 .ok_or_else(|| {
2822 LLMError::Other(format!(
2823 "Agent with code '{}' not found for tenant {}",
2824 agent_code, tenant_id
2825 ))
2826 })?;
2827
2828 Self::from_agent_config(&config)
2829 }
2830
2831 #[cfg(feature = "persistence-postgres")]
2845 pub async fn with_database_agent<S>(store: &S, agent_code: &str) -> LLMResult<Self>
2846 where
2847 S: crate::persistence::AgentStore + Send + Sync,
2848 {
2849 Self::from_database(store, agent_code).await
2850 }
2851
2852 #[cfg(feature = "persistence-postgres")]
2854 pub fn from_agent_config(config: &crate::persistence::AgentConfig) -> LLMResult<Self> {
2855 use super::openai::{OpenAIConfig, OpenAIProvider};
2856
2857 let agent = &config.agent;
2858 let provider = &config.provider;
2859
2860 if !agent.agent_status {
2862 return Err(LLMError::Other(format!(
2863 "Agent '{}' is disabled (agent_status = false)",
2864 agent.agent_code
2865 )));
2866 }
2867
2868 if !provider.enabled {
2870 return Err(LLMError::Other(format!(
2871 "Provider '{}' is disabled (enabled = false)",
2872 provider.provider_name
2873 )));
2874 }
2875
2876 let llm_provider: Arc<dyn super::LLMProvider> = match provider.provider_type.as_str() {
2878 "openai" | "azure" | "ollama" | "compatible" | "local" => {
2879 let mut openai_config = OpenAIConfig::new(provider.api_key.clone());
2880 openai_config = openai_config.with_base_url(&provider.api_base);
2881 openai_config = openai_config.with_model(&agent.model_name);
2882
2883 if let Some(temp) = agent.temperature {
2884 openai_config = openai_config.with_temperature(temp);
2885 }
2886
2887 if let Some(max_tokens) = agent.max_completion_tokens {
2888 openai_config = openai_config.with_max_tokens(max_tokens as u32);
2889 }
2890
2891 Arc::new(OpenAIProvider::with_config(openai_config))
2892 }
2893 "anthropic" => {
2894 let mut cfg = AnthropicConfig::new(provider.api_key.clone());
2895 cfg = cfg.with_base_url(&provider.api_base);
2896 cfg = cfg.with_model(&agent.model_name);
2897
2898 if let Some(temp) = agent.temperature {
2899 cfg = cfg.with_temperature(temp);
2900 }
2901 if let Some(tokens) = agent.max_completion_tokens {
2902 cfg = cfg.with_max_tokens(tokens as u32);
2903 }
2904
2905 Arc::new(AnthropicProvider::with_config(cfg))
2906 }
2907 "gemini" => {
2908 let mut cfg = GeminiConfig::new(provider.api_key.clone());
2909 cfg = cfg.with_base_url(&provider.api_base);
2910 cfg = cfg.with_model(&agent.model_name);
2911
2912 if let Some(temp) = agent.temperature {
2913 cfg = cfg.with_temperature(temp);
2914 }
2915 if let Some(tokens) = agent.max_completion_tokens {
2916 cfg = cfg.with_max_tokens(tokens as u32);
2917 }
2918
2919 Arc::new(GeminiProvider::with_config(cfg))
2920 }
2921 other => {
2922 return Err(LLMError::Other(format!(
2923 "Unsupported provider type: {}",
2924 other
2925 )));
2926 }
2927 };
2928
2929 let mut builder = Self::new()
2931 .with_id(agent.id.clone())
2932 .with_name(agent.agent_name.clone())
2933 .with_provider(llm_provider)
2934 .with_system_prompt(agent.system_prompt.clone())
2935 .with_tenant(agent.tenant_id.to_string());
2936
2937 if let Some(temp) = agent.temperature {
2939 builder = builder.with_temperature(temp);
2940 }
2941 if let Some(tokens) = agent.max_completion_tokens {
2942 builder = builder.with_max_tokens(tokens as u32);
2943 }
2944 if let Some(limit) = agent.context_limit {
2945 builder = builder.with_sliding_window(limit as usize);
2946 }
2947
2948 if let Some(ref params) = agent.custom_params {
2950 if let Some(obj) = params.as_object() {
2951 for (key, value) in obj.iter() {
2952 let value_str: String = match value {
2953 serde_json::Value::String(s) => s.clone(),
2954 serde_json::Value::Bool(b) => b.to_string(),
2955 serde_json::Value::Number(n) => n.to_string(),
2956 _ => value.to_string(),
2957 };
2958 builder = builder.with_config(key.as_str(), value_str);
2959 }
2960 }
2961 }
2962
2963 if let Some(ref format) = agent.response_format {
2965 builder = builder.with_config("response_format", format);
2966 }
2967
2968 if let Some(stream) = agent.stream {
2970 builder = builder.with_config("stream", if stream { "true" } else { "false" });
2971 }
2972
2973 Ok(builder)
2974 }
2975}
2976
2977fn create_provider_from_config(
2980 config: &crate::config::LLMYamlConfig,
2981) -> LLMResult<super::openai::OpenAIProvider> {
2982 use super::openai::{OpenAIConfig, OpenAIProvider};
2983
2984 match config.provider.as_str() {
2985 "openai" => {
2986 let api_key = config
2987 .api_key
2988 .clone()
2989 .or_else(|| std::env::var("OPENAI_API_KEY").ok())
2990 .ok_or_else(|| LLMError::ConfigError("OpenAI API key not set".to_string()))?;
2991
2992 let mut openai_config = OpenAIConfig::new(api_key);
2993
2994 if let Some(ref model) = config.model {
2995 openai_config = openai_config.with_model(model);
2996 }
2997 if let Some(ref base_url) = config.base_url {
2998 openai_config = openai_config.with_base_url(base_url);
2999 }
3000 if let Some(temp) = config.temperature {
3001 openai_config = openai_config.with_temperature(temp);
3002 }
3003 if let Some(tokens) = config.max_tokens {
3004 openai_config = openai_config.with_max_tokens(tokens);
3005 }
3006
3007 Ok(OpenAIProvider::with_config(openai_config))
3008 }
3009 "ollama" => {
3010 let model = config.model.clone().unwrap_or_else(|| "llama2".to_string());
3011 Ok(OpenAIProvider::ollama(model))
3012 }
3013 "azure" => {
3014 let endpoint = config.base_url.clone().ok_or_else(|| {
3015 LLMError::ConfigError("Azure endpoint (base_url) not set".to_string())
3016 })?;
3017 let api_key = config
3018 .api_key
3019 .clone()
3020 .or_else(|| std::env::var("AZURE_OPENAI_API_KEY").ok())
3021 .ok_or_else(|| LLMError::ConfigError("Azure API key not set".to_string()))?;
3022 let deployment = config
3023 .deployment
3024 .clone()
3025 .or_else(|| config.model.clone())
3026 .ok_or_else(|| {
3027 LLMError::ConfigError("Azure deployment name not set".to_string())
3028 })?;
3029
3030 Ok(OpenAIProvider::azure(endpoint, api_key, deployment))
3031 }
3032 "compatible" | "local" => {
3033 let base_url = config.base_url.clone().ok_or_else(|| {
3034 LLMError::ConfigError("base_url not set for compatible provider".to_string())
3035 })?;
3036 let model = config
3037 .model
3038 .clone()
3039 .unwrap_or_else(|| "default".to_string());
3040
3041 Ok(OpenAIProvider::local(base_url, model))
3042 }
3043 other => Err(LLMError::ConfigError(format!(
3044 "Unknown provider: {}",
3045 other
3046 ))),
3047 }
3048}
3049
3050#[async_trait::async_trait]
3055impl mofa_kernel::agent::MoFAAgent for LLMAgent {
3056 fn id(&self) -> &str {
3057 &self.metadata.id
3058 }
3059
3060 fn name(&self) -> &str {
3061 &self.metadata.name
3062 }
3063
3064 fn capabilities(&self) -> &mofa_kernel::agent::AgentCapabilities {
3065 use mofa_kernel::agent::AgentCapabilities;
3070
3071 static CAPABILITIES: std::sync::OnceLock<AgentCapabilities> = std::sync::OnceLock::new();
3075
3076 CAPABILITIES.get_or_init(|| {
3077 AgentCapabilities::builder()
3078 .tag("llm")
3079 .tag("chat")
3080 .tag("text-generation")
3081 .input_type(mofa_kernel::agent::InputType::Text)
3082 .output_type(mofa_kernel::agent::OutputType::Text)
3083 .supports_streaming(true)
3084 .supports_tools(true)
3085 .build()
3086 })
3087 }
3088
3089 async fn initialize(
3090 &mut self,
3091 ctx: &mofa_kernel::agent::AgentContext,
3092 ) -> mofa_kernel::agent::AgentResult<()> {
3093 let mut plugin_config = mofa_kernel::plugin::PluginConfig::new();
3095 for (k, v) in &self.config.custom_config {
3096 plugin_config.set(k, v);
3097 }
3098 if let Some(user_id) = &self.config.user_id {
3099 plugin_config.set("user_id", user_id);
3100 }
3101 if let Some(tenant_id) = &self.config.tenant_id {
3102 plugin_config.set("tenant_id", tenant_id);
3103 }
3104 let session_id = self.active_session_id.read().await.clone();
3105 plugin_config.set("session_id", session_id);
3106
3107 let plugin_ctx =
3108 mofa_kernel::plugin::PluginContext::new(self.id()).with_config(plugin_config);
3109
3110 for plugin in &mut self.plugins {
3111 plugin
3112 .load(&plugin_ctx)
3113 .await
3114 .map_err(|e| mofa_kernel::agent::AgentError::InitializationFailed(e.to_string()))?;
3115 plugin
3116 .init_plugin()
3117 .await
3118 .map_err(|e| mofa_kernel::agent::AgentError::InitializationFailed(e.to_string()))?;
3119 }
3120 self.state = mofa_kernel::agent::AgentState::Ready;
3121
3122 let _ = ctx;
3124
3125 Ok(())
3126 }
3127
3128 async fn execute(
3129 &mut self,
3130 input: mofa_kernel::agent::AgentInput,
3131 _ctx: &mofa_kernel::agent::AgentContext,
3132 ) -> mofa_kernel::agent::AgentResult<mofa_kernel::agent::AgentOutput> {
3133 use mofa_kernel::agent::{AgentError, AgentInput, AgentOutput};
3134
3135 let message = match input {
3137 AgentInput::Text(text) => text,
3138 AgentInput::Json(json) => json.to_string(),
3139 _ => {
3140 return Err(AgentError::ValidationFailed(
3141 "Unsupported input type for LLMAgent".to_string(),
3142 ));
3143 }
3144 };
3145
3146 let response = self
3148 .chat(&message)
3149 .await
3150 .map_err(|e| AgentError::ExecutionFailed(format!("LLM chat failed: {}", e)))?;
3151
3152 Ok(AgentOutput::text(response))
3154 }
3155
3156 async fn shutdown(&mut self) -> mofa_kernel::agent::AgentResult<()> {
3157 for plugin in &mut self.plugins {
3159 plugin
3160 .unload()
3161 .await
3162 .map_err(|e| mofa_kernel::agent::AgentError::ShutdownFailed(e.to_string()))?;
3163 }
3164 self.state = mofa_kernel::agent::AgentState::Shutdown;
3165 Ok(())
3166 }
3167
3168 fn state(&self) -> mofa_kernel::agent::AgentState {
3169 self.state.clone()
3170 }
3171}
3172
3173pub fn simple_llm_agent(
3192 agent_id: impl Into<String>,
3193 provider: Arc<dyn LLMProvider>,
3194 system_prompt: impl Into<String>,
3195) -> LLMAgent {
3196 LLMAgentBuilder::new()
3197 .with_id(agent_id)
3198 .with_provider(provider)
3199 .with_system_prompt(system_prompt)
3200 .build()
3201}
3202
3203pub fn agent_from_config(path: impl AsRef<std::path::Path>) -> LLMResult<LLMAgent> {
3214 LLMAgentBuilder::from_config_file(path)?.try_build()
3215}