use std::collections::BTreeMap;
use std::fmt::Write as _;
use bmux_ipc::Request;
use bmux_recording_protocol::{
RecordingEventEnvelope as ProtocolRecordingEventEnvelope, RecordingEventKind,
RecordingPayload as ProtocolRecordingPayload,
};
use uuid::Uuid;
type RecordingPayload = ProtocolRecordingPayload<bmux_ipc::Event, bmux_ipc::ErrorCode>;
type RecordingEventEnvelope = ProtocolRecordingEventEnvelope<bmux_ipc::Event, bmux_ipc::ErrorCode>;
const SLEEP_THRESHOLD_NS: u64 = 200_000_000;
const INPUT_COALESCE_NS: u64 = 100_000_000;
const OUTPUT_QUIESCENT_NS: u64 = 300_000_000;
struct RecordingStateTracker {
pane_uuid_to_index: BTreeMap<Uuid, u32>,
focused_pane_id: Option<Uuid>,
viewport: (u16, u16),
#[cfg(test)]
next_pane_index: u32,
}
impl RecordingStateTracker {
const fn new() -> Self {
Self {
pane_uuid_to_index: BTreeMap::new(),
focused_pane_id: None,
viewport: (80, 24),
#[cfg(test)]
next_pane_index: 0,
}
}
#[cfg(test)]
fn add_pane(&mut self, pane_id: Uuid) {
if !self.pane_uuid_to_index.contains_key(&pane_id) {
self.pane_uuid_to_index
.insert(pane_id, self.next_pane_index);
self.next_pane_index += 1;
}
}
#[cfg(test)]
const fn set_focus(&mut self, pane_id: Uuid) {
self.focused_pane_id = Some(pane_id);
}
#[cfg(test)]
fn remove_pane(&mut self, pane_id: &Uuid) {
self.pane_uuid_to_index.remove(pane_id);
}
fn pane_index(&self, pane_id: &Uuid) -> Option<u32> {
self.pane_uuid_to_index.get(pane_id).copied()
}
fn focused_pane_index(&self) -> Option<u32> {
self.focused_pane_id
.as_ref()
.and_then(|id| self.pane_index(id))
}
}
struct PaneOutputAccumulator {
pane_id: Uuid,
bytes: Vec<u8>,
}
#[must_use]
#[allow(clippy::too_many_lines, clippy::similar_names)]
pub fn events_to_playbook(events: &[RecordingEventEnvelope]) -> String {
let mut lines: Vec<String> = Vec::new();
lines.push("# Auto-generated from recording".to_string());
lines.push(String::new());
let state = RecordingStateTracker::new();
let mut last_mono_ns: u64 = 0;
let mut has_session = false;
let mut pending_input: Vec<u8> = Vec::new();
let mut pending_input_pane: Option<Uuid> = None;
let mut last_input_mono_ns: u64 = 0;
let mut pending_input_is_command = false;
let mut output_accum: Vec<PaneOutputAccumulator> = Vec::new();
let mut last_output_mono_ns: u64 = 0;
for event in events {
let is_input_event = matches!(
(&event.kind, &event.payload),
(
RecordingEventKind::RequestStart,
RecordingPayload::RequestStart { request_kind, .. }
) if request_kind == "attach_input" || request_kind == "pane_direct_input"
);
let time_gap = if last_input_mono_ns > 0 && event.mono_ns > last_input_mono_ns {
event.mono_ns - last_input_mono_ns
} else {
0
};
let output_quiescent = !output_accum.is_empty()
&& last_output_mono_ns > 0
&& event.mono_ns.saturating_sub(last_output_mono_ns) > OUTPUT_QUIESCENT_NS;
let should_flush_input =
!pending_input.is_empty() && (!is_input_event || time_gap > INPUT_COALESCE_NS);
if should_flush_input || (output_quiescent && pending_input.is_empty()) {
if !pending_input.is_empty() {
flush_input(&mut lines, &pending_input, pending_input_pane, &state);
let was_command = pending_input_is_command;
pending_input.clear();
pending_input_pane = None;
pending_input_is_command = false;
if was_command && !output_accum.is_empty() {
generate_assertions_from_output(&mut lines, &output_accum, &state);
}
output_accum.clear();
} else if output_quiescent {
generate_assertions_from_output(&mut lines, &output_accum, &state);
output_accum.clear();
}
}
if last_mono_ns > 0 && event.mono_ns > last_mono_ns {
let gap_ns = event.mono_ns - last_mono_ns;
if gap_ns >= SLEEP_THRESHOLD_NS {
let gap_ms = gap_ns / 1_000_000;
lines.push(format!("sleep ms={gap_ms}"));
}
}
last_mono_ns = event.mono_ns;
if let (RecordingEventKind::PaneOutputRaw, RecordingPayload::Bytes { data }) =
(&event.kind, &event.payload)
{
let pane_id = event.pane_id.unwrap_or_default();
if let Some(existing) = output_accum.iter_mut().find(|a| a.pane_id == pane_id) {
existing.bytes.extend_from_slice(data);
} else {
output_accum.push(PaneOutputAccumulator {
pane_id,
bytes: data.clone(),
});
}
last_output_mono_ns = event.mono_ns;
}
if let (
RecordingEventKind::RequestStart,
RecordingPayload::RequestStart {
request_data,
request_kind,
..
},
) = (&event.kind, &event.payload)
{
if request_data.is_empty() {
continue;
}
if let Ok(request) = bmux_ipc::decode::<Request>(request_data) {
match request_to_dsl(&request, &mut has_session, request_kind, &state, event) {
RequestDslResult::Line(line) => lines.push(line),
RequestDslResult::CoalesceInput(data, pane_id) => {
if data.contains(&b'\r') {
pending_input_is_command = true;
}
if pending_input_pane.is_none() {
pending_input_pane = pane_id;
}
pending_input.extend_from_slice(&data);
last_input_mono_ns = event.mono_ns;
}
RequestDslResult::Skip => {}
}
}
}
}
if !pending_input.is_empty() {
flush_input(&mut lines, &pending_input, pending_input_pane, &state);
if pending_input_is_command && !output_accum.is_empty() {
generate_assertions_from_output(&mut lines, &output_accum, &state);
}
} else if !output_accum.is_empty() {
generate_assertions_from_output(&mut lines, &output_accum, &state);
}
lines.push(String::new());
lines.join("\n")
}
fn flush_input(
lines: &mut Vec<String>,
data: &[u8],
pane_id: Option<Uuid>,
state: &RecordingStateTracker,
) {
let escaped = bytes_to_c_escaped(data);
let pane_arg = pane_id.and_then(|id| {
let target_idx = state.pane_index(&id)?;
let focused_idx = state.focused_pane_index();
if state.pane_uuid_to_index.len() > 1 && (focused_idx != Some(target_idx)) {
Some(target_idx)
} else {
None
}
});
match pane_arg {
Some(idx) => lines.push(format!("send-keys keys='{escaped}' pane={idx}")),
None => lines.push(format!("send-keys keys='{escaped}'")),
}
}
fn generate_assertions_from_output(
lines: &mut Vec<String>,
output_accum: &[PaneOutputAccumulator],
state: &RecordingStateTracker,
) {
let (cols, rows) = state.viewport;
for accum in output_accum {
if accum.bytes.is_empty() {
continue;
}
let pane_index = state.pane_index(&accum.pane_id);
let mut stream = bmux_terminal_grid::TerminalGridStream::new(
cols.max(1),
rows.max(1),
bmux_terminal_grid::GridLimits::default(),
)
.expect("recording playbook grid dimensions are valid");
stream.process(&accum.bytes);
let text_lines =
bmux_terminal_grid::visible_text_lines(stream.grid(), 0, usize::from(rows.max(1)));
let last_nonempty = text_lines.iter().rposition(|l| !l.trim().is_empty());
let Some(last_idx) = last_nonempty else {
continue; };
let prompt_line = text_lines[last_idx].trim();
let pattern = make_robust_pattern(prompt_line);
if !pattern.is_empty() {
let pane_suffix = pane_index
.filter(|_| state.pane_uuid_to_index.len() > 1)
.map_or(String::new(), |idx| format!(" pane={idx}"));
lines.push(format!("wait-for pattern='{pattern}'{pane_suffix}"));
}
let content_lines = &text_lines[..last_idx];
let mut assertions_added = 0;
for content_line in content_lines.iter().rev() {
let trimmed = content_line.trim();
if trimmed.is_empty() || trimmed.len() < 3 {
continue;
}
if trimmed
.chars()
.all(|c| c.is_ascii_digit() || c == '.' || c == ':' || c == ' ')
{
continue;
}
let escaped_content = escape_single_quote(trimmed);
let pane_suffix = pane_index
.filter(|_| state.pane_uuid_to_index.len() > 1)
.map_or(String::new(), |idx| format!(" pane={idx}"));
lines.push(format!(
"assert-screen contains='{escaped_content}'{pane_suffix}"
));
assertions_added += 1;
if assertions_added >= 3 {
break; }
}
}
}
fn make_robust_pattern(line: &str) -> String {
let mut result = String::new();
let mut chars = line.chars().peekable();
while let Some(ch) = chars.next() {
if ch.is_ascii_digit() {
while chars.peek().is_some_and(char::is_ascii_digit) {
chars.next();
}
result.push_str("\\d+");
} else if is_regex_meta(ch) {
result.push('\\');
result.push(ch);
} else {
result.push(ch);
}
}
result
}
const fn is_regex_meta(ch: char) -> bool {
matches!(
ch,
'.' | '*' | '+' | '?' | '(' | ')' | '[' | ']' | '{' | '}' | '|' | '^' | '$' | '\\'
)
}
pub(super) fn escape_single_quote(s: &str) -> String {
s.replace('\'', "\\'")
}
enum RequestDslResult {
Line(String),
CoalesceInput(Vec<u8>, Option<Uuid>),
Skip,
}
#[allow(clippy::too_many_lines)] fn request_to_dsl(
request: &Request,
has_session: &mut bool,
request_kind: &str,
state: &RecordingStateTracker,
event: &RecordingEventEnvelope,
) -> RequestDslResult {
match request {
Request::InvokeService {
interface_id,
operation,
payload,
..
} => {
if interface_id == "sessions-commands" && operation == "new-session" {
#[derive(serde::Deserialize)]
struct NewSessionArgs {
name: Option<String>,
}
if let Ok(args) = bmux_codec::from_bytes::<NewSessionArgs>(payload) {
*has_session = true;
return RequestDslResult::Line(args.name.map_or_else(
|| "new-session".to_string(),
|n| format!("new-session name='{n}'"),
));
}
}
if interface_id == "sessions-commands" && operation == "kill-session" {
#[derive(serde::Deserialize)]
struct KillSessionArgs {
selector: SelectorSlim,
}
#[derive(serde::Deserialize)]
struct SelectorSlim {
#[serde(default)]
name: Option<String>,
}
if let Ok(args) = bmux_codec::from_bytes::<KillSessionArgs>(payload)
&& let Some(name) = args.selector.name
{
return RequestDslResult::Line(format!("kill-session name='{name}'"));
}
}
if interface_id == "attach-runtime-commands" {
match operation.as_str() {
"attach-set-viewport" => {
#[derive(serde::Deserialize)]
struct ViewportArgs {
#[serde(rename = "session_id")]
_session_id: Uuid,
cols: u16,
rows: u16,
#[serde(rename = "status_top_inset")]
_status_top_inset: u16,
#[serde(rename = "status_bottom_inset")]
_status_bottom_inset: u16,
#[serde(rename = "cell_pixel_w")]
_cell_pixel_w: u16,
#[serde(rename = "cell_pixel_h")]
_cell_pixel_h: u16,
}
if let Ok(args) = bmux_codec::from_bytes::<ViewportArgs>(payload) {
return RequestDslResult::Line(format!(
"resize-viewport cols={} rows={}",
args.cols, args.rows
));
}
}
"attach-input" => {
#[derive(serde::Deserialize)]
struct InputArgs {
#[serde(rename = "session_id")]
_session_id: Uuid,
data: Vec<u8>,
}
if let Ok(args) = bmux_codec::from_bytes::<InputArgs>(payload) {
if args.data.is_empty() || !*has_session {
return RequestDslResult::Skip;
}
let pane_id = event.pane_id.or(state.focused_pane_id);
return RequestDslResult::CoalesceInput(args.data, pane_id);
}
}
"attach-session"
| "attach-context"
| "attach-open"
| "attach-output"
| "detach"
| "set-client-attach-policy" => return RequestDslResult::Skip,
_ => {}
}
}
if interface_id == "attach-runtime-state" {
return RequestDslResult::Skip;
}
RequestDslResult::Line(format!(
"# unhandled invoke-service {interface_id}:{operation}"
))
}
Request::Ping
| Request::Hello { .. }
| Request::InvokeServicePipeline { .. }
| Request::WhoAmIPrincipal
| Request::SubscribeEvents
| Request::PollEvents { .. } => RequestDslResult::Skip,
_ => RequestDslResult::Line(format!("# unhandled request: {request_kind}")),
}
}
pub(super) fn bytes_to_c_escaped(data: &[u8]) -> String {
let mut result = String::new();
for &byte in data {
match byte {
b'\r' => result.push_str("\\r"),
b'\n' => result.push_str("\\n"),
b'\t' => result.push_str("\\t"),
b'\\' => result.push_str("\\\\"),
b'\'' => result.push_str("\\'"),
0x1b => result.push_str("\\e"),
0x01..=0x1a => {
write!(result, "\\x{byte:02x}").unwrap();
}
0x7f => result.push_str("\\x7f"),
0x20..=0x7e => result.push(byte as char),
_ => {
write!(result, "\\x{byte:02x}").unwrap();
}
}
}
result
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn bytes_to_c_escaped_basic() {
assert_eq!(bytes_to_c_escaped(b"hello\r\n"), "hello\\r\\n");
}
#[test]
fn bytes_to_c_escaped_ctrl() {
assert_eq!(bytes_to_c_escaped(&[0x01]), "\\x01"); assert_eq!(bytes_to_c_escaped(&[0x1b]), "\\e"); }
#[test]
fn bytes_to_c_escaped_mixed() {
assert_eq!(bytes_to_c_escaped(b"echo hello\r"), "echo hello\\r");
}
#[test]
fn make_robust_pattern_digits() {
assert_eq!(make_robust_pattern("pid: 12345"), "pid: \\d+");
assert_eq!(make_robust_pattern("line 42: error"), "line \\d+: error");
}
#[test]
fn make_robust_pattern_escapes_meta() {
assert_eq!(make_robust_pattern("file.txt"), "file\\.txt");
assert_eq!(make_robust_pattern("a+b"), "a\\+b");
assert_eq!(make_robust_pattern("user@host:~$"), "user@host:~\\$");
}
#[test]
fn make_robust_pattern_preserves_text() {
assert_eq!(make_robust_pattern("hello world"), "hello world");
}
#[test]
fn escape_single_quote_basic() {
assert_eq!(escape_single_quote("it's"), "it\\'s");
assert_eq!(escape_single_quote("a\\b"), "a\\b");
}
#[test]
fn tracker_pane_lifecycle() {
let mut tracker = RecordingStateTracker::new();
let pane1 = Uuid::nil();
let pane2 = Uuid::from_u128(1);
tracker.add_pane(pane1);
tracker.set_focus(pane1);
assert_eq!(tracker.pane_index(&pane1), Some(0));
assert_eq!(tracker.focused_pane_index(), Some(0));
tracker.add_pane(pane2);
tracker.set_focus(pane2);
assert_eq!(tracker.pane_index(&pane2), Some(1));
assert_eq!(tracker.focused_pane_index(), Some(1));
tracker.remove_pane(&pane1);
assert_eq!(tracker.pane_index(&pane1), None);
assert_eq!(tracker.pane_index(&pane2), Some(1)); }
#[test]
fn generate_assertions_basic() {
let mut state = RecordingStateTracker::new();
let pane_id = Uuid::nil();
state.add_pane(pane_id);
state.set_focus(pane_id);
state.viewport = (40, 10);
let output = b"hello world\r\nuser@host:~$ ";
let accum = vec![PaneOutputAccumulator {
pane_id,
bytes: output.to_vec(),
}];
let mut lines = Vec::new();
generate_assertions_from_output(&mut lines, &accum, &state);
assert!(
lines.iter().any(|l| l.starts_with("wait-for")),
"expected wait-for, got: {lines:?}"
);
let waitfor = lines.iter().find(|l| l.starts_with("wait-for")).unwrap();
assert!(
waitfor.contains("user@host"),
"wait-for should match prompt: {waitfor}"
);
}
}