use std::sync::{Arc, Mutex};
use lash_core::PluginRuntimeEvent;
use lash_core::plugin::{
AssistantStreamHookContext, AssistantStreamTransform, PluginError, PluginRegistrar,
};
use crate::fence_scan::{FENCE_LANG, body_has_closing_fence, first_lashlang_fence_span};
pub fn register_stream_mask(reg: &mut PluginRegistrar) -> Result<(), PluginError> {
let state = Arc::new(Mutex::new(FenceDetector::new()));
let stream_state = Arc::clone(&state);
reg.output()
.stream(Arc::new(move |ctx: AssistantStreamHookContext| {
let state = Arc::clone(&stream_state);
Box::pin(async move {
let mut detector = state.lock().expect("fence detector lock");
Ok(detector.process_chunk(&ctx.chunk))
})
}));
let response_state = Arc::clone(&state);
reg.output().response(Arc::new(
move |ctx: lash_core::plugin::AssistantResponseHookContext| {
let state = Arc::clone(&response_state);
Box::pin(async move {
let mut response = ctx.response;
{
let mut detector = state.lock().expect("fence detector lock");
if detector.inside_fence {
let spliced = detector.splice_into(&response.full_text);
response.full_text = spliced.clone();
let needs_text_part = !response
.parts
.iter()
.any(|part| matches!(part, lash_core::LlmOutputPart::Text { .. }));
if needs_text_part {
response.parts.push(lash_core::LlmOutputPart::Text {
text: spliced,
response_meta: None,
});
}
}
detector.reset();
}
Ok(lash_core::plugin::AssistantResponseTransform {
response,
events: Vec::new(),
})
})
},
));
Ok(())
}
struct FenceDetector {
pending: String,
inside_fence: bool,
emitted_start: bool,
opener_len: usize,
fence_body: String,
fence_closed: bool,
}
impl FenceDetector {
fn new() -> Self {
Self {
pending: String::new(),
inside_fence: false,
emitted_start: false,
opener_len: 0,
fence_body: String::new(),
fence_closed: false,
}
}
fn reset(&mut self) {
self.pending.clear();
self.inside_fence = false;
self.emitted_start = false;
self.opener_len = 0;
self.fence_body.clear();
self.fence_closed = false;
}
fn splice_into(&self, visible: &str) -> String {
debug_assert!(self.inside_fence);
let ticks = "`".repeat(self.opener_len.max(3));
let mut spliced = visible.to_string();
if !spliced.is_empty() && !spliced.ends_with('\n') {
spliced.push('\n');
}
spliced.push_str(&ticks);
spliced.push_str(FENCE_LANG);
spliced.push('\n');
spliced.push_str(&self.fence_body);
if !self.fence_closed {
if !spliced.ends_with('\n') {
spliced.push('\n');
}
spliced.push_str(&ticks);
}
spliced
}
fn process_chunk(&mut self, chunk: &str) -> AssistantStreamTransform {
if self.inside_fence {
if self.fence_closed {
return AssistantStreamTransform {
chunk: String::new(),
reasoning_deltas: Vec::new(),
events: Vec::new(),
abort_stream: false,
};
}
self.fence_body.push_str(chunk);
if body_has_closing_fence(&self.fence_body, self.opener_len) {
self.fence_closed = true;
return AssistantStreamTransform {
chunk: String::new(),
reasoning_deltas: Vec::new(),
events: Vec::new(),
abort_stream: true,
};
}
return AssistantStreamTransform {
chunk: String::new(),
reasoning_deltas: Vec::new(),
events: Vec::new(),
abort_stream: false,
};
}
self.pending.push_str(chunk);
if let Some((fence_start, body_start, opener_len)) = find_fence_opener(&self.pending) {
self.inside_fence = true;
self.opener_len = opener_len;
let prose_before = self.pending[..fence_start].to_string();
let initial_body = self.pending[body_start..].to_string();
self.pending.clear();
let mut events = Vec::new();
if !self.emitted_start {
self.emitted_start = true;
events.push(PluginRuntimeEvent::Custom {
name: "rlm_fence_start".to_string(),
payload: serde_json::json!({}),
});
}
if !initial_body.is_empty() {
self.fence_body.push_str(&initial_body);
if body_has_closing_fence(&self.fence_body, self.opener_len) {
self.fence_closed = true;
return AssistantStreamTransform {
chunk: String::new(),
reasoning_deltas: non_empty_reasoning_delta(prose_before),
events,
abort_stream: true,
};
}
}
return AssistantStreamTransform {
chunk: String::new(),
reasoning_deltas: non_empty_reasoning_delta(prose_before),
events,
abort_stream: false,
};
}
let safe_len = self.pending.len() - possible_fence_opener_suffix_len(&self.pending);
if safe_len == 0 {
return AssistantStreamTransform {
chunk: String::new(),
reasoning_deltas: Vec::new(),
events: Vec::new(),
abort_stream: false,
};
}
let flushed = self.pending[..safe_len].to_string();
self.pending = self.pending[safe_len..].to_string();
AssistantStreamTransform {
chunk: String::new(),
reasoning_deltas: non_empty_reasoning_delta(flushed),
events: Vec::new(),
abort_stream: false,
}
}
}
fn non_empty_reasoning_delta(text: String) -> Vec<String> {
if text.is_empty() {
Vec::new()
} else {
vec![text]
}
}
fn find_fence_opener(text: &str) -> Option<(usize, usize, usize)> {
let span = first_lashlang_fence_span(text)?;
Some((span.open_start, span.body_start, span.opener_len))
}
fn possible_fence_opener_suffix_len(text: &str) -> usize {
text.char_indices()
.find_map(|(idx, _)| {
let suffix = &text[idx..];
suffix_can_be_fence_opener_prefix(suffix).then_some(suffix.len())
})
.unwrap_or(0)
}
fn suffix_can_be_fence_opener_prefix(suffix: &str) -> bool {
if suffix.is_empty() {
return false;
}
let backtick_count = suffix.chars().take_while(|&c| c == '`').count();
let after_ticks: String = suffix.chars().skip(backtick_count).collect();
if after_ticks.is_empty() {
return backtick_count > 0;
}
if backtick_count < 3 {
return false;
}
let after_padding = after_ticks.trim_start_matches(' ');
after_padding.is_empty() || FENCE_LANG.starts_with(after_padding)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn prose_streams_as_reasoning_before_fence() {
let mut d = FenceDetector::new();
let t = d.process_chunk("Hello, here's my plan.\n\n");
assert_eq!(t.chunk, "");
assert_eq!(t.reasoning_deltas, vec!["Hello, here's my plan.\n\n"]);
assert!(t.events.is_empty());
}
#[test]
fn short_prose_without_newline_streams_immediately() {
let mut d = FenceDetector::new();
let t = d.process_chunk("Hi - what can I help with?");
assert_eq!(t.chunk, "");
assert_eq!(t.reasoning_deltas, vec!["Hi - what can I help with?"]);
assert!(d.pending.is_empty());
assert!(t.events.is_empty());
}
#[test]
fn only_possible_fence_suffix_is_held() {
let mut d = FenceDetector::new();
let t = d.process_chunk("Plan. ```la");
assert_eq!(t.chunk, "");
assert_eq!(t.reasoning_deltas, vec!["Plan. "]);
assert_eq!(d.pending, "```la");
let t = d.process_chunk("shlang\n");
assert_eq!(t.chunk, "");
assert!(t.reasoning_deltas.is_empty());
assert!(d.inside_fence);
assert_eq!(t.events.len(), 1);
}
#[test]
fn non_lashlang_fence_flushes_after_it_stops_matching() {
let mut d = FenceDetector::new();
let t = d.process_chunk("Example: ``");
assert_eq!(t.chunk, "");
assert_eq!(t.reasoning_deltas, vec!["Example: "]);
assert_eq!(d.pending, "``");
let t = d.process_chunk("`python\n");
assert_eq!(t.chunk, "");
assert_eq!(t.reasoning_deltas, vec!["```python\n"]);
assert!(!d.inside_fence);
assert!(d.pending.is_empty());
}
#[test]
fn fence_in_single_chunk() {
let mut d = FenceDetector::new();
let t = d.process_chunk("Thinking...\n\n```lashlang\ncode\n```\n");
assert_eq!(t.chunk, "");
assert_eq!(t.reasoning_deltas, vec!["Thinking...\n\n"]);
assert_eq!(t.events.len(), 1);
assert!(matches!(
&t.events[0],
PluginRuntimeEvent::Custom { name, .. } if name == "rlm_fence_start"
));
}
#[test]
fn fence_split_across_chunks() {
let mut d = FenceDetector::new();
let t = d.process_chunk("Plan.\n\n");
assert_eq!(t.chunk, "");
assert_eq!(t.reasoning_deltas, vec!["Plan.\n\n"]);
assert_eq!(d.process_chunk("```").chunk, "");
let t = d.process_chunk("lashlang\n");
assert!(d.inside_fence);
assert_eq!(t.events.len(), 1);
assert_eq!(d.process_chunk("code\n```\n").chunk, "");
}
#[test]
fn token_by_token_streaming() {
let mut d = FenceDetector::new();
for tok in ["Let", " me", " check", ".\n\n"] {
let t = d.process_chunk(tok);
assert!(t.events.is_empty());
}
d.process_chunk("```");
d.process_chunk("lash");
let t = d.process_chunk("lang");
assert!(d.inside_fence);
assert_eq!(t.events.len(), 1);
assert_eq!(d.process_chunk("\n").chunk, "");
assert_eq!(
d.process_chunk("result = await tools.exec({ cmd: \"date\" })\n")
.chunk,
""
);
}
#[test]
fn non_lashlang_fence_streams_as_reasoning_without_masking() {
let mut d = FenceDetector::new();
let t = d.process_chunk("Example:\n\n```python\nprint('hi')\n```\n");
assert!(t.chunk.is_empty());
assert!(!t.reasoning_deltas.is_empty());
assert!(t.events.is_empty());
}
#[test]
fn rlm_alias_does_not_trigger_masking() {
let mut d = FenceDetector::new();
let t = d.process_chunk("Check:\n\n```rlm\nprint x\n```\n");
assert!(t.chunk.is_empty());
assert!(t.reasoning_deltas.join("").contains("```rlm"));
assert!(t.events.is_empty());
}
#[test]
fn inline_backticks_do_not_trigger() {
let mut d = FenceDetector::new();
let t = d.process_chunk("Use ```lashlang in your code.\n");
assert!(t.chunk.is_empty());
assert!(t.reasoning_deltas.join("").contains("lashlang"));
assert!(t.events.is_empty());
}
#[test]
fn reset_prevents_cross_response_leak() {
let mut d = FenceDetector::new();
d.process_chunk("Hi! How can I help you?");
d.reset();
let t = d.process_chunk("New response.\n\n```lashlang\ncode\n```\n");
assert_eq!(t.chunk, "");
let reasoning = t.reasoning_deltas.join("");
assert!(reasoning.starts_with("New response."));
assert!(!reasoning.contains("How can I help"));
}
#[test]
fn fence_opener_and_body_in_same_chunk_preserves_body() {
let mut d = FenceDetector::new();
d.process_chunk(
"```lashlang\nnow = await tools.exec({ cmd: \"date\" })?\nprint now.output\n",
);
assert!(d.inside_fence);
assert!(
d.fence_body
.starts_with("now = await tools.exec({ cmd: \"date\" })?")
);
}
#[test]
fn fence_opener_with_body_and_close_in_same_chunk_aborts() {
let mut d = FenceDetector::new();
let t = d.process_chunk("```lashlang\nsubmit \"hi\"\n```");
assert!(d.inside_fence);
assert!(d.fence_closed);
assert!(t.abort_stream);
assert!(d.fence_body.contains("submit \"hi\""));
}
#[test]
fn reset_clears_fence_state() {
let mut d = FenceDetector::new();
d.process_chunk("Plan.\n\n```lashlang\ncode\n```\n");
assert!(d.inside_fence);
d.reset();
assert!(!d.inside_fence);
assert!(d.pending.is_empty());
let t = d.process_chunk("Result.\n");
assert_eq!(t.chunk, "");
assert_eq!(t.reasoning_deltas, vec!["Result.\n"]);
}
fn stream_and_splice(chunks: &[&str]) -> String {
let mut d = FenceDetector::new();
let mut visible = String::new();
for chunk in chunks {
let t = d.process_chunk(chunk);
for delta in t.reasoning_deltas {
visible.push_str(&delta);
}
}
if d.inside_fence {
d.splice_into(&visible)
} else {
visible
}
}
#[test]
fn splice_reconstructs_plain_triple_backtick_block() {
let spliced = stream_and_splice(&["Quick check.\n\n```lashlang\nprint \"hi\"\n```\n"]);
let span = first_lashlang_fence_span(&spliced).expect("spliced block parses");
let code = spliced[span.body_start..span.body_end].trim_end_matches('\n');
assert_eq!(code, "print \"hi\"");
assert!(span.is_closed());
}
#[test]
fn splice_preserves_four_backtick_fence_with_embedded_triple() {
let spliced =
stream_and_splice(&["````lashlang\n", "print \"```\"\n", "submit 1\n", "````"]);
let span = first_lashlang_fence_span(&spliced).expect("spliced block parses");
assert_eq!(span.opener_len, 4, "opener length must survive the splice");
let code = spliced[span.body_start..span.body_end].trim_end_matches('\n');
assert_eq!(
code, "print \"```\"\nsubmit 1",
"embedded triple-backticks must not prematurely close the block"
);
}
#[test]
fn stream_and_finalize_agree_on_opener_at_end_of_text() {
let opener_at_eof = "Plan.\n\n```lashlang";
let mut d = FenceDetector::new();
d.process_chunk(opener_at_eof);
assert!(
d.inside_fence,
"streaming path must detect the end-of-text opener"
);
assert!(
first_lashlang_fence_span(opener_at_eof).is_some(),
"finalize path must detect the same end-of-text opener"
);
let spliced = d.splice_into("Plan.");
assert!(first_lashlang_fence_span(&spliced).is_some());
}
#[test]
fn splice_closes_unterminated_fence_with_matching_opener_length() {
let spliced = stream_and_splice(&["````lashlang\n", "print \"```\"\n", "submit 1\n"]);
let span = first_lashlang_fence_span(&spliced).expect("spliced block parses");
assert_eq!(span.opener_len, 4);
assert!(
span.is_closed(),
"unterminated stream gets a synthetic closer"
);
let code = spliced[span.body_start..span.body_end].trim_end_matches('\n');
assert_eq!(code, "print \"```\"\nsubmit 1");
}
}