1pub mod bifs;
2pub mod buffer;
3pub mod context;
4mod pty;
5
6use std::collections::HashMap;
7use std::sync::Arc;
8use std::time::Duration;
9use std::time::Instant;
10
11use regex::Regex;
12use regex::RegexBuilder;
13use tokio::sync::Mutex;
14use tokio_util::sync::CancellationToken;
15
16use crate::RuntimeContext;
17use crate::observe::event_log::BufferSnapshot;
18use crate::observe::event_sink::EventSink;
19use crate::report::result::Failure;
20use crate::vm::buffer::BUFFER_TAIL_LEN;
21use crate::vm::buffer::FailPatternHit;
22use crate::vm::buffer::regex_error_summary;
23use crate::vm::context::Captures;
24use crate::vm::context::ExecutionContext;
25use crate::vm::context::FailPattern;
26use crate::vm::pty::PtyShell;
27use relux_core::diagnostics::IrSpan;
28use relux_ir::IrCallExpr;
29use relux_ir::IrExpr;
30use relux_ir::IrFn;
31use relux_ir::IrInterpolation;
32use relux_ir::IrPureFn;
33use relux_ir::IrShellStmt;
34use relux_ir::IrStringPart;
35use relux_ir::Tables;
36
37fn has_interpolation(expr: &IrInterpolation) -> bool {
40 expr.parts().iter().any(|p| {
41 matches!(
42 p,
43 IrStringPart::Var { .. } | IrStringPart::CaptureRef { .. }
44 )
45 })
46}
47
48fn interpolation_template(expr: &IrInterpolation) -> String {
49 expr.parts()
50 .iter()
51 .map(|p| match p {
52 IrStringPart::Literal { value, .. } => value.clone(),
53 IrStringPart::Var { name, .. } => format!("${{{name}}}"),
54 IrStringPart::EscapedDollar { .. } => "$".to_string(),
55 IrStringPart::CaptureRef { index, .. } => format!("${{{index}}}"),
56 })
57 .collect()
58}
59
60async fn interpolate_ir(expr: &IrInterpolation, ctx: &ExecutionContext) -> String {
61 let mut out = String::new();
62 for part in expr.parts() {
63 match part {
64 IrStringPart::Literal { value, .. } => out.push_str(value),
65 IrStringPart::Var { name, .. } => {
66 if let Some(v) = ctx.lookup(name).await {
67 out.push_str(&v);
68 }
69 }
70 IrStringPart::EscapedDollar { .. } => out.push('$'),
71 IrStringPart::CaptureRef { index, .. } => {
72 if let Some(v) = ctx.capture(*index) {
73 out.push_str(&v);
74 }
75 }
76 }
77 }
78 out
79}
80
81pub struct Vm {
84 pty: PtyShell,
85 ctx: ExecutionContext,
86 tables: Tables,
87 pub events: EventSink,
88 shell_prompt: String,
89 pub(crate) cancel: CancellationToken,
90 flaky_timeout_multiplier: f64,
91}
92
93impl Vm {
94 pub async fn new(
95 shell_name: String,
96 ctx: ExecutionContext,
97 rt_ctx: &RuntimeContext,
98 ) -> Result<Self, Failure> {
99 let shell_log = rt_ctx
100 .create_shell_logger(&shell_name)
101 .map_err(|e| Failure::Runtime {
102 message: format!("failed to create shell log: {e}"),
103 span: None,
104 shell: Some(shell_name.clone()),
105 })?;
106 let shell_log = Arc::new(Mutex::new(shell_log));
107
108 let shell_command = rt_ctx.shell.command.to_string();
109 let shell_prompt = rt_ctx.shell.prompt.to_string();
110
111 let pty = PtyShell::spawn(&shell_command, ctx.process_env(), shell_log).map_err(|e| {
112 Failure::Runtime {
113 message: format!("failed to spawn shell: {e}"),
114 span: None,
115 shell: Some(shell_name.clone()),
116 }
117 })?;
118
119 let events = rt_ctx.events.clone();
120 let cancel = rt_ctx.cancel.clone();
121
122 let mut vm = Self {
123 pty,
124 ctx,
125 tables: rt_ctx.tables.clone(),
126 events: events.clone(),
127 shell_prompt,
128 cancel,
129 flaky_timeout_multiplier: rt_ctx.flaky_timeout_multiplier,
130 };
131
132 events.emit_shell_spawn(&shell_name, &shell_command);
133
134 vm.pty
135 .init_prompt(
136 &vm.shell_prompt,
137 vm.ctx
138 .timeout()
139 .adjusted_duration_with_flaky(vm.flaky_timeout_multiplier),
140 )
141 .await
142 .map_err(|_| Failure::Runtime {
143 message: "shell did not produce prompt during init".to_string(),
144 span: None,
145 shell: Some(shell_name),
146 })?;
147
148 vm.events.emit_shell_ready(vm.ctx.current_name());
149
150 Ok(vm)
151 }
152
153 pub fn current_name(&self) -> String {
155 self.ctx.current_name()
156 }
157
158 pub fn reset_for_export(&mut self, new_scope: context::Scope) {
160 self.ctx.reset_for_export(new_scope);
161 }
162
163 pub fn shell_prompt(&self) -> &str {
164 &self.shell_prompt
165 }
166
167 pub async fn exec_stmts(&mut self, stmts: &[IrShellStmt]) -> Result<String, Failure> {
168 let mut last = String::new();
169 for stmt in stmts {
170 if self.cancel.is_cancelled() {
171 return Err(Failure::Cancelled {
172 span: None,
173 shell: Some(self.ctx.current_name().to_string()),
174 });
175 }
176 last = self.exec_stmt(stmt).await?;
177 }
178 self.drain_recv_event().await;
179 Ok(last)
180 }
181
182 async fn drain_recv_event(&mut self) {
183 if let Some(_data) = self.pty.output_buf.drain_recv().await {
184 }
186 }
187
188 async fn emit_interpolation(&mut self, expr: &IrInterpolation, result: &str) {
189 if has_interpolation(expr) {
190 let mut bindings = Vec::new();
191 for part in expr.parts() {
192 match part {
193 IrStringPart::Var { name, .. } => {
194 let value = self.ctx.lookup(name).await.unwrap_or_default();
195 bindings.push((name.clone(), value));
196 }
197 IrStringPart::CaptureRef { index, .. } => {
198 let name = index.to_string();
199 let value = self.ctx.capture(*index).unwrap_or_default();
200 bindings.push((name, value));
201 }
202 _ => {}
203 }
204 }
205 let shell = self.ctx.current_name();
206 self.events
207 .emit_interpolation(&shell, interpolation_template(expr), result, &bindings);
208 }
209 }
210
211 pub async fn exec_stmt(&mut self, stmt: &IrShellStmt) -> Result<String, Failure> {
212 use relux_ir::IrNode;
213 let span = stmt.span().clone();
214 self.drain_recv_event().await;
215 self.check_fail(span.clone()).await?;
216 match stmt {
217 IrShellStmt::Comment { .. } => Ok(String::new()),
218 IrShellStmt::FailRegex {
219 pattern,
220 span: ir_span,
221 } => {
222 let pat = interpolate_ir(pattern, &self.ctx).await;
223 self.emit_interpolation(pattern, &pat).await;
224 let shell = self.ctx.current_name();
225 self.events.emit_fail_pattern_set(&shell, &pat);
226 let re = RegexBuilder::new(&pat)
227 .multi_line(true)
228 .crlf(true)
229 .build()
230 .map_err(|e| Failure::Runtime {
231 message: format!("invalid fail regex: {}", regex_error_summary(&e)),
232 span: Some(ir_span.clone()),
233 shell: Some(self.ctx.current_name().to_string()),
234 })?;
235 let fp = Some(FailPattern::Regex(re));
236 self.ctx.set_fail_pattern(fp);
237 self.check_fail(span).await?;
238 Ok(String::new())
239 }
240 IrShellStmt::FailLiteral { pattern, .. } => {
241 let pat = interpolate_ir(pattern, &self.ctx).await;
242 self.emit_interpolation(pattern, &pat).await;
243 let shell = self.ctx.current_name();
244 self.events.emit_fail_pattern_set(&shell, &pat);
245 let fp = Some(FailPattern::Literal(pat));
246 self.ctx.set_fail_pattern(fp);
247 self.check_fail(span).await?;
248 Ok(String::new())
249 }
250 IrShellStmt::ClearFailPattern { .. } => {
251 let shell = self.ctx.current_name();
252 self.events.emit_fail_pattern_cleared(&shell);
253 self.ctx.set_fail_pattern(None);
254 Ok(String::new())
255 }
256 IrShellStmt::Timeout { timeout, .. } => {
257 let previous = format!("{:?}", self.ctx.timeout());
258 self.ctx.set_timeout(timeout.clone());
259 let new_timeout = format!("{:?}", self.ctx.timeout());
260 let shell = self.ctx.current_name();
261 self.events
262 .emit_timeout_set(&shell, &new_timeout, &previous);
263 Ok(String::new())
264 }
265 IrShellStmt::Let { stmt: let_stmt, .. } => {
266 let value = if let Some(expr) = let_stmt.value() {
267 self.eval_expr(expr).await?
268 } else {
269 String::new()
270 };
271 let shell = self.ctx.current_name();
272 self.events
273 .emit_var_let(&shell, let_stmt.name().name(), &value);
274 self.ctx
275 .let_insert(let_stmt.name().name().to_string(), value.clone());
276 Ok(value)
277 }
278 IrShellStmt::Assign { stmt: assign, .. } => {
279 let value = self.eval_expr(assign.value()).await?;
280 let found = self.ctx.assign(assign.name().name(), value.clone()).await;
281 if !found {
282 return Err(Failure::Runtime {
283 message: format!(
284 "assignment to undeclared variable `{}`",
285 assign.name().name()
286 ),
287 span: Some(assign.name().span().clone()),
288 shell: Some(self.ctx.current_name().to_string()),
289 });
290 }
291 let shell = self.ctx.current_name();
292 self.events
293 .emit_var_assign(&shell, assign.name().name(), &value);
294 Ok(value)
295 }
296 IrShellStmt::Expr { expr, .. } => self.eval_expr(expr).await,
297 IrShellStmt::Send { payload, .. } => {
298 let data = interpolate_ir(payload, &self.ctx).await;
299 self.emit_interpolation(payload, &data).await;
300 self.send_bytes(format!("{data}\n").as_bytes(), span.clone())
301 .await?;
302 let shell = self.ctx.current_name();
303 self.events.emit_send(&shell, &data);
304 Ok(data)
305 }
306 IrShellStmt::SendRaw { payload, .. } => {
307 let data = interpolate_ir(payload, &self.ctx).await;
308 self.emit_interpolation(payload, &data).await;
309 self.send_bytes(data.as_bytes(), span.clone()).await?;
310 let shell = self.ctx.current_name();
311 self.events.emit_send(&shell, &data);
312 Ok(data)
313 }
314 IrShellStmt::MatchLiteral { pattern, .. } => {
315 let timeout = self
316 .ctx
317 .timeout()
318 .adjusted_duration_with_flaky(self.flaky_timeout_multiplier);
319 let pat = interpolate_ir(pattern, &self.ctx).await;
320 self.emit_interpolation(pattern, &pat).await;
321 let shell = self.ctx.current_name();
322 self.events.emit_match_start(&shell, &pat, false);
323 let match_start = Instant::now();
324 let (mat, snapshot) = self.wait_consume_literal(&pat, timeout, span).await?;
325 let shell = self.ctx.current_name();
326 self.events.emit_match_done(
327 &shell,
328 &mat.value.0,
329 match_start.elapsed(),
330 snapshot,
331 None,
332 );
333 Ok(pat)
334 }
335 IrShellStmt::MatchRegex { pattern, .. } => {
336 let timeout = self
337 .ctx
338 .timeout()
339 .adjusted_duration_with_flaky(self.flaky_timeout_multiplier);
340 let pat = interpolate_ir(pattern, &self.ctx).await;
341 self.emit_interpolation(pattern, &pat).await;
342 let re = RegexBuilder::new(&pat)
343 .multi_line(true)
344 .crlf(true)
345 .build()
346 .map_err(|e| Failure::Runtime {
347 message: format!("invalid regex: {}", regex_error_summary(&e)),
348 span: Some(pattern.span().clone()),
349 shell: Some(self.ctx.current_name().to_string()),
350 })?;
351 let shell = self.ctx.current_name();
352 self.events.emit_match_start(&shell, &pat, true);
353 let match_start = Instant::now();
354 let (mat, snapshot) = self
355 .wait_consume_regex(&pat, &re, timeout, span.clone())
356 .await?;
357 let full = mat.value.0.get("0").cloned().unwrap_or_default();
358 let captures = mat.value.0.clone();
359 let shell = self.ctx.current_name();
360 self.events.emit_match_done(
361 &shell,
362 &full,
363 match_start.elapsed(),
364 snapshot,
365 Some(captures.clone()),
366 );
367 self.set_captures_from_map(captures);
368 Ok(full)
369 }
370 IrShellStmt::TimedMatchLiteral {
371 timeout, pattern, ..
372 } => {
373 let dur = timeout.adjusted_duration_with_flaky(self.flaky_timeout_multiplier);
374 let pat = interpolate_ir(pattern, &self.ctx).await;
375 self.emit_interpolation(pattern, &pat).await;
376 let shell = self.ctx.current_name();
377 self.events.emit_match_start(&shell, &pat, false);
378 let match_start = Instant::now();
379 let (mat, snapshot) = self.wait_consume_literal(&pat, dur, span).await?;
380 let shell = self.ctx.current_name();
381 self.events.emit_match_done(
382 &shell,
383 &mat.value.0,
384 match_start.elapsed(),
385 snapshot,
386 None,
387 );
388 Ok(pat)
389 }
390 IrShellStmt::TimedMatchRegex {
391 timeout, pattern, ..
392 } => {
393 let dur = timeout.adjusted_duration_with_flaky(self.flaky_timeout_multiplier);
394 let pat = interpolate_ir(pattern, &self.ctx).await;
395 self.emit_interpolation(pattern, &pat).await;
396 let re = RegexBuilder::new(&pat)
397 .multi_line(true)
398 .crlf(true)
399 .build()
400 .map_err(|e| Failure::Runtime {
401 message: format!("invalid regex: {}", regex_error_summary(&e)),
402 span: Some(pattern.span().clone()),
403 shell: Some(self.ctx.current_name().to_string()),
404 })?;
405 let shell = self.ctx.current_name();
406 self.events.emit_match_start(&shell, &pat, true);
407 let match_start = Instant::now();
408 let (mat, snapshot) = self
409 .wait_consume_regex(&pat, &re, dur, span.clone())
410 .await?;
411 let full = mat.value.0.get("0").cloned().unwrap_or_default();
412 let captures = mat.value.0.clone();
413 let shell = self.ctx.current_name();
414 self.events.emit_match_done(
415 &shell,
416 &full,
417 match_start.elapsed(),
418 snapshot,
419 Some(captures.clone()),
420 );
421 self.set_captures_from_map(captures);
422 Ok(full)
423 }
424 IrShellStmt::BufferReset { .. } => {
425 let snapshot = self.pty.output_buf.snapshot_tail(BUFFER_TAIL_LEN).await;
426 let shell = self.ctx.current_name();
427 self.events.emit_buffer_reset(&shell, snapshot);
428 self.pty.output_buf.clear().await;
429 Ok(String::new())
430 }
431 }
432 }
433
434 fn set_captures_from_map(&mut self, map: HashMap<String, String>) {
435 let mut caps = Captures::new();
436 for (k, v) in map {
437 caps.set(k, v);
438 }
439 self.ctx.set_captures(caps);
440 }
441
442 #[async_recursion::async_recursion]
443 async fn eval_expr(&mut self, expr: &IrExpr) -> Result<String, Failure> {
444 use relux_ir::IrNode;
445 let span = expr.span().clone();
446 self.check_fail(span.clone()).await?;
447 match expr {
448 IrExpr::String { value, .. } => {
449 let result = interpolate_ir(value, &self.ctx).await;
450 self.emit_interpolation(value, &result).await;
451 let shell = self.ctx.current_name();
452 self.events.emit_string_eval(&shell, &result);
453 Ok(result)
454 }
455 IrExpr::Var { name, .. } => Ok(self.ctx.lookup(name).await.unwrap_or_default()),
456 IrExpr::CaptureRef { index, .. } => Ok(self.ctx.capture(*index).unwrap_or_default()),
457 IrExpr::Call { call, .. } => self.eval_call(call, &span).await,
458 }
459 }
460
461 async fn eval_call(&mut self, call: &IrCallExpr, span: &IrSpan) -> Result<String, Failure> {
462 let fn_id = call.resolved().clone();
463 let fn_name = call.name().name().to_string();
464
465 let mut evaluated_args = Vec::with_capacity(call.args().len());
467 for arg in call.args() {
468 evaluated_args.push(self.eval_expr(arg).await?);
469 }
470
471 if let Some(result) = self.tables.fns.get(&fn_id) {
473 let ir_fn = result.as_ref().map_err(|e| Failure::Runtime {
474 message: format!("function resolution failed: {e:?}"),
475 span: Some(span.clone()),
476 shell: Some(self.ctx.current_name().to_string()),
477 })?;
478 match ir_fn {
479 IrFn::UserDefined { params, body, .. } => {
480 let params = params.clone();
481 let body = body.clone();
482 let named_args: Vec<(String, String)> = params
483 .iter()
484 .zip(evaluated_args.iter())
485 .map(|(p, v)| (p.name().to_string(), v.clone()))
486 .collect();
487 let shell = self.ctx.current_name();
488 self.events.emit_fn_enter(&shell, &fn_name, &named_args);
489 let pre_timeout = format!("{:?}", self.ctx.timeout());
490 let pre_fail = self.ctx.fail_pattern().map(|p| format!("{p:?}"));
491 self.ctx
492 .push_call(fn_name.clone(), named_args.into_iter().collect());
493 let mut last = String::new();
494 for stmt in &body {
495 match self.exec_stmt(stmt).await {
496 Ok(v) => last = v,
497 Err(e) => {
498 self.ctx.pop_call();
499 return Err(e);
500 }
501 }
502 }
503 self.ctx.pop_call();
504 let post_timeout = format!("{:?}", self.ctx.timeout());
505 let post_fail = self.ctx.fail_pattern().map(|p| format!("{p:?}"));
506 let shell = self.ctx.current_name();
507 self.events.emit_fn_exit(
508 &shell,
509 &fn_name,
510 &last,
511 if post_timeout != pre_timeout {
512 Some(&post_timeout)
513 } else {
514 None
515 },
516 if post_fail != pre_fail {
517 post_fail.as_deref()
518 } else {
519 None
520 },
521 );
522 return Ok(last);
523 }
524 IrFn::Builtin { name, arity } => {
525 if let Some(bif) = bifs::lookup_impure(name, *arity) {
527 let positional_args: Vec<(String, String)> = evaluated_args
528 .iter()
529 .enumerate()
530 .map(|(i, v)| (format!("${i}"), v.clone()))
531 .collect();
532 let shell = self.ctx.current_name();
533 self.events
534 .emit_fn_enter(&shell, &fn_name, &positional_args);
535 let result = bif.call(self, evaluated_args, span).await;
536 let return_value = result.clone().unwrap_or_default();
537 let shell = self.ctx.current_name();
538 self.events.emit_fn_exit(
539 &shell,
540 &fn_name,
541 &return_value,
542 None::<&str>,
543 None::<&str>,
544 );
545 return result;
546 }
547 }
548 }
549 }
550
551 if let Some(result) = self.tables.pure_fns.get(&fn_id) {
553 let ir_fn = result.as_ref().map_err(|e| Failure::Runtime {
554 message: format!("pure function resolution failed: {e:?}"),
555 span: Some(span.clone()),
556 shell: Some(self.ctx.current_name().to_string()),
557 })?;
558 let named_args: Vec<(String, String)> = match ir_fn {
559 IrPureFn::UserDefined { params, .. } => params
560 .iter()
561 .zip(evaluated_args.iter())
562 .map(|(p, v)| (p.name().to_string(), v.clone()))
563 .collect(),
564 IrPureFn::Builtin { .. } => evaluated_args
565 .iter()
566 .enumerate()
567 .map(|(i, v)| (format!("${i}"), v.clone()))
568 .collect(),
569 };
570 let shell = self.ctx.current_name();
571 self.events.emit_fn_enter(&shell, &fn_name, &named_args);
572 let return_value = relux_ir::evaluator::eval_pure_fn(
573 ir_fn,
574 evaluated_args,
575 &self.ctx.env,
576 &self.tables.pure_fns,
577 );
578 let shell = self.ctx.current_name();
579 self.events
580 .emit_fn_exit(&shell, &fn_name, &return_value, None::<&str>, None::<&str>);
581 return Ok(return_value);
582 }
583
584 Err(Failure::Runtime {
585 message: format!(
586 "undefined function `{}` with arity {}",
587 fn_name,
588 call.args().len()
589 ),
590 span: Some(span.clone()),
591 shell: Some(self.ctx.current_name().to_string()),
592 })
593 }
594
595 pub async fn match_literal(&mut self, pattern: &str, span: &IrSpan) -> Result<String, Failure> {
598 let shell = self.ctx.current_name();
599 self.events.emit_match_start(&shell, pattern, false);
600 let match_start = Instant::now();
601 let timeout = self
602 .ctx
603 .timeout()
604 .adjusted_duration_with_flaky(self.flaky_timeout_multiplier);
605 let (mat, snapshot) = self
606 .wait_consume_literal(pattern, timeout, span.clone())
607 .await?;
608 let shell = self.ctx.current_name();
609 self.events
610 .emit_match_done(&shell, &mat.value.0, match_start.elapsed(), snapshot, None);
611 Ok(pattern.to_string())
612 }
613
614 pub async fn send_line(&mut self, line: &str, span: &IrSpan) -> Result<(), Failure> {
615 self.send_bytes(format!("{line}\n").as_bytes(), span.clone())
616 .await?;
617 let shell = self.ctx.current_name();
618 self.events.emit_send(&shell, line);
619 Ok(())
620 }
621
622 pub async fn send_raw(&mut self, data: &[u8], span: &IrSpan) -> Result<(), Failure> {
623 self.send_bytes(data, span.clone()).await?;
624 let display = data
625 .iter()
626 .map(|b| format!("\\x{b:02x}"))
627 .collect::<String>();
628 let shell = self.ctx.current_name();
629 self.events.emit_send(&shell, &display);
630 Ok(())
631 }
632
633 async fn wait_consume_literal(
636 &self,
637 pattern: &str,
638 timeout: Duration,
639 span: IrSpan,
640 ) -> Result<(buffer::Match<buffer::LiteralMatch>, BufferSnapshot), Failure> {
641 let fut = async {
642 loop {
643 let notified = self.pty.output_buf.notify.notified();
644 let fail_pat = self.ctx.fail_pattern();
645 match self
646 .pty
647 .output_buf
648 .fail_check_consume_literal(pattern, fail_pat)
649 .await
650 {
651 Err(hit) => {
652 return Err(self.make_fail_pattern_error(hit, span.clone()).await);
653 }
654 Ok(Some(result)) => {
655 return Ok::<(buffer::Match<buffer::LiteralMatch>, BufferSnapshot), Failure>(
656 result,
657 );
658 }
659 Ok(None) => {}
660 }
661 tokio::select! {
662 _ = notified => {}
663 _ = self.cancel.cancelled() => {
664 return Err(Failure::Cancelled {
665 span: Some(span.clone()),
666 shell: Some(self.ctx.current_name().to_string()),
667 });
668 }
669 }
670 }
671 };
672
673 match tokio::time::timeout(timeout, fut).await {
674 Ok(result) => result,
675 Err(_) => {
676 let shell = self.ctx.current_name();
677 let buffer = self.pty.output_buf.snapshot_tail(BUFFER_TAIL_LEN).await;
678 self.events.emit_timeout(&shell, pattern, buffer);
679 Err(Failure::MatchTimeout {
680 pattern: pattern.to_string(),
681 span,
682 shell: self.ctx.current_name().to_string(),
683 })
684 }
685 }
686 }
687
688 async fn wait_consume_regex(
689 &self,
690 pattern: &str,
691 re: &Regex,
692 timeout: Duration,
693 span: IrSpan,
694 ) -> Result<(buffer::Match<buffer::RegexMatch>, BufferSnapshot), Failure> {
695 let fut = async {
696 loop {
697 let notified = self.pty.output_buf.notify.notified();
698 let fail_pat = self.ctx.fail_pattern();
699 match self
700 .pty
701 .output_buf
702 .fail_check_consume_regex(re, fail_pat)
703 .await
704 {
705 Err(hit) => {
706 return Err(self.make_fail_pattern_error(hit, span.clone()).await);
707 }
708 Ok(Some(result)) => {
709 return Ok::<(buffer::Match<buffer::RegexMatch>, BufferSnapshot), Failure>(
710 result,
711 );
712 }
713 Ok(None) => {}
714 }
715 tokio::select! {
716 _ = notified => {}
717 _ = self.cancel.cancelled() => {
718 return Err(Failure::Cancelled {
719 span: Some(span.clone()),
720 shell: Some(self.ctx.current_name().to_string()),
721 });
722 }
723 }
724 }
725 };
726
727 match tokio::time::timeout(timeout, fut).await {
728 Ok(result) => result,
729 Err(_) => {
730 let shell = self.ctx.current_name();
731 let buffer = self.pty.output_buf.snapshot_tail(BUFFER_TAIL_LEN).await;
732 self.events.emit_timeout(&shell, pattern, buffer);
733 Err(Failure::MatchTimeout {
734 pattern: pattern.to_string(),
735 span,
736 shell: self.ctx.current_name().to_string(),
737 })
738 }
739 }
740 }
741
742 async fn check_fail(&self, span: IrSpan) -> Result<(), Failure> {
743 let fail_pat = self.ctx.fail_pattern();
744 if let Some(hit) = self.pty.output_buf.check_fail_pattern(fail_pat).await {
745 return Err(self.make_fail_pattern_error(hit, span).await);
746 }
747 Ok(())
748 }
749
750 async fn make_fail_pattern_error(&self, hit: FailPatternHit, span: IrSpan) -> Failure {
751 let shell = self.ctx.current_name();
752 let buffer = self.pty.output_buf.snapshot_tail(BUFFER_TAIL_LEN).await;
753 self.events
754 .emit_fail_pattern_triggered(&shell, &hit.pattern, &hit.matched_text, buffer);
755 Failure::FailPatternMatched {
756 pattern: hit.pattern,
757 matched_line: hit.matched_text,
758 span,
759 shell: self.ctx.current_name().to_string(),
760 }
761 }
762
763 async fn send_bytes(&mut self, data: &[u8], span: IrSpan) -> Result<(), Failure> {
764 self.pty
765 .send_bytes(data)
766 .await
767 .map_err(|e| Failure::ShellExited {
768 shell: self.ctx.current_name().to_string(),
769 exit_code: e.raw_os_error(),
770 span,
771 })
772 }
773
774 pub async fn shutdown(&mut self) {
775 let shell = self.ctx.current_name();
776 self.events.emit_shell_terminate(&shell);
777 self.pty.shutdown().await;
778 }
779}