1use std::sync::Arc;
7
8use chrono::Utc;
9use tokio::sync::mpsc;
10use tokio_util::sync::CancellationToken;
11
12use crate::agent::core::agent::events::{TokenBudgetUsage, TokenUsage};
13use crate::agent::core::budget::{
14 prepare_hybrid_context, HeuristicTokenCounter, ModelLimitsRegistry, TokenBudget,
15};
16use crate::agent::core::tools::{
17 execute_tool_call, handle_tool_result_with_agentic_support, parse_tool_args, ToolExecutor,
18 ToolHandlingOutcome, ToolSchema,
19};
20use crate::agent::core::{AgentError, AgentEvent, Message, Session, TodoItemStatus};
21use crate::agent::llm::LLMProvider;
22use crate::agent::metrics::{
23 MetricsCollector, RoundStatus as MetricsRoundStatus, SessionStatus as MetricsSessionStatus,
24 TokenUsage as MetricsTokenUsage,
25};
26use crate::agent::tools::guide::{context::GuideBuildContext, EnhancedPromptBuilder};
27use crate::agent::tools::CreateTodoListTool;
28
29use crate::agent::loop_module::config::AgentLoopConfig;
30use crate::agent::loop_module::stream::handler::consume_llm_stream;
31use crate::agent::loop_module::todo_context::TodoLoopContext;
32
33pub type Result<T> = std::result::Result<T, AgentError>;
35
36pub async fn run_agent_loop_with_config(
56 session: &mut Session,
57 initial_message: String,
58 event_tx: mpsc::Sender<AgentEvent>,
59 llm: Arc<dyn LLMProvider>,
60 tools: Arc<dyn ToolExecutor>,
61 cancel_token: CancellationToken,
62 config: AgentLoopConfig,
63) -> Result<()> {
64 let debug_logger = DebugLogger::new(log::log_enabled!(log::Level::Debug));
65 let session_id = session.id.clone();
66 let metrics_collector = config.metrics_collector.clone();
67 let model_name = config
68 .model_name
69 .clone()
70 .unwrap_or_else(|| "unknown".to_string());
71
72 if let Some(metrics) = metrics_collector.as_ref() {
73 metrics.session_started(session_id.clone(), model_name.clone(), session.created_at);
74 metrics.session_message_count(
75 session_id.clone(),
76 session.messages.len() as u32,
77 Utc::now(),
78 );
79 }
80
81 log::debug!(
82 "[{}] Starting agent loop with message: {}",
83 session_id,
84 initial_message
85 );
86 debug_logger.log_event(
87 &session_id,
88 "agent_loop_start",
89 serde_json::json!({
90 "message": initial_message,
91 "max_rounds": config.max_rounds,
92 "initial_message_count": session.messages.len(),
93 }),
94 );
95
96 let skill_context = if let Some(skill_manager) = config.skill_manager.as_ref() {
97 let context = skill_manager
98 .build_skill_context(Some(session.id.as_str()))
99 .await;
100 if !context.is_empty() {
101 log::info!(
102 "[{}] Skill context loaded, length: {} chars",
103 session_id,
104 context.len()
105 );
106 log::debug!("[{}] Skill context content:\n{}", session_id, context);
107 } else {
108 log::info!("[{}] No skill context loaded (empty)", session_id);
109 }
110 context
111 } else {
112 log::info!("[{}] No skill manager configured", session_id);
113 String::new()
114 };
115
116 let base_prompt_for_language = config
118 .system_prompt
119 .as_deref()
120 .or_else(|| {
121 session
122 .messages
123 .iter()
124 .find(|message| matches!(message.role, crate::agent::core::Role::System))
125 .map(|message| message.content.as_str())
126 })
127 .unwrap_or_default();
128 let guide_context = GuideBuildContext::from_system_prompt(base_prompt_for_language);
129 let tool_schemas = resolve_available_tool_schemas(&config, tools.as_ref());
130 let tool_guide_context = EnhancedPromptBuilder::build(
131 Some(config.tool_registry.as_ref()),
132 &tool_schemas,
133 &guide_context,
134 );
135 log::info!(
136 "[{}] Tool guide context built, length: {} chars",
137 session_id,
138 tool_guide_context.len()
139 );
140
141 if let Some(system_message) = session
142 .messages
143 .iter_mut()
144 .find(|message| matches!(message.role, crate::agent::core::Role::System))
145 {
146 let base_prompt = config
147 .system_prompt
148 .as_deref()
149 .unwrap_or(&system_message.content);
150 system_message.content =
151 merge_system_prompt_with_contexts(base_prompt, &skill_context, &tool_guide_context);
152 } else {
153 let base_prompt = config.system_prompt.as_deref().unwrap_or_default();
154 let merged_prompt =
155 merge_system_prompt_with_contexts(base_prompt, &skill_context, &tool_guide_context);
156 if !merged_prompt.is_empty() {
157 session.messages.insert(0, Message::system(merged_prompt));
158 }
159 }
160
161 if !config.skip_initial_user_message {
162 session.add_message(Message::user(initial_message.clone()));
163 if let Some(metrics) = metrics_collector.as_ref() {
164 metrics.session_message_count(
165 session_id.clone(),
166 session.messages.len() as u32,
167 Utc::now(),
168 );
169 }
170 }
171
172 let mut sent_complete = false;
173
174 let mut todo_context = TodoLoopContext::from_session(session);
176 if todo_context.is_some() {
177 log::debug!("[{}] TodoLoopContext initialized", session_id);
178 }
179
180 for round in 0..config.max_rounds {
181 inject_todo_list_into_system_message(session);
183
184 if let Some(ref mut ctx) = todo_context {
186 ctx.current_round = round as u32;
187 ctx.max_rounds = config.max_rounds as u32;
188 }
189
190 let round_id = format!("{}-round-{}", session_id, round + 1);
191 let mut round_status = MetricsRoundStatus::Success;
192 let mut round_error: Option<String> = None;
193
194 debug_logger.log_event(
195 &session_id,
196 "round_start",
197 serde_json::json!({
198 "round": round + 1,
199 "total_rounds": config.max_rounds,
200 "message_count": session.messages.len(),
201 }),
202 );
203
204 if cancel_token.is_cancelled() {
205 if let Some(metrics) = metrics_collector.as_ref() {
206 metrics.session_message_count(
207 session_id.clone(),
208 session.messages.len() as u32,
209 Utc::now(),
210 );
211 metrics.session_completed(
212 session_id.clone(),
213 MetricsSessionStatus::Cancelled,
214 Utc::now(),
215 );
216 }
217 return Err(AgentError::Cancelled);
218 }
219
220 if let Some(metrics) = metrics_collector.as_ref() {
221 metrics.round_started(
222 round_id.clone(),
223 session_id.clone(),
224 model_name.clone(),
225 Utc::now(),
226 );
227 }
228
229 let tool_schemas = resolve_available_tool_schemas(&config, tools.as_ref());
230
231 let budget = resolve_token_budget(session, &config, &model_name);
233 let counter = HeuristicTokenCounter::default();
234
235 let prepared_context = match prepare_hybrid_context(session, &budget, &counter) {
236 Ok(ctx) => ctx,
237 Err(e) => {
238 let agent_error = AgentError::Budget(e.to_string());
239 round_status = MetricsRoundStatus::Error;
240 round_error = Some(agent_error.to_string());
241 if let Some(metrics) = metrics_collector.as_ref() {
242 metrics.round_completed(
243 round_id.clone(),
244 Utc::now(),
245 round_status,
246 MetricsTokenUsage::default(),
247 round_error.clone(),
248 );
249 metrics.session_message_count(
250 session_id.clone(),
251 session.messages.len() as u32,
252 Utc::now(),
253 );
254 metrics.session_completed(
255 session_id.clone(),
256 MetricsSessionStatus::Error,
257 Utc::now(),
258 );
259 }
260 return Err(agent_error);
261 }
262 };
263
264 if prepared_context.truncation_occurred {
265 log::info!(
266 "[{}] Context truncated: removed {} segments, using {} tokens of {} ({:.1}%)",
267 session_id,
268 prepared_context.segments_removed,
269 prepared_context.token_usage.total_tokens,
270 prepared_context.token_usage.budget_limit,
271 prepared_context.token_usage.usage_percentage()
272 );
273 }
274
275 let timer = Timer::new("llm_request");
276
277 let model = config.model_name.as_deref().ok_or_else(|| {
279 crate::agent::core::AgentError::LLM(
280 "model_name is required in AgentLoopConfig".to_string(),
281 )
282 })?;
283
284 let stream = match llm
285 .chat_stream(
286 &prepared_context.messages,
287 &tool_schemas,
288 Some(budget.max_output_tokens),
289 model,
290 )
291 .await
292 {
293 Ok(stream) => {
294 let usage = TokenBudgetUsage {
297 system_tokens: prepared_context.token_usage.system_tokens,
298 summary_tokens: prepared_context.token_usage.summary_tokens,
299 window_tokens: prepared_context.token_usage.window_tokens,
300 total_tokens: prepared_context.token_usage.total_tokens,
301 budget_limit: prepared_context.token_usage.budget_limit,
302 truncation_occurred: prepared_context.truncation_occurred,
303 segments_removed: prepared_context.segments_removed,
304 };
305
306 session.token_usage = Some(usage.clone());
308
309 let budget_event = AgentEvent::TokenBudgetUpdated { usage };
310 if let Err(e) = event_tx.send(budget_event).await {
311 log::warn!("[{}] Failed to send token budget event: {}", session_id, e);
312 }
313 stream
314 }
315 Err(error) => {
316 let agent_error = AgentError::LLM(error.to_string());
317 round_status = MetricsRoundStatus::Error;
318 round_error = Some(agent_error.to_string());
319 if let Some(metrics) = metrics_collector.as_ref() {
320 metrics.round_completed(
321 round_id.clone(),
322 Utc::now(),
323 round_status,
324 MetricsTokenUsage::default(),
325 round_error.clone(),
326 );
327 metrics.session_message_count(
328 session_id.clone(),
329 session.messages.len() as u32,
330 Utc::now(),
331 );
332 metrics.session_completed(
333 session_id.clone(),
334 MetricsSessionStatus::Error,
335 Utc::now(),
336 );
337 }
338 return Err(agent_error);
339 }
340 };
341
342 let stream_output =
343 match consume_llm_stream(stream, &event_tx, &cancel_token, &session_id).await {
344 Ok(output) => output,
345 Err(error) => {
346 round_status = if matches!(error, AgentError::Cancelled) {
347 MetricsRoundStatus::Cancelled
348 } else {
349 MetricsRoundStatus::Error
350 };
351 round_error = Some(error.to_string());
352 if let Some(metrics) = metrics_collector.as_ref() {
353 metrics.round_completed(
354 round_id.clone(),
355 Utc::now(),
356 round_status,
357 MetricsTokenUsage::default(),
358 round_error.clone(),
359 );
360 let session_status = if matches!(error, AgentError::Cancelled) {
361 MetricsSessionStatus::Cancelled
362 } else {
363 MetricsSessionStatus::Error
364 };
365 metrics.session_message_count(
366 session_id.clone(),
367 session.messages.len() as u32,
368 Utc::now(),
369 );
370 metrics.session_completed(session_id.clone(), session_status, Utc::now());
371 }
372 return Err(error);
373 }
374 };
375
376 let round_usage = MetricsTokenUsage {
377 prompt_tokens: 0,
378 completion_tokens: stream_output.token_count as u64,
379 total_tokens: stream_output.token_count as u64,
380 };
381
382 let llm_duration = timer.elapsed_ms();
383 timer.debug(&session_id);
384 log::debug!(
385 "[{}] LLM response completed in {}ms, {} tokens received",
386 session_id,
387 llm_duration,
388 stream_output.token_count
389 );
390
391 if stream_output.tool_calls.is_empty() {
392 session.add_message(Message::assistant(stream_output.content, None));
393
394 let _ = event_tx
395 .send(AgentEvent::Complete {
396 usage: TokenUsage {
397 prompt_tokens: 0,
398 completion_tokens: stream_output.token_count as u32,
399 total_tokens: stream_output.token_count as u32,
400 },
401 })
402 .await;
403
404 if let Some(metrics) = metrics_collector.as_ref() {
405 metrics.round_completed(
406 round_id.clone(),
407 Utc::now(),
408 MetricsRoundStatus::Success,
409 round_usage,
410 None,
411 );
412 metrics.session_message_count(
413 session_id.clone(),
414 session.messages.len() as u32,
415 Utc::now(),
416 );
417 }
418
419 sent_complete = true;
420 break;
421 }
422
423 session.add_message(Message::assistant(
424 stream_output.content,
425 Some(stream_output.tool_calls.clone()),
426 ));
427
428 let mut awaiting_clarification = false;
429
430 for tool_call in &stream_output.tool_calls {
431 let args = parse_tool_args(&tool_call.function.arguments)
432 .unwrap_or_else(|_| serde_json::json!({}));
433
434 send_event_with_metrics(
435 &event_tx,
436 metrics_collector.as_ref(),
437 &session_id,
438 &round_id,
439 AgentEvent::ToolStart {
440 tool_call_id: tool_call.id.clone(),
441 tool_name: tool_call.function.name.clone(),
442 arguments: args,
443 },
444 )
445 .await;
446
447 let tool_timer = Timer::new(format!("tool_{}", tool_call.function.name));
448
449 match execute_tool_call(
450 tool_call,
451 tools.as_ref(),
452 config.composition_executor.as_ref().map(Arc::clone),
453 )
454 .await
455 {
456 Ok(result) => {
457 if let Some(ref mut ctx) = todo_context {
459 ctx.auto_update_status(&tool_call.function.name, &result);
462
463 ctx.track_tool_execution(&tool_call.function.name, &result, round as u32);
464
465 let progress_event = if let Some(ref active_id) = ctx.active_item_id {
469 ctx.items.iter().find(|i| &i.id == active_id).map(|item| {
471 AgentEvent::TodoListItemProgress {
472 session_id: session_id.clone(),
473 item_id: item.id.clone(),
474 status: item.status.clone(),
475 tool_calls_count: item.tool_calls.len(),
476 version: ctx.version,
477 }
478 })
479 } else {
480 ctx.items
483 .iter()
484 .find(|item| item.status == TodoItemStatus::Completed)
485 .map(|item| AgentEvent::TodoListItemProgress {
486 session_id: session_id.clone(),
487 item_id: item.id.clone(),
488 status: item.status.clone(),
489 tool_calls_count: item.tool_calls.len(),
490 version: ctx.version,
491 })
492 };
493
494 if let Some(event) = progress_event {
495 let _ = event_tx.send(event).await;
496 }
497 }
498
499 if tool_call.function.name == "create_todo_list" && result.success {
501 if let Ok(args) =
502 serde_json::from_str::<serde_json::Value>(&tool_call.function.arguments)
503 {
504 if let Ok(todo_list) =
505 CreateTodoListTool::todo_list_from_args(&args, &session_id)
506 {
507 session.set_todo_list(todo_list.clone());
508 log::info!(
509 "[{}] Todo list '{}' created with {} items",
510 session_id,
511 todo_list.title,
512 todo_list.items.len()
513 );
514
515 if let Some(ref storage) = config.storage {
517 if let Err(e) = storage.save_session(session).await {
518 log::warn!("[{}] Failed to save session after todo list creation: {}", session_id, e);
519 } else {
520 log::debug!(
521 "[{}] Session saved after todo list creation",
522 session_id
523 );
524 }
525 }
526
527 let _ = event_tx
529 .send(AgentEvent::TodoListUpdated {
530 todo_list: todo_list.clone(),
531 })
532 .await;
533
534 todo_context = TodoLoopContext::from_session(session);
537 if todo_context.is_some() {
538 log::debug!("[{}] TodoLoopContext re-initialized after create_todo_list", session_id);
539 }
540 }
541 }
542 } else if tool_call.function.name == "update_todo_item" && result.success {
543 if let Ok(args) =
544 serde_json::from_str::<serde_json::Value>(&tool_call.function.arguments)
545 {
546 if let (Some(item_id), Some(status)) =
547 (args["item_id"].as_str(), args["status"].as_str())
548 {
549 let status_enum = match status {
550 "pending" => Some(crate::agent::core::TodoItemStatus::Pending),
551 "in_progress" => {
552 Some(crate::agent::core::TodoItemStatus::InProgress)
553 }
554 "completed" => {
555 Some(crate::agent::core::TodoItemStatus::Completed)
556 }
557 "blocked" => Some(crate::agent::core::TodoItemStatus::Blocked),
558 _ => None,
559 };
560 if let Some(s) = status_enum {
561 let notes = args["notes"].as_str();
562
563 if let Some(ref mut ctx) = todo_context {
566 ctx.update_item_status(item_id, s.clone());
567 }
568
569 if let Err(e) = session.update_todo_item(item_id, s, notes) {
570 log::warn!(
571 "[{}] Failed to update todo item: {}",
572 session_id,
573 e
574 );
575 } else {
576 log::info!(
577 "[{}] Updated todo item '{}' to '{}'",
578 session_id,
579 item_id,
580 status
581 );
582
583 if let Some(ref storage) = config.storage {
585 if let Err(e) = storage.save_session(session).await {
586 log::warn!("[{}] Failed to save session after todo item update: {}", session_id, e);
587 } else {
588 log::debug!(
589 "[{}] Session saved after todo item update",
590 session_id
591 );
592 }
593 }
594
595 if let Some(ref todo_list) = session.todo_list {
597 let _ = event_tx
598 .send(AgentEvent::TodoListUpdated {
599 todo_list: todo_list.clone(),
600 })
601 .await;
602 }
603 }
604 }
605 }
606 }
607 }
608
609 if tool_call.function.name == "ask_user" && result.success {
611 if let Ok(payload) =
612 serde_json::from_str::<serde_json::Value>(&result.result)
613 {
614 let question = payload["question"]
615 .as_str()
616 .unwrap_or("Please select:")
617 .to_string();
618 let options: Vec<String> = payload["options"]
619 .as_array()
620 .map(|arr| {
621 arr.iter()
622 .filter_map(|v| v.as_str().map(String::from))
623 .collect()
624 })
625 .unwrap_or_default();
626 let allow_custom = payload["allow_custom"].as_bool().unwrap_or(true);
627
628 log::info!(
629 "[{}] ask_user tool called, awaiting user response",
630 session_id
631 );
632
633 let tool_result_msg = Message::tool_result(
636 tool_call.id.clone(),
637 format!("Waiting for user response to: {}", question),
638 );
639 log::debug!("[{}] Adding tool result message for ask_user, tool_call_id: {}, message_id: {}",
640 session_id, tool_call.id, tool_result_msg.id);
641 session.add_message(tool_result_msg);
642
643 let _ = event_tx
645 .send(AgentEvent::NeedClarification {
646 question: question.clone(),
647 options: if options.is_empty() {
648 None
649 } else {
650 Some(options.clone())
651 },
652 })
653 .await;
654
655 session.set_pending_question(
657 tool_call.id.clone(),
658 question,
659 options,
660 allow_custom,
661 );
662
663 if let Some(ref storage) = config.storage {
665 if let Err(e) = storage.save_session(session).await {
666 log::warn!(
667 "[{}] Failed to save session after ask_user: {}",
668 session_id,
669 e
670 );
671 }
672 }
673
674 awaiting_clarification = true;
675 break;
676 }
677 }
678
679 send_event_with_metrics(
680 &event_tx,
681 metrics_collector.as_ref(),
682 &session_id,
683 &round_id,
684 AgentEvent::ToolComplete {
685 tool_call_id: tool_call.id.clone(),
686 result: result.clone(),
687 },
688 )
689 .await;
690
691 if !result.success && round_error.is_none() {
692 round_status = MetricsRoundStatus::Error;
693 round_error = Some(format!(
694 "Tool \"{}\" returned an unsuccessful result",
695 tool_call.function.name
696 ));
697 }
698
699 debug_logger.log_event(
700 &session_id,
701 "tool_complete",
702 serde_json::json!({
703 "tool_name": tool_call.function.name,
704 "tool_call_id": tool_call.id,
705 "duration_ms": tool_timer.elapsed_ms(),
706 "success": result.success,
707 }),
708 );
709
710 let outcome = handle_tool_result_with_agentic_support(
711 &result,
712 tool_call,
713 &event_tx,
714 session,
715 tools.as_ref(),
716 config.composition_executor.as_ref().map(Arc::clone),
717 )
718 .await;
719
720 if outcome == ToolHandlingOutcome::AwaitingClarification {
721 awaiting_clarification = true;
722 break;
723 }
724 }
725 Err(error) => {
726 let error_message = error.to_string();
727 round_status = MetricsRoundStatus::Error;
728 round_error = Some(error_message.clone());
729
730 send_event_with_metrics(
731 &event_tx,
732 metrics_collector.as_ref(),
733 &session_id,
734 &round_id,
735 AgentEvent::ToolError {
736 tool_call_id: tool_call.id.clone(),
737 error: error_message.clone(),
738 },
739 )
740 .await;
741
742 session.add_message(Message::tool_result(
743 tool_call.id.clone(),
744 format!("Error: {error_message}"),
745 ));
746 }
747 }
748 }
749
750 if awaiting_clarification {
751 if let Some(metrics) = metrics_collector.as_ref() {
752 metrics.round_completed(
753 round_id.clone(),
754 Utc::now(),
755 round_status,
756 round_usage,
757 round_error.clone(),
758 );
759 metrics.session_message_count(
760 session_id.clone(),
761 session.messages.len() as u32,
762 Utc::now(),
763 );
764 }
765 break;
766 }
767
768 debug_logger.log_event(
769 &session_id,
770 "round_complete",
771 serde_json::json!({
772 "round": round + 1,
773 "message_count": session.messages.len(),
774 }),
775 );
776
777 if let Some(ref ctx) = todo_context {
780 use crate::agent::loop_module::todo_evaluation::evaluate_todo_progress;
781
782 log::debug!(
783 "[{}] Evaluating todo list progress at end of round {}",
784 session_id,
785 round + 1
786 );
787
788 let model = config.model_name.as_deref().ok_or_else(|| {
790 crate::agent::core::AgentError::LLM(
791 "model_name is required in AgentLoopConfig".to_string(),
792 )
793 })?;
794
795 match evaluate_todo_progress(
796 ctx,
797 session,
798 llm.clone(),
799 &event_tx,
800 &session_id,
801 model, )
803 .await
804 {
805 Ok(evaluation_result) => {
806 if evaluation_result.needs_evaluation && !evaluation_result.updates.is_empty() {
807 log::info!(
808 "[{}] LLM evaluated {} todo item updates",
809 session_id,
810 evaluation_result.updates.len()
811 );
812
813 if let Some(ref mut ctx) = todo_context {
815 for update in evaluation_result.updates {
816 let status = update.status.clone();
817 ctx.update_item_status(&update.item_id, status);
818
819 if let Some(notes) = update.notes {
821 let _ = session.update_todo_item(
822 &update.item_id,
823 update.status,
824 Some(¬es),
825 );
826 } else {
827 let status = update.status.clone();
828 let _ = session.update_todo_item(&update.item_id, status, None);
829 }
830 }
831 }
832 }
833 }
834 Err(e) => {
835 log::warn!("[{}] Todo evaluation failed: {}", session_id, e);
836 }
837 }
838 }
839
840 if let Some(metrics) = metrics_collector.as_ref() {
841 metrics.round_completed(
842 round_id.clone(),
843 Utc::now(),
844 round_status,
845 round_usage,
846 round_error.clone(),
847 );
848 metrics.session_message_count(
849 session_id.clone(),
850 session.messages.len() as u32,
851 Utc::now(),
852 );
853 }
854 }
855
856 if let Some(ref ctx) = todo_context {
858 if ctx.is_all_completed() {
859 log::info!("[{}] All todo items completed", session_id);
860
861 let _ = event_tx
862 .send(AgentEvent::TodoListCompleted {
863 session_id: session_id.clone(),
864 completed_at: Utc::now(),
865 total_rounds: ctx.current_round + 1, total_tool_calls: ctx.items.iter().map(|i| i.tool_calls.len()).sum(),
867 })
868 .await;
869 }
870 }
871
872 if let Some(ctx) = todo_context {
874 let version = ctx.version;
876 session
877 .metadata
878 .insert("todo_list_version".to_string(), version.to_string());
879
880 session.todo_list = Some(ctx.into_todo_list());
881 session.updated_at = Utc::now();
882
883 log::debug!(
884 "[{}] Synced TodoLoopContext to session, version={}",
885 session_id,
886 version
887 );
888
889 if let Some(ref storage) = config.storage {
891 if let Err(e) = storage.save_session(session).await {
892 log::warn!(
893 "[{}] Failed to save session after agent loop: {}",
894 session_id,
895 e
896 );
897 } else {
898 log::debug!("[{}] Session saved with updated todo list", session_id);
899 }
900 }
901 }
902
903 if !sent_complete {
904 let _ = event_tx
905 .send(AgentEvent::Complete {
906 usage: TokenUsage {
907 prompt_tokens: 0,
908 completion_tokens: 0,
909 total_tokens: 0,
910 },
911 })
912 .await;
913 }
914
915 if let Some(metrics) = metrics_collector.as_ref() {
916 metrics.session_message_count(
917 session_id.clone(),
918 session.messages.len() as u32,
919 Utc::now(),
920 );
921 if !session.has_pending_question() {
922 metrics.session_completed(session_id, MetricsSessionStatus::Completed, Utc::now());
923 }
924 }
925
926 Ok(())
927}
928
929async fn send_event_with_metrics(
930 event_tx: &mpsc::Sender<AgentEvent>,
931 metrics_collector: Option<&MetricsCollector>,
932 session_id: &str,
933 round_id: &str,
934 event: AgentEvent,
935) {
936 if let Some(metrics) = metrics_collector {
937 metrics.record_agent_event(session_id, round_id, &event);
938 }
939
940 let _ = event_tx.send(event).await;
941}
942
943fn resolve_token_budget(
944 session: &Session,
945 config: &AgentLoopConfig,
946 model_name: &str,
947) -> TokenBudget {
948 if let Some(ref budget) = session.token_budget {
950 log::debug!("Using session-specific token budget");
951 return budget.clone();
952 }
953
954 if let Some(ref budget) = config.token_budget {
955 log::debug!("Using config token budget");
956 return budget.clone();
957 }
958
959 let registry = ModelLimitsRegistry::default();
961 let model_limit = registry.get_or_default(model_name);
962
963 TokenBudget::with_safety_margin(
964 model_limit.max_context_tokens,
965 model_limit.get_max_output_tokens(),
966 crate::agent::core::budget::BudgetStrategy::default(),
967 model_limit.get_safety_margin(),
968 )
969}
970
971fn resolve_available_tool_schemas(
972 config: &AgentLoopConfig,
973 tools: &dyn ToolExecutor,
974) -> Vec<ToolSchema> {
975 let mut tool_schemas = config.tool_registry.list_tools();
976 if tool_schemas.is_empty() {
977 tool_schemas = tools.list_tools();
978 }
979
980 tool_schemas.extend(config.additional_tool_schemas.clone());
981 tool_schemas.sort_by(|left, right| left.function.name.cmp(&right.function.name));
982 tool_schemas.dedup_by(|left, right| left.function.name == right.function.name);
983 tool_schemas
984}
985
986const SKILL_CONTEXT_MARKER: &str = "\n\n## Available Skills\n";
987const TOOL_GUIDE_MARKER: &str = "## Tool Usage Guidelines\n";
988
989fn merge_system_prompt_with_contexts(
990 base_prompt: &str,
991 skill_context: &str,
992 tool_guide_context: &str,
993) -> String {
994 let mut merged = strip_existing_tool_guide_context(&strip_existing_skill_context(base_prompt));
995
996 let sections: Vec<&str> = [skill_context, tool_guide_context]
997 .into_iter()
998 .map(str::trim)
999 .filter(|section| !section.is_empty())
1000 .collect();
1001
1002 if sections.is_empty() {
1003 return merged;
1004 }
1005
1006 if merged.trim().is_empty() {
1007 return sections.join("\n\n");
1008 }
1009
1010 for section in sections {
1011 merged.push_str("\n\n");
1012 merged.push_str(section);
1013 }
1014
1015 merged
1016}
1017
1018fn strip_existing_skill_context(prompt: &str) -> String {
1019 strip_existing_prompt_section(prompt, SKILL_CONTEXT_MARKER)
1020}
1021
1022fn strip_existing_tool_guide_context(prompt: &str) -> String {
1023 strip_existing_prompt_section(prompt, TOOL_GUIDE_MARKER)
1024}
1025
1026fn strip_existing_prompt_section(prompt: &str, marker: &str) -> String {
1027 if let Some(index) = prompt.find(marker) {
1028 prompt[..index].trim_end().to_string()
1029 } else {
1030 prompt.to_string()
1031 }
1032}
1033
1034const TODO_LIST_MARKER: &str = "\n\n## Current Task List:";
1035
1036fn inject_todo_list_into_system_message(session: &mut Session) {
1038 let todo_context = session.format_todo_list_for_prompt();
1039
1040 if let Some(system_message) = session
1041 .messages
1042 .iter_mut()
1043 .find(|message| matches!(message.role, crate::agent::core::Role::System))
1044 {
1045 let base_prompt = strip_existing_todo_list(&system_message.content);
1046
1047 if !todo_context.is_empty() {
1048 system_message.content = format!("{}\n{}", base_prompt, todo_context);
1049 log::info!(
1050 "Injected todo list into system message ({} chars)",
1051 todo_context.len()
1052 );
1053 } else {
1054 system_message.content = base_prompt;
1055 }
1056 } else if !todo_context.is_empty() {
1057 session
1059 .messages
1060 .insert(0, Message::system(todo_context.clone()));
1061 log::info!(
1062 "Created system message with todo list ({} chars)",
1063 todo_context.len()
1064 );
1065 }
1066}
1067
1068fn strip_existing_todo_list(prompt: &str) -> String {
1069 if let Some(index) = prompt.find(TODO_LIST_MARKER) {
1070 prompt[..index].trim_end().to_string()
1071 } else {
1072 prompt.to_string()
1073 }
1074}
1075
1076#[allow(dead_code)]
1077pub async fn run_agent_loop(
1078 session: &mut Session,
1079 initial_message: String,
1080 event_tx: mpsc::Sender<AgentEvent>,
1081 llm: Arc<dyn LLMProvider>,
1082 tools: Arc<dyn ToolExecutor>,
1083 cancel_token: CancellationToken,
1084 max_rounds: usize,
1085) -> Result<()> {
1086 run_agent_loop_with_config(
1087 session,
1088 initial_message,
1089 event_tx,
1090 llm,
1091 tools,
1092 cancel_token,
1093 AgentLoopConfig {
1094 max_rounds,
1095 skip_initial_user_message: false,
1096 ..Default::default()
1097 },
1098 )
1099 .await
1100}
1101
1102struct DebugLogger {
1103 enabled: bool,
1104}
1105
1106impl DebugLogger {
1107 fn new(enabled: bool) -> Self {
1108 Self { enabled }
1109 }
1110
1111 fn log_event(&self, session_id: &str, event_type: &str, details: serde_json::Value) {
1112 if !self.enabled {
1113 return;
1114 }
1115
1116 log::debug!("[{}] {}: {}", session_id, event_type, details);
1117 }
1118}
1119
1120struct Timer {
1121 name: String,
1122 start: std::time::Instant,
1123}
1124
1125impl Timer {
1126 fn new(name: impl Into<String>) -> Self {
1127 Self {
1128 name: name.into(),
1129 start: std::time::Instant::now(),
1130 }
1131 }
1132
1133 fn elapsed_ms(&self) -> u128 {
1134 self.start.elapsed().as_millis()
1135 }
1136
1137 fn debug(&self, session_id: &str) {
1138 log::debug!(
1139 "[{}] {} completed in {}ms",
1140 session_id,
1141 self.name,
1142 self.elapsed_ms()
1143 );
1144 }
1145}
1146
1147#[cfg(test)]
1148mod tests {
1149 use super::{
1150 merge_system_prompt_with_contexts, strip_existing_skill_context,
1151 strip_existing_tool_guide_context, AgentLoopConfig,
1152 };
1153
1154 #[test]
1155 fn merge_system_prompt_with_contexts_appends_both_contexts() {
1156 let merged = merge_system_prompt_with_contexts(
1157 "You are a helpful assistant.",
1158 "\n\n## Available Skills\n\n### Skill\nDetails",
1159 "## Tool Usage Guidelines\n\n### File Reading Tools\nDetails",
1160 );
1161 assert!(merged.starts_with("You are a helpful assistant."));
1162 assert!(merged.contains("## Available Skills"));
1163 assert!(merged.contains("## Tool Usage Guidelines"));
1164 }
1165
1166 #[test]
1167 fn merge_system_prompt_with_contexts_handles_empty_base_prompt() {
1168 let merged = merge_system_prompt_with_contexts(
1169 "",
1170 "\n\n## Available Skills\n\n### Skill",
1171 "## Tool Usage Guidelines\n\n### File Reading Tools",
1172 );
1173 assert_eq!(
1174 merged,
1175 "## Available Skills\n\n### Skill\n\n## Tool Usage Guidelines\n\n### File Reading Tools"
1176 );
1177 }
1178
1179 #[test]
1180 fn strip_existing_skill_context_removes_previous_section() {
1181 let stripped = strip_existing_skill_context(
1182 "Base prompt\n\n## Available Skills\n\n### One\nInstructions",
1183 );
1184 assert_eq!(stripped, "Base prompt");
1185 }
1186
1187 #[test]
1188 fn strip_existing_tool_guide_context_removes_previous_section() {
1189 let stripped = strip_existing_tool_guide_context(
1190 "Base prompt\n\n## Tool Usage Guidelines\n\n### File Reading Tools\nInstructions",
1191 );
1192 assert_eq!(stripped, "Base prompt");
1193 }
1194
1195 #[test]
1201 fn agent_loop_config_model_name_defaults_to_none() {
1202 let config = AgentLoopConfig::default();
1203 assert!(
1204 config.model_name.is_none(),
1205 "model_name should default to None, forcing explicit setting"
1206 );
1207 }
1208
1209 #[test]
1211 fn agent_loop_config_can_set_model_name() {
1212 let config = AgentLoopConfig {
1213 model_name: Some("kimi-for-coding".to_string()),
1214 ..Default::default()
1215 };
1216 assert_eq!(config.model_name, Some("kimi-for-coding".to_string()));
1217 }
1218
1219 #[test]
1222 fn model_must_come_from_config_not_session() {
1223 use crate::agent::core::Session;
1224
1225 let config = AgentLoopConfig {
1227 model_name: Some("config-model".to_string()),
1228 ..Default::default()
1229 };
1230
1231 let session = Session::new("test", "session-model");
1233
1234 let execution_model = config.model_name.as_deref().unwrap();
1236 assert_eq!(
1237 execution_model, "config-model",
1238 "Model must come from config.model_name, not session.model"
1239 );
1240
1241 assert_eq!(
1243 session.model, "session-model",
1244 "session.model is just for recording, not execution"
1245 );
1246 }
1247}