use std::collections::HashMap;
use once_cell::sync::Lazy;
use regex::{Regex, RegexSet};
mod builder;
mod config;
mod diagnostics;
mod execution;
mod runtime;
mod transitions;
pub use config::{
DeviceCommandExecutionConfig, DeviceHandlerConfig, DeviceInputRule, DevicePromptRule,
DevicePromptWithSysRule, DeviceShellFlavor, DeviceTransitionRule, input_rule, prompt_rule,
prompt_with_sys_rule, transition_rule,
};
pub use diagnostics::StateMachineDiagnostics;
#[derive(Debug, Clone, PartialEq, Eq)]
pub(crate) enum CommandExecutionStrategy {
PromptDriven,
ShellExitStatus {
marker: String,
shell_flavor: DeviceShellFlavor,
},
}
pub struct DeviceHandler {
current_state_index: usize,
all_states: Vec<String>,
all_regex: RegexSet,
regex_index_map: HashMap<usize, usize>,
prompt_index: (usize, usize),
sys_prompt_index: (usize, usize),
input_map: HashMap<String, (bool, String, bool)>,
edges: Vec<(String, String, String, bool, bool)>,
ignore_errors: Option<RegexSet>,
pub dyn_param: HashMap<String, String>,
catch_map: HashMap<usize, (Regex, String)>,
sys: Option<String>,
current_prompt: Option<String>,
prompt_patterns: Vec<(String, String)>,
prompt_prefix_regex: Option<RegexSet>,
prompt_prefix_patterns: Vec<String>,
command_execution: CommandExecutionStrategy,
}
type ExitPath = Option<(String, Vec<(String, String)>)>;
pub(crate) const PRIVATE_USE_PLACEHOLDER: &str = "<PUA>";
pub(crate) fn is_private_use(ch: char) -> bool {
matches!(
ch as u32,
0xE000..=0xF8FF | 0xF0000..=0xFFFFD | 0x100000..=0x10FFFD
)
}
static PRE_STATE: Lazy<Vec<String>> = Lazy::new(|| {
vec![
"Output".to_string(),
"More".to_string(),
"Error".to_string(),
]
});
pub static IGNORE_START_LINE: Lazy<Regex> =
Lazy::new(
|| match Regex::new(r"^(\r+(\s+\r+)*)|(\u{8}+(\s+\u{8}+)*)") {
Ok(re) => re,
Err(err) => panic!("invalid IGNORE_START_LINE regex: {err}"),
},
);
pub static STRIP_OSC_ESCAPE: Lazy<Regex> =
Lazy::new(|| match Regex::new(r"\x1B\][^\x07\x1B]*(?:\x07|\x1B\\)") {
Ok(re) => re,
Err(err) => panic!("invalid STRIP_OSC_ESCAPE regex: {err}"),
});
pub static STRIP_DCS_ESCAPE: Lazy<Regex> = Lazy::new(|| match Regex::new(r"\x1BP(?s:.*?)\x1B\\") {
Ok(re) => re,
Err(err) => panic!("invalid STRIP_DCS_ESCAPE regex: {err}"),
});
pub static STRIP_CSI_ESCAPE: Lazy<Regex> =
Lazy::new(|| match Regex::new(r"\x1B\[[0-?]*[ -/]*[@-~]") {
Ok(re) => re,
Err(err) => panic!("invalid STRIP_CSI_ESCAPE regex: {err}"),
});
pub static STRIP_SIMPLE_ESCAPE: Lazy<Regex> =
Lazy::new(|| match Regex::new(r"\x1B(?:[@-Z\\-_]|[=>])") {
Ok(re) => re,
Err(err) => panic!("invalid STRIP_SIMPLE_ESCAPE regex: {err}"),
});
pub(crate) fn sanitize_terminal_text(line: &str) -> String {
let without_osc = STRIP_OSC_ESCAPE.replace_all(line, "");
let without_dcs = STRIP_DCS_ESCAPE.replace_all(without_osc.as_ref(), "");
let without_csi = STRIP_CSI_ESCAPE.replace_all(without_dcs.as_ref(), "");
let without_simple = STRIP_SIMPLE_ESCAPE.replace_all(without_csi.as_ref(), "");
let mut sanitized = String::with_capacity(without_simple.len());
let mut in_pua_run = false;
for ch in without_simple.chars() {
if ch.is_control() && !matches!(ch, '\n' | '\r' | '\t') {
continue;
}
if is_private_use(ch) {
if !in_pua_run {
sanitized.push_str(PRIVATE_USE_PLACEHOLDER);
in_pua_run = true;
}
continue;
}
in_pua_run = false;
sanitized.push(ch);
}
sanitized
}
pub(crate) fn latest_terminal_fragment(line: &str) -> &str {
line.rsplit(['\n', '\r'])
.find(|segment| !segment.is_empty())
.unwrap_or(line)
}
pub(crate) fn normalize_terminal_output(text: &str) -> String {
let mut normalized = String::with_capacity(text.len());
for chunk in text.split_inclusive('\n') {
let has_newline = chunk.ends_with('\n');
let body = if has_newline {
&chunk[..chunk.len().saturating_sub(1)]
} else {
chunk
};
let sanitized = sanitize_terminal_text(body);
let visible = latest_terminal_fragment(&sanitized).trim_end_matches('\r');
normalized.push_str(visible);
if has_newline {
normalized.push('\n');
}
}
normalized
}
pub(crate) fn normalize_terminal_fragment(line: &str) -> String {
let sanitized = sanitize_terminal_text(line);
latest_terminal_fragment(&sanitized)
.trim_end_matches('\r')
.to_string()
}
pub(crate) fn terminal_fragment_has_pua(line: &str) -> bool {
normalize_terminal_fragment(line).contains(PRIVATE_USE_PLACEHOLDER)
}
pub(crate) fn merge_terminal_prompt_fragments(
lines: &[String],
tail: Option<&str>,
) -> Option<String> {
let mut fragments = Vec::with_capacity(lines.len() + usize::from(tail.is_some()));
for line in lines {
let fragment = normalize_terminal_fragment(line);
let trimmed = fragment.trim();
if !trimmed.is_empty() {
fragments.push(trimmed.to_string());
}
}
if let Some(tail) = tail {
let fragment = normalize_terminal_fragment(tail);
let trimmed = fragment.trim();
if !trimmed.is_empty() {
fragments.push(trimmed.to_string());
}
}
if fragments.is_empty() {
None
} else {
Some(fragments.join(" "))
}
}
#[cfg(test)]
fn build_test_handler() -> DeviceHandler {
let mut dyn_param = HashMap::new();
dyn_param.insert("EnablePassword".to_string(), "secret\n".to_string());
DeviceHandler::new(DeviceHandlerConfig {
prompt: vec![
prompt_rule("Login", &[r"^dev>\s*$"]),
prompt_rule("Enable", &[r"^dev#\s*$"]),
prompt_rule("Config", &[r"^dev\(cfg\)#\s*$"]),
],
write: vec![
input_rule(
"EnablePassword",
true,
"EnablePassword",
true,
&[r"^Password:\s*$"],
),
input_rule("Confirm", false, "y", false, &[r"^\[y\/n\]\?\s*$"]),
],
more_regex: vec![r"^--More--$".to_string()],
error_regex: vec![r"^ERROR: .+$".to_string()],
edges: vec![
transition_rule("Login", "enable", "Enable", false, false),
transition_rule("Enable", "configure terminal", "Config", false, false),
transition_rule("Config", "exit", "Enable", true, false),
transition_rule("Enable", "exit", "Login", true, false),
],
ignore_errors: vec![r"^ERROR: benign$".to_string()],
dyn_param,
..Default::default()
})
.expect("test handler config should be valid")
}