1use std::io;
7use std::sync::Arc;
8
9use tokio::runtime::Runtime;
10use tokio::sync::mpsc;
11use tokio_util::sync::CancellationToken;
12
13use crate::controller::{
14 ControllerEvent, ControllerInputPayload, Executable, LLMController, LLMSessionConfig,
15 LLMTool, ListSkillsTool, PermissionRegistry, ToolRegistry, UserInteractionRegistry,
16};
17use crate::skills::{SkillDiscovery, SkillDiscoveryError, SkillRegistry, SkillReloadResult};
18
19use super::config::{load_config, AgentConfig, LLMRegistry};
20use super::error::AgentError;
21use super::logger::Logger;
22use super::messages::channels::DEFAULT_CHANNEL_SIZE;
23use super::messages::UiMessage;
24use super::router::InputRouter;
25
26pub type ToControllerTx = mpsc::Sender<ControllerInputPayload>;
28pub type ToControllerRx = mpsc::Receiver<ControllerInputPayload>;
30pub type FromControllerTx = mpsc::Sender<UiMessage>;
32pub type FromControllerRx = mpsc::Receiver<UiMessage>;
34
35pub struct AgentCore {
78 #[allow(dead_code)]
81 logger: Logger,
82
83 name: String,
85
86 version: String,
88
89 runtime: Runtime,
91
92 controller: Arc<LLMController>,
94
95 llm_registry: Option<LLMRegistry>,
97
98 to_controller_tx: ToControllerTx,
100
101 to_controller_rx: Option<ToControllerRx>,
103
104 from_controller_tx: FromControllerTx,
106
107 from_controller_rx: Option<FromControllerRx>,
109
110 cancel_token: CancellationToken,
112
113 user_interaction_registry: Arc<UserInteractionRegistry>,
115
116 permission_registry: Arc<PermissionRegistry>,
118
119 tool_definitions: Vec<LLMTool>,
121
122 error_no_session: Option<String>,
124
125 skill_registry: Arc<SkillRegistry>,
127
128 skill_discovery: SkillDiscovery,
130}
131
132impl AgentCore {
133 pub fn new<C: AgentConfig>(config: &C) -> io::Result<Self> {
143 let logger = Logger::new(config.log_prefix())?;
144 tracing::info!("{} agent initialized", config.name());
145
146 let llm_registry = load_config(config);
148 if llm_registry.is_empty() {
149 tracing::warn!(
150 "No LLM providers configured. Set ANTHROPIC_API_KEY or create ~/{}",
151 config.config_path()
152 );
153 } else {
154 tracing::info!(
155 "Loaded {} LLM provider(s): {:?}",
156 llm_registry.providers().len(),
157 llm_registry.providers()
158 );
159 }
160
161 let runtime = Runtime::new().map_err(|e| {
163 io::Error::new(
164 io::ErrorKind::Other,
165 format!("Failed to create runtime: {}", e),
166 )
167 })?;
168
169 let channel_size = config.channel_buffer_size().unwrap_or(DEFAULT_CHANNEL_SIZE);
171 tracing::debug!("Using channel buffer size: {}", channel_size);
172
173 let (to_controller_tx, to_controller_rx) =
175 mpsc::channel::<ControllerInputPayload>(channel_size);
176 let (from_controller_tx, from_controller_rx) =
177 mpsc::channel::<UiMessage>(channel_size);
178
179 let (interaction_event_tx, mut interaction_event_rx) =
181 mpsc::channel::<ControllerEvent>(channel_size);
182
183 let user_interaction_registry =
185 Arc::new(UserInteractionRegistry::new(interaction_event_tx));
186
187 let ui_tx_for_interactions = from_controller_tx.clone();
190 runtime.spawn(async move {
191 while let Some(event) = interaction_event_rx.recv().await {
192 let msg = convert_controller_event_to_ui_message(event);
193 if let Err(e) = ui_tx_for_interactions.send(msg).await {
194 tracing::warn!("Failed to send user interaction event to UI: {}", e);
195 }
196 }
197 });
198
199 let (permission_event_tx, mut permission_event_rx) =
201 mpsc::channel::<ControllerEvent>(channel_size);
202
203 let permission_registry = Arc::new(PermissionRegistry::new(permission_event_tx));
205
206 let ui_tx_for_permissions = from_controller_tx.clone();
209 runtime.spawn(async move {
210 while let Some(event) = permission_event_rx.recv().await {
211 let msg = convert_controller_event_to_ui_message(event);
212 if let Err(e) = ui_tx_for_permissions.send(msg).await {
213 tracing::warn!("Failed to send permission event to UI: {}", e);
214 }
215 }
216 });
217
218 let controller = Arc::new(LLMController::new(
223 permission_registry.clone(),
224 Some(from_controller_tx.clone()),
225 Some(channel_size),
226 ));
227 let cancel_token = CancellationToken::new();
228
229 Ok(Self {
230 logger,
231 name: config.name().to_string(),
232 version: "0.1.0".to_string(),
233 runtime,
234 controller,
235 llm_registry: Some(llm_registry),
236 to_controller_tx,
237 to_controller_rx: Some(to_controller_rx),
238 from_controller_tx,
239 from_controller_rx: Some(from_controller_rx),
240 cancel_token,
241 user_interaction_registry,
242 permission_registry,
243 tool_definitions: Vec::new(),
244 error_no_session: None,
245 skill_registry: Arc::new(SkillRegistry::new()),
246 skill_discovery: SkillDiscovery::new(),
247 })
248 }
249
250 pub fn set_error_no_session(&mut self, message: impl Into<String>) -> &mut Self {
260 self.error_no_session = Some(message.into());
261 self
262 }
263
264 pub fn error_no_session(&self) -> Option<&str> {
266 self.error_no_session.as_deref()
267 }
268
269 pub fn set_version(&mut self, version: impl Into<String>) {
271 self.version = version.into();
272 }
273
274 pub fn version(&self) -> &str {
276 &self.version
277 }
278
279 pub fn load_environment_context(&mut self) -> &mut Self {
297 if let Some(registry) = self.llm_registry.take() {
298 self.llm_registry = Some(registry.with_environment_context());
299 tracing::info!("Environment context loaded into system prompt");
300 }
301 self
302 }
303
304 pub fn register_tools<F>(&mut self, f: F) -> Result<(), AgentError>
317 where
318 F: FnOnce(
319 &Arc<ToolRegistry>,
320 &Arc<UserInteractionRegistry>,
321 &Arc<PermissionRegistry>,
322 ) -> Result<Vec<LLMTool>, String>,
323 {
324 let tool_defs = f(
325 self.controller.tool_registry(),
326 &self.user_interaction_registry,
327 &self.permission_registry,
328 )
329 .map_err(AgentError::ToolRegistration)?;
330 self.tool_definitions = tool_defs;
331 Ok(())
332 }
333
334 pub fn register_tools_async<F, Fut>(&mut self, f: F) -> Result<(), AgentError>
347 where
348 F: FnOnce(Arc<ToolRegistry>, Arc<UserInteractionRegistry>, Arc<PermissionRegistry>) -> Fut,
349 Fut: std::future::Future<Output = Result<Vec<LLMTool>, String>>,
350 {
351 let tool_defs = self.runtime.block_on(f(
352 self.controller.tool_registry().clone(),
353 self.user_interaction_registry.clone(),
354 self.permission_registry.clone(),
355 ))
356 .map_err(AgentError::ToolRegistration)?;
357 self.tool_definitions = tool_defs;
358 Ok(())
359 }
360
361 pub fn start_background_tasks(&mut self) {
366 tracing::info!("{} starting background tasks", self.name);
367
368 let controller = self.controller.clone();
370 self.runtime.spawn(async move {
371 controller.start().await;
372 });
373 tracing::info!("Controller started");
374
375 if let Some(to_controller_rx) = self.to_controller_rx.take() {
377 let router = InputRouter::new(
378 self.controller.clone(),
379 to_controller_rx,
380 self.cancel_token.clone(),
381 );
382 self.runtime.spawn(async move {
383 router.run().await;
384 });
385 tracing::info!("InputRouter started");
386 }
387 }
388
389 async fn create_session_internal(
391 controller: &Arc<LLMController>,
392 mut config: LLMSessionConfig,
393 tools: &[LLMTool],
394 skill_registry: &Arc<SkillRegistry>,
395 ) -> Result<i64, crate::client::error::LlmError> {
396 let skills_xml = skill_registry.to_prompt_xml();
398 if !skills_xml.is_empty() {
399 config.system_prompt = Some(match config.system_prompt {
400 Some(prompt) => format!("{}\n\n{}", prompt, skills_xml),
401 None => skills_xml,
402 });
403 }
404
405 let id = controller.create_session(config).await?;
406
407 if !tools.is_empty() {
409 if let Some(session) = controller.get_session(id).await {
410 session.set_tools(tools.to_vec()).await;
411 }
412 }
413
414 Ok(id)
415 }
416
417 pub fn create_initial_session(&mut self) -> Result<(i64, String, i32), AgentError> {
421 let registry = self.llm_registry.as_ref().ok_or_else(|| {
422 AgentError::NoConfiguration("No LLM registry available".to_string())
423 })?;
424
425 let config = registry.get_default().ok_or_else(|| {
426 AgentError::NoConfiguration("No default LLM provider configured".to_string())
427 })?;
428
429 let model = config.model.clone();
430 let context_limit = config.context_limit;
431
432 let controller = self.controller.clone();
433 let tool_definitions = self.tool_definitions.clone();
434 let skill_registry = self.skill_registry.clone();
435
436 let session_id = self.runtime.block_on(Self::create_session_internal(
437 &controller,
438 config.clone(),
439 &tool_definitions,
440 &skill_registry,
441 ))?;
442
443 tracing::info!(
444 session_id = session_id,
445 model = %model,
446 "Created initial session"
447 );
448
449 Ok((session_id, model, context_limit))
450 }
451
452 pub fn create_session(&self, config: LLMSessionConfig) -> Result<i64, AgentError> {
456 let controller = self.controller.clone();
457 let tool_definitions = self.tool_definitions.clone();
458 let skill_registry = self.skill_registry.clone();
459
460 self.runtime
461 .block_on(Self::create_session_internal(
462 &controller,
463 config,
464 &tool_definitions,
465 &skill_registry,
466 ))
467 .map_err(AgentError::from)
468 }
469
470 pub fn shutdown(&self) {
472 tracing::info!("{} shutting down", self.name);
473 self.cancel_token.cancel();
474
475 let controller = self.controller.clone();
476 self.runtime.block_on(async move {
477 controller.shutdown().await;
478 });
479
480 tracing::info!("{} shutdown complete", self.name);
481 }
482
483 pub fn to_controller_tx(&self) -> ToControllerTx {
487 self.to_controller_tx.clone()
488 }
489
490 pub fn take_from_controller_rx(&mut self) -> Option<FromControllerRx> {
492 self.from_controller_rx.take()
493 }
494
495 pub fn controller(&self) -> &Arc<LLMController> {
497 &self.controller
498 }
499
500 pub fn runtime(&self) -> &Runtime {
502 &self.runtime
503 }
504
505 pub fn runtime_handle(&self) -> tokio::runtime::Handle {
507 self.runtime.handle().clone()
508 }
509
510 pub fn user_interaction_registry(&self) -> &Arc<UserInteractionRegistry> {
512 &self.user_interaction_registry
513 }
514
515 pub fn permission_registry(&self) -> &Arc<PermissionRegistry> {
517 &self.permission_registry
518 }
519
520 pub async fn remove_session(&self, session_id: i64) -> bool {
534 let removed = self.controller.remove_session(session_id).await;
536
537 self.permission_registry.cancel_session(session_id).await;
539
540 self.user_interaction_registry.cancel_session(session_id).await;
542
543 self.controller.tool_registry().cleanup_session(session_id).await;
545
546 if removed {
547 tracing::info!(session_id, "Session removed with full cleanup");
548 }
549
550 removed
551 }
552
553 pub fn llm_registry(&self) -> Option<&LLMRegistry> {
555 self.llm_registry.as_ref()
556 }
557
558 pub fn take_llm_registry(&mut self) -> Option<LLMRegistry> {
560 self.llm_registry.take()
561 }
562
563 pub fn cancel_token(&self) -> CancellationToken {
565 self.cancel_token.clone()
566 }
567
568 pub fn name(&self) -> &str {
570 &self.name
571 }
572
573 pub fn from_controller_tx(&self) -> FromControllerTx {
577 self.from_controller_tx.clone()
578 }
579
580 pub fn tool_definitions(&self) -> &[LLMTool] {
582 &self.tool_definitions
583 }
584
585 pub fn skill_registry(&self) -> &Arc<SkillRegistry> {
589 &self.skill_registry
590 }
591
592 pub fn register_list_skills_tool(&mut self) -> Result<LLMTool, AgentError> {
600 let tool = ListSkillsTool::new(self.skill_registry.clone());
601 let llm_tool = tool.to_llm_tool();
602
603 self.runtime.block_on(async {
604 self.controller
605 .tool_registry()
606 .register(Arc::new(tool))
607 .await
608 }).map_err(|e| AgentError::ToolRegistration(e.to_string()))?;
609
610 self.tool_definitions.push(llm_tool.clone());
611 tracing::info!("Registered list_skills tool");
612
613 Ok(llm_tool)
614 }
615
616 pub fn add_skill_path(&mut self, path: std::path::PathBuf) -> &mut Self {
621 self.skill_discovery.add_path(path);
622 self
623 }
624
625 pub fn load_skills(&mut self) -> (usize, Vec<SkillDiscoveryError>) {
632 let results = self.skill_discovery.discover();
633 self.register_discovered_skills(results)
634 }
635
636 pub fn load_skills_from(&self, paths: Vec<std::path::PathBuf>) -> (usize, Vec<SkillDiscoveryError>) {
645 let mut discovery = SkillDiscovery::empty();
646 for path in paths {
647 discovery.add_path(path);
648 }
649
650 let results = discovery.discover();
651 self.register_discovered_skills(results)
652 }
653
654 fn register_discovered_skills(
658 &self,
659 results: Vec<Result<crate::skills::Skill, SkillDiscoveryError>>,
660 ) -> (usize, Vec<SkillDiscoveryError>) {
661 let mut errors = Vec::new();
662 let mut count = 0;
663
664 for result in results {
665 match result {
666 Ok(skill) => {
667 let skill_name = skill.metadata.name.clone();
668 let skill_path = skill.path.clone();
669 let replaced = self.skill_registry.register(skill);
670
671 if let Some(old_skill) = replaced {
672 tracing::warn!(
673 skill_name = %skill_name,
674 new_path = %skill_path.display(),
675 old_path = %old_skill.path.display(),
676 "Duplicate skill name detected - replaced existing skill"
677 );
678 }
679
680 tracing::info!(
681 skill_name = %skill_name,
682 skill_path = %skill_path.display(),
683 "Loaded skill"
684 );
685 count += 1;
686 }
687 Err(e) => {
688 tracing::warn!(
689 path = %e.path.display(),
690 error = %e.message,
691 "Failed to load skill"
692 );
693 errors.push(e);
694 }
695 }
696 }
697
698 tracing::info!("Loaded {} skill(s)", count);
699 (count, errors)
700 }
701
702 pub fn reload_skills(&mut self) -> SkillReloadResult {
711 let current_names: std::collections::HashSet<String> =
712 self.skill_registry.names().into_iter().collect();
713
714 let results = self.skill_discovery.discover();
715 let mut discovered_names = std::collections::HashSet::new();
716 let mut result = SkillReloadResult::default();
717
718 for discovery_result in results {
720 match discovery_result {
721 Ok(skill) => {
722 let name = skill.metadata.name.clone();
723 discovered_names.insert(name.clone());
724
725 if !current_names.contains(&name) {
726 tracing::info!(skill_name = %name, "Added new skill");
727 result.added.push(name);
728 }
729 self.skill_registry.register(skill);
730 }
731 Err(e) => {
732 tracing::warn!(
733 path = %e.path.display(),
734 error = %e.message,
735 "Failed to load skill during reload"
736 );
737 result.errors.push(e);
738 }
739 }
740 }
741
742 for name in ¤t_names {
744 if !discovered_names.contains(name) {
745 tracing::info!(skill_name = %name, "Removed skill");
746 self.skill_registry.unregister(name);
747 result.removed.push(name.clone());
748 }
749 }
750
751 tracing::info!(
752 added = result.added.len(),
753 removed = result.removed.len(),
754 errors = result.errors.len(),
755 "Skills reloaded"
756 );
757
758 result
759 }
760
761 pub fn skills_prompt_xml(&self) -> String {
766 self.skill_registry.to_prompt_xml()
767 }
768
769 pub async fn refresh_session_skills(&self, session_id: i64) -> Result<(), AgentError> {
777 let skills_xml = self.skills_prompt_xml();
778 if skills_xml.is_empty() {
779 return Ok(());
780 }
781
782 let session = self
783 .controller
784 .get_session(session_id)
785 .await
786 .ok_or_else(|| AgentError::SessionNotFound(session_id))?;
787
788 let current_prompt = session.system_prompt().await.unwrap_or_default();
789
790 let new_prompt = if current_prompt.contains("<available_skills>") {
792 replace_skills_section(¤t_prompt, &skills_xml)
794 } else if current_prompt.is_empty() {
795 skills_xml
797 } else {
798 format!("{}\n\n{}", current_prompt, skills_xml)
800 };
801
802 session.set_system_prompt(new_prompt).await;
803 tracing::debug!(session_id, "Refreshed session skills");
804 Ok(())
805 }
806}
807
808fn replace_skills_section(prompt: &str, new_skills_xml: &str) -> String {
810 if let Some(start) = prompt.find("<available_skills>") {
811 if let Some(end) = prompt.find("</available_skills>") {
812 let end = end + "</available_skills>".len();
813 let mut result = String::with_capacity(prompt.len());
814 result.push_str(&prompt[..start]);
815 result.push_str(new_skills_xml);
816 result.push_str(&prompt[end..]);
817 return result;
818 }
819 }
820 format!("{}\n\n{}", prompt, new_skills_xml)
822}
823
824pub fn convert_controller_event_to_ui_message(event: ControllerEvent) -> UiMessage {
841 match event {
842 ControllerEvent::StreamStart { session_id, .. } => {
843 UiMessage::System {
845 session_id,
846 message: String::new(),
847 }
848 }
849 ControllerEvent::TextChunk {
850 session_id,
851 text,
852 turn_id,
853 } => UiMessage::TextChunk {
854 session_id,
855 turn_id,
856 text,
857 input_tokens: 0,
858 output_tokens: 0,
859 },
860 ControllerEvent::ToolUseStart {
861 session_id,
862 tool_name,
863 turn_id,
864 ..
865 } => UiMessage::Display {
866 session_id,
867 turn_id,
868 message: format!("Executing tool: {}", tool_name),
869 },
870 ControllerEvent::ToolUse {
871 session_id,
872 tool,
873 display_name,
874 display_title,
875 turn_id,
876 } => UiMessage::ToolExecuting {
877 session_id,
878 turn_id,
879 tool_use_id: tool.id.clone(),
880 display_name: display_name.unwrap_or_else(|| tool.name.clone()),
881 display_title: display_title.unwrap_or_default(),
882 },
883 ControllerEvent::Complete {
884 session_id,
885 turn_id,
886 stop_reason,
887 } => UiMessage::Complete {
888 session_id,
889 turn_id,
890 input_tokens: 0,
891 output_tokens: 0,
892 stop_reason,
893 },
894 ControllerEvent::Error {
895 session_id,
896 error,
897 turn_id,
898 } => UiMessage::Error {
899 session_id,
900 turn_id,
901 error,
902 },
903 ControllerEvent::TokenUpdate {
904 session_id,
905 input_tokens,
906 output_tokens,
907 context_limit,
908 } => UiMessage::TokenUpdate {
909 session_id,
910 turn_id: None,
911 input_tokens,
912 output_tokens,
913 context_limit,
914 },
915 ControllerEvent::ToolResult {
916 session_id,
917 tool_use_id,
918 status,
919 error,
920 turn_id,
921 ..
922 } => UiMessage::ToolCompleted {
923 session_id,
924 turn_id,
925 tool_use_id,
926 status,
927 error,
928 },
929 ControllerEvent::CommandComplete {
930 session_id,
931 command,
932 success,
933 message,
934 } => UiMessage::CommandComplete {
935 session_id,
936 command,
937 success,
938 message,
939 },
940 ControllerEvent::UserInteractionRequired {
941 session_id,
942 tool_use_id,
943 request,
944 turn_id,
945 } => UiMessage::UserInteractionRequired {
946 session_id,
947 tool_use_id,
948 request,
949 turn_id,
950 },
951 ControllerEvent::PermissionRequired {
952 session_id,
953 tool_use_id,
954 request,
955 turn_id,
956 } => UiMessage::PermissionRequired {
957 session_id,
958 tool_use_id,
959 request,
960 turn_id,
961 },
962 ControllerEvent::BatchPermissionRequired {
963 session_id,
964 batch,
965 turn_id,
966 } => UiMessage::BatchPermissionRequired {
967 session_id,
968 batch,
969 turn_id,
970 },
971 }
972}
973
974#[cfg(test)]
975mod tests {
976 use super::*;
977 use crate::controller::TurnId;
978
979 #[test]
980 fn test_convert_text_chunk_event() {
981 let event = ControllerEvent::TextChunk {
982 session_id: 1,
983 text: "Hello".to_string(),
984 turn_id: Some(TurnId::new_user_turn(1)),
985 };
986
987 let msg = convert_controller_event_to_ui_message(event);
988
989 match msg {
990 UiMessage::TextChunk {
991 session_id, text, ..
992 } => {
993 assert_eq!(session_id, 1);
994 assert_eq!(text, "Hello");
995 }
996 _ => panic!("Expected TextChunk message"),
997 }
998 }
999
1000 #[test]
1001 fn test_convert_error_event() {
1002 let event = ControllerEvent::Error {
1003 session_id: 1,
1004 error: "Test error".to_string(),
1005 turn_id: None,
1006 };
1007
1008 let msg = convert_controller_event_to_ui_message(event);
1009
1010 match msg {
1011 UiMessage::Error {
1012 session_id, error, ..
1013 } => {
1014 assert_eq!(session_id, 1);
1015 assert_eq!(error, "Test error");
1016 }
1017 _ => panic!("Expected Error message"),
1018 }
1019 }
1020
1021 #[test]
1022 fn test_replace_skills_section_replaces_existing() {
1023 let prompt = "System prompt.\n\n<available_skills>\n <skill>old</skill>\n</available_skills>\n\nMore text.";
1024 let new_xml = "<available_skills>\n <skill>new</skill>\n</available_skills>";
1025
1026 let result = replace_skills_section(prompt, new_xml);
1027
1028 assert!(result.contains("<skill>new</skill>"));
1029 assert!(!result.contains("<skill>old</skill>"));
1030 assert!(result.contains("System prompt."));
1031 assert!(result.contains("More text."));
1032 }
1033
1034 #[test]
1035 fn test_replace_skills_section_no_existing() {
1036 let prompt = "System prompt without skills.";
1037 let new_xml = "<available_skills>\n <skill>new</skill>\n</available_skills>";
1038
1039 let result = replace_skills_section(prompt, new_xml);
1040
1041 assert!(result.contains("System prompt without skills."));
1043 assert!(result.contains("<skill>new</skill>"));
1044 }
1045
1046 #[test]
1047 fn test_replace_skills_section_malformed_no_closing_tag() {
1048 let prompt = "System prompt.\n\n<available_skills>\n <skill>old</skill>\n\nNo closing tag.";
1049 let new_xml = "<available_skills>\n <skill>new</skill>\n</available_skills>";
1050
1051 let result = replace_skills_section(prompt, new_xml);
1052
1053 assert!(result.contains("<skill>old</skill>"));
1055 assert!(result.contains("<skill>new</skill>"));
1056 }
1057
1058 #[test]
1059 fn test_replace_skills_section_at_end() {
1060 let prompt = "System prompt.\n\n<available_skills>\n <skill>old</skill>\n</available_skills>";
1061 let new_xml = "<available_skills>\n <skill>new</skill>\n</available_skills>";
1062
1063 let result = replace_skills_section(prompt, new_xml);
1064
1065 assert!(result.contains("<skill>new</skill>"));
1066 assert!(!result.contains("<skill>old</skill>"));
1067 assert!(result.starts_with("System prompt."));
1068 }
1069}