Skip to main content

clark_agent/
thinking_filter.rs

1//! Streaming filter that splits assistant text into a hidden
2//! `<thought>` channel and visible content.
3//!
4//! A host typically prompts the model to begin every turn with exactly
5//! one `<thought>...</thought>` block (private scratch space — the typed
6//! record this filter extracts and routes off the visible stream),
7//! optionally followed by a short `<narrate>...</narrate>` sentence
8//! (user-visible diary text), and then the tool call(s). Emitting
9//! `<thought>` first preserves the audit record even if generation is
10//! cut short before any user-visible token streams. This filter sits
11//! in the OpenRouter SSE path and routes content tokens to the right
12//! place as they arrive:
13//!
14//! - Text outside any thinking tag flows through as visible text.
15//! - Text inside a recognized hidden tag (`<thought>`, `<thinking>`,
16//!   `<think>`, `<reasoning>`, `<reflection>`) is buffered separately
17//!   and surfaced via [`ThinkingTagStreamFilter::take_completed_thought`]
18//!   when the closing tag arrives.
19//!
20//! The filter is delta-aware: a tag may be split across SSE chunk
21//! boundaries (`<thi` then `nking>` then `hidden` then `</thinking>`).
22//! Ambiguous prefixes (anything starting with `<` that *could* be a
23//! thinking tag) are buffered until they can be confirmed or rejected.
24//!
25//! Why a fresh copy in clark-agent rather than reusing
26//! `clark_core::runtime_core::json_extract::ThinkingTagStreamFilter`:
27//! clark-agent is the lean loop crate (no redis, no chrono-tz, no
28//! sentry); pulling in clark-core would bloat its compile graph by an
29//! order of magnitude. The two implementations are kept narrow enough
30//! that drift is cheap to spot in review.
31
32/// Tag names whose content is treated as hidden reasoning and
33/// stripped from the visible assistant stream. Synonyms kept for
34/// compatibility with diverse provider conventions; the canonical
35/// Clark prompt asks for `<thought>`.
36const THINKING_TAGS: &[&str] = &["think", "thinking", "thought", "reasoning", "reflection"];
37
38/// Streaming-aware filter that suppresses content inside thinking
39/// XML tags as deltas arrive token-by-token.
40#[derive(Debug, Default, Clone)]
41pub struct ThinkingTagStreamFilter {
42    inside: bool,
43    pending: String,
44    thought_buffer: Option<String>,
45    completed_thought: Option<String>,
46}
47
48impl ThinkingTagStreamFilter {
49    pub fn new() -> Self {
50        Self::default()
51    }
52
53    /// Feed a delta and return the visible portion (which may be empty).
54    pub fn feed(&mut self, delta: &str) -> String {
55        let mut out = String::with_capacity(delta.len());
56        for ch in delta.chars() {
57            self.consume_char(ch, &mut out);
58        }
59        out
60    }
61
62    /// End-of-stream flush. If we're outside a tag and have a pending
63    /// prefix (a stray `<` that turned out not to start a tag), emit
64    /// it. If we're inside an unclosed tag, drop the trailing content
65    /// (mirrors [`strip_thinking_tags`]).
66    pub fn flush(&mut self) -> String {
67        let out = if self.inside {
68            self.thought_buffer.take();
69            String::new()
70        } else {
71            std::mem::take(&mut self.pending)
72        };
73        self.pending.clear();
74        self.inside = false;
75        self.completed_thought.take();
76        out
77    }
78
79    /// Reset between turns / bounces.
80    pub fn reset(&mut self) {
81        self.pending.clear();
82        self.inside = false;
83        self.thought_buffer.take();
84        self.completed_thought.take();
85    }
86
87    /// Take any thought block completed during the most recent
88    /// [`Self::feed`] call. Returns `None` if no thought closed since the
89    /// last call.
90    pub fn take_completed_thought(&mut self) -> Option<String> {
91        self.completed_thought.take()
92    }
93
94    fn consume_char(&mut self, ch: char, out: &mut String) {
95        if self.inside {
96            self.consume_inside(ch);
97        } else {
98            self.consume_outside(ch, out);
99        }
100    }
101
102    fn consume_outside(&mut self, ch: char, out: &mut String) {
103        if self.pending.is_empty() {
104            if ch == '<' {
105                self.pending.push(ch);
106            } else {
107                out.push(ch);
108            }
109            return;
110        }
111        self.pending.push(ch);
112        match classify(&self.pending, /* closing = */ false) {
113            TagMatch::Complete => {
114                self.inside = true;
115                self.pending.clear();
116                self.thought_buffer = Some(String::new());
117            }
118            TagMatch::Possible => {}
119            TagMatch::No => {
120                out.push_str(&self.pending);
121                self.pending.clear();
122            }
123        }
124    }
125
126    fn consume_inside(&mut self, ch: char) {
127        if self.pending.is_empty() {
128            if ch == '<' {
129                self.pending.push(ch);
130            } else if let Some(ref mut buf) = self.thought_buffer {
131                buf.push(ch);
132            }
133            return;
134        }
135        self.pending.push(ch);
136        match classify(&self.pending, /* closing = */ true) {
137            TagMatch::Complete => {
138                self.inside = false;
139                self.completed_thought = self.thought_buffer.take();
140                self.pending.clear();
141            }
142            TagMatch::Possible => {}
143            TagMatch::No => {
144                if let Some(ref mut buf) = self.thought_buffer {
145                    buf.push_str(&self.pending);
146                } else {
147                    self.thought_buffer = Some(std::mem::take(&mut self.pending));
148                }
149                self.pending.clear();
150            }
151        }
152    }
153}
154
155enum TagMatch {
156    Complete,
157    Possible,
158    No,
159}
160
161fn classify(buf: &str, closing: bool) -> TagMatch {
162    let lower = canonicalize_tag_candidate(buf);
163    for tag in THINKING_TAGS {
164        let full = if closing {
165            format!("</{tag}>")
166        } else {
167            format!("<{tag}>")
168        };
169        if lower == full {
170            return TagMatch::Complete;
171        }
172        if full.starts_with(&lower) {
173            return TagMatch::Possible;
174        }
175    }
176    TagMatch::No
177}
178
179fn canonicalize_tag_candidate(buf: &str) -> String {
180    let mut out = String::with_capacity(buf.len());
181    let mut chars = buf.chars().peekable();
182
183    let Some(first) = chars.next() else {
184        return out;
185    };
186    out.push(first.to_ascii_lowercase());
187    if first != '<' {
188        out.extend(chars.map(|ch| ch.to_ascii_lowercase()));
189        return out;
190    }
191
192    while matches!(chars.peek(), Some(ch) if ch.is_ascii_whitespace()) {
193        chars.next();
194    }
195
196    if matches!(chars.peek(), Some('/')) {
197        out.push('/');
198        chars.next();
199        while matches!(chars.peek(), Some(ch) if ch.is_ascii_whitespace()) {
200            chars.next();
201        }
202    }
203
204    for ch in chars {
205        if ch.is_ascii_whitespace() {
206            continue;
207        }
208        out.push(ch.to_ascii_lowercase());
209    }
210    out
211}
212
213/// Remove XML-like thinking blocks from a complete string. Used as a
214/// safety net by [`crate::types::AssistantContent::plain_text`]'s
215/// callers when serializing assistant text back to the wire on the
216/// next request — defends against blocks that slipped past the
217/// streaming filter (provider buffering, malformed nesting, etc.).
218pub fn strip_thinking_tags(text: &str) -> String {
219    let mut filter = ThinkingTagStreamFilter::new();
220    let mut result = filter.feed(text);
221    result.push_str(&filter.flush());
222    result.trim().to_string()
223}
224
225#[cfg(test)]
226mod tests {
227    use super::*;
228
229    fn feed_all(filter: &mut ThinkingTagStreamFilter, deltas: &[&str]) -> String {
230        let mut out = String::new();
231        for d in deltas {
232            out.push_str(&filter.feed(d));
233        }
234        out.push_str(&filter.flush());
235        out
236    }
237
238    #[test]
239    fn passes_through_when_no_tag() {
240        let mut f = ThinkingTagStreamFilter::new();
241        assert_eq!(feed_all(&mut f, &["hello", " ", "world"]), "hello world");
242        assert!(f.take_completed_thought().is_none());
243    }
244
245    #[test]
246    fn strips_complete_thought_block_in_one_delta() {
247        let mut f = ThinkingTagStreamFilter::new();
248        let visible = feed_all(&mut f, &["<thought>hidden</thought>visible"]);
249        assert_eq!(visible, "visible");
250    }
251
252    #[test]
253    fn captures_completed_thought_text() {
254        let mut f = ThinkingTagStreamFilter::new();
255        let _ = f.feed("<thought>recall and frame</thought>");
256        assert_eq!(
257            f.take_completed_thought().as_deref(),
258            Some("recall and frame")
259        );
260        // Second take returns None — the buffer is consumed.
261        assert!(f.take_completed_thought().is_none());
262    }
263
264    #[test]
265    fn handles_tag_split_across_deltas() {
266        let mut f = ThinkingTagStreamFilter::new();
267        let visible = feed_all(
268            &mut f,
269            &[
270                "<thi", "nking", ">", "hidden ", "stuff", "</thi", "nking>", "out",
271            ],
272        );
273        assert_eq!(visible, "out");
274    }
275
276    #[test]
277    fn tolerant_thought_tags_are_hidden() {
278        for raw in [
279            "< thought >hidden</ thought >visible",
280            "<\tthought>hidden</\tthought>visible",
281            "<Thinking >hidden</ Thinking >visible",
282        ] {
283            let mut f = ThinkingTagStreamFilter::new();
284            assert_eq!(feed_all(&mut f, &[raw]), "visible", "raw={raw:?}");
285        }
286    }
287
288    #[test]
289    fn stray_lt_is_emitted_when_not_a_tag() {
290        let mut f = ThinkingTagStreamFilter::new();
291        // `<` followed by non-tag chars should pass through.
292        let visible = feed_all(&mut f, &["a < b > c"]);
293        assert_eq!(visible, "a < b > c");
294    }
295
296    #[test]
297    fn unclosed_tag_drops_trailing_content() {
298        let mut f = ThinkingTagStreamFilter::new();
299        let visible = feed_all(&mut f, &["before<thought>never closed"]);
300        assert_eq!(visible, "before");
301    }
302
303    #[test]
304    fn synonyms_recognized() {
305        for tag in &["think", "thinking", "thought", "reasoning", "reflection"] {
306            let mut f = ThinkingTagStreamFilter::new();
307            let raw = format!("a<{tag}>x</{tag}>b");
308            assert_eq!(feed_all(&mut f, &[raw.as_str()]), "ab", "tag={tag}");
309        }
310    }
311
312    #[test]
313    fn case_insensitive_tag_recognition() {
314        let mut f = ThinkingTagStreamFilter::new();
315        let visible = feed_all(&mut f, &["<Thought>hidden</Thought>visible"]);
316        assert_eq!(visible, "visible");
317    }
318
319    #[test]
320    fn strip_thinking_tags_removes_blocks() {
321        assert_eq!(strip_thinking_tags("a<thought>x</thought>b"), "ab");
322        assert_eq!(strip_thinking_tags("<reasoning>r</reasoning>tail"), "tail");
323        assert_eq!(strip_thinking_tags("a< thought >x</ thought >b"), "ab");
324    }
325
326    #[test]
327    fn strip_thinking_tags_handles_unclosed() {
328        // Unclosed tag → everything from the open tag onward is dropped.
329        assert_eq!(strip_thinking_tags("keep<thought>drop the rest"), "keep");
330    }
331
332    #[test]
333    fn reset_clears_state() {
334        let mut f = ThinkingTagStreamFilter::new();
335        let _ = f.feed("<thought>partial");
336        f.reset();
337        assert_eq!(feed_all(&mut f, &["fresh"]), "fresh");
338        assert!(f.take_completed_thought().is_none());
339    }
340}