use std::sync::{Arc, Mutex};
use lash_core::PluginRuntimeEvent;
use lash_core::plugin::{
AssistantStreamHookContext, AssistantStreamTransform, PluginError, PluginRegistrar,
};
use crate::cell_scan::{
complete_lashlang_end_tag_span, complete_lashlang_start_tag_span,
possible_lashlang_start_tag_suffix_len, render_lashlang_cell_text,
};
pub fn register_stream_mask(reg: &mut PluginRegistrar) -> Result<(), PluginError> {
let state = Arc::new(Mutex::new(CellDetector::new()));
let stream_state = Arc::clone(&state);
reg.output()
.stream(Arc::new(move |ctx: AssistantStreamHookContext| {
let state = Arc::clone(&stream_state);
Box::pin(async move {
let mut detector = state.lock().expect("cell detector lock");
Ok(detector.process_chunk(&ctx.chunk))
})
}));
let response_state = Arc::clone(&state);
reg.output().response(Arc::new(
move |ctx: lash_core::plugin::AssistantResponseHookContext| {
let state = Arc::clone(&response_state);
Box::pin(async move {
let response = {
let mut detector = state.lock().expect("cell detector lock");
let response = transform_final_response(&detector, ctx.response);
detector.reset();
response
};
Ok(lash_core::plugin::AssistantResponseTransform {
response,
events: Vec::new(),
})
})
},
));
Ok(())
}
fn transform_final_response(
detector: &CellDetector,
mut response: lash_core::LlmResponse,
) -> lash_core::LlmResponse {
if !detector.cell_closed {
return response;
}
let spliced = detector.spliced_response_text();
response.full_text = spliced.clone();
response
.parts
.retain(|part| !matches!(part, lash_core::LlmOutputPart::Text { .. }));
response.parts.push(lash_core::LlmOutputPart::Text {
text: spliced,
response_meta: None,
});
response
}
struct CellDetector {
pending: String,
inside_cell: bool,
cell_closed: bool,
emitted_start: bool,
emitted_end: bool,
visible_prose: String,
cell_body: String,
}
impl CellDetector {
fn new() -> Self {
Self {
pending: String::new(),
inside_cell: false,
cell_closed: false,
emitted_start: false,
emitted_end: false,
visible_prose: String::new(),
cell_body: String::new(),
}
}
fn reset(&mut self) {
self.pending.clear();
self.inside_cell = false;
self.cell_closed = false;
self.emitted_start = false;
self.emitted_end = false;
self.visible_prose.clear();
self.cell_body.clear();
}
fn splice_into_visible(&self, visible: &str) -> String {
debug_assert!(self.cell_closed);
render_lashlang_cell_text(visible, &self.cell_body)
}
fn spliced_response_text(&self) -> String {
self.splice_into_visible(&self.visible_prose)
}
fn process_chunk(&mut self, chunk: &str) -> AssistantStreamTransform {
if self.cell_closed {
return AssistantStreamTransform {
chunk: String::new(),
reasoning_deltas: Vec::new(),
events: Vec::new(),
abort_stream: false,
};
}
if self.inside_cell {
return self.capture_cell_body_chunk(chunk, String::new(), Vec::new());
}
self.pending.push_str(chunk);
if let Some(span) = complete_lashlang_start_tag_span(&self.pending) {
self.inside_cell = true;
let prose_before = self.pending[..span.start_tag_start].to_string();
self.visible_prose.push_str(&prose_before);
let body_suffix = self.pending[span.body_start..span.body_end].to_string();
self.pending.clear();
let events = vec![self.start_event()];
return self.capture_cell_body_chunk(&body_suffix, prose_before, events);
}
let safe_len = self.pending.len() - possible_lashlang_start_tag_suffix_len(&self.pending);
if safe_len == 0 {
return AssistantStreamTransform {
chunk: String::new(),
reasoning_deltas: Vec::new(),
events: Vec::new(),
abort_stream: false,
};
}
let flushed = self.pending[..safe_len].to_string();
self.pending = self.pending[safe_len..].to_string();
self.visible_prose.push_str(&flushed);
AssistantStreamTransform {
chunk: flushed,
reasoning_deltas: Vec::new(),
events: Vec::new(),
abort_stream: false,
}
}
fn capture_cell_body_chunk(
&mut self,
chunk: &str,
visible_chunk: String,
mut events: Vec<PluginRuntimeEvent>,
) -> AssistantStreamTransform {
self.cell_body.push_str(chunk);
let abort_stream = if let Some(span) = complete_lashlang_end_tag_span(&self.cell_body, true)
{
self.cell_body = self.cell_body[..span.body_end].to_string();
self.cell_closed = true;
events.push(self.end_event());
true
} else {
false
};
AssistantStreamTransform {
chunk: visible_chunk,
reasoning_deltas: Vec::new(),
events,
abort_stream,
}
}
fn start_event(&mut self) -> PluginRuntimeEvent {
debug_assert!(!self.emitted_start);
self.emitted_start = true;
PluginRuntimeEvent::Custom {
name: "rlm_lashlang_cell_start".to_string(),
payload: serde_json::json!({}),
}
}
fn end_event(&mut self) -> PluginRuntimeEvent {
debug_assert!(!self.emitted_end);
self.emitted_end = true;
PluginRuntimeEvent::Custom {
name: "rlm_lashlang_cell_end".to_string(),
payload: serde_json::json!({}),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::cell_scan::first_lashlang_cell_span;
#[test]
fn prose_streams_as_assistant_text_before_cell() {
let mut d = CellDetector::new();
let t = d.process_chunk("Hello, here's my plan.\n\n");
assert_eq!(t.chunk, "Hello, here's my plan.\n\n");
assert!(t.reasoning_deltas.is_empty());
assert!(t.events.is_empty());
assert!(!t.abort_stream);
}
#[test]
fn short_prose_without_newline_streams_immediately() {
let mut d = CellDetector::new();
let t = d.process_chunk("Hi - what can I help with?");
assert_eq!(t.chunk, "Hi - what can I help with?");
assert!(d.pending.is_empty());
}
#[test]
fn possible_start_tag_suffix_is_held() {
let mut d = CellDetector::new();
let t = d.process_chunk("Plan.\n<lash");
assert_eq!(t.chunk, "Plan.\n");
assert_eq!(d.pending, "<lash");
let t = d.process_chunk("lang>\n");
assert_eq!(t.chunk, "");
assert!(d.inside_cell);
assert!(!d.cell_closed);
assert_eq!(t.events.len(), 1);
assert!(!t.abort_stream);
}
#[test]
fn indented_start_tag_split_after_whitespace_is_held() {
let mut d = CellDetector::new();
let t = d.process_chunk("Plan.\n ");
assert_eq!(t.chunk, "Plan.\n");
assert_eq!(d.pending, " ");
let t = d.process_chunk("<lashlang>\nsubmit 1");
assert_eq!(t.chunk, "");
assert!(d.inside_cell);
assert!(!d.cell_closed);
assert_eq!(d.cell_body, "submit 1");
}
#[test]
fn start_tag_and_body_in_same_chunk_preserves_body_and_does_not_abort_before_close() {
let mut d = CellDetector::new();
let t = d.process_chunk("Thinking...\n\n<lashlang>\ncode\n```markdown\ninside\n```\n");
assert_eq!(t.chunk, "Thinking...\n\n");
assert!(d.inside_cell);
assert_eq!(d.cell_body, "code\n```markdown\ninside\n```\n");
assert!(!t.abort_stream);
}
#[test]
fn body_after_start_tag_is_suppressed_until_close() {
let mut d = CellDetector::new();
assert_eq!(d.process_chunk("<lashlang>\n").chunk, "");
let t = d.process_chunk("submit \"hi\"\n");
assert_eq!(t.chunk, "");
assert!(!t.abort_stream);
assert_eq!(d.cell_body, "submit \"hi\"\n");
}
#[test]
fn inline_start_tag_text_does_not_trigger() {
let mut d = CellDetector::new();
let t = d.process_chunk("Use <lashlang> here.\n");
assert_eq!(t.chunk, "Use <lashlang> here.\n");
assert!(!d.inside_cell);
assert!(t.events.is_empty());
}
#[test]
fn incomplete_start_tag_can_become_visible_prose() {
let mut d = CellDetector::new();
assert_eq!(d.process_chunk("<lashlang>").chunk, "");
let t = d.process_chunk(" here\n");
assert_eq!(t.chunk, "<lashlang> here\n");
assert!(!d.inside_cell);
}
#[test]
fn reset_prevents_cross_response_leak() {
let mut d = CellDetector::new();
d.process_chunk("Hi! How can I help you?");
d.reset();
let t = d.process_chunk("New response.\n\n<lashlang>\ncode\n");
assert_eq!(t.chunk, "New response.\n\n");
assert!(!t.chunk.contains("How can I help"));
}
#[test]
fn close_tag_split_across_chunks_aborts_stream() {
let mut d = CellDetector::new();
assert_eq!(d.process_chunk("<lashlang>\nsubmit 1\n</lash").chunk, "");
let t = d.process_chunk("lang>");
assert_eq!(t.chunk, "");
assert!(t.abort_stream);
assert!(d.cell_closed);
assert_eq!(d.cell_body, "submit 1");
assert_eq!(event_names(&t.events), vec!["rlm_lashlang_cell_end"]);
}
#[test]
fn close_tag_plus_trailing_prose_in_same_chunk_aborts_and_drops_suffix() {
let mut d = CellDetector::new();
let t = d.process_chunk("Visible.\n<lashlang>\nsubmit 1\n</lashlang>\nTrailing prose.");
assert_eq!(t.chunk, "Visible.\n");
assert!(t.abort_stream);
assert!(d.cell_closed);
assert_eq!(d.cell_body, "submit 1");
assert_eq!(
event_names(&t.events),
vec!["rlm_lashlang_cell_start", "rlm_lashlang_cell_end"]
);
assert_eq!(
d.spliced_response_text(),
"Visible.\n<lashlang>\nsubmit 1\n</lashlang>"
);
}
#[test]
fn incomplete_block_does_not_abort_and_does_not_close() {
let mut d = CellDetector::new();
let t = d.process_chunk("Visible.\n<lashlang>\nsubmit 1");
assert_eq!(t.chunk, "Visible.\n");
assert!(!t.abort_stream);
assert!(d.inside_cell);
assert!(!d.cell_closed);
assert_eq!(d.cell_body, "submit 1");
}
fn stream_chunks(chunks: &[&str]) -> (CellDetector, String) {
let mut d = CellDetector::new();
let mut visible = String::new();
for chunk in chunks {
let t = d.process_chunk(chunk);
visible.push_str(&t.chunk);
assert!(t.reasoning_deltas.is_empty());
if t.abort_stream {
break;
}
}
(d, visible)
}
fn response_with_text(text: &str) -> lash_core::LlmResponse {
lash_core::LlmResponse {
full_text: text.to_string(),
parts: vec![lash_core::LlmOutputPart::Text {
text: text.to_string(),
response_meta: None,
}],
..lash_core::LlmResponse::default()
}
}
#[test]
fn final_response_splice_reconstructs_cell_with_exact_body() {
let (d, visible) = stream_chunks(&[
"Quick check.\n\n<lashlang>\n",
"print \"hi\"\n",
"submit 1\n</lashlang>",
]);
assert_eq!(visible, "Quick check.\n\n");
let spliced = d.spliced_response_text();
let span = first_lashlang_cell_span(&spliced).expect("spliced cell parses");
let code = &spliced[span.body_start..span.body_end];
assert_eq!(code, "print \"hi\"\nsubmit 1");
}
#[test]
fn final_response_splice_ignores_raw_provider_full_text_with_suffix() {
let raw_final = "Visible before code.\n<lashlang>\nsubmit \"ok\"\n</lashlang>\nignored";
let (d, visible) = stream_chunks(&[
"Visible before",
" code.\n<lash",
"lang>\nsubmit ",
"\"ok\"\n</lashlang>\nignored",
]);
assert_eq!(visible, "Visible before code.\n");
assert!(raw_final.contains("ignored"));
let spliced = d.spliced_response_text();
assert_eq!(
spliced,
"Visible before code.\n<lashlang>\nsubmit \"ok\"\n</lashlang>"
);
let span = first_lashlang_cell_span(&spliced).expect("spliced cell parses");
assert_eq!(&spliced[span.body_start..span.body_end], "submit \"ok\"");
assert!(!spliced.contains("ignored"));
}
#[test]
fn final_response_transform_never_splices_using_raw_provider_text() {
let raw_final = "Visible before code.\n<lashlang>\nsubmit \"ok\"\n</lashlang>\nignored";
let (d, visible) = stream_chunks(&[
"Visible before",
" code.\n%%",
" ordinary prose\n<lashlang>\nsubmit ",
"\"ok\"\n</lashlang>\nignored",
]);
assert_eq!(visible, "Visible before code.\n%% ordinary prose\n");
let response = transform_final_response(&d, response_with_text(raw_final));
assert_eq!(
response.full_text,
"Visible before code.\n%% ordinary prose\n<lashlang>\nsubmit \"ok\"\n</lashlang>"
);
assert_eq!(response.full_text.matches("<lashlang>").count(), 1);
assert_eq!(response.full_text.matches("</lashlang>").count(), 1);
let span = first_lashlang_cell_span(&response.full_text).expect("cell parses");
assert_eq!(
&response.full_text[span.body_start..span.body_end],
"submit \"ok\""
);
assert!(
!response.full_text[span.end_tag_end..].contains("ignored"),
"suffix after the close tag must not survive streaming abort normalization"
);
let text_parts = response
.parts
.iter()
.filter_map(|part| match part {
lash_core::LlmOutputPart::Text { text, .. } => Some(text.as_str()),
_ => None,
})
.collect::<Vec<_>>();
assert_eq!(text_parts, vec![response.full_text.as_str()]);
}
#[test]
fn final_response_transform_replaces_raw_text_parts_but_preserves_reasoning_parts() {
let raw_final = "Plan.\n<lashlang>\nsubmit \"ok\"\n</lashlang>\nignored";
let (d, visible) = stream_chunks(&["Plan.\n<lash", "lang>\nsubmit \"ok\"\n</lashlang>"]);
assert_eq!(visible, "Plan.\n");
let response = lash_core::LlmResponse {
full_text: raw_final.to_string(),
parts: vec![
lash_core::LlmOutputPart::Text {
text: raw_final.to_string(),
response_meta: None,
},
lash_core::LlmOutputPart::Reasoning {
text: "brief reasoning summary".to_string(),
replay: None,
},
lash_core::LlmOutputPart::Text {
text: "stale provider text".to_string(),
response_meta: None,
},
],
..lash_core::LlmResponse::default()
};
let response = transform_final_response(&d, response);
assert_eq!(
response.full_text,
"Plan.\n<lashlang>\nsubmit \"ok\"\n</lashlang>"
);
assert_eq!(response.full_text.matches("<lashlang>").count(), 1);
assert!(matches!(
response.parts.first(),
Some(lash_core::LlmOutputPart::Reasoning { text, .. })
if text == "brief reasoning summary"
));
let text_parts = response
.parts
.iter()
.filter_map(|part| match part {
lash_core::LlmOutputPart::Text { text, .. } => Some(text.as_str()),
_ => None,
})
.collect::<Vec<_>>();
assert_eq!(text_parts, vec![response.full_text.as_str()]);
}
#[test]
fn final_response_transform_is_noop_without_detected_cell() {
let mut d = CellDetector::new();
assert_eq!(d.process_chunk("Visible only").chunk, "Visible only");
let response = response_with_text("Visible only");
let transformed = transform_final_response(&d, response.clone());
assert_eq!(transformed.full_text, response.full_text);
assert_eq!(transformed.parts, response.parts);
}
#[test]
fn final_response_splice_also_handles_already_transformed_visible_text() {
let (d, visible) = stream_chunks(&["Visible.\n", "<lashlang>\nsubmit \"ok\"\n</lashlang>"]);
assert_eq!(visible, "Visible.\n");
let spliced = d.spliced_response_text();
assert_eq!(spliced, "Visible.\n<lashlang>\nsubmit \"ok\"\n</lashlang>");
let span = first_lashlang_cell_span(&spliced).expect("spliced cell parses");
assert_eq!(&spliced[span.body_start..span.body_end], "submit \"ok\"");
}
#[test]
fn final_response_splice_preserves_start_tag_line_split_across_chunks() {
let (d, visible) = stream_chunks(&[
"Line one.",
"\n ",
"<las",
"hlang> \n",
"payload = r\"\"\"```markdown\nbody\n```\"\"\"\n",
"submit payload\n </lash",
"lang> ",
]);
assert_eq!(visible, "Line one.\n");
let spliced = d.spliced_response_text();
assert_eq!(
spliced,
"Line one.\n<lashlang>\npayload = r\"\"\"```markdown\nbody\n```\"\"\"\nsubmit payload\n</lashlang>"
);
let span = first_lashlang_cell_span(&spliced).expect("spliced cell parses");
assert_eq!(
&spliced[span.body_start..span.body_end],
"payload = r\"\"\"```markdown\nbody\n```\"\"\"\nsubmit payload"
);
}
#[test]
fn start_tag_only_without_newline_is_left_to_final_parser() {
let mut d = CellDetector::new();
let t = d.process_chunk("<lashlang>");
assert_eq!(t.chunk, "");
assert!(!d.inside_cell);
assert_eq!(d.splice_or_visible_for_test(""), "<lashlang>");
}
#[test]
fn final_response_transform_is_noop_for_incomplete_streamed_block() {
let mut d = CellDetector::new();
assert_eq!(
d.process_chunk("Visible.\n<lashlang>\nsubmit 1").chunk,
"Visible.\n"
);
assert!(d.inside_cell);
assert!(!d.cell_closed);
let response = response_with_text("Visible.\n<lashlang>\nsubmit 1");
let transformed = transform_final_response(&d, response.clone());
assert_eq!(transformed.full_text, response.full_text);
assert_eq!(transformed.parts, response.parts);
}
#[test]
fn old_percent_marker_streams_as_plain_prose() {
let mut d = CellDetector::new();
let t = d.process_chunk("%%lashlang\nsubmit 1\n");
assert_eq!(t.chunk, "%%lashlang\nsubmit 1\n");
assert!(!d.inside_cell);
assert!(!t.abort_stream);
}
impl CellDetector {
fn splice_or_visible_for_test(&self, visible: &str) -> String {
if self.inside_cell {
self.splice_into_visible(visible)
} else {
let mut out = visible.to_string();
out.push_str(&self.pending);
out
}
}
}
fn event_names(events: &[PluginRuntimeEvent]) -> Vec<&str> {
events
.iter()
.map(|event| match event {
PluginRuntimeEvent::Custom { name, .. } => name.as_str(),
_ => panic!("unexpected event: {event:?}"),
})
.collect()
}
}