1use std::io;
7use std::sync::Arc;
8
9use tokio::runtime::Runtime;
10use tokio::sync::mpsc;
11use tokio_util::sync::CancellationToken;
12
13use crate::controller::{
14 ControllerEvent, ControllerInputPayload, LLMController, LLMSessionConfig, LLMTool,
15 PermissionRegistry, ToolRegistry, UserInteractionRegistry,
16};
17
18use super::config::{load_config, AgentConfig, LLMRegistry};
19use super::error::AgentError;
20use super::logger::Logger;
21use super::messages::channels::DEFAULT_CHANNEL_SIZE;
22use super::messages::UiMessage;
23use super::router::InputRouter;
24
25pub type ToControllerTx = mpsc::Sender<ControllerInputPayload>;
27pub type ToControllerRx = mpsc::Receiver<ControllerInputPayload>;
29pub type FromControllerTx = mpsc::Sender<UiMessage>;
31pub type FromControllerRx = mpsc::Receiver<UiMessage>;
33
34pub struct AgentCore {
77 #[allow(dead_code)]
80 logger: Logger,
81
82 name: String,
84
85 version: String,
87
88 runtime: Runtime,
90
91 controller: Arc<LLMController>,
93
94 llm_registry: Option<LLMRegistry>,
96
97 to_controller_tx: ToControllerTx,
99
100 to_controller_rx: Option<ToControllerRx>,
102
103 from_controller_tx: FromControllerTx,
105
106 from_controller_rx: Option<FromControllerRx>,
108
109 cancel_token: CancellationToken,
111
112 user_interaction_registry: Arc<UserInteractionRegistry>,
114
115 permission_registry: Arc<PermissionRegistry>,
117
118 tool_definitions: Vec<LLMTool>,
120
121 error_no_session: Option<String>,
123}
124
125impl AgentCore {
126 pub fn new<C: AgentConfig>(config: &C) -> io::Result<Self> {
136 let logger = Logger::new(config.log_prefix())?;
137 tracing::info!("{} agent initialized", config.name());
138
139 let llm_registry = load_config(config);
141 if llm_registry.is_empty() {
142 tracing::warn!(
143 "No LLM providers configured. Set ANTHROPIC_API_KEY or create ~/{}",
144 config.config_path()
145 );
146 } else {
147 tracing::info!(
148 "Loaded {} LLM provider(s): {:?}",
149 llm_registry.providers().len(),
150 llm_registry.providers()
151 );
152 }
153
154 let runtime = Runtime::new().map_err(|e| {
156 io::Error::new(
157 io::ErrorKind::Other,
158 format!("Failed to create runtime: {}", e),
159 )
160 })?;
161
162 let channel_size = config.channel_buffer_size().unwrap_or(DEFAULT_CHANNEL_SIZE);
164 tracing::debug!("Using channel buffer size: {}", channel_size);
165
166 let (to_controller_tx, to_controller_rx) =
168 mpsc::channel::<ControllerInputPayload>(channel_size);
169 let (from_controller_tx, from_controller_rx) =
170 mpsc::channel::<UiMessage>(channel_size);
171
172 let (interaction_event_tx, mut interaction_event_rx) =
174 mpsc::channel::<ControllerEvent>(channel_size);
175
176 let user_interaction_registry =
178 Arc::new(UserInteractionRegistry::new(interaction_event_tx));
179
180 let ui_tx_for_interactions = from_controller_tx.clone();
183 runtime.spawn(async move {
184 while let Some(event) = interaction_event_rx.recv().await {
185 let msg = convert_controller_event_to_ui_message(event);
186 if let Err(e) = ui_tx_for_interactions.send(msg).await {
187 tracing::warn!("Failed to send user interaction event to UI: {}", e);
188 }
189 }
190 });
191
192 let (permission_event_tx, mut permission_event_rx) =
194 mpsc::channel::<ControllerEvent>(channel_size);
195
196 let permission_registry = Arc::new(PermissionRegistry::new(permission_event_tx));
198
199 let ui_tx_for_permissions = from_controller_tx.clone();
202 runtime.spawn(async move {
203 while let Some(event) = permission_event_rx.recv().await {
204 let msg = convert_controller_event_to_ui_message(event);
205 if let Err(e) = ui_tx_for_permissions.send(msg).await {
206 tracing::warn!("Failed to send permission event to UI: {}", e);
207 }
208 }
209 });
210
211 let controller = Arc::new(LLMController::new(
216 permission_registry.clone(),
217 Some(from_controller_tx.clone()),
218 Some(channel_size),
219 ));
220 let cancel_token = CancellationToken::new();
221
222 Ok(Self {
223 logger,
224 name: config.name().to_string(),
225 version: "0.1.0".to_string(),
226 runtime,
227 controller,
228 llm_registry: Some(llm_registry),
229 to_controller_tx,
230 to_controller_rx: Some(to_controller_rx),
231 from_controller_tx,
232 from_controller_rx: Some(from_controller_rx),
233 cancel_token,
234 user_interaction_registry,
235 permission_registry,
236 tool_definitions: Vec::new(),
237 error_no_session: None,
238 })
239 }
240
241 pub fn set_error_no_session(&mut self, message: impl Into<String>) -> &mut Self {
251 self.error_no_session = Some(message.into());
252 self
253 }
254
255 pub fn error_no_session(&self) -> Option<&str> {
257 self.error_no_session.as_deref()
258 }
259
260 pub fn set_version(&mut self, version: impl Into<String>) {
262 self.version = version.into();
263 }
264
265 pub fn version(&self) -> &str {
267 &self.version
268 }
269
270 pub fn load_environment_context(&mut self) -> &mut Self {
288 if let Some(registry) = self.llm_registry.take() {
289 self.llm_registry = Some(registry.with_environment_context());
290 tracing::info!("Environment context loaded into system prompt");
291 }
292 self
293 }
294
295 pub fn register_tools<F>(&mut self, f: F) -> Result<(), AgentError>
308 where
309 F: FnOnce(
310 &Arc<ToolRegistry>,
311 &Arc<UserInteractionRegistry>,
312 &Arc<PermissionRegistry>,
313 ) -> Result<Vec<LLMTool>, String>,
314 {
315 let tool_defs = f(
316 self.controller.tool_registry(),
317 &self.user_interaction_registry,
318 &self.permission_registry,
319 )
320 .map_err(AgentError::ToolRegistration)?;
321 self.tool_definitions = tool_defs;
322 Ok(())
323 }
324
325 pub fn register_tools_async<F, Fut>(&mut self, f: F) -> Result<(), AgentError>
338 where
339 F: FnOnce(Arc<ToolRegistry>, Arc<UserInteractionRegistry>, Arc<PermissionRegistry>) -> Fut,
340 Fut: std::future::Future<Output = Result<Vec<LLMTool>, String>>,
341 {
342 let tool_defs = self.runtime.block_on(f(
343 self.controller.tool_registry().clone(),
344 self.user_interaction_registry.clone(),
345 self.permission_registry.clone(),
346 ))
347 .map_err(AgentError::ToolRegistration)?;
348 self.tool_definitions = tool_defs;
349 Ok(())
350 }
351
352 pub fn start_background_tasks(&mut self) {
357 tracing::info!("{} starting background tasks", self.name);
358
359 let controller = self.controller.clone();
361 self.runtime.spawn(async move {
362 controller.start().await;
363 });
364 tracing::info!("Controller started");
365
366 if let Some(to_controller_rx) = self.to_controller_rx.take() {
368 let router = InputRouter::new(
369 self.controller.clone(),
370 to_controller_rx,
371 self.cancel_token.clone(),
372 );
373 self.runtime.spawn(async move {
374 router.run().await;
375 });
376 tracing::info!("InputRouter started");
377 }
378 }
379
380 async fn create_session_internal(
382 controller: &Arc<LLMController>,
383 config: LLMSessionConfig,
384 tools: &[LLMTool],
385 ) -> Result<i64, crate::client::error::LlmError> {
386 let id = controller.create_session(config).await?;
387
388 if !tools.is_empty() {
390 if let Some(session) = controller.get_session(id).await {
391 session.set_tools(tools.to_vec()).await;
392 }
393 }
394
395 Ok(id)
396 }
397
398 pub fn create_initial_session(&mut self) -> Result<(i64, String, i32), AgentError> {
402 let registry = self.llm_registry.as_ref().ok_or_else(|| {
403 AgentError::NoConfiguration("No LLM registry available".to_string())
404 })?;
405
406 let config = registry.get_default().ok_or_else(|| {
407 AgentError::NoConfiguration("No default LLM provider configured".to_string())
408 })?;
409
410 let model = config.model.clone();
411 let context_limit = config.context_limit;
412
413 let controller = self.controller.clone();
414 let tool_definitions = self.tool_definitions.clone();
415
416 let session_id = self.runtime.block_on(Self::create_session_internal(
417 &controller,
418 config.clone(),
419 &tool_definitions,
420 ))?;
421
422 tracing::info!(
423 session_id = session_id,
424 model = %model,
425 "Created initial session"
426 );
427
428 Ok((session_id, model, context_limit))
429 }
430
431 pub fn create_session(&self, config: LLMSessionConfig) -> Result<i64, AgentError> {
435 let controller = self.controller.clone();
436 let tool_definitions = self.tool_definitions.clone();
437
438 self.runtime
439 .block_on(Self::create_session_internal(
440 &controller,
441 config,
442 &tool_definitions,
443 ))
444 .map_err(AgentError::from)
445 }
446
447 pub fn shutdown(&self) {
449 tracing::info!("{} shutting down", self.name);
450 self.cancel_token.cancel();
451
452 let controller = self.controller.clone();
453 self.runtime.block_on(async move {
454 controller.shutdown().await;
455 });
456
457 tracing::info!("{} shutdown complete", self.name);
458 }
459
460 pub fn to_controller_tx(&self) -> ToControllerTx {
464 self.to_controller_tx.clone()
465 }
466
467 pub fn take_from_controller_rx(&mut self) -> Option<FromControllerRx> {
469 self.from_controller_rx.take()
470 }
471
472 pub fn controller(&self) -> &Arc<LLMController> {
474 &self.controller
475 }
476
477 pub fn runtime(&self) -> &Runtime {
479 &self.runtime
480 }
481
482 pub fn runtime_handle(&self) -> tokio::runtime::Handle {
484 self.runtime.handle().clone()
485 }
486
487 pub fn user_interaction_registry(&self) -> &Arc<UserInteractionRegistry> {
489 &self.user_interaction_registry
490 }
491
492 pub fn permission_registry(&self) -> &Arc<PermissionRegistry> {
494 &self.permission_registry
495 }
496
497 pub async fn remove_session(&self, session_id: i64) -> bool {
511 let removed = self.controller.remove_session(session_id).await;
513
514 self.permission_registry.cancel_session(session_id).await;
516
517 self.user_interaction_registry.cancel_session(session_id).await;
519
520 self.controller.tool_registry().cleanup_session(session_id).await;
522
523 if removed {
524 tracing::info!(session_id, "Session removed with full cleanup");
525 }
526
527 removed
528 }
529
530 pub fn llm_registry(&self) -> Option<&LLMRegistry> {
532 self.llm_registry.as_ref()
533 }
534
535 pub fn take_llm_registry(&mut self) -> Option<LLMRegistry> {
537 self.llm_registry.take()
538 }
539
540 pub fn cancel_token(&self) -> CancellationToken {
542 self.cancel_token.clone()
543 }
544
545 pub fn name(&self) -> &str {
547 &self.name
548 }
549
550 pub fn from_controller_tx(&self) -> FromControllerTx {
554 self.from_controller_tx.clone()
555 }
556
557 pub fn tool_definitions(&self) -> &[LLMTool] {
559 &self.tool_definitions
560 }
561}
562
563pub fn convert_controller_event_to_ui_message(event: ControllerEvent) -> UiMessage {
580 match event {
581 ControllerEvent::StreamStart { session_id, .. } => {
582 UiMessage::System {
584 session_id,
585 message: String::new(),
586 }
587 }
588 ControllerEvent::TextChunk {
589 session_id,
590 text,
591 turn_id,
592 } => UiMessage::TextChunk {
593 session_id,
594 turn_id,
595 text,
596 input_tokens: 0,
597 output_tokens: 0,
598 },
599 ControllerEvent::ToolUseStart {
600 session_id,
601 tool_name,
602 turn_id,
603 ..
604 } => UiMessage::Display {
605 session_id,
606 turn_id,
607 message: format!("Executing tool: {}", tool_name),
608 },
609 ControllerEvent::ToolUse {
610 session_id,
611 tool,
612 display_name,
613 display_title,
614 turn_id,
615 } => UiMessage::ToolExecuting {
616 session_id,
617 turn_id,
618 tool_use_id: tool.id.clone(),
619 display_name: display_name.unwrap_or_else(|| tool.name.clone()),
620 display_title: display_title.unwrap_or_default(),
621 },
622 ControllerEvent::Complete {
623 session_id,
624 turn_id,
625 stop_reason,
626 } => UiMessage::Complete {
627 session_id,
628 turn_id,
629 input_tokens: 0,
630 output_tokens: 0,
631 stop_reason,
632 },
633 ControllerEvent::Error {
634 session_id,
635 error,
636 turn_id,
637 } => UiMessage::Error {
638 session_id,
639 turn_id,
640 error,
641 },
642 ControllerEvent::TokenUpdate {
643 session_id,
644 input_tokens,
645 output_tokens,
646 context_limit,
647 } => UiMessage::TokenUpdate {
648 session_id,
649 turn_id: None,
650 input_tokens,
651 output_tokens,
652 context_limit,
653 },
654 ControllerEvent::ToolResult {
655 session_id,
656 tool_use_id,
657 status,
658 error,
659 turn_id,
660 ..
661 } => UiMessage::ToolCompleted {
662 session_id,
663 turn_id,
664 tool_use_id,
665 status,
666 error,
667 },
668 ControllerEvent::CommandComplete {
669 session_id,
670 command,
671 success,
672 message,
673 } => UiMessage::CommandComplete {
674 session_id,
675 command,
676 success,
677 message,
678 },
679 ControllerEvent::UserInteractionRequired {
680 session_id,
681 tool_use_id,
682 request,
683 turn_id,
684 } => UiMessage::UserInteractionRequired {
685 session_id,
686 tool_use_id,
687 request,
688 turn_id,
689 },
690 ControllerEvent::PermissionRequired {
691 session_id,
692 tool_use_id,
693 request,
694 turn_id,
695 } => UiMessage::PermissionRequired {
696 session_id,
697 tool_use_id,
698 request,
699 turn_id,
700 },
701 ControllerEvent::BatchPermissionRequired {
702 session_id,
703 batch,
704 turn_id,
705 } => UiMessage::BatchPermissionRequired {
706 session_id,
707 batch,
708 turn_id,
709 },
710 }
711}
712
713#[cfg(test)]
714mod tests {
715 use super::*;
716 use crate::controller::TurnId;
717
718 #[test]
719 fn test_convert_text_chunk_event() {
720 let event = ControllerEvent::TextChunk {
721 session_id: 1,
722 text: "Hello".to_string(),
723 turn_id: Some(TurnId::new_user_turn(1)),
724 };
725
726 let msg = convert_controller_event_to_ui_message(event);
727
728 match msg {
729 UiMessage::TextChunk {
730 session_id, text, ..
731 } => {
732 assert_eq!(session_id, 1);
733 assert_eq!(text, "Hello");
734 }
735 _ => panic!("Expected TextChunk message"),
736 }
737 }
738
739 #[test]
740 fn test_convert_error_event() {
741 let event = ControllerEvent::Error {
742 session_id: 1,
743 error: "Test error".to_string(),
744 turn_id: None,
745 };
746
747 let msg = convert_controller_event_to_ui_message(event);
748
749 match msg {
750 UiMessage::Error {
751 session_id, error, ..
752 } => {
753 assert_eq!(session_id, 1);
754 assert_eq!(error, "Test error");
755 }
756 _ => panic!("Expected Error message"),
757 }
758 }
759}