Skip to main content

zagens_core/engine/
streaming.rs

1//! Streaming response state and guardrails (P2 PR4 → `zagens-core`).
2
3use crate::chat::ToolCaller;
4
5#[derive(Clone, Copy, Debug, PartialEq, Eq)]
6pub enum ContentBlockKind {
7    Text,
8    Thinking,
9    ToolUse,
10}
11
12#[derive(Debug, Clone)]
13pub struct ToolUseState {
14    pub id: String,
15    pub name: String,
16    pub input: serde_json::Value,
17    pub caller: Option<ToolCaller>,
18    pub input_buffer: String,
19}
20
21pub const STREAM_CHUNK_TIMEOUT_SECS: u64 = 90;
22pub const STREAM_MAX_CONTENT_BYTES: usize = 10 * 1024 * 1024;
23pub const STREAM_MAX_DURATION_SECS: u64 = 1800;
24pub const MAX_STREAM_ERRORS_BEFORE_FAIL: u32 = 5;
25pub const MAX_TRANSPARENT_STREAM_RETRIES: u32 = 2;
26/// Outer turn-step retries when a stream dies with no actionable content (#103).
27pub const MAX_STREAM_RETRIES: u32 = 3;
28/// Max consecutive auto-continuations after the model hits the output
29/// `max_tokens` cap (`finish_reason=length`) with no tool call to carry the
30/// turn. Bounds runaway cost / an infinite cut→continue loop while still
31/// letting a genuinely huge answer (or reasoning) finish across several rounds.
32/// Reset to 0 on any step that does not end in a length truncation.
33pub const MAX_LENGTH_CONTINUATIONS: u32 = 8;
34/// Max times a long-horizon turn that exhausts its `max_steps` budget may be
35/// granted another full step window to keep pursuing an incomplete task graph,
36/// instead of silently ending at the step cap (step-exhaustion early-stop).
37/// Each grant extends the budget by the original `max_steps`; bounded so a
38/// runaway task can't loop forever (e.g. 3 → up to 4× the base step budget).
39pub const MAX_STEP_LIMIT_CONTINUATIONS: u32 = 3;
40/// Max times a long-horizon turn whose [`LoopGuard`](crate::engine::loop_guard::LoopGuard)
41/// halts (a tool failed `FAILURE_HALT_THRESHOLD` consecutive times) may be
42/// granted a "change approach" continuation instead of silently ending the
43/// turn as `Completed`. Kept small — a halt means the model is genuinely stuck,
44/// so we reset the failure counters and nudge it to switch strategy at most
45/// this many times before accepting the stop.
46pub const MAX_LOOP_GUARD_CONTINUATIONS: u32 = 2;
47/// Max times an in-flight turn whose context overflows the model budget (and
48/// can't be brought back under it by emergency compaction within
49/// [`MAX_CONTEXT_RECOVERY_ATTEMPTS`](crate::engine::context::MAX_CONTEXT_RECOVERY_ATTEMPTS))
50/// may roll a long-horizon **cycle handoff** instead of hard-failing the turn.
51/// A handoff swaps the bloated message buffer for a small `<carry_forward>`
52/// briefing seed plus preserved structured state, so the next step starts with
53/// room to spare. Kept tiny: if even a fresh briefing seed can't fit, the task
54/// is genuinely too large and we fall back to the hard failure.
55pub const MAX_CONTEXT_CYCLE_HANDOFFS: u32 = 2;
56/// Max **clean** in-turn cycle advances at a per-step safe boundary. The cycle
57/// threshold / long-horizon early-advance band is normally only evaluated
58/// *between turns*; a long-horizon turn that loops many tool steps without
59/// returning never reaches that boundary, so a turn crossing ~75% would only
60/// get the hard-overflow emergency handoff ([`MAX_CONTEXT_CYCLE_HANDOFFS`]),
61/// never a clean early refresh. Evaluating the gate after each completed tool
62/// step closes that gap. Each clean advance resets context to a small briefing
63/// seed (so the gate won't immediately re-fire); this bound is the safety net
64/// against a pathological seed that itself stays over threshold. Generous —
65/// a genuinely long turn may legitimately refresh several times.
66pub const MAX_IN_TURN_CYCLE_ADVANCES: u32 = 8;
67
68pub fn should_transparently_retry_stream(
69    any_content_received: bool,
70    transparent_attempts: u32,
71    cancelled: bool,
72) -> bool {
73    !any_content_received && transparent_attempts < MAX_TRANSPARENT_STREAM_RETRIES && !cancelled
74}
75
76pub const TOOL_CALL_START_MARKERS: [&str; 5] = [
77    "[TOOL_CALL]",
78    "<deepseek:tool_call",
79    "<tool_call",
80    "<invoke ",
81    "<function_calls>",
82];
83
84pub const TOOL_CALL_END_MARKERS: [&str; 5] = [
85    "[/TOOL_CALL]",
86    "</deepseek:tool_call>",
87    "</tool_call>",
88    "</invoke>",
89    "</function_calls>",
90];
91
92pub const FAKE_WRAPPER_NOTICE: &str =
93    "Stripped non-API tool-call wrapper from model output (use the API tool channel)";
94
95pub fn contains_fake_tool_wrapper(text: &str) -> bool {
96    TOOL_CALL_START_MARKERS.iter().any(|m| text.contains(m))
97}
98
99fn find_first_marker(text: &str, markers: &[&str]) -> Option<(usize, usize)> {
100    markers
101        .iter()
102        .filter_map(|marker| text.find(marker).map(|idx| (idx, marker.len())))
103        .min_by_key(|(idx, _)| *idx)
104}
105
106pub fn filter_tool_call_delta(delta: &str, in_tool_call: &mut bool) -> String {
107    if delta.is_empty() {
108        return String::new();
109    }
110
111    let mut output = String::new();
112    let mut rest = delta;
113
114    loop {
115        if *in_tool_call {
116            let Some((idx, len)) = find_first_marker(rest, &TOOL_CALL_END_MARKERS) else {
117                break;
118            };
119            rest = &rest[idx + len..];
120            *in_tool_call = false;
121        } else {
122            let Some((idx, len)) = find_first_marker(rest, &TOOL_CALL_START_MARKERS) else {
123                output.push_str(rest);
124                break;
125            };
126            output.push_str(&rest[..idx]);
127            rest = &rest[idx + len..];
128            *in_tool_call = true;
129        }
130    }
131
132    output
133}