1use super::api::{build_request_with_tools, call_llm_non_stream, create_llm_client};
2use super::config::{AgentLoopConfig, AgentLoopSharedState};
3use super::retry::{backoff_delay_ms, retry_policy_for};
4use super::tool_processor::{
5 ToolCallContext, clear_channels, drain_pending_user_messages, flush_streaming_as_message,
6 process_tool_calls, push_both, sync_context_full,
7};
8use crate::chat_error::ChatError;
9use crate::context::compact::{self, AutoCompactParams};
10use crate::infra::hook::{HookContext, HookEvent};
11use crate::message_types::{StreamMsg, ToolResultMsg};
12use crate::storage::{
13 ChatMessage, DisplayHint, MessageRole, SessionEvent, SessionMetrics, ToolCallItem,
14 append_session_event, write_session_metrics,
15};
16use crate::util::log::{write_error_log, write_info_log};
17use crate::util::safe_lock;
18use futures::StreamExt;
19use rand::Rng;
20use std::collections::BTreeMap;
21use std::env::current_dir;
22use std::mem::take;
23use std::sync::{Arc, Mutex, mpsc};
24use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
25
26const DEBUG_LOG_CHUNK_LIMIT: u32 = 3;
28const REASONING_LOG_THRESHOLD: usize = 50;
30
31fn push_compact_tool_messages(
44 messages: &mut Vec<ChatMessage>,
45 display: &Arc<Mutex<Vec<ChatMessage>>>,
46 context: &Arc<Mutex<Vec<ChatMessage>>>,
47 compact_result: &compact::CompactResult,
48) {
49 let tool_call_id = format!("compact_auto_{}", compact_result.messages_before);
50
51 for msg in &compact_result.recent_user_messages {
54 push_both(display, context, msg.clone());
55 }
56
57 let tool_call_item = ToolCallItem {
59 id: tool_call_id.clone(),
60 name: "Compact".to_string(),
61 arguments: r#"{"reason":"auto_compact"}"#.to_string(),
62 };
63 let tool_call_msg = ChatMessage {
64 role: MessageRole::Assistant,
65 content: String::new(),
66 tool_calls: Some(vec![tool_call_item]),
67 tool_call_id: None,
68 images: None,
69 reasoning_content: None,
70 sender_name: None,
71 recipient_name: None,
72 display_hint: DisplayHint::Normal,
73 };
74 messages.push(tool_call_msg.clone());
75 push_both(display, context, tool_call_msg);
76
77 let result_content = format!(
79 "📦 上下文已压缩 ({} 条消息 → 摘要, transcript: {})\n\n{}",
80 compact_result.messages_before, compact_result.transcript_path, compact_result.summary,
81 );
82 let tool_msg = ChatMessage {
83 role: MessageRole::Tool,
84 content: result_content,
85 tool_calls: None,
86 tool_call_id: Some(tool_call_id),
87 images: None,
88 reasoning_content: None,
89 sender_name: None,
90 recipient_name: None,
91 display_hint: DisplayHint::Normal,
92 };
93 messages.push(tool_msg.clone());
94 push_both(display, context, tool_msg);
95}
96
97struct StreamingToolCallPart {
99 call_id: String,
100 function_name: String,
101 function_arguments: String,
102}
103
104pub struct MainAgentLoopParams {
106 pub config: AgentLoopConfig,
108 pub shared: AgentLoopSharedState,
110 pub messages: Vec<ChatMessage>,
112 pub system_prompt_fn: Arc<dyn Fn() -> Option<String> + Send + Sync>,
114 pub tx: mpsc::Sender<StreamMsg>,
116 pub tool_result_rx: mpsc::Receiver<ToolResultMsg>,
118}
119
120pub async fn run_main_agent_loop(params: MainAgentLoopParams) {
122 let MainAgentLoopParams {
123 config,
124 shared,
125 mut messages,
126 system_prompt_fn,
127 tx,
128 tool_result_rx,
129 } = params;
130 let AgentLoopConfig {
131 provider,
132 max_llm_rounds,
133 compact_config,
134 hook_manager,
135 disabled_hooks,
136 cancel_token,
137 } = config;
138 let AgentLoopSharedState {
139 streaming_content,
140 streaming_reasoning_content,
141 pending_user_messages,
142 background_manager: _,
143 todo_manager,
144 display_messages,
145 context_messages,
146 estimated_context_tokens,
147 invoked_skills,
148 session_id,
149 derived_system_prompt,
150 tool_registry,
151 disabled_tools,
152 tools_enabled,
153 sub_agent_metrics,
154 deferred_tools,
155 session_loaded_deferred: _,
156 } = shared;
157
158 let client = create_llm_client(&provider);
159
160 let mut metrics = SessionMetrics {
162 session_start_ms: SystemTime::now()
163 .duration_since(UNIX_EPOCH)
164 .unwrap_or_default()
165 .as_millis() as u64,
166 ..Default::default()
167 };
168 let mut context_tokens_peak: usize = 0;
169
170 let tool_ctx = ToolCallContext {
171 stream_msg_sender: &tx,
172 tool_result_receiver: &tool_result_rx,
173 pending_user_messages: &pending_user_messages,
174 hook_manager: &hook_manager,
175 disabled_hooks: &disabled_hooks,
176 supports_vision: provider.supports_vision,
177 display_messages: &display_messages,
178 context_messages: &context_messages,
179 streaming_content: &streaming_content,
180 session_id: &session_id,
181 };
182
183 let mut final_round_idx: usize = 0;
184 'round: for round_idx in 0..max_llm_rounds {
185 final_round_idx = round_idx;
186
187 let tools = if tools_enabled {
192 let deferred: Vec<String> = match deferred_tools.lock() {
193 Ok(guard) => guard.clone(),
194 Err(e) => e.into_inner().clone(),
195 };
196 tool_registry.to_llm_tools_non_deferred(&disabled_tools, &deferred)
197 } else {
198 vec![]
199 };
200
201 write_info_log(
202 "agent_loop",
203 &format!(
204 "========== 第 {} 轮开始 (max={}) ==========",
205 round_idx, max_llm_rounds
206 ),
207 );
208
209 write_info_log(
210 "agent_loop",
211 &format!(
212 "第 {} 轮可用工具: [{}] (count={})",
213 round_idx,
214 tools
215 .iter()
216 .map(|t| t.function.name.as_str())
217 .collect::<Vec<_>>()
218 .join(", "),
219 tools.len()
220 ),
221 );
222
223 let mut system_prompt = system_prompt_fn();
225
226 {
228 if let Ok(mut sp) = derived_system_prompt.lock() {
229 *sp = system_prompt.clone();
230 }
231 }
232
233 let pending_count_before = safe_lock(&pending_user_messages, "agent::pending_count").len();
235 drain_pending_user_messages(&mut messages, &pending_user_messages);
236 if pending_count_before > 0 {
237 write_info_log(
238 "agent_loop",
239 &format!("drain 了 {} 条用户增量消息", pending_count_before),
240 );
241 }
242
243 if compact_config.enabled {
247 let mut compact_aborted = false;
248
249 if hook_manager.has_hooks_for(HookEvent::PreMicroCompact) {
251 let ctx = HookContext {
252 event: HookEvent::PreMicroCompact,
253 messages: Some(messages.clone()),
254 model: Some(provider.model.clone()),
255 session_id: Some(session_id.clone()),
256 ..Default::default()
257 };
258 if let Some(result) =
259 hook_manager.execute(HookEvent::PreMicroCompact, ctx, &disabled_hooks)
260 && result.is_stop()
261 {
262 write_info_log(
263 "PreMicroCompact hook",
264 "compact 子管线被 hook 中止(跳过 micro + auto)",
265 );
266 compact_aborted = true;
267 }
268 }
269
270 if !compact_aborted {
271 compact::micro_compact(
272 &mut messages,
273 compact_config.keep_recent,
274 &compact_config.micro_compact_exempt_tools,
275 );
276 metrics.micro_compact_count += 1;
277
278 if hook_manager.has_hooks_for(HookEvent::PostMicroCompact) {
280 let ctx = HookContext {
281 event: HookEvent::PostMicroCompact,
282 messages: Some(messages.clone()),
283 session_id: Some(session_id.clone()),
284 ..Default::default()
285 };
286 if let Some(result) =
287 hook_manager.execute(HookEvent::PostMicroCompact, ctx, &disabled_hooks)
288 && let Some(new_msgs) = result.messages
289 {
290 messages = new_msgs;
291 }
292 }
293
294 if compact::estimate_tokens(&messages) > compact_config.effective_token_threshold()
295 {
296 write_info_log(
297 "agent_loop",
298 "auto_compact triggered (token threshold exceeded)",
299 );
300
301 let mut protected_context: Option<String> = None;
303 if hook_manager.has_hooks_for(HookEvent::PreAutoCompact) {
304 let ctx = HookContext {
305 event: HookEvent::PreAutoCompact,
306 messages: Some(messages.clone()),
307 system_prompt: system_prompt.clone(),
308 model: Some(provider.model.clone()),
309 session_id: Some(session_id.clone()),
310 ..Default::default()
311 };
312 if let Some(result) =
313 hook_manager.execute(HookEvent::PreAutoCompact, ctx, &disabled_hooks)
314 {
315 if result.is_stop() {
316 write_info_log("PreAutoCompact hook", "auto_compact 被 hook 中止");
317 compact_aborted = true;
318 }
319 if let Some(ac) = result.additional_context {
320 protected_context = Some(ac);
321 }
322 }
323 }
324
325 if !compact_aborted {
326 let _ = tx.send(StreamMsg::Compacting);
327 match compact::auto_compact(
328 &mut messages,
329 &AutoCompactParams {
330 provider: &provider,
331 invoked_skills: &invoked_skills,
332 session_id: &session_id,
333 protected_context: protected_context.as_deref(),
334 },
335 )
336 .await
337 {
338 Err(e) => {
339 write_error_log(
340 "agent_loop",
341 &format!("auto_compact failed: {}", e),
342 );
343 }
344 Ok(result) => {
345 clear_channels(&display_messages, &context_messages);
346 push_compact_tool_messages(
347 &mut messages,
348 &display_messages,
349 &context_messages,
350 &result,
351 );
352 metrics.auto_compact_count += 1;
353 let _ = tx.send(StreamMsg::Compacted {
354 messages_before: result.messages_before,
355 });
356 if hook_manager.has_hooks_for(HookEvent::PostAutoCompact) {
358 let ctx = HookContext {
359 event: HookEvent::PostAutoCompact,
360 messages: Some(messages.clone()),
361 session_id: Some(session_id.clone()),
362 ..Default::default()
363 };
364 if let Some(hook_result) = hook_manager.execute(
365 HookEvent::PostAutoCompact,
366 ctx,
367 &disabled_hooks,
368 ) && let Some(new_msgs) = hook_result.messages
369 {
370 messages = new_msgs;
371 sync_context_full(
373 &display_messages,
374 &context_messages,
375 &messages,
376 );
377 }
378 }
379 }
380 }
381 }
382 }
383 }
384 }
385
386 todo_manager.increment_turn();
388
389 {
391 let mut stream_buf = safe_lock(&streaming_content, "agent::streaming_content_clear");
392 stream_buf.clear();
393 }
394 {
395 let mut reason_buf = safe_lock(
396 &streaming_reasoning_content,
397 "agent::streaming_reasoning_clear",
398 );
399 reason_buf.clear();
400 }
401
402 {
404 let mut log_content = String::new();
405 if let Some(ref system_prompt) = system_prompt {
406 log_content.push_str(&format!("[System] {}\n", system_prompt));
407 }
408 for msg in &messages {
409 match msg.role {
410 MessageRole::Assistant => {
411 if !msg.content.is_empty() {
412 log_content.push_str(&format!("[Assistant] {}\n", msg.content));
413 }
414 if let Some(ref tool_calls) = msg.tool_calls {
415 for tool_call in tool_calls {
416 log_content.push_str(&format!(
417 "[Assistant/ToolCall] {}: {}\n",
418 tool_call.name, tool_call.arguments
419 ));
420 }
421 }
422 }
423 MessageRole::Tool => {
424 let id = msg.tool_call_id.as_deref().unwrap_or("?");
425 let tool_name = msg
426 .tool_calls
427 .as_ref()
428 .and_then(|tool_calls| tool_calls.first())
429 .map(|tool_call| tool_call.name.as_str())
430 .unwrap_or("unknown");
431 log_content.push_str(&format!(
432 "[Tool/Result({} with id `{}`)] result:\n{}\n",
433 tool_name, id, msg.content
434 ));
435 }
436 MessageRole::User => {
437 log_content.push_str(&format!("[User] {}\n", msg.content));
438 }
439 other => {
440 log_content.push_str(&format!("[{}] {}\n", other, msg.content));
441 }
442 }
443 }
444 write_info_log("Chat 请求", &log_content);
445 }
446
447 if hook_manager.has_hooks_for(HookEvent::PreLlmRequest) {
449 let ctx = HookContext {
450 event: HookEvent::PreLlmRequest,
451 messages: Some(messages.clone()),
452 system_prompt: system_prompt.clone(),
453 model: Some(provider.model.clone()),
454 session_id: Some(session_id.clone()),
455 cwd: current_dir()
456 .map(|p| p.display().to_string())
457 .unwrap_or_else(|_| ".".to_string()),
458 ..Default::default()
459 };
460 if let Some(result) =
461 hook_manager.execute(HookEvent::PreLlmRequest, ctx, &disabled_hooks)
462 {
463 if result.is_stop() {
464 let _ = tx.send(StreamMsg::Error(ChatError::HookAborted));
465 return;
466 }
467 if let Some(new_msgs) = result.messages {
468 messages = new_msgs;
469 }
470 if let Some(new_prompt) = result.system_prompt {
471 system_prompt = Some(new_prompt);
472 }
473 if let Some(inject) = result.inject_messages {
474 messages.extend(inject);
475 }
476 }
477 }
478
479 {
481 let tokens = compact::estimate_tokens(&messages);
482 if let Ok(mut ct) = estimated_context_tokens.lock() {
483 *ct = tokens;
484 }
485 if tokens > context_tokens_peak {
486 context_tokens_peak = tokens;
487 }
488 }
489
490 {
492 let has_images = messages
493 .iter()
494 .any(|m| m.images.as_ref().is_some_and(|imgs| !imgs.is_empty()));
495 write_info_log(
496 "agent_loop",
497 &format!(
498 "第 {} 轮请求: messages={}, has_images={}, supports_vision={}",
499 round_idx,
500 messages.len(),
501 has_images,
502 provider.supports_vision
503 ),
504 );
505 }
506
507 {
514 let cleaned = super::api::sanitize_messages(&messages);
515 let changed = cleaned.len() != messages.len()
516 || cleaned.iter().zip(messages.iter()).any(|(a, b)| {
517 let a_count = a.tool_calls.as_ref().map(|tc| tc.len()).unwrap_or(0);
518 let b_count = b.tool_calls.as_ref().map(|tc| tc.len()).unwrap_or(0);
519 a_count != b_count
520 });
521 if changed {
522 write_info_log(
523 "agent_loop",
524 &format!(
525 "已就地修复孤立 tool_call/tool_result:{} → {} 条消息",
526 messages.len(),
527 cleaned.len()
528 ),
529 );
530 messages = cleaned;
531 }
532 }
533
534 let request = match build_request_with_tools(
535 &provider,
536 &messages,
537 tools.clone(),
538 system_prompt.as_deref(),
539 ) {
540 Ok(req) => {
541 for (i, m) in messages.iter().enumerate() {
543 if let Some(rc) = &m.reasoning_content {
544 write_info_log(
545 "agent_loop",
546 &format!(
547 "messages[{}] role={:?} has reasoning_content len={}",
548 i,
549 m.role,
550 rc.len()
551 ),
552 );
553 }
554 }
555 write_info_log("agent_loop", "build_request_with_tools 成功");
556 req
557 }
558 Err(e) => {
559 let _ = tx.send(StreamMsg::Error(e));
560 return;
561 }
562 };
563
564 let mut retry_attempt: u32 = 0;
567
568 'api_retry: loop {
569 retry_attempt += 1;
570 let call_start = Instant::now();
571
572 write_info_log(
574 "agent_loop",
575 &format!("开始创建流式请求 (attempt={})...", retry_attempt),
576 );
577 let mut stream = match client.chat_completion_stream(&request).await {
578 Ok(s) => {
579 write_info_log("agent_loop", "流式请求创建成功");
580 s
581 }
582 Err(e) => {
583 let err = ChatError::from(e);
584 write_error_log("Chat API 流式请求创建", &err.to_string());
585 if let Some(policy) = retry_policy_for(&err)
586 && retry_attempt <= policy.max_attempts
587 {
588 let delay_ms =
589 backoff_delay_ms(retry_attempt, policy.base_ms, policy.cap_ms);
590 write_info_log(
591 "agent_loop",
592 &format!(
593 "流式创建失败,{}ms 后重试 ({}/{})",
594 delay_ms, retry_attempt, policy.max_attempts
595 ),
596 );
597 let _ = tx.send(StreamMsg::Retrying {
598 attempt: retry_attempt,
599 max_attempts: policy.max_attempts,
600 delay_ms,
601 error: err.display_message(),
602 });
603 tokio::select! {
604 _ = tokio::time::sleep(Duration::from_millis(delay_ms)) => {
605 continue 'api_retry;
606 }
607 _ = cancel_token.cancelled() => {
608 let _ = tx.send(StreamMsg::Cancelled);
609 return;
610 }
611 }
612 }
613 let _ = tx.send(StreamMsg::Error(err));
614 return;
615 }
616 };
617
618 let mut finish_reason: Option<String> = None;
620 let mut assistant_text = String::new();
621 let mut assistant_reasoning = String::new();
622 let mut active_tool_call_parts: BTreeMap<u32, StreamingToolCallPart> = BTreeMap::new();
624 let mut deserialize_failed = false;
625 let mut needs_compact_for_tool_id_mismatch = false;
627 let mut stream_retriable_error: Option<ChatError> = None;
629
630 let mut received_chunks: u32 = 0;
631
632 'stream: loop {
633 tokio::select! {
634 result = stream.next() => {
635 match result {
636 Some(Ok(response)) => {
637 received_chunks += 1;
638 if received_chunks == 1 {
639 metrics.ttft_ms_per_call.push(call_start.elapsed().as_millis() as u64);
640 }
641 if let Some(ref usage) = response.usage {
643 metrics.total_input_tokens += usage.prompt_tokens;
644 metrics.total_output_tokens += usage.completion_tokens;
645 }
646 if received_chunks <= DEBUG_LOG_CHUNK_LIMIT {
648 let choices_debug: Vec<String> = response.choices.iter().map(|choice| {
649 format!(
650 "idx={}, finish_reason={:?}, has_content={}, has_tool_calls={}",
651 choice.index,
652 choice.finish_reason,
653 choice.delta.content.is_some(),
654 choice.delta.tool_calls.is_some(),
655 )
656 }).collect();
657 write_info_log(
658 "stream_chunk",
659 &format!("chunk #{}: choices=[{}]", received_chunks, choices_debug.join("; ")),
660 );
661 }
662 for choice in &response.choices {
663 if let Some(ref content) = choice.delta.content {
664 assistant_text.push_str(content);
665 let mut stream_buf = safe_lock(&streaming_content, "agent::stream_chunk");
666 stream_buf.push_str(content);
667 drop(stream_buf);
668 let _ = tx.send(StreamMsg::Chunk);
669 }
670 if let Some(ref reasoning) = choice.delta.reasoning_content {
671 assistant_reasoning.push_str(reasoning);
672 {
674 let mut reason_buf = safe_lock(&streaming_reasoning_content, "agent::stream_reasoning");
675 reason_buf.push_str(reasoning);
676 }
677 let _ = tx.send(StreamMsg::Chunk);
678 }
679 if !assistant_reasoning.is_empty() && choice.delta.reasoning_content.is_some() && assistant_reasoning.len() < REASONING_LOG_THRESHOLD {
680 write_info_log("agent_loop", &format!("reasoning积累中 len={}", assistant_reasoning.len()));
681 }
682 if let Some(ref toolcall_chunks) = choice.delta.tool_calls {
684 for chunk in toolcall_chunks {
685 let entry =
686 active_tool_call_parts.entry(chunk.index).or_insert_with(|| {
687 StreamingToolCallPart {
688 call_id: chunk.id.clone().unwrap_or_default(),
689 function_name: String::new(),
690 function_arguments: String::new(),
691 }
692 });
693 if entry.call_id.is_empty()
694 && let Some(ref id) = chunk.id {
695 entry.call_id = id.clone();
696 }
697 if let Some(ref tool_function) = chunk.function {
698 if let Some(ref name) = tool_function.name {
699 entry.function_name.push_str(name);
700 }
701 if let Some(ref args) = tool_function.arguments {
702 entry.function_arguments.push_str(args);
703 }
704 }
705 }
706 }
707 if let Some(ref finish_reason_val) = choice.finish_reason {
708 finish_reason = Some(finish_reason_val.clone());
709 }
710 }
711 }
712 Some(Err(e)) => {
713 let error_str = e.to_string();
714 write_error_log("Chat API 流式响应 error", &error_str);
715 let err = ChatError::from(e);
716 if matches!(err, ChatError::StreamDeserialize(_))
719 || error_str.contains("missing field `index`")
720 || error_str.contains("tool_calls")
721 {
722 write_info_log(
723 "Chat API 流式响应",
724 &format!("检测到反序列化错误,将 fallback 到非流式: {}", err),
725 );
726 deserialize_failed = true;
727 break 'stream;
728 }
729 if matches!(&err, ChatError::ApiBadRequest(msg) if msg.contains("tool_call_id")) {
732 write_error_log(
733 "Chat API 流式响应",
734 &format!("检测到 tool_call_id 不一致错误,将压缩上下文后重试: {}", err),
735 );
736 needs_compact_for_tool_id_mismatch = true;
737 break 'stream;
738 }
739 if retry_policy_for(&err).is_some() {
741 stream_retriable_error = Some(err);
742 break 'stream;
743 }
744 write_error_log("Chat API 流式响应(不可重试)", &err.to_string());
746 let _ = tx.send(StreamMsg::Error(err));
747 return;
748 }
749 None => {
750 write_info_log("agent_loop", "流式结束 (stream returned None)");
751 break 'stream;
752 }
753 }
754 }
755 _ = cancel_token.cancelled() => {
756 let _ = tx.send(StreamMsg::Cancelled);
757 return;
758 }
759 }
760 }
761
762 if needs_compact_for_tool_id_mismatch {
764 write_info_log(
765 "agent_loop",
766 "tool_call_id 不一致错误:将执行 auto_compact 压缩上下文后重试",
767 );
768 {
770 let mut stream_buf =
771 safe_lock(&streaming_content, "agent::tool_id_error_clear");
772 stream_buf.clear();
773 }
774 {
775 let mut reason_buf = safe_lock(
776 &streaming_reasoning_content,
777 "agent::tool_id_error_reason_clear",
778 );
779 reason_buf.clear();
780 }
781 if compact_config.enabled {
783 let _ = tx.send(StreamMsg::Compacting);
784 match compact::auto_compact(
785 &mut messages,
786 &AutoCompactParams {
787 provider: &provider,
788 invoked_skills: &invoked_skills,
789 session_id: &session_id,
790 protected_context: None,
791 },
792 )
793 .await
794 {
795 Err(e) => {
796 write_error_log(
797 "agent_loop",
798 &format!("tool_call_id 恢复时 auto_compact 失败: {}", e),
799 );
800 let _ = tx.send(StreamMsg::Error(ChatError::Other(format!(
801 "消息历史损坏且自动修复失败: {}",
802 e
803 ))));
804 return;
805 }
806 Ok(result) => {
807 clear_channels(&display_messages, &context_messages);
808 push_compact_tool_messages(
809 &mut messages,
810 &display_messages,
811 &context_messages,
812 &result,
813 );
814 metrics.auto_compact_count += 1;
815 let _ = tx.send(StreamMsg::Compacted {
816 messages_before: result.messages_before,
817 });
818 }
819 }
820 continue 'round;
821 } else {
822 let _ = tx.send(StreamMsg::Error(ChatError::Other(
824 "消息历史中 tool_call_id 不一致,且 compact 未启用,无法自动恢复"
825 .to_string(),
826 )));
827 return;
828 }
829 }
830
831 if let Some(err) = stream_retriable_error {
833 write_error_log("Chat API 流式响应(将重试)", &err.to_string());
834 if let Some(policy) = retry_policy_for(&err)
835 && retry_attempt <= policy.max_attempts
836 {
837 {
839 let mut stream_buf =
840 safe_lock(&streaming_content, "agent::stream_retry_clear");
841 stream_buf.clear();
842 }
843 {
844 let mut reason_buf = safe_lock(
845 &streaming_reasoning_content,
846 "agent::stream_retry_reason_clear",
847 );
848 reason_buf.clear();
849 }
850 let delay_ms = backoff_delay_ms(retry_attempt, policy.base_ms, policy.cap_ms);
851 write_info_log(
852 "agent_loop",
853 &format!(
854 "流式中断,{}ms 后重试 ({}/{})",
855 delay_ms, retry_attempt, policy.max_attempts
856 ),
857 );
858 let _ = tx.send(StreamMsg::Retrying {
859 attempt: retry_attempt,
860 max_attempts: policy.max_attempts,
861 delay_ms,
862 error: err.display_message(),
863 });
864 tokio::select! {
865 _ = tokio::time::sleep(Duration::from_millis(delay_ms)) => {
866 continue 'api_retry;
867 }
868 _ = cancel_token.cancelled() => {
869 let _ = tx.send(StreamMsg::Cancelled);
870 return;
871 }
872 }
873 }
874 let _ = tx.send(StreamMsg::Error(err));
876 return;
877 }
878
879 if !assistant_text.is_empty() {
881 write_info_log("Sprite 回复", &assistant_text);
882 }
883
884 write_info_log(
885 "agent_loop",
886 &format!(
887 "流式循环结束: finish_reason={:?}, assistant_text_len={}, active_tool_call_parts={}, deserialize_failed={}",
888 finish_reason,
889 assistant_text.len(),
890 active_tool_call_parts.len(),
891 deserialize_failed
892 ),
893 );
894
895 let stream_empty = finish_reason.is_none()
899 && assistant_text.is_empty()
900 && active_tool_call_parts.is_empty();
901 write_info_log(
902 "agent_loop",
903 &format!(
904 "流式结果分析: stream_empty={}, deserialize_failed={}, received_chunks={}",
905 stream_empty, deserialize_failed, received_chunks
906 ),
907 );
908 if deserialize_failed || stream_empty {
909 if stream_empty {
910 write_info_log(
911 "agent_loop",
912 &format!(
913 "流式返回空响应 (chunks={}, finish_reason=None, 无内容),fallback 到非流式重试",
914 received_chunks
915 ),
916 );
917 }
918 {
920 let mut stream_buf = safe_lock(&streaming_content, "agent::fallback_clear");
921 stream_buf.clear();
922 }
923 {
924 let mut reason_buf =
925 safe_lock(&streaming_reasoning_content, "agent::fallback_reason_clear");
926 reason_buf.clear();
927 }
928 let fallback_result = loop {
930 let create_fut = call_llm_non_stream(&provider, &request);
931 let result = tokio::select! {
932 result = create_fut => result,
933 _ = cancel_token.cancelled() => {
934 let _ = tx.send(StreamMsg::Cancelled);
935 return;
936 }
937 };
938 match result {
939 Ok(r) => {
940 metrics
941 .ttft_ms_per_call
942 .push(call_start.elapsed().as_millis() as u64);
943 if let Some(ref usage) = r.usage {
944 metrics.total_input_tokens += usage.prompt_tokens;
945 metrics.total_output_tokens += usage.completion_tokens;
946 }
947 metrics.total_llm_calls += 1;
948 metrics.total_llm_elapsed_ms += call_start.elapsed().as_millis() as u64;
949 break r;
950 }
951 Err(e) => {
952 write_error_log("Sprite API fallback 非流式", &e.to_string());
953 if let Some(policy) = retry_policy_for(&e)
954 && retry_attempt <= policy.max_attempts
955 {
956 let delay_ms =
957 backoff_delay_ms(retry_attempt, policy.base_ms, policy.cap_ms);
958 write_info_log(
959 "agent_loop",
960 &format!(
961 "fallback 非流式失败,{}ms 后重试 ({}/{})",
962 delay_ms, retry_attempt, policy.max_attempts
963 ),
964 );
965 let _ = tx.send(StreamMsg::Retrying {
966 attempt: retry_attempt,
967 max_attempts: policy.max_attempts,
968 delay_ms,
969 error: e.display_message(),
970 });
971 tokio::select! {
972 _ = tokio::time::sleep(Duration::from_millis(delay_ms)) => {
973 retry_attempt += 1;
974 continue;
975 }
976 _ = cancel_token.cancelled() => {
977 let _ = tx.send(StreamMsg::Cancelled);
978 return;
979 }
980 }
981 }
982 let _ = tx.send(StreamMsg::Error(e));
983 return;
984 }
985 }
986 };
987
988 write_info_log(
989 "agent_loop",
990 &format!(
991 "fallback 非流式结果: has_tool_calls={}, has_content={}, finish_reason={:?}",
992 fallback_result.has_tool_calls(),
993 fallback_result
994 .content
995 .as_ref()
996 .map(|content| content.len())
997 .unwrap_or(0),
998 fallback_result.finish_reason
999 ),
1000 );
1001
1002 if fallback_result.has_tool_calls()
1003 && let Some(tool_items) = fallback_result.tool_calls
1004 {
1005 if tool_items.is_empty() {
1006 write_info_log("agent_loop", "fallback tool_calls 为空列表,break 'round");
1007 break 'round;
1008 }
1009 let assistant_text: String =
1010 fallback_result.content.clone().unwrap_or_default();
1011 metrics.total_tool_calls += tool_items.len() as u32;
1012 let tool_start = Instant::now();
1013 match process_tool_calls(
1014 tool_items,
1015 assistant_text,
1016 &mut messages,
1017 &tool_ctx,
1018 fallback_result.reasoning_content.clone(),
1019 ) {
1020 Ok(result) => {
1021 metrics.total_tool_elapsed_ms +=
1022 tool_start.elapsed().as_millis() as u64;
1023 if result.compact_requested && compact_config.enabled {
1025 let _ = tx.send(StreamMsg::Compacting);
1026 if let Ok(compact_result) = compact::auto_compact(
1027 &mut messages,
1028 &AutoCompactParams {
1029 provider: &provider,
1030 invoked_skills: &invoked_skills,
1031 session_id: &session_id,
1032 protected_context: None,
1033 },
1034 )
1035 .await
1036 {
1037 clear_channels(&display_messages, &context_messages);
1038 push_compact_tool_messages(
1039 &mut messages,
1040 &display_messages,
1041 &context_messages,
1042 &compact_result,
1043 );
1044 metrics.auto_compact_count += 1;
1045 let _ = tx.send(StreamMsg::Compacted {
1046 messages_before: compact_result.messages_before,
1047 });
1048 }
1049 }
1050 if let Some(ref plan_content) = result.plan_with_context_clear {
1052 write_info_log(
1053 "agent_loop",
1054 "Clearing context after plan approval (fallback path)",
1055 );
1056 messages.clear();
1058 if let Ok(mut shared) = display_messages.lock() {
1059 shared.clear();
1060 }
1061 if let Ok(mut shared) = context_messages.lock() {
1062 shared.clear();
1063 }
1064 let plan_msg = ChatMessage::text(
1067 MessageRole::User,
1068 format!("以下计划已获批准,请按计划执行:\n\n{}", plan_content),
1069 );
1070 messages.push(plan_msg);
1071 }
1072 continue 'round;
1073 }
1074 Err(e) => {
1075 write_error_log(
1076 "agent_loop",
1077 &format!("process_tool_calls failed: {}", e),
1078 );
1079 return;
1080 }
1081 }
1082 }
1083
1084 if let Some(ref content) = fallback_result.content
1086 && !content.is_empty()
1087 {
1088 write_info_log("Sprite 回复", content);
1089 let mut stream_buf = safe_lock(&streaming_content, "agent::fallback_content");
1090 stream_buf.push_str(content);
1091 drop(stream_buf);
1092 let _ = tx.send(StreamMsg::Chunk);
1093 }
1094 if let Some(ref reason) = fallback_result.finish_reason
1096 && !matches!(
1097 reason.as_str(),
1098 "stop" | "length" | "tool_calls" | "content_filter" | "function_call"
1099 )
1100 && fallback_result
1101 .content
1102 .as_deref()
1103 .unwrap_or_default()
1104 .is_empty()
1105 {
1106 let error_msg = ChatError::AbnormalFinish(reason.clone());
1107 write_error_log("Sprite API fallback 非流式", &error_msg.to_string());
1108 let _ = tx.send(StreamMsg::Error(error_msg));
1109 return;
1110 }
1111
1112 let has_pending =
1114 !safe_lock(&pending_user_messages, "agent::pending_check_fallback").is_empty();
1115 write_info_log(
1116 "agent_loop",
1117 &format!("fallback 正常结束,pending_user_messages={}", has_pending),
1118 );
1119 if has_pending {
1120 flush_streaming_as_message(
1121 &streaming_content,
1122 &streaming_reasoning_content,
1123 &mut messages,
1124 &display_messages,
1125 &context_messages,
1126 fallback_result.reasoning_content.clone(),
1127 );
1128 write_info_log("agent_loop", "有用户增量消息,continue 'round");
1129 continue 'round;
1130 }
1131 write_info_log("agent_loop", "无用户增量消息,break 'round (fallback 路径)");
1132
1133 flush_streaming_as_message(
1135 &streaming_content,
1136 &streaming_reasoning_content,
1137 &mut messages,
1138 &display_messages,
1139 &context_messages,
1140 fallback_result.reasoning_content.clone(),
1141 );
1142 break 'round;
1143 }
1144
1145 let has_tool_calls = !active_tool_call_parts.is_empty();
1151 write_info_log(
1152 "agent_loop",
1153 &format!(
1154 "流式路径决策: has_tool_calls={}, finish_reason={:?}",
1155 has_tool_calls, finish_reason
1156 ),
1157 );
1158
1159 if has_tool_calls {
1160 let finish_reason_is_tool_calls = finish_reason.as_deref() == Some("tool_calls");
1162 if !finish_reason_is_tool_calls {
1163 write_info_log(
1164 "agent_loop",
1165 &format!(
1166 "finish_reason={:?} 不是 ToolCalls 但 active_tool_call_parts 非空({}),仍处理工具调用",
1167 finish_reason,
1168 active_tool_call_parts.len()
1169 ),
1170 );
1171 }
1172
1173 let tool_items: Vec<ToolCallItem> = active_tool_call_parts
1174 .into_values()
1175 .map(|part| {
1176 let id = if part.call_id.is_empty() {
1180 let rand_id =
1181 format!("call_{:016x}", rand::thread_rng().r#gen::<u64>());
1182 write_info_log(
1183 "agent_loop",
1184 &format!(
1185 "tool_call id 为空(API 未在流式 chunk 中返回),已生成随机 id: {}",
1186 rand_id
1187 ),
1188 );
1189 rand_id
1190 } else {
1191 part.call_id
1192 };
1193 ToolCallItem { id, name: part.function_name, arguments: part.function_arguments }
1194 })
1195 .collect();
1196
1197 if tool_items.is_empty() {
1198 write_info_log("agent_loop", "流式 tool_items 转换后为空,break 'round");
1199 metrics.total_llm_calls += 1;
1200 metrics.total_llm_elapsed_ms += call_start.elapsed().as_millis() as u64;
1201 break 'round;
1202 }
1203
1204 write_info_log(
1205 "agent_loop",
1206 &format!(
1207 "开始处理 {} 个工具调用: [{}]",
1208 tool_items.len(),
1209 tool_items
1210 .iter()
1211 .map(|t| t.name.as_str())
1212 .collect::<Vec<_>>()
1213 .join(", ")
1214 ),
1215 );
1216 let reasoning_opt = {
1217 let r: String = take(&mut assistant_reasoning);
1218 if r.is_empty() { None } else { Some(r) }
1219 };
1220 metrics.total_llm_calls += 1;
1221 metrics.total_llm_elapsed_ms += call_start.elapsed().as_millis() as u64;
1222 metrics.total_tool_calls += tool_items.len() as u32;
1223 let tool_start = Instant::now();
1224 match process_tool_calls(
1225 tool_items,
1226 assistant_text,
1227 &mut messages,
1228 &tool_ctx,
1229 reasoning_opt,
1230 ) {
1231 Ok(result) => {
1232 metrics.total_tool_elapsed_ms += tool_start.elapsed().as_millis() as u64;
1233 if result.compact_requested && compact_config.enabled {
1235 let _ = tx.send(StreamMsg::Compacting);
1236 if let Ok(compact_result) = compact::auto_compact(
1237 &mut messages,
1238 &AutoCompactParams {
1239 provider: &provider,
1240 invoked_skills: &invoked_skills,
1241 session_id: &session_id,
1242 protected_context: None,
1243 },
1244 )
1245 .await
1246 {
1247 clear_channels(&display_messages, &context_messages);
1248 push_compact_tool_messages(
1249 &mut messages,
1250 &display_messages,
1251 &context_messages,
1252 &compact_result,
1253 );
1254 let _ = tx.send(StreamMsg::Compacted {
1255 messages_before: compact_result.messages_before,
1256 });
1257 }
1258 }
1259 if let Some(ref plan_content) = result.plan_with_context_clear {
1261 write_info_log(
1262 "agent_loop",
1263 "Clearing context after plan approval (stream path)",
1264 );
1265 messages.clear();
1267 if let Ok(mut shared) = display_messages.lock() {
1268 shared.clear();
1269 }
1270 if let Ok(mut shared) = context_messages.lock() {
1271 shared.clear();
1272 }
1273 let plan_msg = ChatMessage::text(
1276 MessageRole::User,
1277 format!("以下计划已获批准,请按计划执行:\n\n{}", plan_content),
1278 );
1279 messages.push(plan_msg);
1280 }
1281 continue 'round;
1282 }
1283 Err(e) => {
1284 write_error_log("agent_loop", &format!("process_tool_calls failed: {}", e));
1285 return;
1286 }
1287 }
1288 } else {
1289 let has_pending =
1291 !safe_lock(&pending_user_messages, "agent::pending_check_stream").is_empty();
1292 write_info_log(
1293 "agent_loop",
1294 &format!(
1295 "LLM 未调用工具 (finish_reason={:?}, text_len={}),pending_user_messages={}",
1296 finish_reason,
1297 assistant_text.len(),
1298 has_pending
1299 ),
1300 );
1301 if has_pending {
1302 let reasoning_for_flush: Option<String> = {
1303 let r = take(&mut assistant_reasoning);
1304 if r.is_empty() { None } else { Some(r) }
1305 };
1306 flush_streaming_as_message(
1307 &streaming_content,
1308 &streaming_reasoning_content,
1309 &mut messages,
1310 &display_messages,
1311 &context_messages,
1312 reasoning_for_flush,
1313 );
1314 write_info_log("agent_loop", "有用户增量消息,continue 'round");
1315 continue 'round;
1316 }
1317
1318 if hook_manager.has_hooks_for(HookEvent::Stop) {
1320 let reasoning_for_flush: Option<String> = {
1321 let r = take(&mut assistant_reasoning);
1322 if r.is_empty() { None } else { Some(r) }
1323 };
1324 flush_streaming_as_message(
1325 &streaming_content,
1326 &streaming_reasoning_content,
1327 &mut messages,
1328 &display_messages,
1329 &context_messages,
1330 reasoning_for_flush,
1331 );
1332 let stop_ctx = HookContext {
1333 event: HookEvent::Stop,
1334 messages: Some(messages.clone()),
1335 system_prompt: system_prompt.clone(),
1336 model: Some(provider.model.clone()),
1337 user_input: Some(assistant_text.clone()),
1338 session_id: Some(session_id.clone()),
1339 ..Default::default()
1340 };
1341 if let Some(result) =
1342 hook_manager.execute(HookEvent::Stop, stop_ctx, &disabled_hooks)
1343 {
1344 if let Some(ref ctx_text) = result.additional_context {
1346 let current = system_prompt.unwrap_or_default();
1347 system_prompt = Some(format!("{}\n\n{}", current, ctx_text));
1348 }
1349 if let Some(ref feedback) = result.retry_feedback {
1351 write_info_log("Stop hook", &format!("纠查官反馈: {}", feedback));
1352 let feedback_msg =
1353 ChatMessage::text(MessageRole::User, feedback.clone());
1354 messages.push(feedback_msg.clone());
1355 push_both(&display_messages, &context_messages, feedback_msg);
1356 continue 'round;
1357 }
1358 if result.is_stop() {
1360 let _ = tx.send(StreamMsg::Error(ChatError::HookAborted));
1361 return;
1362 }
1363 }
1364 }
1365
1366 write_info_log(
1367 "agent_loop",
1368 &format!(
1369 "break 'round: LLM 返回 Stop 且无工具调用,无待处理消息 (round={}, text_len={})",
1370 round_idx,
1371 assistant_text.len()
1372 ),
1373 );
1374
1375 let reasoning_for_flush: Option<String> = {
1378 let r = take(&mut assistant_reasoning);
1379 if r.is_empty() { None } else { Some(r) }
1380 };
1381 flush_streaming_as_message(
1382 &streaming_content,
1383 &streaming_reasoning_content,
1384 &mut messages,
1385 &display_messages,
1386 &context_messages,
1387 reasoning_for_flush,
1388 );
1389
1390 metrics.total_llm_calls += 1;
1391 metrics.total_llm_elapsed_ms += call_start.elapsed().as_millis() as u64;
1392 break 'round;
1393 }
1394
1395 #[allow(unreachable_code)]
1397 {
1398 break 'api_retry;
1399 }
1400 } } write_info_log(
1404 "agent_loop",
1405 &format!(
1406 "agent loop 结束,发送 Done (共执行 {} 轮后退出 'round)",
1407 final_round_idx + 1
1408 ),
1409 );
1410
1411 metrics.session_end_ms = SystemTime::now()
1413 .duration_since(UNIX_EPOCH)
1414 .unwrap_or_default()
1415 .as_millis() as u64;
1416 metrics.estimated_context_tokens_peak = context_tokens_peak;
1417 metrics.skill_loads = {
1418 if let Ok(skills) = invoked_skills.lock() {
1419 skills.keys().cloned().collect()
1420 } else {
1421 vec![]
1422 }
1423 };
1424
1425 if let Ok(sub) = sub_agent_metrics.lock() {
1427 metrics.total_llm_calls += sub.total_llm_calls;
1428 metrics.total_tool_calls += sub.total_tool_calls;
1429 metrics.total_input_tokens += sub.total_input_tokens;
1430 metrics.total_output_tokens += sub.total_output_tokens;
1431 metrics.total_llm_elapsed_ms += sub.total_llm_elapsed_ms;
1432 metrics.total_tool_elapsed_ms += sub.total_tool_elapsed_ms;
1433 metrics
1434 .ttft_ms_per_call
1435 .extend(&sub.llm_elapsed_ms_per_call);
1436 }
1437
1438 let _ = write_session_metrics(&session_id, &metrics);
1439 let metrics_event = SessionEvent::Metrics { metrics };
1440 let _ = append_session_event(&session_id, &metrics_event);
1441
1442 let _ = tx.send(StreamMsg::Done);
1443}