Skip to main content

relux_runtime/vm/
mod.rs

1pub mod bifs;
2pub mod buffer;
3pub mod context;
4mod pty;
5
6use std::collections::HashMap;
7use std::sync::Arc;
8use std::time::Duration;
9use std::time::Instant;
10
11use regex::Regex;
12use regex::RegexBuilder;
13use tokio::sync::Mutex;
14use tokio_util::sync::CancellationToken;
15
16use crate::RuntimeContext;
17use crate::observe::event_log::BufferSnapshot;
18use crate::observe::event_sink::EventSink;
19use crate::report::result::Failure;
20use crate::vm::buffer::BUFFER_TAIL_LEN;
21use crate::vm::buffer::FailPatternHit;
22use crate::vm::buffer::regex_error_summary;
23use crate::vm::context::Captures;
24use crate::vm::context::ExecutionContext;
25use crate::vm::context::FailPattern;
26use crate::vm::pty::PtyShell;
27use relux_core::diagnostics::IrSpan;
28use relux_ir::IrCallExpr;
29use relux_ir::IrExpr;
30use relux_ir::IrFn;
31use relux_ir::IrInterpolation;
32use relux_ir::IrPureFn;
33use relux_ir::IrShellStmt;
34use relux_ir::IrStringPart;
35use relux_ir::Tables;
36
37// ─── Interpolation helpers ──────────────────────────────────────
38
39fn has_interpolation(expr: &IrInterpolation) -> bool {
40    expr.parts().iter().any(|p| {
41        matches!(
42            p,
43            IrStringPart::Var { .. } | IrStringPart::CaptureRef { .. }
44        )
45    })
46}
47
48fn interpolation_template(expr: &IrInterpolation) -> String {
49    expr.parts()
50        .iter()
51        .map(|p| match p {
52            IrStringPart::Literal { value, .. } => value.clone(),
53            IrStringPart::Var { name, .. } => format!("${{{name}}}"),
54            IrStringPart::EscapedDollar { .. } => "$".to_string(),
55            IrStringPart::CaptureRef { index, .. } => format!("${{{index}}}"),
56        })
57        .collect()
58}
59
60async fn interpolate_ir(expr: &IrInterpolation, ctx: &ExecutionContext) -> String {
61    let mut out = String::new();
62    for part in expr.parts() {
63        match part {
64            IrStringPart::Literal { value, .. } => out.push_str(value),
65            IrStringPart::Var { name, .. } => {
66                if let Some(v) = ctx.lookup(name).await {
67                    out.push_str(&v);
68                }
69            }
70            IrStringPart::EscapedDollar { .. } => out.push('$'),
71            IrStringPart::CaptureRef { index, .. } => {
72                if let Some(v) = ctx.capture(*index) {
73                    out.push_str(&v);
74                }
75            }
76        }
77    }
78    out
79}
80
81// ─── Vm ─────────────────────────────────────────────────────────
82
83pub struct Vm {
84    pty: PtyShell,
85    ctx: ExecutionContext,
86    tables: Tables,
87    pub events: EventSink,
88    shell_prompt: String,
89    pub(crate) cancel: CancellationToken,
90    flaky_timeout_multiplier: f64,
91}
92
93impl Vm {
94    pub async fn new(
95        shell_name: String,
96        ctx: ExecutionContext,
97        rt_ctx: &RuntimeContext,
98    ) -> Result<Self, Failure> {
99        let shell_log = rt_ctx
100            .create_shell_logger(&shell_name)
101            .map_err(|e| Failure::Runtime {
102                message: format!("failed to create shell log: {e}"),
103                span: None,
104                shell: Some(shell_name.clone()),
105            })?;
106        let shell_log = Arc::new(Mutex::new(shell_log));
107
108        let shell_command = rt_ctx.shell.command.to_string();
109        let shell_prompt = rt_ctx.shell.prompt.to_string();
110
111        let pty = PtyShell::spawn(&shell_command, ctx.process_env(), shell_log).map_err(|e| {
112            Failure::Runtime {
113                message: format!("failed to spawn shell: {e}"),
114                span: None,
115                shell: Some(shell_name.clone()),
116            }
117        })?;
118
119        let events = rt_ctx.events.clone();
120        let cancel = rt_ctx.cancel.clone();
121
122        let mut vm = Self {
123            pty,
124            ctx,
125            tables: rt_ctx.tables.clone(),
126            events: events.clone(),
127            shell_prompt,
128            cancel,
129            flaky_timeout_multiplier: rt_ctx.flaky_timeout_multiplier,
130        };
131
132        events.emit_shell_spawn(&shell_name, &shell_command);
133
134        vm.pty
135            .init_prompt(
136                &vm.shell_prompt,
137                vm.ctx
138                    .timeout()
139                    .adjusted_duration_with_flaky(vm.flaky_timeout_multiplier),
140            )
141            .await
142            .map_err(|_| Failure::Runtime {
143                message: "shell did not produce prompt during init".to_string(),
144                span: None,
145                shell: Some(shell_name),
146            })?;
147
148        vm.events.emit_shell_ready(vm.ctx.current_name());
149
150        Ok(vm)
151    }
152
153    /// Current display name for logging (resolves effect chain + alias).
154    pub fn current_name(&self) -> String {
155        self.ctx.current_name()
156    }
157
158    /// Reset the execution context for shell export (effect → test/parent effect).
159    pub fn reset_for_export(&mut self, new_scope: context::Scope) {
160        self.ctx.reset_for_export(new_scope);
161    }
162
163    pub fn shell_prompt(&self) -> &str {
164        &self.shell_prompt
165    }
166
167    pub async fn exec_stmts(&mut self, stmts: &[IrShellStmt]) -> Result<String, Failure> {
168        let mut last = String::new();
169        for stmt in stmts {
170            if self.cancel.is_cancelled() {
171                return Err(Failure::Cancelled {
172                    span: None,
173                    shell: Some(self.ctx.current_name().to_string()),
174                });
175            }
176            last = self.exec_stmt(stmt).await?;
177        }
178        self.drain_recv_event().await;
179        Ok(last)
180    }
181
182    async fn drain_recv_event(&mut self) {
183        if let Some(_data) = self.pty.output_buf.drain_recv().await {
184            // self.events.emit_recv(&self.ctx.current_name(), &data);
185        }
186    }
187
188    async fn emit_interpolation(&mut self, expr: &IrInterpolation, result: &str) {
189        if has_interpolation(expr) {
190            let mut bindings = Vec::new();
191            for part in expr.parts() {
192                match part {
193                    IrStringPart::Var { name, .. } => {
194                        let value = self.ctx.lookup(name).await.unwrap_or_default();
195                        bindings.push((name.clone(), value));
196                    }
197                    IrStringPart::CaptureRef { index, .. } => {
198                        let name = index.to_string();
199                        let value = self.ctx.capture(*index).unwrap_or_default();
200                        bindings.push((name, value));
201                    }
202                    _ => {}
203                }
204            }
205            let shell = self.ctx.current_name();
206            self.events
207                .emit_interpolation(&shell, interpolation_template(expr), result, &bindings);
208        }
209    }
210
211    pub async fn exec_stmt(&mut self, stmt: &IrShellStmt) -> Result<String, Failure> {
212        use relux_ir::IrNode;
213        let span = stmt.span().clone();
214        self.drain_recv_event().await;
215        self.check_fail(span.clone()).await?;
216        match stmt {
217            IrShellStmt::Comment { .. } => Ok(String::new()),
218            IrShellStmt::FailRegex {
219                pattern,
220                span: ir_span,
221            } => {
222                let pat = interpolate_ir(pattern, &self.ctx).await;
223                self.emit_interpolation(pattern, &pat).await;
224                let shell = self.ctx.current_name();
225                self.events.emit_fail_pattern_set(&shell, &pat);
226                let re = RegexBuilder::new(&pat)
227                    .multi_line(true)
228                    .crlf(true)
229                    .build()
230                    .map_err(|e| Failure::Runtime {
231                        message: format!("invalid fail regex: {}", regex_error_summary(&e)),
232                        span: Some(ir_span.clone()),
233                        shell: Some(self.ctx.current_name().to_string()),
234                    })?;
235                let fp = Some(FailPattern::Regex(re));
236                self.ctx.set_fail_pattern(fp);
237                self.check_fail(span).await?;
238                Ok(String::new())
239            }
240            IrShellStmt::FailLiteral { pattern, .. } => {
241                let pat = interpolate_ir(pattern, &self.ctx).await;
242                self.emit_interpolation(pattern, &pat).await;
243                let shell = self.ctx.current_name();
244                self.events.emit_fail_pattern_set(&shell, &pat);
245                let fp = Some(FailPattern::Literal(pat));
246                self.ctx.set_fail_pattern(fp);
247                self.check_fail(span).await?;
248                Ok(String::new())
249            }
250            IrShellStmt::ClearFailPattern { .. } => {
251                let shell = self.ctx.current_name();
252                self.events.emit_fail_pattern_cleared(&shell);
253                self.ctx.set_fail_pattern(None);
254                Ok(String::new())
255            }
256            IrShellStmt::Timeout { timeout, .. } => {
257                let previous = format!("{:?}", self.ctx.timeout());
258                self.ctx.set_timeout(timeout.clone());
259                let new_timeout = format!("{:?}", self.ctx.timeout());
260                let shell = self.ctx.current_name();
261                self.events
262                    .emit_timeout_set(&shell, &new_timeout, &previous);
263                Ok(String::new())
264            }
265            IrShellStmt::Let { stmt: let_stmt, .. } => {
266                let value = if let Some(expr) = let_stmt.value() {
267                    self.eval_expr(expr).await?
268                } else {
269                    String::new()
270                };
271                let shell = self.ctx.current_name();
272                self.events
273                    .emit_var_let(&shell, let_stmt.name().name(), &value);
274                self.ctx
275                    .let_insert(let_stmt.name().name().to_string(), value.clone());
276                Ok(value)
277            }
278            IrShellStmt::Assign { stmt: assign, .. } => {
279                let value = self.eval_expr(assign.value()).await?;
280                let found = self.ctx.assign(assign.name().name(), value.clone()).await;
281                if !found {
282                    return Err(Failure::Runtime {
283                        message: format!(
284                            "assignment to undeclared variable `{}`",
285                            assign.name().name()
286                        ),
287                        span: Some(assign.name().span().clone()),
288                        shell: Some(self.ctx.current_name().to_string()),
289                    });
290                }
291                let shell = self.ctx.current_name();
292                self.events
293                    .emit_var_assign(&shell, assign.name().name(), &value);
294                Ok(value)
295            }
296            IrShellStmt::Expr { expr, .. } => self.eval_expr(expr).await,
297            IrShellStmt::Send { payload, .. } => {
298                let data = interpolate_ir(payload, &self.ctx).await;
299                self.emit_interpolation(payload, &data).await;
300                self.send_bytes(format!("{data}\n").as_bytes(), span.clone())
301                    .await?;
302                let shell = self.ctx.current_name();
303                self.events.emit_send(&shell, &data);
304                Ok(data)
305            }
306            IrShellStmt::SendRaw { payload, .. } => {
307                let data = interpolate_ir(payload, &self.ctx).await;
308                self.emit_interpolation(payload, &data).await;
309                self.send_bytes(data.as_bytes(), span.clone()).await?;
310                let shell = self.ctx.current_name();
311                self.events.emit_send(&shell, &data);
312                Ok(data)
313            }
314            IrShellStmt::MatchLiteral { pattern, .. } => {
315                let timeout = self
316                    .ctx
317                    .timeout()
318                    .adjusted_duration_with_flaky(self.flaky_timeout_multiplier);
319                let pat = interpolate_ir(pattern, &self.ctx).await;
320                self.emit_interpolation(pattern, &pat).await;
321                let shell = self.ctx.current_name();
322                self.events.emit_match_start(&shell, &pat, false);
323                let match_start = Instant::now();
324                let (mat, snapshot) = self.wait_consume_literal(&pat, timeout, span).await?;
325                let shell = self.ctx.current_name();
326                self.events.emit_match_done(
327                    &shell,
328                    &mat.value.0,
329                    match_start.elapsed(),
330                    snapshot,
331                    None,
332                );
333                Ok(pat)
334            }
335            IrShellStmt::MatchRegex { pattern, .. } => {
336                let timeout = self
337                    .ctx
338                    .timeout()
339                    .adjusted_duration_with_flaky(self.flaky_timeout_multiplier);
340                let pat = interpolate_ir(pattern, &self.ctx).await;
341                self.emit_interpolation(pattern, &pat).await;
342                let re = RegexBuilder::new(&pat)
343                    .multi_line(true)
344                    .crlf(true)
345                    .build()
346                    .map_err(|e| Failure::Runtime {
347                        message: format!("invalid regex: {}", regex_error_summary(&e)),
348                        span: Some(pattern.span().clone()),
349                        shell: Some(self.ctx.current_name().to_string()),
350                    })?;
351                let shell = self.ctx.current_name();
352                self.events.emit_match_start(&shell, &pat, true);
353                let match_start = Instant::now();
354                let (mat, snapshot) = self
355                    .wait_consume_regex(&pat, &re, timeout, span.clone())
356                    .await?;
357                let full = mat.value.0.get("0").cloned().unwrap_or_default();
358                let captures = mat.value.0.clone();
359                let shell = self.ctx.current_name();
360                self.events.emit_match_done(
361                    &shell,
362                    &full,
363                    match_start.elapsed(),
364                    snapshot,
365                    Some(captures.clone()),
366                );
367                self.set_captures_from_map(captures);
368                Ok(full)
369            }
370            IrShellStmt::TimedMatchLiteral {
371                timeout, pattern, ..
372            } => {
373                let dur = timeout.adjusted_duration_with_flaky(self.flaky_timeout_multiplier);
374                let pat = interpolate_ir(pattern, &self.ctx).await;
375                self.emit_interpolation(pattern, &pat).await;
376                let shell = self.ctx.current_name();
377                self.events.emit_match_start(&shell, &pat, false);
378                let match_start = Instant::now();
379                let (mat, snapshot) = self.wait_consume_literal(&pat, dur, span).await?;
380                let shell = self.ctx.current_name();
381                self.events.emit_match_done(
382                    &shell,
383                    &mat.value.0,
384                    match_start.elapsed(),
385                    snapshot,
386                    None,
387                );
388                Ok(pat)
389            }
390            IrShellStmt::TimedMatchRegex {
391                timeout, pattern, ..
392            } => {
393                let dur = timeout.adjusted_duration_with_flaky(self.flaky_timeout_multiplier);
394                let pat = interpolate_ir(pattern, &self.ctx).await;
395                self.emit_interpolation(pattern, &pat).await;
396                let re = RegexBuilder::new(&pat)
397                    .multi_line(true)
398                    .crlf(true)
399                    .build()
400                    .map_err(|e| Failure::Runtime {
401                        message: format!("invalid regex: {}", regex_error_summary(&e)),
402                        span: Some(pattern.span().clone()),
403                        shell: Some(self.ctx.current_name().to_string()),
404                    })?;
405                let shell = self.ctx.current_name();
406                self.events.emit_match_start(&shell, &pat, true);
407                let match_start = Instant::now();
408                let (mat, snapshot) = self
409                    .wait_consume_regex(&pat, &re, dur, span.clone())
410                    .await?;
411                let full = mat.value.0.get("0").cloned().unwrap_or_default();
412                let captures = mat.value.0.clone();
413                let shell = self.ctx.current_name();
414                self.events.emit_match_done(
415                    &shell,
416                    &full,
417                    match_start.elapsed(),
418                    snapshot,
419                    Some(captures.clone()),
420                );
421                self.set_captures_from_map(captures);
422                Ok(full)
423            }
424            IrShellStmt::BufferReset { .. } => {
425                let snapshot = self.pty.output_buf.snapshot_tail(BUFFER_TAIL_LEN).await;
426                let shell = self.ctx.current_name();
427                self.events.emit_buffer_reset(&shell, snapshot);
428                self.pty.output_buf.clear().await;
429                Ok(String::new())
430            }
431        }
432    }
433
434    fn set_captures_from_map(&mut self, map: HashMap<String, String>) {
435        let mut caps = Captures::new();
436        for (k, v) in map {
437            caps.set(k, v);
438        }
439        self.ctx.set_captures(caps);
440    }
441
442    #[async_recursion::async_recursion]
443    async fn eval_expr(&mut self, expr: &IrExpr) -> Result<String, Failure> {
444        use relux_ir::IrNode;
445        let span = expr.span().clone();
446        self.check_fail(span.clone()).await?;
447        match expr {
448            IrExpr::String { value, .. } => {
449                let result = interpolate_ir(value, &self.ctx).await;
450                self.emit_interpolation(value, &result).await;
451                let shell = self.ctx.current_name();
452                self.events.emit_string_eval(&shell, &result);
453                Ok(result)
454            }
455            IrExpr::Var { name, .. } => Ok(self.ctx.lookup(name).await.unwrap_or_default()),
456            IrExpr::CaptureRef { index, .. } => Ok(self.ctx.capture(*index).unwrap_or_default()),
457            IrExpr::Call { call, .. } => self.eval_call(call, &span).await,
458        }
459    }
460
461    async fn eval_call(&mut self, call: &IrCallExpr, span: &IrSpan) -> Result<String, Failure> {
462        let fn_id = call.resolved().clone();
463        let fn_name = call.name().name().to_string();
464
465        // Evaluate args first
466        let mut evaluated_args = Vec::with_capacity(call.args().len());
467        for arg in call.args() {
468            evaluated_args.push(self.eval_expr(arg).await?);
469        }
470
471        // Try user-defined function
472        if let Some(result) = self.tables.fns.get(&fn_id) {
473            let ir_fn = result.as_ref().map_err(|e| Failure::Runtime {
474                message: format!("function resolution failed: {e:?}"),
475                span: Some(span.clone()),
476                shell: Some(self.ctx.current_name().to_string()),
477            })?;
478            match ir_fn {
479                IrFn::UserDefined { params, body, .. } => {
480                    let params = params.clone();
481                    let body = body.clone();
482                    let named_args: Vec<(String, String)> = params
483                        .iter()
484                        .zip(evaluated_args.iter())
485                        .map(|(p, v)| (p.name().to_string(), v.clone()))
486                        .collect();
487                    let shell = self.ctx.current_name();
488                    self.events.emit_fn_enter(&shell, &fn_name, &named_args);
489                    let pre_timeout = format!("{:?}", self.ctx.timeout());
490                    let pre_fail = self.ctx.fail_pattern().map(|p| format!("{p:?}"));
491                    self.ctx
492                        .push_call(fn_name.clone(), named_args.into_iter().collect());
493                    let mut last = String::new();
494                    for stmt in &body {
495                        match self.exec_stmt(stmt).await {
496                            Ok(v) => last = v,
497                            Err(e) => {
498                                self.ctx.pop_call();
499                                return Err(e);
500                            }
501                        }
502                    }
503                    self.ctx.pop_call();
504                    let post_timeout = format!("{:?}", self.ctx.timeout());
505                    let post_fail = self.ctx.fail_pattern().map(|p| format!("{p:?}"));
506                    let shell = self.ctx.current_name();
507                    self.events.emit_fn_exit(
508                        &shell,
509                        &fn_name,
510                        &last,
511                        if post_timeout != pre_timeout {
512                            Some(&post_timeout)
513                        } else {
514                            None
515                        },
516                        if post_fail != pre_fail {
517                            post_fail.as_deref()
518                        } else {
519                            None
520                        },
521                    );
522                    return Ok(last);
523                }
524                IrFn::Builtin { name, arity } => {
525                    // Impure builtin
526                    if let Some(bif) = bifs::lookup_impure(name, *arity) {
527                        let positional_args: Vec<(String, String)> = evaluated_args
528                            .iter()
529                            .enumerate()
530                            .map(|(i, v)| (format!("${i}"), v.clone()))
531                            .collect();
532                        let shell = self.ctx.current_name();
533                        self.events
534                            .emit_fn_enter(&shell, &fn_name, &positional_args);
535                        let result = bif.call(self, evaluated_args, span).await;
536                        let return_value = result.clone().unwrap_or_default();
537                        let shell = self.ctx.current_name();
538                        self.events.emit_fn_exit(
539                            &shell,
540                            &fn_name,
541                            &return_value,
542                            None::<&str>,
543                            None::<&str>,
544                        );
545                        return result;
546                    }
547                }
548            }
549        }
550
551        // Try pure function
552        if let Some(result) = self.tables.pure_fns.get(&fn_id) {
553            let ir_fn = result.as_ref().map_err(|e| Failure::Runtime {
554                message: format!("pure function resolution failed: {e:?}"),
555                span: Some(span.clone()),
556                shell: Some(self.ctx.current_name().to_string()),
557            })?;
558            let named_args: Vec<(String, String)> = match ir_fn {
559                IrPureFn::UserDefined { params, .. } => params
560                    .iter()
561                    .zip(evaluated_args.iter())
562                    .map(|(p, v)| (p.name().to_string(), v.clone()))
563                    .collect(),
564                IrPureFn::Builtin { .. } => evaluated_args
565                    .iter()
566                    .enumerate()
567                    .map(|(i, v)| (format!("${i}"), v.clone()))
568                    .collect(),
569            };
570            let shell = self.ctx.current_name();
571            self.events.emit_fn_enter(&shell, &fn_name, &named_args);
572            let return_value = relux_ir::evaluator::eval_pure_fn(
573                ir_fn,
574                evaluated_args,
575                &self.ctx.env,
576                &self.tables.pure_fns,
577            );
578            let shell = self.ctx.current_name();
579            self.events
580                .emit_fn_exit(&shell, &fn_name, &return_value, None::<&str>, None::<&str>);
581            return Ok(return_value);
582        }
583
584        Err(Failure::Runtime {
585            message: format!(
586                "undefined function `{}` with arity {}",
587                fn_name,
588                call.args().len()
589            ),
590            span: Some(span.clone()),
591            shell: Some(self.ctx.current_name().to_string()),
592        })
593    }
594
595    // ─── Public methods for BIFs ────────────────────────────────
596
597    pub async fn match_literal(&mut self, pattern: &str, span: &IrSpan) -> Result<String, Failure> {
598        let shell = self.ctx.current_name();
599        self.events.emit_match_start(&shell, pattern, false);
600        let match_start = Instant::now();
601        let timeout = self
602            .ctx
603            .timeout()
604            .adjusted_duration_with_flaky(self.flaky_timeout_multiplier);
605        let (mat, snapshot) = self
606            .wait_consume_literal(pattern, timeout, span.clone())
607            .await?;
608        let shell = self.ctx.current_name();
609        self.events
610            .emit_match_done(&shell, &mat.value.0, match_start.elapsed(), snapshot, None);
611        Ok(pattern.to_string())
612    }
613
614    pub async fn send_line(&mut self, line: &str, span: &IrSpan) -> Result<(), Failure> {
615        self.send_bytes(format!("{line}\n").as_bytes(), span.clone())
616            .await?;
617        let shell = self.ctx.current_name();
618        self.events.emit_send(&shell, line);
619        Ok(())
620    }
621
622    pub async fn send_raw(&mut self, data: &[u8], span: &IrSpan) -> Result<(), Failure> {
623        self.send_bytes(data, span.clone()).await?;
624        let display = data
625            .iter()
626            .map(|b| format!("\\x{b:02x}"))
627            .collect::<String>();
628        let shell = self.ctx.current_name();
629        self.events.emit_send(&shell, &display);
630        Ok(())
631    }
632
633    // ─── Wait + consume/peek helpers ────────────────────────────
634
635    async fn wait_consume_literal(
636        &self,
637        pattern: &str,
638        timeout: Duration,
639        span: IrSpan,
640    ) -> Result<(buffer::Match<buffer::LiteralMatch>, BufferSnapshot), Failure> {
641        let fut = async {
642            loop {
643                let notified = self.pty.output_buf.notify.notified();
644                let fail_pat = self.ctx.fail_pattern();
645                match self
646                    .pty
647                    .output_buf
648                    .fail_check_consume_literal(pattern, fail_pat)
649                    .await
650                {
651                    Err(hit) => {
652                        return Err(self.make_fail_pattern_error(hit, span.clone()).await);
653                    }
654                    Ok(Some(result)) => {
655                        return Ok::<(buffer::Match<buffer::LiteralMatch>, BufferSnapshot), Failure>(
656                            result,
657                        );
658                    }
659                    Ok(None) => {}
660                }
661                tokio::select! {
662                    _ = notified => {}
663                    _ = self.cancel.cancelled() => {
664                        return Err(Failure::Cancelled {
665                            span: Some(span.clone()),
666                            shell: Some(self.ctx.current_name().to_string()),
667                        });
668                    }
669                }
670            }
671        };
672
673        match tokio::time::timeout(timeout, fut).await {
674            Ok(result) => result,
675            Err(_) => {
676                let shell = self.ctx.current_name();
677                let buffer = self.pty.output_buf.snapshot_tail(BUFFER_TAIL_LEN).await;
678                self.events.emit_timeout(&shell, pattern, buffer);
679                Err(Failure::MatchTimeout {
680                    pattern: pattern.to_string(),
681                    span,
682                    shell: self.ctx.current_name().to_string(),
683                })
684            }
685        }
686    }
687
688    async fn wait_consume_regex(
689        &self,
690        pattern: &str,
691        re: &Regex,
692        timeout: Duration,
693        span: IrSpan,
694    ) -> Result<(buffer::Match<buffer::RegexMatch>, BufferSnapshot), Failure> {
695        let fut = async {
696            loop {
697                let notified = self.pty.output_buf.notify.notified();
698                let fail_pat = self.ctx.fail_pattern();
699                match self
700                    .pty
701                    .output_buf
702                    .fail_check_consume_regex(re, fail_pat)
703                    .await
704                {
705                    Err(hit) => {
706                        return Err(self.make_fail_pattern_error(hit, span.clone()).await);
707                    }
708                    Ok(Some(result)) => {
709                        return Ok::<(buffer::Match<buffer::RegexMatch>, BufferSnapshot), Failure>(
710                            result,
711                        );
712                    }
713                    Ok(None) => {}
714                }
715                tokio::select! {
716                    _ = notified => {}
717                    _ = self.cancel.cancelled() => {
718                        return Err(Failure::Cancelled {
719                            span: Some(span.clone()),
720                            shell: Some(self.ctx.current_name().to_string()),
721                        });
722                    }
723                }
724            }
725        };
726
727        match tokio::time::timeout(timeout, fut).await {
728            Ok(result) => result,
729            Err(_) => {
730                let shell = self.ctx.current_name();
731                let buffer = self.pty.output_buf.snapshot_tail(BUFFER_TAIL_LEN).await;
732                self.events.emit_timeout(&shell, pattern, buffer);
733                Err(Failure::MatchTimeout {
734                    pattern: pattern.to_string(),
735                    span,
736                    shell: self.ctx.current_name().to_string(),
737                })
738            }
739        }
740    }
741
742    async fn check_fail(&self, span: IrSpan) -> Result<(), Failure> {
743        let fail_pat = self.ctx.fail_pattern();
744        if let Some(hit) = self.pty.output_buf.check_fail_pattern(fail_pat).await {
745            return Err(self.make_fail_pattern_error(hit, span).await);
746        }
747        Ok(())
748    }
749
750    async fn make_fail_pattern_error(&self, hit: FailPatternHit, span: IrSpan) -> Failure {
751        let shell = self.ctx.current_name();
752        let buffer = self.pty.output_buf.snapshot_tail(BUFFER_TAIL_LEN).await;
753        self.events
754            .emit_fail_pattern_triggered(&shell, &hit.pattern, &hit.matched_text, buffer);
755        Failure::FailPatternMatched {
756            pattern: hit.pattern,
757            matched_line: hit.matched_text,
758            span,
759            shell: self.ctx.current_name().to_string(),
760        }
761    }
762
763    async fn send_bytes(&mut self, data: &[u8], span: IrSpan) -> Result<(), Failure> {
764        self.pty
765            .send_bytes(data)
766            .await
767            .map_err(|e| Failure::ShellExited {
768                shell: self.ctx.current_name().to_string(),
769                exit_code: e.raw_os_error(),
770                span,
771            })
772    }
773
774    pub async fn shutdown(&mut self) {
775        let shell = self.ctx.current_name();
776        self.events.emit_shell_terminate(&shell);
777        self.pty.shutdown().await;
778    }
779}