use std::collections::HashMap;
use std::sync::Arc;
use regex::Regex;
use std::sync::Mutex;
use tokio::sync::Notify;
use crate::observe::structured::BufferEventKind;
use crate::observe::structured::EventSeq;
use crate::observe::structured::StructuredLogBuilder;
use crate::observe::structured::Utf8Stream;
use crate::vm::context::FailPattern;
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct FailPatternHit {
pub(crate) pattern: String,
pub(crate) is_regex: bool,
pub(crate) matched_text: String,
}
pub type MatchContext = (String, String, String);
fn truncate_before(s: &str, max: usize) -> String {
if s.len() <= max {
s.to_string()
} else {
let start = s.ceil_char_boundary(s.len() - max);
format!("...{}", &s[start..])
}
}
pub(crate) fn regex_error_summary(e: ®ex::Error) -> String {
let full = e.to_string();
full.lines()
.rev()
.find(|l| !l.is_empty())
.unwrap_or(&full)
.strip_prefix("error: ")
.unwrap_or(&full)
.to_string()
}
fn match_context(text: &str, pos: usize, end_pos: usize, matched: &str) -> MatchContext {
(
text[..pos].to_string(),
matched.to_string(),
text[end_pos..].to_string(),
)
}
pub trait MatchKind {}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct LiteralMatch(pub String);
impl MatchKind for LiteralMatch {}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct RegexMatch(pub HashMap<String, String>);
impl MatchKind for RegexMatch {}
#[derive(Debug, Clone)]
pub struct Match<T: MatchKind> {
pub start: usize,
pub end: usize,
pub consumed: usize,
pub value: T,
}
struct BufferInner {
decoded: String,
base: usize,
utf8: Utf8Stream,
}
#[derive(Clone)]
pub struct OutputBuffer {
inner: Arc<Mutex<BufferInner>>,
pub(crate) notify: Arc<Notify>,
log: Option<StructuredLogBuilder>,
shell_name: String,
shell_marker: String,
}
impl OutputBuffer {
pub fn new(log: StructuredLogBuilder, shell_name: String, shell_marker: String) -> Self {
Self {
inner: Arc::new(Mutex::new(BufferInner {
decoded: String::new(),
base: 0,
utf8: Utf8Stream::new(),
})),
notify: Arc::new(Notify::new()),
log: Some(log),
shell_name,
shell_marker,
}
}
#[cfg(test)]
pub fn for_tests() -> Self {
Self {
inner: Arc::new(Mutex::new(BufferInner {
decoded: String::new(),
base: 0,
utf8: Utf8Stream::new(),
})),
notify: Arc::new(Notify::new()),
log: None,
shell_name: String::new(),
shell_marker: String::new(),
}
}
pub async fn append(&self, bytes: &[u8]) {
let mut inner = self.inner.lock().unwrap();
let decoded = inner.utf8.feed(bytes);
if !decoded.is_empty() {
inner.decoded.push_str(&decoded);
if let Some(log) = &self.log {
log.push_buffer_event(
&self.shell_name,
&self.shell_marker,
BufferEventKind::Grew { data: decoded },
);
}
}
drop(inner);
self.notify.notify_waiters();
}
pub async fn consume_literal(&self, needle: &str) -> Option<(Match<LiteralMatch>, EventSeq)> {
let mut inner = self.inner.lock().unwrap();
let pos = inner.decoded.find(needle)?;
let end_pos = pos + needle.len();
let (before, matched_str, after) = match_context(&inner.decoded, pos, end_pos, needle);
let consumed = end_pos;
let m = Match {
start: inner.base + pos,
end: inner.base + end_pos,
consumed,
value: LiteralMatch(needle.to_string()),
};
inner.decoded.drain(..end_pos);
inner.base += end_pos;
let buffer_seq = self.emit_matched(before, matched_str, after);
Some((m, buffer_seq))
}
pub async fn consume_regex(&self, re: &Regex) -> Option<(Match<RegexMatch>, EventSeq)> {
let mut inner = self.inner.lock().unwrap();
let (pos, end_pos, matched_str, captures) = {
let cap = re.captures(&inner.decoded)?;
let whole = cap.get(0)?;
let pos = whole.start();
let end_pos = whole.end();
if is_partial_line_match(re, end_pos, &inner.decoded) {
return None;
}
let matched_str = whole.as_str().to_string();
let mut captures = HashMap::new();
for i in 0..cap.len() {
if let Some(m) = cap.get(i) {
captures.insert(i.to_string(), m.as_str().to_string());
}
}
(pos, end_pos, matched_str, captures)
};
let (before, _, after) = match_context(&inner.decoded, pos, end_pos, &matched_str);
let consumed = end_pos;
let m = Match {
start: inner.base + pos,
end: inner.base + end_pos,
consumed,
value: RegexMatch(captures),
};
inner.decoded.drain(..end_pos);
inner.base += end_pos;
let buffer_seq = self.emit_matched(before, matched_str, after);
Some((m, buffer_seq))
}
pub async fn fail_check_consume_literal(
&self,
needle: &str,
fail_pattern: Option<&FailPattern>,
) -> Result<Option<(Match<LiteralMatch>, EventSeq)>, FailPatternHit> {
let mut inner = self.inner.lock().unwrap();
if let Some(fp) = fail_pattern
&& let Some(hit) = check_fail_in_buffer(&inner.decoded, fp)
{
return Err(hit);
}
let Some(pos) = inner.decoded.find(needle) else {
return Ok(None);
};
let end_pos = pos + needle.len();
let (before, matched_str, after) = match_context(&inner.decoded, pos, end_pos, needle);
let consumed = end_pos;
let m = Match {
start: inner.base + pos,
end: inner.base + end_pos,
consumed,
value: LiteralMatch(needle.to_string()),
};
inner.decoded.drain(..end_pos);
inner.base += end_pos;
let buffer_seq = self.emit_matched(before, matched_str, after);
Ok(Some((m, buffer_seq)))
}
pub async fn fail_check_consume_regex(
&self,
re: &Regex,
fail_pattern: Option<&FailPattern>,
) -> Result<Option<(Match<RegexMatch>, EventSeq)>, FailPatternHit> {
let mut inner = self.inner.lock().unwrap();
if let Some(fp) = fail_pattern
&& let Some(hit) = check_fail_in_buffer(&inner.decoded, fp)
{
return Err(hit);
}
let (pos, end_pos, matched_str, captures) = {
let Some(cap) = re.captures(&inner.decoded) else {
return Ok(None);
};
let Some(whole) = cap.get(0) else {
return Ok(None);
};
let pos = whole.start();
let end_pos = whole.end();
if is_partial_line_match(re, end_pos, &inner.decoded) {
return Ok(None);
}
let matched_str = whole.as_str().to_string();
let mut captures = HashMap::new();
for i in 0..cap.len() {
if let Some(m) = cap.get(i) {
captures.insert(i.to_string(), m.as_str().to_string());
}
}
(pos, end_pos, matched_str, captures)
};
let (before, _, after) = match_context(&inner.decoded, pos, end_pos, &matched_str);
let consumed = end_pos;
let m = Match {
start: inner.base + pos,
end: inner.base + end_pos,
consumed,
value: RegexMatch(captures),
};
inner.decoded.drain(..end_pos);
inner.base += end_pos;
let buffer_seq = self.emit_matched(before, matched_str, after);
Ok(Some((m, buffer_seq)))
}
fn emit_matched(&self, before: String, matched: String, after: String) -> EventSeq {
if let Some(log) = &self.log {
log.push_buffer_event(
&self.shell_name,
&self.shell_marker,
BufferEventKind::Matched {
before,
matched,
after,
},
)
} else {
0
}
}
pub async fn check_fail_pattern(
&self,
fail_pattern: Option<&FailPattern>,
) -> Option<FailPatternHit> {
let fp = fail_pattern?;
let inner = self.inner.lock().unwrap();
check_fail_in_buffer(&inner.decoded, fp)
}
pub async fn clear(&self) -> String {
let mut inner = self.inner.lock().unwrap();
let consumed = std::mem::take(&mut inner.decoded);
inner.base += consumed.len();
if let Some(log) = &self.log {
log.push_buffer_event(
&self.shell_name,
&self.shell_marker,
BufferEventKind::Reset {
consumed: consumed.clone(),
},
);
}
consumed
}
pub async fn snapshot_tail(&self, n: usize) -> String {
let inner = self.inner.lock().unwrap();
truncate_before(&inner.decoded, n)
}
pub async fn remaining(&self) -> Vec<u8> {
let inner = self.inner.lock().unwrap();
inner.decoded.as_bytes().to_vec()
}
}
fn is_partial_line_match(re: &Regex, match_end: usize, text: &str) -> bool {
has_trailing_anchor(re.as_str()) && match_end == text.len() && !text.ends_with('\n')
}
fn has_trailing_anchor(src: &str) -> bool {
let Some(stripped) = src.strip_suffix('$') else {
return false;
};
let trailing_backslashes = stripped.bytes().rev().take_while(|&b| b == b'\\').count();
trailing_backslashes % 2 == 0
}
fn check_fail_in_buffer(text: &str, pattern: &FailPattern) -> Option<FailPatternHit> {
match pattern {
FailPattern::Regex(re) => {
let m = re.find(text)?;
Some(FailPatternHit {
pattern: re.as_str().to_string(),
is_regex: true,
matched_text: m.as_str().to_string(),
})
}
FailPattern::Literal(s) => {
text.find(s.as_str())?;
Some(FailPatternHit {
pattern: s.clone(),
is_regex: false,
matched_text: s.clone(),
})
}
}
}
#[cfg(test)]
mod tests {
use std::path::PathBuf;
use std::time::Instant;
use super::*;
use crate::observe::progress;
use regex::RegexBuilder;
fn wired_buffer() -> (
OutputBuffer,
StructuredLogBuilder,
tokio::sync::mpsc::UnboundedReceiver<crate::observe::progress::ProgressEvent>,
) {
let (tx, rx) = progress::channel();
let sources = relux_core::table::SharedTable::new();
let builder = StructuredLogBuilder::new(
tx,
Instant::now(),
sources,
Arc::from(PathBuf::from("/project").as_path()),
);
let buf = OutputBuffer::new(builder.clone(), "sh".into(), "m".into());
(buf, builder, rx)
}
fn last_matched(builder: &StructuredLogBuilder) -> Option<(String, String, String)> {
let events = builder.buffer_events_for_tests();
events.last().and_then(|ev| match &ev.kind {
BufferEventKind::Matched {
before,
matched,
after,
} => Some((before.clone(), matched.clone(), after.clone())),
_ => None,
})
}
fn last_reset(builder: &StructuredLogBuilder) -> Option<String> {
let events = builder.buffer_events_for_tests();
events.last().and_then(|ev| match &ev.kind {
BufferEventKind::Reset { consumed } => Some(consumed.clone()),
_ => None,
})
}
fn all_grew(builder: &StructuredLogBuilder) -> Vec<String> {
builder
.buffer_events_for_tests()
.iter()
.filter_map(|ev| match &ev.kind {
BufferEventKind::Grew { data } => Some(data.clone()),
_ => None,
})
.collect()
}
#[test]
fn truncate_before_short_string_unchanged() {
assert_eq!(truncate_before("hello", 10), "hello");
}
#[test]
fn truncate_before_exact_length_unchanged() {
assert_eq!(truncate_before("hello", 5), "hello");
}
#[test]
fn truncate_before_keeps_last_n_chars() {
assert_eq!(truncate_before("hello world", 5), "...world");
}
#[test]
fn truncate_before_empty_string() {
assert_eq!(truncate_before("", 5), "");
}
#[test]
fn truncate_before_max_zero() {
assert_eq!(truncate_before("hello", 0), "...");
}
#[tokio::test]
async fn output_buffer_append_and_remaining() {
let buf = OutputBuffer::for_tests();
buf.append(b"hello").await;
assert_eq!(buf.remaining().await, b"hello");
}
#[tokio::test]
async fn output_buffer_append_empty_bytes() {
let buf = OutputBuffer::for_tests();
buf.append(b"").await;
assert!(buf.remaining().await.is_empty());
}
#[tokio::test]
async fn consume_literal_basic() {
let (buf, builder, _rx) = wired_buffer();
buf.append(b"hello world").await;
let (m, _buffer_seq) = buf.consume_literal("hello").await.unwrap();
assert_eq!(m.start, 0);
assert_eq!(m.end, 5);
assert_eq!(m.consumed, 5);
assert_eq!(m.value.0, "hello");
let (before, matched, after) = last_matched(&builder).expect("matched event");
assert_eq!(before, "");
assert_eq!(matched, "hello");
assert_eq!(after, " world");
assert_eq!(buf.remaining().await, b" world");
}
#[tokio::test]
async fn consume_literal_drains_up_to_match_end() {
let buf = OutputBuffer::for_tests();
buf.append(b"prefix MATCH suffix").await;
let (m, _) = buf.consume_literal("MATCH").await.unwrap();
assert_eq!(m.start, 7);
assert_eq!(m.end, 12);
assert_eq!(m.consumed, 12);
assert_eq!(buf.remaining().await, b" suffix");
}
#[tokio::test]
async fn consume_literal_not_found() {
let buf = OutputBuffer::for_tests();
buf.append(b"hello world").await;
assert!(buf.consume_literal("xyz").await.is_none());
assert_eq!(buf.remaining().await, b"hello world");
}
#[tokio::test]
async fn consume_literal_absolute_offsets_after_drain() {
let buf = OutputBuffer::for_tests();
buf.append(b"aaa bbb ccc").await;
let (m1, _) = buf.consume_literal("aaa").await.unwrap();
assert_eq!(m1.start, 0);
assert_eq!(m1.end, 3);
let (m2, _) = buf.consume_literal("bbb").await.unwrap();
assert_eq!(m2.start, 4);
assert_eq!(m2.end, 7);
assert_eq!(buf.remaining().await, b" ccc");
}
#[tokio::test]
async fn consume_literal_context_carries_full_before_and_after() {
let (buf, builder, _rx) = wired_buffer();
let huge_prefix = "x".repeat(500);
let huge_suffix = "y".repeat(500);
buf.append(format!("{huge_prefix}MATCH{huge_suffix}").as_bytes())
.await;
let _ = buf.consume_literal("MATCH").await.unwrap();
let (before, matched, after) = last_matched(&builder).expect("matched event");
assert_eq!(before, huge_prefix);
assert_eq!(matched, "MATCH");
assert_eq!(after, huge_suffix);
}
#[tokio::test]
async fn consume_literal_handles_invalid_utf8_in_buffer() {
let buf = OutputBuffer::for_tests();
let mut bytes = b"prefix".to_vec();
bytes.push(0xFF);
bytes.extend_from_slice(b"MATCH suffix");
buf.append(&bytes).await;
let (m, _) = buf.consume_literal("MATCH").await.expect("found");
assert_eq!(m.value.0, "MATCH");
assert_eq!(buf.remaining().await, " suffix".as_bytes());
}
#[tokio::test]
async fn consume_regex_basic() {
let buf = OutputBuffer::for_tests();
buf.append(b"abc 123 def").await;
let re = Regex::new(r"\d+").unwrap();
let (m, _) = buf.consume_regex(&re).await.unwrap();
assert_eq!(m.start, 4);
assert_eq!(m.end, 7);
assert_eq!(m.value.0.get("0").unwrap(), "123");
assert_eq!(buf.remaining().await, b" def");
}
#[tokio::test]
async fn consume_regex_with_captures() {
let buf = OutputBuffer::for_tests();
buf.append(b"name: Alice age: 30\n").await;
let re = Regex::new(r"name: (\w+) age: (\d+)").unwrap();
let (m, _) = buf.consume_regex(&re).await.unwrap();
assert_eq!(m.start, 0);
assert_eq!(m.end, 19);
assert_eq!(m.value.0.get("0").unwrap(), "name: Alice age: 30");
assert_eq!(m.value.0.get("1").unwrap(), "Alice");
assert_eq!(m.value.0.get("2").unwrap(), "30");
}
#[tokio::test]
async fn consume_regex_not_found() {
let buf = OutputBuffer::for_tests();
buf.append(b"hello world").await;
let re = Regex::new(r"\d+").unwrap();
assert!(buf.consume_regex(&re).await.is_none());
assert_eq!(buf.remaining().await, b"hello world");
}
#[tokio::test]
async fn consume_regex_absolute_offsets_after_drain() {
let buf = OutputBuffer::for_tests();
buf.append(b"aaa 123 bbb 456\n").await;
let re = Regex::new(r"\d+").unwrap();
let (m1, _) = buf.consume_regex(&re).await.unwrap();
assert_eq!(m1.start, 4);
assert_eq!(m1.end, 7);
let (m2, _) = buf.consume_regex(&re).await.unwrap();
assert_eq!(m2.start, 12);
assert_eq!(m2.end, 15);
}
#[tokio::test]
async fn consume_regex_defers_partial_line() {
let buf = OutputBuffer::for_tests();
buf.append(b"hello wor").await;
let re = RegexBuilder::new(r"^(.+)$")
.multi_line(true)
.build()
.unwrap();
assert!(buf.consume_regex(&re).await.is_none());
assert_eq!(buf.remaining().await, b"hello wor");
buf.append(b"ld\n").await;
let (m, _) = buf.consume_regex(&re).await.unwrap();
assert_eq!(m.value.0.get("0").unwrap(), "hello world");
}
#[tokio::test]
async fn consume_regex_allows_match_before_partial_tail() {
let buf = OutputBuffer::for_tests();
buf.append(b"first line\nsecond li").await;
let re = RegexBuilder::new(r"^(.+)$")
.multi_line(true)
.build()
.unwrap();
let (m, _) = buf.consume_regex(&re).await.unwrap();
assert_eq!(m.value.0.get("1").unwrap(), "first line");
}
#[tokio::test]
async fn fail_check_consume_regex_defers_partial_line() {
let buf = OutputBuffer::for_tests();
buf.append(b"partial data").await;
let re = RegexBuilder::new(r"^(.+)$")
.multi_line(true)
.build()
.unwrap();
let result = buf.fail_check_consume_regex(&re, None).await;
assert!(result.unwrap().is_none());
buf.append(b"\n").await;
let result = buf.fail_check_consume_regex(&re, None).await;
let (m, _) = result.unwrap().unwrap();
assert_eq!(m.value.0.get("0").unwrap(), "partial data");
}
#[tokio::test]
async fn consume_regex_handles_invalid_utf8_in_buffer() {
let buf = OutputBuffer::for_tests();
let mut bytes = b"abc".to_vec();
bytes.push(0xFF);
bytes.extend_from_slice(b" 123 def");
buf.append(&bytes).await;
let re = Regex::new(r"\d+").unwrap();
let (m, _) = buf.consume_regex(&re).await.expect("found");
assert_eq!(m.value.0.get("0").unwrap(), "123");
assert_eq!(buf.remaining().await, " def".as_bytes());
}
#[tokio::test]
async fn consume_regex_does_not_defer_on_escaped_trailing_dollar() {
let buf = OutputBuffer::for_tests();
buf.append(b"price: $9").await;
let re = Regex::new(r"price: \$\d+").unwrap();
let (m, _) = buf
.consume_regex(&re)
.await
.expect("escaped trailing dollar must not defer");
assert_eq!(m.value.0.get("0").unwrap(), "price: $9");
}
#[test]
fn has_trailing_anchor_unescaped() {
assert!(super::has_trailing_anchor("foo$"));
assert!(super::has_trailing_anchor(r"^(.+)$"));
assert!(super::has_trailing_anchor(r"foo\\$"));
}
#[test]
fn has_trailing_anchor_escaped() {
assert!(!super::has_trailing_anchor(r"price: \$"));
assert!(!super::has_trailing_anchor(r"foo\\\$"));
}
#[test]
fn has_trailing_anchor_no_dollar() {
assert!(!super::has_trailing_anchor("foo"));
assert!(!super::has_trailing_anchor(""));
}
#[tokio::test]
async fn clear_empties_buffer_and_returns_consumed() {
let buf = OutputBuffer::for_tests();
buf.append(b"hello world").await;
let consumed = buf.clear().await;
assert_eq!(consumed, "hello world");
assert!(buf.remaining().await.is_empty());
}
#[tokio::test]
async fn clear_advances_base_correctly() {
let buf = OutputBuffer::for_tests();
buf.append(b"hello world").await;
let _ = buf.clear().await;
buf.append(b"abc 123\n").await;
let re = Regex::new(r"\d+").unwrap();
let (m, _) = buf.consume_regex(&re).await.unwrap();
assert_eq!(m.start, 15);
assert_eq!(m.end, 18);
}
#[tokio::test]
async fn clear_drops_incomplete_utf8_trailing_sequence() {
let (buf, builder, _rx) = wired_buffer();
buf.append(b"ok").await;
buf.append(&[0xF0, 0x9F]).await;
let _ = buf.clear().await;
let consumed = last_reset(&builder).expect("reset event");
assert_eq!(consumed, "ok");
}
#[tokio::test]
async fn clear_consumed_equals_sum_of_grew_payloads() {
let (buf, builder, _rx) = wired_buffer();
buf.append(b"alpha ").await;
buf.append(b"beta ").await;
buf.append("gamma\n".as_bytes()).await;
let grew_sum: String = all_grew(&builder).concat();
let _ = buf.clear().await;
let consumed = last_reset(&builder).expect("reset event");
assert_eq!(consumed, grew_sum);
assert_eq!(consumed, "alpha beta gamma\n");
}
#[tokio::test]
async fn clear_preserves_partial_utf8_in_buffer() {
let (buf, builder, _rx) = wired_buffer();
buf.append(&[0xF0, 0x9F]).await;
let _ = buf.clear().await;
let consumed = last_reset(&builder).expect("reset event");
assert_eq!(consumed, "");
buf.append(&[0x8E, 0x89]).await;
let grew: Vec<String> = all_grew(&builder);
assert_eq!(grew.last().map(String::as_str), Some("\u{1F389}"));
}
#[tokio::test]
async fn snapshot_tail_returns_truncated_tail() {
let buf = OutputBuffer::for_tests();
buf.append(b"hello world").await;
let tail = buf.snapshot_tail(5).await;
assert_eq!(tail, "...world");
}
#[tokio::test]
async fn snapshot_tail_full_content_when_short() {
let buf = OutputBuffer::for_tests();
buf.append(b"hi").await;
let tail = buf.snapshot_tail(80).await;
assert_eq!(tail, "hi");
}
#[test]
fn check_fail_in_buffer_regex_match() {
let fp = FailPattern::Regex(Regex::new(r"ERROR").unwrap());
let hit = check_fail_in_buffer("some ERROR here", &fp).unwrap();
assert_eq!(hit.pattern, "ERROR");
assert_eq!(hit.matched_text, "ERROR");
}
#[test]
fn check_fail_in_buffer_regex_no_match() {
let fp = FailPattern::Regex(Regex::new(r"ERROR").unwrap());
assert!(check_fail_in_buffer("all good", &fp).is_none());
}
#[test]
fn check_fail_in_buffer_literal_match() {
let fp = FailPattern::Literal("FATAL".to_string());
let hit = check_fail_in_buffer("got FATAL crash", &fp).unwrap();
assert_eq!(hit.pattern, "FATAL");
assert_eq!(hit.matched_text, "FATAL");
}
#[test]
fn check_fail_in_buffer_literal_no_match() {
let fp = FailPattern::Literal("FATAL".to_string());
assert!(check_fail_in_buffer("all good", &fp).is_none());
}
}