1use crate::config::KodaConfig;
40use crate::db::{Database, Role};
41use crate::engine::{EngineCommand, EngineEvent, EngineSink};
42use crate::file_tracker::FileTracker;
43use crate::inference_helpers::{
44 AUTO_COMPACT_THRESHOLD, CONTEXT_WARN_THRESHOLD, RATE_LIMIT_MAX_RETRIES, assemble_messages,
45 estimate_tokens, is_context_overflow_error, is_image_rejection_error, is_rate_limit_error,
46 is_server_error, rate_limit_backoff,
47};
48use crate::loop_guard::{LoopAction, LoopDetector};
49use crate::persistence::Persistence;
50use crate::providers::{
51 ChatMessage, ImageData, LlmProvider, StreamChunk, TokenUsage, ToolCall, ToolDefinition,
52 stream_collector::SseCollector,
53};
54use crate::skill_scope::SkillToolScope;
55use crate::tool_dispatch::{
56 can_parallelize, execute_tools_parallel, execute_tools_sequential, execute_tools_split_batch,
57};
58use crate::tools::ToolRegistry;
59use crate::trust::TrustMode;
60
61use anyhow::{Context, Result};
62use std::path::Path;
63use std::time::Instant;
64use tokio::sync::mpsc;
65use tokio_util::sync::CancellationToken;
66
67struct TurnState<'a> {
77 db: &'a Database,
78 session_id: &'a str,
79 system_message: &'a ChatMessage,
80 pending_images: Option<&'a [ImageData]>,
81 iteration: u32,
82 config: &'a KodaConfig,
83 provider: &'a dyn LlmProvider,
84 tool_defs: &'a [ToolDefinition],
85 sink: &'a dyn EngineSink,
86 cancel: &'a CancellationToken,
87}
88
89struct StreamResult {
91 text: String,
93 thinking_content: String,
99 tool_calls: Vec<ToolCall>,
101 eager_results: Vec<(String, String, bool, Option<String>)>,
107 usage: TokenUsage,
109 char_count: usize,
111 interrupted: bool,
113 network_error: Option<String>,
118}
119
120async fn assemble_context(turn: &TurnState<'_>) -> Result<Vec<ChatMessage>> {
126 let history = turn.db.load_context(turn.session_id).await?;
127
128 let analysis = crate::context_analysis::analyze_context(&history);
132 if analysis.total > 0 {
133 tracing::debug!(
134 "Context analysis: {} total, {}% tool results, {}% duplicate reads",
135 analysis.total,
136 analysis.tool_result_percent(),
137 analysis.duplicate_read_percent(),
138 );
139 for (tool, tokens) in analysis.top_tool_results(3) {
140 tracing::debug!(" {tool}: ~{tokens} tokens");
141 }
142 }
143
144 let mut messages = assemble_messages(turn.system_message, &history);
145
146 if turn.iteration == 0
148 && let Some(imgs) = turn.pending_images
149 && !imgs.is_empty()
150 && let Some(last_user) = messages.iter_mut().rev().find(|m| m.role == "user")
151 {
152 last_user.images = Some(imgs.to_vec());
153 }
154
155 let context_used = estimate_tokens(&messages);
156 crate::context::update(context_used, turn.config.max_context_tokens);
157 turn.sink.emit(EngineEvent::ContextUsage {
158 used: context_used,
159 max: turn.config.max_context_tokens,
160 });
161
162 let ctx_pct = crate::context::percentage();
165 if (CONTEXT_WARN_THRESHOLD..AUTO_COMPACT_THRESHOLD).contains(&ctx_pct) {
166 let mut warning = format!("Context at {ctx_pct}% — approaching limit.");
168 let top = analysis.top_tool_results(2);
169 if !top.is_empty() {
170 let hogs: Vec<String> = top
171 .iter()
172 .map(|(name, tokens)| format!("{name} (~{tokens} tok)"))
173 .collect();
174 warning.push_str(&format!(" Top consumers: {}.", hogs.join(", ")));
175 }
176 let waste = analysis.total_duplicate_waste();
177 if waste > 500 {
178 warning.push_str(&format!(" ~{waste} tokens wasted on duplicate file reads."));
179 }
180 warning.push_str(" Run /compact to free up space.");
181 turn.sink.emit(EngineEvent::Warn { message: warning });
182 }
183
184 Ok(messages)
185}
186
187async fn preflight_compact_if_needed(
192 turn: &TurnState<'_>,
193 messages: Vec<ChatMessage>,
194) -> Result<Vec<ChatMessage>> {
195 let ctx_pct = crate::context::percentage();
196 if ctx_pct < AUTO_COMPACT_THRESHOLD {
197 return Ok(messages);
198 }
199
200 if crate::compact::is_compact_circuit_broken() {
202 tracing::warn!("Pre-flight: context at {ctx_pct}% but circuit breaker tripped — skipping");
203 return Ok(messages);
204 }
205
206 tracing::warn!("Pre-flight: context at {ctx_pct}%, attempting auto-compact");
207 turn.sink.emit(EngineEvent::Info {
208 message: format!("\u{1f4e6} Context at {ctx_pct}% \u{2014} compacting before sending..."),
209 });
210
211 match crate::compact::compact_session_with_provider(
212 turn.db,
213 turn.session_id,
214 turn.config.max_context_tokens,
215 &turn.config.model_settings,
216 turn.provider,
217 )
218 .await
219 {
220 Ok(Ok(result)) => {
221 turn.sink.emit(EngineEvent::Info {
222 message: format!(
223 "\u{2705} Compacted {} messages (~{} token summary)",
224 result.deleted, result.summary_tokens
225 ),
226 });
227 assemble_context(turn).await
228 }
229 Ok(Err(skip)) => {
230 tracing::info!("Pre-flight compact skipped: {skip:?}");
231 if matches!(skip, crate::compact::CompactSkip::HistoryTooLarge) {
232 crate::compact::record_compact_failure();
233 turn.sink.emit(EngineEvent::Warn {
234 message: "\u{26a0}\u{fe0f} Context is full but history is too large for \
235 this model to summarize. Start a new session (/session) or \
236 switch to a model with a larger context window."
237 .to_string(),
238 });
239 }
240 Ok(messages)
241 }
242 Err(e) => {
243 tracing::warn!("Pre-flight compact failed: {e:#}");
244 let tripped = crate::compact::record_compact_failure();
245 let suffix = if tripped {
246 " Auto-compact disabled after repeated failures."
247 } else {
248 " Continuing anyway..."
249 };
250 turn.sink.emit(EngineEvent::Warn {
251 message: format!("Compact failed: {e:#}.{suffix}"),
252 });
253 Ok(messages)
254 }
255 }
256}
257
258async fn try_with_rate_limit(
263 provider: &dyn LlmProvider,
264 messages: &[ChatMessage],
265 tool_defs: &[ToolDefinition],
266 model_settings: &crate::config::ModelSettings,
267 cancel: &CancellationToken,
268 sink: &dyn EngineSink,
269) -> Result<Option<SseCollector>> {
270 let mut last_err = None;
271 for attempt in 0..RATE_LIMIT_MAX_RETRIES {
272 let result = tokio::select! {
273 result = provider.chat_stream(messages, tool_defs, model_settings) => result,
274 _ = cancel.cancelled() => return Ok(None),
275 };
276 match result {
277 Ok(collector) => return Ok(Some(collector)),
278 Err(e) if is_rate_limit_error(&e) && attempt + 1 < RATE_LIMIT_MAX_RETRIES => {
279 let delay = rate_limit_backoff(attempt);
280 sink.emit(EngineEvent::SpinnerStop);
281 sink.emit(EngineEvent::Warn {
282 message: format!("\u{23f3} Rate limited. Retrying in {}s...", delay.as_secs()),
283 });
284 tracing::warn!(
285 "Rate limit (attempt {}/{}): {e:#}",
286 attempt + 1,
287 RATE_LIMIT_MAX_RETRIES
288 );
289 tokio::time::sleep(delay).await;
290 sink.emit(EngineEvent::SpinnerStart {
291 message: format!("Retrying (attempt {})...", attempt + 2),
292 });
293 last_err = Some(e);
294 }
295 Err(e) => return Err(e),
296 }
297 }
298 Err(last_err.unwrap_or_else(|| anyhow::anyhow!("Rate limit retries exhausted")))
299}
300
301async fn try_overflow_recovery(
307 turn: &TurnState<'_>,
308 original_err: anyhow::Error,
309) -> Result<Option<(SseCollector, Vec<ChatMessage>)>> {
310 turn.sink.emit(EngineEvent::SpinnerStop);
311 turn.sink.emit(EngineEvent::Warn {
312 message: "\u{26a0}\u{fe0f} Provider rejected request (context overflow). \
313 Compacting and retrying..."
314 .to_string(),
315 });
316 tracing::warn!("Context overflow from provider: {original_err:#}");
317
318 match crate::compact::compact_session_with_provider(
319 turn.db,
320 turn.session_id,
321 turn.config.max_context_tokens,
322 &turn.config.model_settings,
323 turn.provider,
324 )
325 .await
326 {
327 Ok(Ok(result)) => {
328 turn.sink.emit(EngineEvent::Info {
329 message: format!(
330 "\u{2705} Compacted {} messages. Retrying...",
331 result.deleted
332 ),
333 });
334 }
335 _ => {
336 return Err(original_err)
337 .context("LLM inference failed (context overflow, compaction unsuccessful)");
338 }
339 }
340
341 let messages = assemble_context(turn).await?;
342
343 turn.sink.emit(EngineEvent::SpinnerStart {
344 message: "Retrying...".into(),
345 });
346 let collector = tokio::select! {
347 result = turn.provider.chat_stream(&messages, turn.tool_defs, &turn.config.model_settings) => {
348 result.context("LLM inference failed after compaction retry")?
349 }
350 _ = turn.cancel.cancelled() => return Ok(None),
351 };
352 Ok(Some((collector, messages)))
353}
354
355async fn collect_stream(
367 rx: &mut mpsc::Receiver<StreamChunk>,
368 sink: &dyn EngineSink,
369 cancel: &CancellationToken,
370 tools: &ToolRegistry,
371 mode: TrustMode,
372 project_root: &Path,
373) -> StreamResult {
374 let mut full_text = String::new();
375 let mut tool_calls: Vec<ToolCall> = Vec::new();
376 let mut eager_results: Vec<(String, String, bool, Option<String>)> = Vec::new();
377 let mut usage = TokenUsage::default();
378 let mut first_token = true;
379 let mut char_count: usize = 0;
380 let mut thinking_content = String::new();
382 let mut in_thinking_block = false;
384 let mut response_banner_shown = false;
385 let mut thinking_banner_shown = false;
386 let mut interrupted = false;
387
388 loop {
389 let chunk = tokio::select! {
390 c = rx.recv() => c,
391 _ = cancel.cancelled() => {
392 interrupted = true;
393 None
394 }
395 };
396
397 if interrupted || cancel.is_cancelled() {
398 sink.emit(EngineEvent::SpinnerStop);
399 if !full_text.is_empty() {
400 sink.emit(EngineEvent::TextDone);
401 }
402 sink.emit(EngineEvent::Warn {
403 message: "Interrupted".into(),
404 });
405 return StreamResult {
406 text: full_text,
407 thinking_content,
408 tool_calls,
409 eager_results,
410 usage,
411 char_count,
412 interrupted: true,
413 network_error: None,
414 };
415 }
416
417 let Some(chunk) = chunk else { break };
418
419 match chunk {
420 StreamChunk::TextDelta(delta) => {
421 if first_token {
422 if in_thinking_block {
423 sink.emit(EngineEvent::SpinnerStop);
424 sink.emit(EngineEvent::ThinkingDone);
425 in_thinking_block = false;
426 thinking_banner_shown = true;
427 }
428 sink.emit(EngineEvent::SpinnerStop);
429 first_token = false;
430 }
431
432 if !response_banner_shown && !delta.trim().is_empty() {
433 sink.emit(EngineEvent::ResponseStart);
434 response_banner_shown = true;
435 }
436
437 full_text.push_str(&delta);
438 char_count += delta.len();
439 sink.emit(EngineEvent::TextDelta {
440 text: delta.clone(),
441 });
442 }
443 StreamChunk::ThinkingDelta(delta) => {
444 if !thinking_banner_shown {
445 sink.emit(EngineEvent::SpinnerStop);
446 sink.emit(EngineEvent::ThinkingStart);
447 thinking_banner_shown = true;
448 }
449 in_thinking_block = true;
450 sink.emit(EngineEvent::ThinkingDelta {
451 text: delta.clone(),
452 });
453 thinking_content.push_str(&delta);
454 }
455 StreamChunk::ToolCallReady(tc) => {
456 if in_thinking_block {
460 sink.emit(EngineEvent::SpinnerStop);
461 sink.emit(EngineEvent::ThinkingDone);
462 in_thinking_block = false;
463 }
464 let args: serde_json::Value =
465 serde_json::from_str(&tc.arguments).unwrap_or_default();
466 let is_read_only = !crate::tools::is_mutating_tool(&tc.function_name);
467 let is_auto_approved = !matches!(
468 crate::trust::check_tool(&tc.function_name, &args, mode, Some(project_root),),
469 crate::trust::ToolApproval::NeedsConfirmation
470 | crate::trust::ToolApproval::Blocked
471 );
472
473 if is_read_only && is_auto_approved && tc.function_name != "InvokeAgent" {
474 tracing::debug!("Eager dispatch: {} (id={})", tc.function_name, tc.id);
477 let r = tools
481 .execute(&tc.function_name, &tc.arguments, None, None)
482 .await;
483 eager_results.push((tc.id.clone(), r.output, r.success, r.full_output));
484 }
485 tool_calls.push(tc);
487 }
488 StreamChunk::ToolCalls(tcs) => {
489 if in_thinking_block {
490 sink.emit(EngineEvent::SpinnerStop);
491 sink.emit(EngineEvent::ThinkingDone);
492 in_thinking_block = false;
493 }
494 sink.emit(EngineEvent::SpinnerStop);
495 tool_calls.extend(tcs);
497 }
498 StreamChunk::Done(u) => {
499 if in_thinking_block {
500 sink.emit(EngineEvent::SpinnerStop);
501 sink.emit(EngineEvent::ThinkingDone);
502 }
504 usage = u;
505 break;
506 }
507 StreamChunk::NetworkError(err) => {
508 sink.emit(EngineEvent::SpinnerStop);
511 if !full_text.is_empty() {
512 sink.emit(EngineEvent::TextDone);
513 }
514 sink.emit(EngineEvent::Warn {
515 message: format!("Connection lost mid-stream — turn discarded ({err})"),
516 });
517 return StreamResult {
518 text: full_text,
519 thinking_content,
520 tool_calls,
521 eager_results,
522 usage,
523 char_count,
524 interrupted: false,
525 network_error: Some(err),
526 };
527 }
528 }
529 }
530
531 sink.emit(EngineEvent::TextDone);
532
533 if first_token {
534 sink.emit(EngineEvent::SpinnerStop);
535 }
536
537 StreamResult {
538 text: full_text,
539 thinking_content,
540 tool_calls,
541 eager_results,
542 usage,
543 char_count,
544 interrupted: false,
545 network_error: None,
546 }
547}
548
549pub struct InferenceContext<'a> {
555 pub project_root: &'a Path,
557 pub config: &'a KodaConfig,
559 pub db: &'a Database,
561 pub session_id: &'a str,
563 pub system_prompt: &'a str,
565 pub provider: &'a dyn LlmProvider,
567 pub tools: &'a ToolRegistry,
569 pub tool_defs: &'a [ToolDefinition],
571 pub pending_images: Option<Vec<ImageData>>,
573 pub mode: TrustMode,
575 pub sink: &'a dyn EngineSink,
577 pub cancel: CancellationToken,
579 pub cmd_rx: &'a mut mpsc::Receiver<EngineCommand>,
581 pub file_tracker: &'a mut FileTracker,
583 pub bg_agents: &'a std::sync::Arc<crate::bg_agent::BgAgentRegistry>,
588 pub sub_agent_cache: &'a crate::sub_agent_cache::SubAgentCache,
592}
593
594#[tracing::instrument(skip_all, fields(session_id = %ctx.session_id, agent = %ctx.config.agent_name))]
596pub async fn inference_loop(ctx: InferenceContext<'_>) -> Result<()> {
597 let InferenceContext {
598 project_root,
599 config,
600 db,
601 session_id,
602 system_prompt,
603 provider,
604 tools,
605 tool_defs,
606 pending_images,
607 mode,
608 sink,
609 cancel,
610 cmd_rx,
611 file_tracker,
612 bg_agents,
613 sub_agent_cache,
614 } = ctx;
615
616 let mut hard_cap = config.max_iterations;
618 let mut iteration = 0u32;
619 let mut made_tool_calls = false;
620 let mut retried_empty = false;
621 let mut loop_detector = LoopDetector::new();
622 let mut skill_scope = SkillToolScope::new();
631 let mut total_prompt_tokens: i64 = 0;
632 let mut total_completion_tokens: i64 = 0;
633 let mut total_cache_read_tokens: i64 = 0;
634 let mut total_thinking_tokens: i64 = 0;
635 let mut total_char_count: usize = 0;
636 let loop_start = Instant::now();
637
638 let base_system_prompt = system_prompt.to_string();
640
641 if let Ok(Some(mc)) = crate::microcompact::microcompact_session(db, session_id).await {
645 sink.emit(EngineEvent::Info {
646 message: format!(
647 "\u{1f9f9} Microcompact: cleared {} old tool results (~{} tokens)",
648 mc.cleared, mc.tokens_saved,
649 ),
650 });
651 }
652
653 loop {
654 for ev in bg_agents.drain_status_events() {
663 sink.emit(ev);
664 }
665
666 for bg_result in bg_agents.drain_completed() {
668 let status = if bg_result.success {
669 "completed"
670 } else {
671 "failed"
672 };
673 let injection = format!(
674 "[Background agent '{}' {status}]\n\
675 Original task: {}\n\
676 Result:\n{}",
677 bg_result.agent_name, bg_result.prompt, bg_result.output
678 );
679 let mut msg = format!(
691 " \u{2705} Background agent '{}' {status}",
692 bg_result.agent_name
693 );
694 if !bg_result.events.is_empty() {
695 msg.push('\n');
696 msg.push_str(&bg_result.events.join("\n"));
697 }
698 sink.emit(EngineEvent::Info { message: msg });
699 db.insert_message(session_id, &Role::User, Some(&injection), None, None, None)
700 .await?;
701 }
702
703 {
714 let mut next_texts: Vec<String> = Vec::new();
715 while let Ok(cmd) = cmd_rx.try_recv() {
716 match cmd {
717 EngineCommand::QueueNext { text } => next_texts.push(text),
718 other => {
719 tracing::warn!(
720 "inference_loop: unexpected command at iteration start (discarded): {:?}",
721 std::mem::discriminant(&other)
722 );
723 }
724 }
725 }
726 if !next_texts.is_empty() {
727 let combined = next_texts.join("\n\n");
728 sink.emit(EngineEvent::Info {
729 message: format!(
730 " \u{1f4e5} Injecting {} steer{} into current turn",
731 next_texts.len(),
732 if next_texts.len() == 1 { "" } else { "s" },
733 ),
734 });
735 db.insert_message(session_id, &Role::User, Some(&combined), None, None, None)
736 .await?;
737 }
738 }
739
740 if iteration >= hard_cap {
741 let recent = loop_detector.recent_names();
742 sink.emit(EngineEvent::LoopCapReached {
743 cap: hard_cap,
744 recent_tools: recent,
745 });
746
747 let extra = loop {
749 tokio::select! {
750 cmd = cmd_rx.recv() => match cmd {
751 Some(EngineCommand::LoopDecision { action }) => {
752 break action.extra_iterations();
753 }
754 Some(EngineCommand::Interrupt) => {
755 cancel.cancel();
756 break 0;
757 }
758 None => break 0,
759 _ => continue,
760 },
761 _ = cancel.cancelled() => break 0,
762 }
763 };
764
765 if extra == 0 {
766 break Ok(());
767 }
768 hard_cap += extra;
769 }
770
771 let git_line = crate::git::git_context(project_root)
786 .map(|ctx| format!("\n{ctx}"))
787 .unwrap_or_default();
788 let system_prompt_full = format!("{base_system_prompt}{git_line}");
789 let system_message = ChatMessage::text("system", &system_prompt_full);
790
791 let scoped_tool_defs = skill_scope.filter_tool_defs(tool_defs);
794
795 let active_tool_defs: &[ToolDefinition] = &scoped_tool_defs;
796
797 let turn = TurnState {
799 db,
800 session_id,
801 system_message: &system_message,
802 pending_images: pending_images.as_deref(),
803 iteration,
804 config,
805 provider,
806 tool_defs: active_tool_defs,
807 sink,
808 cancel: &cancel,
809 };
810
811 let messages = assemble_context(&turn).await?;
813
814 let messages = preflight_compact_if_needed(&turn, messages).await?;
816
817 let had_images = turn
820 .pending_images
821 .map(|imgs| !imgs.is_empty())
822 .unwrap_or(false);
823
824 sink.emit(EngineEvent::SpinnerStart {
826 message: "Thinking...".into(),
827 });
828
829 let stream_result = try_with_rate_limit(
830 provider,
831 &messages,
832 active_tool_defs,
833 &config.model_settings,
834 &cancel,
835 sink,
836 )
837 .await;
838
839 let stream_result: Result<SseCollector> = match stream_result {
841 Ok(Some(c)) => Ok(c),
842 Ok(None) => {
843 sink.emit(EngineEvent::SpinnerStop);
844 sink.emit(EngineEvent::Warn {
845 message: "Interrupted".into(),
846 });
847 return Ok(());
848 }
849 Err(e) => Err(e),
850 };
851
852 let SseCollector {
855 mut rx,
856 handle: sse_handle,
857 } = match stream_result {
858 Ok(c) => c,
859 Err(e) if is_context_overflow_error(&e) => {
860 match try_overflow_recovery(&turn, e).await? {
861 Some((rx, _updated)) => rx,
862 None => {
863 sink.emit(EngineEvent::SpinnerStop);
864 sink.emit(EngineEvent::Warn {
865 message: "Interrupted".into(),
866 });
867 return Ok(());
868 }
869 }
870 }
871 Err(e) if is_server_error(&e) => {
872 sink.emit(EngineEvent::SpinnerStop);
873 sink.emit(EngineEvent::Warn {
874 message: format!(
875 "Provider returned a server error: {e:#}. \
876 This often means the model can't handle the current \
877 conversation state. Try a different model or start a new session."
878 ),
879 });
880 return Ok(());
881 }
882 Err(e) if had_images && is_image_rejection_error(&e) => {
883 sink.emit(EngineEvent::SpinnerStop);
884 sink.emit(EngineEvent::Warn {
885 message: format!(
886 "⚠ This model rejected the image attachment — \
887 it likely does not support vision input. \
888 Switch to a vision-capable model such as \
889 claude-sonnet, gemini-flash, or gpt-4o. ({e})"
890 ),
891 });
892 return Ok(());
893 }
894 Err(e) => {
895 return Err(e).context("LLM inference failed");
896 }
897 };
898
899 let stream_result = collect_stream(&mut rx, sink, &cancel, tools, mode, project_root).await;
901
902 if stream_result.interrupted {
903 sse_handle.abort();
907 let has_text = !stream_result.text.is_empty();
908 let has_thinking = !stream_result.thinking_content.is_empty();
909 if has_text || has_thinking {
910 let mid = db
911 .insert_message(
912 session_id,
913 &Role::Assistant,
914 if has_text {
915 Some(stream_result.text.as_str())
916 } else {
917 None
918 },
919 None,
920 None,
921 None,
922 )
923 .await?;
924 if has_thinking {
925 db.update_message_thinking_content(mid, &stream_result.thinking_content)
926 .await?;
927 }
928 }
929 return Ok(());
930 }
931
932 if stream_result.network_error.is_some() {
935 sse_handle.abort();
936 return Ok(());
937 }
938
939 let full_text = stream_result.text;
940 let stream_thinking = stream_result.thinking_content;
941 let tool_calls = crate::tool_normalize::normalize_tool_calls(stream_result.tool_calls);
948 let usage = stream_result.usage;
949 let char_count = stream_result.char_count;
950
951 if tool_calls.is_empty()
953 && made_tool_calls
954 && full_text.trim().is_empty()
955 && usage.stop_reason != "max_tokens"
956 && !retried_empty
957 {
958 retried_empty = true;
959 sink.emit(EngineEvent::SpinnerStart {
960 message: "Empty response — retrying...".into(),
961 });
962 continue;
963 }
964
965 let content = if full_text.is_empty() {
967 None
968 } else {
969 Some(full_text.as_str())
970 };
971 let tool_calls_json = if tool_calls.is_empty() {
972 None
973 } else {
974 Some(serde_json::to_string(&tool_calls)?)
975 };
976
977 let msg_id = db
978 .insert_message(
979 session_id,
980 &Role::Assistant,
981 content,
982 tool_calls_json.as_deref(),
983 None,
984 Some(&usage),
985 )
986 .await?;
987
988 db.mark_message_complete(msg_id).await?;
991
992 if !stream_thinking.is_empty() {
996 db.update_message_thinking_content(msg_id, &stream_thinking)
997 .await?;
998 }
999
1000 if tool_calls.is_empty() {
1002 if usage.stop_reason == "max_tokens" {
1003 sink.emit(EngineEvent::Warn {
1004 message: format!(
1005 "Model {} hit max_tokens limit — response was truncated. \
1006 The context may be too large. Try /compact or start a new session.",
1007 config.model,
1008 ),
1009 });
1010 continue;
1011 } else if made_tool_calls && full_text.trim().is_empty() {
1012 sink.emit(EngineEvent::Warn {
1013 message: format!(
1014 "Model {} produced an empty response after tool use. \
1015 Try rephrasing, run /compact, or switch models with /model.",
1016 config.model,
1017 ),
1018 });
1019 }
1020 let last_prompt_tokens = usage.prompt_tokens;
1025 total_prompt_tokens += usage.prompt_tokens;
1026 total_completion_tokens += usage.completion_tokens;
1027 total_cache_read_tokens += usage.cache_read_tokens;
1028 total_thinking_tokens += usage.thinking_tokens;
1029 total_char_count += char_count;
1030
1031 let display_tokens = if total_completion_tokens > 0 {
1032 total_completion_tokens
1033 } else {
1034 (total_char_count / 4) as i64
1035 };
1036
1037 let total_elapsed = loop_start.elapsed();
1038 let total_secs = total_elapsed.as_secs_f64();
1039 let rate = if total_secs > 0.0 && display_tokens > 0 {
1040 display_tokens as f64 / total_secs
1041 } else {
1042 0.0
1043 };
1044
1045 let context = crate::context::format_footer();
1046
1047 crate::context::update(last_prompt_tokens as usize, config.max_context_tokens);
1060 sink.emit(EngineEvent::ContextUsage {
1061 used: last_prompt_tokens as usize,
1062 max: config.max_context_tokens,
1063 });
1064
1065 sink.emit(EngineEvent::Footer {
1066 prompt_tokens: total_prompt_tokens,
1067 completion_tokens: total_completion_tokens,
1068 cache_read_tokens: total_cache_read_tokens,
1069 thinking_tokens: total_thinking_tokens,
1070 total_chars: total_char_count,
1071 elapsed_ms: total_elapsed.as_millis() as u64,
1072 rate,
1073 context,
1074 });
1075
1076 return Ok(());
1077 }
1078
1079 total_prompt_tokens += usage.prompt_tokens;
1085 total_completion_tokens += usage.completion_tokens;
1086 total_cache_read_tokens += usage.cache_read_tokens;
1087 total_thinking_tokens += usage.thinking_tokens;
1088 total_char_count += char_count;
1089
1090 made_tool_calls = true;
1091
1092 let eager_ids: std::collections::HashSet<String> = stream_result
1094 .eager_results
1095 .iter()
1096 .map(|(id, _, _, _)| id.clone())
1097 .collect();
1098
1099 if !eager_ids.is_empty() {
1100 tracing::info!(
1101 "{} tool(s) executed eagerly during streaming",
1102 eager_ids.len()
1103 );
1104 for (tc_id, result, success, full_output) in &stream_result.eager_results {
1105 if let Some(tc) = tool_calls.iter().find(|tc| tc.id == *tc_id) {
1107 sink.emit(EngineEvent::ToolCallStart {
1108 id: tc_id.clone(),
1109 name: tc.function_name.clone(),
1110 args: serde_json::from_str(&tc.arguments).unwrap_or_default(),
1111 is_sub_agent: false,
1112 });
1113 crate::tool_dispatch::record_tool_result(
1114 tc,
1115 result,
1116 *success,
1117 full_output.as_deref(),
1118 db,
1119 session_id,
1120 tools.caps.tool_result_chars,
1121 project_root,
1122 file_tracker,
1123 sink,
1124 )
1125 .await?;
1126 }
1127 }
1128 }
1129
1130 let remaining_tools: Vec<ToolCall> = tool_calls
1132 .iter()
1133 .filter(|tc| !eager_ids.contains(&tc.id))
1134 .cloned()
1135 .collect();
1136
1137 let remaining_tools = if skill_scope.is_active() {
1140 let mut allowed = Vec::with_capacity(remaining_tools.len());
1141 for tc in remaining_tools {
1142 if let Some(err_msg) = skill_scope.check_tool_call(&tc.function_name) {
1143 let parsed_args: serde_json::Value =
1144 serde_json::from_str(&tc.arguments).unwrap_or_default();
1145 sink.emit(EngineEvent::ToolCallStart {
1146 id: tc.id.clone(),
1147 name: tc.function_name.clone(),
1148 args: parsed_args,
1149 is_sub_agent: false,
1150 });
1151 crate::tool_dispatch::record_tool_result(
1152 &tc,
1153 &err_msg,
1154 false,
1155 None,
1156 db,
1157 session_id,
1158 tools.caps.tool_result_chars,
1159 project_root,
1160 file_tracker,
1161 sink,
1162 )
1163 .await?;
1164 } else {
1165 allowed.push(tc);
1166 }
1167 }
1168 allowed
1169 } else {
1170 remaining_tools
1171 };
1172
1173 if remaining_tools.len() > 1
1179 && can_parallelize(&remaining_tools, mode, project_root, Some(file_tracker))
1180 {
1181 execute_tools_parallel(
1182 &remaining_tools,
1183 project_root,
1184 config,
1185 db,
1186 session_id,
1187 tools,
1188 mode,
1189 sink,
1190 cancel.clone(),
1191 sub_agent_cache,
1192 file_tracker,
1193 bg_agents,
1194 None,
1197 )
1198 .await?;
1199 } else if remaining_tools.len() > 1 {
1200 execute_tools_split_batch(
1201 &remaining_tools,
1202 project_root,
1203 config,
1204 db,
1205 session_id,
1206 tools,
1207 mode,
1208 sink,
1209 cancel.clone(),
1210 cmd_rx,
1211 sub_agent_cache,
1212 file_tracker,
1213 bg_agents,
1214 None,
1215 )
1216 .await?;
1217 } else if !remaining_tools.is_empty() {
1218 execute_tools_sequential(
1219 &remaining_tools,
1220 project_root,
1221 config,
1222 db,
1223 session_id,
1224 tools,
1225 mode,
1226 sink,
1227 cancel.clone(),
1228 cmd_rx,
1229 sub_agent_cache,
1230 file_tracker,
1231 bg_agents,
1232 None,
1233 )
1234 .await?;
1235 }
1236
1237 {
1240 let scope_calls: Vec<(String, serde_json::Value)> = tool_calls
1241 .iter()
1242 .map(|tc| {
1243 let args: serde_json::Value =
1244 serde_json::from_str(&tc.arguments).unwrap_or_default();
1245 (tc.function_name.clone(), args)
1246 })
1247 .collect();
1248 let was_active = skill_scope.is_active();
1249 skill_scope.update_from_tool_calls(&scope_calls, &tools.skill_registry);
1250 match (was_active, skill_scope.is_active()) {
1252 (false, true) => {
1253 sink.emit(EngineEvent::Info {
1254 message: "\u{1f512} Skill tool scope activated — tool set restricted"
1255 .into(),
1256 });
1257 }
1258 (true, false) => {
1259 sink.emit(EngineEvent::Info {
1260 message: "\u{1f513} Skill tool scope cleared — full tool set restored"
1261 .into(),
1262 });
1263 }
1264 _ => {}
1265 }
1266 }
1267
1268 match loop_detector.record(&tool_calls) {
1272 LoopAction::Ok => {}
1273 LoopAction::InjectFeedback(detail) => {
1274 tracing::warn!(%detail, "Loop detected — injecting feedback");
1275 sink.emit(EngineEvent::Warn {
1276 message: format!(
1277 "Loop detected: {detail}. Injecting feedback to nudge the model."
1278 ),
1279 });
1280 db.insert_message(
1282 session_id,
1283 &Role::User,
1284 Some(&format!(
1285 "System: Potential loop detected — {detail}. \
1286 Please take a step back and confirm you're making forward progress. \
1287 If not, analyze your previous actions and try a different approach. \
1288 Avoid repeating the same tool calls without new results."
1289 )),
1290 None,
1291 None,
1292 None,
1293 )
1294 .await?;
1295 loop_detector.clear_after_feedback();
1296 }
1298 LoopAction::HardStop(detail) => {
1299 sink.emit(EngineEvent::Warn {
1300 message: format!(
1301 "Loop guard: {detail} — model ignored feedback, stopping. \
1302 Send a follow-up message to continue."
1303 ),
1304 });
1305 break Ok(());
1306 }
1307 }
1308
1309 iteration += 1;
1310 }
1311}
1312
1313#[cfg(test)]
1318mod tests {
1319 use super::*;
1320 use crate::engine::sink::TestSink;
1321 use crate::providers::{StreamChunk, TokenUsage, ToolCall};
1322 use crate::trust::TrustMode;
1323 use tokio::sync::mpsc;
1324
1325 fn test_tools(root: &Path) -> ToolRegistry {
1327 ToolRegistry::new(root.to_path_buf(), 100_000)
1328 }
1329
1330 async fn run_collect(
1332 chunks: Vec<StreamChunk>,
1333 cancel: Option<CancellationToken>,
1334 ) -> StreamResult {
1335 let (tx, mut rx) = mpsc::channel(32);
1336 let sink = TestSink::new();
1337 let cancel = cancel.unwrap_or_default();
1338 let tmp = tempfile::tempdir().unwrap();
1339 let tools = test_tools(tmp.path());
1340
1341 tokio::spawn(async move {
1343 for chunk in chunks {
1344 let _ = tx.send(chunk).await;
1345 }
1346 });
1348
1349 collect_stream(&mut rx, &sink, &cancel, &tools, TrustMode::Auto, tmp.path()).await
1350 }
1351
1352 #[tokio::test]
1355 async fn collect_stream_accumulates_text_deltas() {
1356 let result = run_collect(
1357 vec![
1358 StreamChunk::TextDelta("Hello ".into()),
1359 StreamChunk::TextDelta("world!".into()),
1360 StreamChunk::Done(TokenUsage::default()),
1361 ],
1362 None,
1363 )
1364 .await;
1365
1366 assert_eq!(result.text, "Hello world!");
1367 assert!(!result.interrupted);
1368 assert!(result.network_error.is_none());
1369 assert!(result.tool_calls.is_empty());
1370 assert_eq!(result.char_count, 12);
1371 }
1372
1373 #[tokio::test]
1374 async fn collect_stream_empty_stream_returns_empty() {
1375 let result = run_collect(vec![StreamChunk::Done(TokenUsage::default())], None).await;
1376
1377 assert!(result.text.is_empty());
1378 assert!(!result.interrupted);
1379 assert!(result.tool_calls.is_empty());
1380 }
1381
1382 #[tokio::test]
1383 async fn collect_stream_preserves_usage_from_done() {
1384 let usage = TokenUsage {
1385 prompt_tokens: 42,
1386 completion_tokens: 17,
1387 stop_reason: "end_turn".into(),
1388 ..Default::default()
1389 };
1390 let result = run_collect(
1391 vec![
1392 StreamChunk::TextDelta("hi".into()),
1393 StreamChunk::Done(usage),
1394 ],
1395 None,
1396 )
1397 .await;
1398
1399 assert_eq!(result.usage.prompt_tokens, 42);
1400 assert_eq!(result.usage.completion_tokens, 17);
1401 assert_eq!(result.usage.stop_reason, "end_turn");
1402 }
1403
1404 #[tokio::test]
1407 async fn collect_stream_thinking_then_text() {
1408 let result = run_collect(
1409 vec![
1410 StreamChunk::ThinkingDelta("Let me think...".into()),
1411 StreamChunk::TextDelta("Answer!".into()),
1412 StreamChunk::Done(TokenUsage::default()),
1413 ],
1414 None,
1415 )
1416 .await;
1417
1418 assert_eq!(result.thinking_content, "Let me think...");
1420 assert_eq!(result.text, "Answer!");
1422 }
1423
1424 #[tokio::test]
1427 async fn collect_stream_tool_calls_batch() {
1428 let tc = ToolCall {
1429 id: "tc_1".into(),
1430 function_name: "Bash".into(),
1431 arguments: r#"{"command":"echo hi"}"#.into(),
1432 thought_signature: None,
1433 };
1434 let result = run_collect(
1435 vec![
1436 StreamChunk::ToolCalls(vec![tc]),
1437 StreamChunk::Done(TokenUsage::default()),
1438 ],
1439 None,
1440 )
1441 .await;
1442
1443 assert_eq!(result.tool_calls.len(), 1);
1444 assert_eq!(result.tool_calls[0].function_name, "Bash");
1445 assert!(result.text.is_empty());
1446 }
1447
1448 #[tokio::test]
1449 async fn collect_stream_eager_executes_read_only_tool() {
1450 let tmp = tempfile::tempdir().unwrap();
1452 let test_file = tmp.path().join("hello.txt");
1453 std::fs::write(&test_file, "file content").unwrap();
1454
1455 let tc = ToolCall {
1456 id: "tc_eager".into(),
1457 function_name: "Read".into(),
1458 arguments: serde_json::json!({"file_path": test_file.to_string_lossy()}).to_string(),
1459 thought_signature: None,
1460 };
1461
1462 let (tx, mut rx) = mpsc::channel(32);
1463 let sink = TestSink::new();
1464 let cancel = CancellationToken::new();
1465 let tools = test_tools(tmp.path());
1466
1467 tokio::spawn(async move {
1468 let _ = tx.send(StreamChunk::ToolCallReady(tc)).await;
1469 let _ = tx.send(StreamChunk::ToolCalls(vec![])).await;
1470 let _ = tx.send(StreamChunk::Done(TokenUsage::default())).await;
1471 });
1472
1473 let result =
1474 collect_stream(&mut rx, &sink, &cancel, &tools, TrustMode::Auto, tmp.path()).await;
1475
1476 assert_eq!(result.tool_calls.len(), 1, "tool call should be recorded");
1477 assert_eq!(result.eager_results.len(), 1, "should have 1 eager result");
1478 let (id, output, success, _) = &result.eager_results[0];
1479 assert_eq!(id, "tc_eager");
1480 assert!(output.contains("file content"), "eager result: {output}");
1481 assert!(success);
1482 }
1483
1484 #[tokio::test]
1485 async fn collect_stream_does_not_eagerly_execute_mutating_tool() {
1486 let tc = ToolCall {
1488 id: "tc_write".into(),
1489 function_name: "Write".into(),
1490 arguments: r#"{"file_path":"/tmp/x","content":"y"}"#.into(),
1491 thought_signature: None,
1492 };
1493 let result = run_collect(
1494 vec![
1495 StreamChunk::ToolCallReady(tc),
1496 StreamChunk::ToolCalls(vec![]),
1497 StreamChunk::Done(TokenUsage::default()),
1498 ],
1499 None,
1500 )
1501 .await;
1502
1503 assert_eq!(result.tool_calls.len(), 1);
1504 assert!(
1505 result.eager_results.is_empty(),
1506 "Write should NOT be eagerly executed"
1507 );
1508 }
1509
1510 #[tokio::test]
1513 async fn collect_stream_cancellation_sets_interrupted() {
1514 let cancel = CancellationToken::new();
1515 let cancel_clone = cancel.clone();
1516
1517 let (tx, mut rx) = mpsc::channel(32);
1518 let sink = TestSink::new();
1519 let tmp = tempfile::tempdir().unwrap();
1520 let tools = test_tools(tmp.path());
1521
1522 tokio::spawn(async move {
1524 let _ = tx.send(StreamChunk::TextDelta("partial".into())).await;
1525 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
1526 cancel_clone.cancel();
1527 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
1529 let _ = tx.send(StreamChunk::TextDelta(" ignored".into())).await;
1530 });
1531
1532 let result =
1533 collect_stream(&mut rx, &sink, &cancel, &tools, TrustMode::Auto, tmp.path()).await;
1534
1535 assert!(result.interrupted);
1536 assert!(result.network_error.is_none());
1537 assert!(result.text.contains("partial"));
1539 }
1540
1541 #[tokio::test]
1544 async fn collect_stream_network_error_preserves_partial() {
1545 let result = run_collect(
1546 vec![
1547 StreamChunk::TextDelta("partial response".into()),
1548 StreamChunk::NetworkError("connection reset".into()),
1549 ],
1550 None,
1551 )
1552 .await;
1553
1554 assert!(!result.interrupted);
1555 assert_eq!(result.network_error.as_deref(), Some("connection reset"));
1556 assert_eq!(result.text, "partial response");
1557 }
1558
1559 #[tokio::test]
1560 async fn collect_stream_network_error_with_no_text() {
1561 let result = run_collect(vec![StreamChunk::NetworkError("timeout".into())], None).await;
1562
1563 assert!(result.text.is_empty());
1564 assert!(result.network_error.is_some());
1565 }
1566}