rust_expect/session/handle.rs
1//! Session handle for interacting with spawned processes.
2//!
3//! This module provides the main `Session` type that users interact with
4//! to control spawned processes, send input, and expect output.
5
6use std::sync::Arc;
7#[cfg(feature = "screen")]
8use std::sync::{Mutex as StdMutex, MutexGuard};
9use std::time::Duration;
10
11use tokio::io::{AsyncReadExt, AsyncWriteExt};
12use tokio::sync::Mutex;
13
14#[cfg(unix)]
15use crate::backend::{AsyncPty, PtyConfig, PtySpawner};
16#[cfg(windows)]
17use crate::backend::{PtyConfig, PtySpawner, WindowsAsyncPty};
18use crate::config::SessionConfig;
19use crate::dialog::{Dialog, DialogExecutor, DialogResult};
20use crate::error::{ExpectError, Result};
21use crate::expect::{ExpectState, MatchResult, Matcher, Pattern, PatternManager, PatternSet};
22use crate::interact::InteractBuilder;
23#[cfg(feature = "screen")]
24use crate::screen::Screen;
25use crate::types::{ControlChar, Dimensions, Match, ProcessExitStatus, SessionId, SessionState};
26
27/// Callback invoked for every chunk of bytes read from the transport.
28///
29/// Taps observe the raw byte stream as it arrives, after it is appended to the
30/// matcher buffer but before any pattern matching is performed. They are the
31/// foundation for screen emulation, transcript recording, and other features
32/// that need to see output as it happens.
33pub type OutputTap = Arc<dyn Fn(&[u8]) + Send + Sync>;
34
35/// Opaque handle identifying a registered output tap. Returned by
36/// [`Session::add_output_tap`] and accepted by
37/// [`Session::remove_output_tap`].
38///
39/// Backed by `u64`. The id space is large enough that wraparound is not
40/// reachable in practice; the implementation uses a non-wrapping `+= 1`
41/// so a hypothetical exhaustion would surface as a loud panic instead of
42/// silently colliding with a still-registered tap.
43#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
44pub struct TapId(u64);
45
46impl std::fmt::Display for TapId {
47 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
48 write!(f, "tap#{}", self.0)
49 }
50}
51
52/// Lock the screen mutex, recovering from poisoning.
53///
54/// A user-supplied tap (or `Screen::process` panicking on a malformed parse
55/// path) can poison the screen mutex. Silently returning a default on
56/// poisoning makes screen-aware expects look like they always-miss, which
57/// is a confusing failure mode. Recovering via `into_inner` lets the call
58/// continue against the actual screen state — the screen contents are
59/// still valid; only the lock was tainted.
60#[cfg(feature = "screen")]
61fn lock_screen(screen: &Arc<StdMutex<Screen>>) -> MutexGuard<'_, Screen> {
62 match screen.lock() {
63 Ok(g) => g,
64 Err(poison) => {
65 tracing::warn!("screen mutex was poisoned; recovering inner state");
66 poison.into_inner()
67 }
68 }
69}
70
71/// A session handle for interacting with a spawned process.
72///
73/// The session provides methods to send input, expect patterns in output,
74/// and manage the lifecycle of the process.
75pub struct Session<T: AsyncReadExt + AsyncWriteExt + Unpin + Send> {
76 /// The underlying transport (PTY, SSH channel, etc.).
77 transport: Arc<Mutex<T>>,
78 /// Session configuration.
79 config: SessionConfig,
80 /// Pattern matcher.
81 matcher: Matcher,
82 /// Pattern manager for before/after patterns.
83 pattern_manager: PatternManager,
84 /// Current session state.
85 state: SessionState,
86 /// Unique session identifier.
87 id: SessionId,
88 /// EOF flag.
89 eof: bool,
90 /// Output taps invoked on every chunk of bytes read from the transport,
91 /// stored as (id, callback) so they can be removed individually.
92 output_taps: Vec<(TapId, OutputTap)>,
93 /// Monotonic counter for assigning new `TapId`s.
94 next_tap_id: u64,
95 /// Attached virtual terminal screen, fed from an output tap.
96 #[cfg(feature = "screen")]
97 screen: Option<Arc<StdMutex<Screen>>>,
98 /// Tap id used to feed the attached screen, so `detach_screen` can
99 /// remove only that tap and leave user-registered taps in place.
100 #[cfg(feature = "screen")]
101 screen_tap_id: Option<TapId>,
102 /// Poll interval used by the screen-aware expect helpers
103 /// (`expect_screen_contains`, `wait_screen_not_contains`,
104 /// `wait_screen_stable`). 50 ms by default.
105 #[cfg(feature = "screen")]
106 screen_poll_interval: Duration,
107}
108
109impl<T: AsyncReadExt + AsyncWriteExt + Unpin + Send> Session<T> {
110 /// Create a new session with the given transport.
111 pub fn new(transport: T, config: SessionConfig) -> Self {
112 let buffer_size = config.buffer.max_size;
113 let mut matcher = Matcher::new(buffer_size);
114 matcher.set_default_timeout(config.timeout.default);
115 Self {
116 transport: Arc::new(Mutex::new(transport)),
117 config,
118 matcher,
119 pattern_manager: PatternManager::new(),
120 state: SessionState::Starting,
121 id: SessionId::new(),
122 eof: false,
123 output_taps: Vec::new(),
124 next_tap_id: 0,
125 #[cfg(feature = "screen")]
126 screen: None,
127 #[cfg(feature = "screen")]
128 screen_tap_id: None,
129 #[cfg(feature = "screen")]
130 screen_poll_interval: Duration::from_millis(50),
131 }
132 }
133
134 /// Set the polling interval used by the screen-aware expect helpers.
135 ///
136 /// Affects `expect_screen_contains`, `wait_screen_not_contains`, and
137 /// `wait_screen_stable`. Smaller values reduce match latency at the
138 /// cost of CPU; larger values do the opposite. Default is 50 ms.
139 ///
140 /// Available with the `screen` feature.
141 #[cfg(feature = "screen")]
142 pub const fn set_screen_poll_interval(&mut self, interval: Duration) {
143 self.screen_poll_interval = interval;
144 }
145
146 /// Get the current screen-poll interval. Default 50 ms.
147 ///
148 /// Available with the `screen` feature.
149 #[cfg(feature = "screen")]
150 #[must_use]
151 pub const fn screen_poll_interval(&self) -> Duration {
152 self.screen_poll_interval
153 }
154
155 /// Register a callback that will be invoked with every chunk of bytes
156 /// read from the transport.
157 ///
158 /// Taps observe the raw byte stream as it arrives — they receive bytes
159 /// in the same form the underlying process produced them, including any
160 /// ANSI escape sequences. Taps are invoked synchronously inside the read
161 /// loop after the bytes are appended to the matcher buffer; they should
162 /// be cheap and non-blocking. Use a channel if expensive work is required.
163 ///
164 /// Multiple taps may be registered; they are invoked in registration
165 /// order. Taps are dropped when the session is dropped.
166 ///
167 /// # Example
168 ///
169 /// ```ignore
170 /// use std::sync::Arc;
171 /// use std::sync::Mutex;
172 /// let captured: Arc<Mutex<Vec<u8>>> = Arc::new(Mutex::new(Vec::new()));
173 /// let buf = captured.clone();
174 /// session.add_output_tap(move |chunk| {
175 /// buf.lock().unwrap().extend_from_slice(chunk);
176 /// });
177 /// ```
178 pub fn add_output_tap<F>(&mut self, f: F) -> TapId
179 where
180 F: Fn(&[u8]) + Send + Sync + 'static,
181 {
182 let id = TapId(self.next_tap_id);
183 // Plain addition (not wrapping_add): on the astronomically unlikely
184 // event of u64 exhaustion on a single session, we'd rather panic
185 // loudly than silently issue a colliding id.
186 self.next_tap_id += 1;
187 self.output_taps.push((id, Arc::new(f)));
188 id
189 }
190
191 /// Remove a previously registered output tap by its [`TapId`]. Returns
192 /// `true` if a tap was removed, `false` if the id was not registered
193 /// (already removed, or never existed).
194 pub fn remove_output_tap(&mut self, id: TapId) -> bool {
195 let len_before = self.output_taps.len();
196 self.output_taps.retain(|(existing, _)| *existing != id);
197 self.output_taps.len() != len_before
198 }
199
200 /// Iterate the callbacks for all currently registered output taps.
201 ///
202 /// Exposed for instrumentation and inspection only — the read loops in
203 /// [`expect`](Self::expect) and [`interact`](Self::interact) invoke
204 /// these themselves. Returns the callback `Arc`s in registration
205 /// order; ids are intentionally omitted (use
206 /// [`add_output_tap`](Self::add_output_tap)'s return value if you
207 /// need the id).
208 pub fn output_tap_callbacks(&self) -> impl Iterator<Item = &OutputTap> {
209 self.output_taps.iter().map(|(_, cb)| cb)
210 }
211
212 /// Attach a virtual terminal screen to this session.
213 ///
214 /// Creates a [`Screen`](crate::screen::Screen) with the session's
215 /// configured dimensions and registers an output tap that feeds every
216 /// chunk of output into the screen's ANSI parser. The screen is then
217 /// accessible via [`screen()`](Self::screen) and is automatically updated
218 /// whenever output is read from the transport (i.e. inside `expect_*`,
219 /// `wait`, or `wait_screen_stable`).
220 ///
221 /// Repeated calls replace the previous screen.
222 ///
223 /// Available with the `screen` feature.
224 #[cfg(feature = "screen")]
225 pub fn attach_screen(&mut self) {
226 let (cols, rows) = self.config.dimensions;
227 self.attach_screen_with_dims(rows, cols);
228 }
229
230 /// Attach a screen with custom dimensions.
231 ///
232 /// `rows` and `cols` are the screen size in cells. Note that this does
233 /// not resize the PTY itself — use [`resize_pty`](Self::resize_pty) for
234 /// that. The two should normally match, but it can be useful to set a
235 /// larger virtual screen for transcript capture.
236 ///
237 /// Available with the `screen` feature.
238 #[cfg(feature = "screen")]
239 pub fn attach_screen_with_dims(&mut self, rows: u16, cols: u16) {
240 // Replace any previous screen + its tap so we don't leak callbacks.
241 self.detach_screen();
242 let screen = Arc::new(StdMutex::new(Screen::new(rows as usize, cols as usize)));
243 let screen_for_tap = screen.clone();
244 let id = self.add_output_tap(move |chunk| {
245 // Reuse the shared poison-recovery helper so the tap-side and
246 // read-side recovery logic stays in lockstep.
247 lock_screen(&screen_for_tap).process(chunk);
248 });
249 self.screen = Some(screen);
250 self.screen_tap_id = Some(id);
251 }
252
253 /// Detach the currently attached screen, also removing its output tap.
254 /// No-op if no screen is attached. Returns `true` if a screen was
255 /// detached.
256 ///
257 /// Available with the `screen` feature.
258 #[cfg(feature = "screen")]
259 pub fn detach_screen(&mut self) -> bool {
260 if let Some(id) = self.screen_tap_id.take() {
261 self.remove_output_tap(id);
262 }
263 self.screen.take().is_some()
264 }
265
266 /// Get the attached virtual terminal screen, if any.
267 ///
268 /// Returns a shared handle protected by a [`std::sync::Mutex`]. Lock it
269 /// briefly to read screen state — the lock is also taken by the output
270 /// tap on every read, so holding it for long stretches blocks the read
271 /// loop.
272 ///
273 /// Available with the `screen` feature.
274 #[cfg(feature = "screen")]
275 #[must_use]
276 pub const fn screen(&self) -> Option<&Arc<StdMutex<Screen>>> {
277 self.screen.as_ref()
278 }
279
280 /// Get the session ID.
281 #[must_use]
282 pub const fn id(&self) -> &SessionId {
283 &self.id
284 }
285
286 /// Get the current session state.
287 #[must_use]
288 pub const fn state(&self) -> SessionState {
289 self.state
290 }
291
292 /// Get the session configuration.
293 #[must_use]
294 pub const fn config(&self) -> &SessionConfig {
295 &self.config
296 }
297
298 /// Check if EOF has been detected.
299 #[must_use]
300 pub const fn is_eof(&self) -> bool {
301 self.eof
302 }
303
304 /// Get the current buffer contents.
305 #[must_use]
306 pub fn buffer(&mut self) -> String {
307 self.matcher.buffer_str()
308 }
309
310 /// Clear the buffer.
311 pub fn clear_buffer(&mut self) {
312 self.matcher.clear();
313 }
314
315 /// Get the pattern manager for before/after patterns.
316 #[must_use]
317 pub const fn pattern_manager(&self) -> &PatternManager {
318 &self.pattern_manager
319 }
320
321 /// Get mutable access to the pattern manager.
322 pub const fn pattern_manager_mut(&mut self) -> &mut PatternManager {
323 &mut self.pattern_manager
324 }
325
326 /// Set the session state.
327 pub const fn set_state(&mut self, state: SessionState) {
328 self.state = state;
329 }
330
331 /// Send bytes to the process.
332 ///
333 /// # Errors
334 ///
335 /// Returns an error if the write fails.
336 #[allow(clippy::significant_drop_tightening)]
337 pub async fn send(&mut self, data: &[u8]) -> Result<()> {
338 if matches!(self.state, SessionState::Closed | SessionState::Exited(_)) {
339 return Err(ExpectError::SessionClosed);
340 }
341
342 let mut transport = self.transport.lock().await;
343 transport
344 .write_all(data)
345 .await
346 .map_err(|e| ExpectError::io_context("writing to process", e))?;
347 transport
348 .flush()
349 .await
350 .map_err(|e| ExpectError::io_context("flushing process output", e))?;
351 Ok(())
352 }
353
354 /// Send a string to the process.
355 ///
356 /// # Errors
357 ///
358 /// Returns an error if the write fails.
359 pub async fn send_str(&mut self, s: &str) -> Result<()> {
360 self.send(s.as_bytes()).await
361 }
362
363 /// Send a line to the process (appends newline based on config).
364 ///
365 /// # Errors
366 ///
367 /// Returns an error if the write fails.
368 pub async fn send_line(&mut self, line: &str) -> Result<()> {
369 let line_ending = self.config.line_ending.as_str();
370 let data = format!("{line}{line_ending}");
371 self.send(data.as_bytes()).await
372 }
373
374 /// Send a control character to the process.
375 ///
376 /// # Errors
377 ///
378 /// Returns an error if the write fails.
379 pub async fn send_control(&mut self, ctrl: ControlChar) -> Result<()> {
380 self.send(&[ctrl.as_byte()]).await
381 }
382
383 /// Send a Shift+Tab keystroke.
384 ///
385 /// Sends the xterm "back tab" sequence `\x1b[Z` (CSI Z). Most TUIs use
386 /// this to cycle a focused-element ring backwards or, in Claude Code's
387 /// case, to cycle permission modes. Compatible with both plain xterm
388 /// and the kitty keyboard protocol's CSI-u fallback mode.
389 ///
390 /// # Errors
391 ///
392 /// Returns an error if the write fails.
393 pub async fn send_shift_tab(&mut self) -> Result<()> {
394 self.send(b"\x1b[Z").await
395 }
396
397 /// Send text using bracketed paste mode (DECSET 2004).
398 ///
399 /// Wraps the content in `\x1b[200~` and `\x1b[201~` markers. Applications
400 /// that have enabled bracketed paste treat the enclosed content as
401 /// pasted input rather than typed input — this suppresses autocomplete,
402 /// command-history scanning, and per-character interpretation such as a
403 /// leading `/` triggering a slash-command popup. Safe to call even when
404 /// the receiver hasn't enabled bracketed paste: most terminals ignore
405 /// the markers and deliver the inner text as-is.
406 ///
407 /// # Errors
408 ///
409 /// Returns an error if the write fails or if `text` contains the
410 /// closing paste marker `\x1b[201~`, which would let the receiver drop
411 /// out of paste mode mid-payload. Callers that want to send such bytes
412 /// should write them through the regular [`send`](Self::send) path.
413 pub async fn send_paste(&mut self, text: &str) -> Result<()> {
414 if memchr::memmem::find(text.as_bytes(), b"\x1b[201~").is_some() {
415 return Err(ExpectError::InvalidInput {
416 api: "send_paste".to_string(),
417 reason:
418 "input contains the bracketed-paste end marker (\\x1b[201~); use send() for raw bytes that include this sequence"
419 .to_string(),
420 });
421 }
422 self.send(b"\x1b[200~").await?;
423 self.send(text.as_bytes()).await?;
424 self.send(b"\x1b[201~").await
425 }
426
427 /// Expect a pattern in the output.
428 ///
429 /// Blocks until the pattern is matched, EOF is detected, or timeout occurs.
430 ///
431 /// # Errors
432 ///
433 /// Returns an error on timeout, EOF (if not expected), or I/O error.
434 pub async fn expect(&mut self, pattern: impl Into<Pattern>) -> Result<Match> {
435 let patterns = PatternSet::from_patterns(vec![pattern.into()]);
436 self.expect_any(&patterns).await
437 }
438
439 /// Expect any of the given patterns.
440 ///
441 /// # Errors
442 ///
443 /// Returns an error on timeout, EOF (if not expected), or I/O error.
444 pub async fn expect_any(&mut self, patterns: &PatternSet) -> Result<Match> {
445 let timeout = self.matcher.get_timeout(patterns);
446 let state = ExpectState::new(patterns.clone(), timeout);
447
448 loop {
449 // Check before patterns first
450 if let Some((_, action)) = self
451 .pattern_manager
452 .check_before(&self.matcher.buffer_str())
453 {
454 match action {
455 crate::expect::HandlerAction::Continue => {}
456 crate::expect::HandlerAction::Return(s) => {
457 return Ok(Match::new(0, s, String::new(), self.matcher.buffer_str()));
458 }
459 crate::expect::HandlerAction::Abort(msg) => {
460 return Err(ExpectError::PatternNotFound {
461 pattern: msg,
462 buffer: self.matcher.buffer_str(),
463 });
464 }
465 crate::expect::HandlerAction::Respond(s) => {
466 self.send_str(&s).await?;
467 }
468 }
469 }
470
471 // Check for pattern match
472 if let Some(result) = self.matcher.try_match_any(patterns) {
473 return Ok(self.matcher.consume_match(&result));
474 }
475
476 // Check for timeout
477 if state.is_timed_out() {
478 return Err(ExpectError::Timeout {
479 duration: timeout,
480 pattern: patterns
481 .iter()
482 .next()
483 .map(|p| p.pattern.as_str().to_string())
484 .unwrap_or_default(),
485 buffer: self.matcher.buffer_str(),
486 });
487 }
488
489 // Check for EOF
490 if self.eof {
491 if state.expects_eof() {
492 return Ok(Match::new(
493 0,
494 String::new(),
495 self.matcher.buffer_str(),
496 String::new(),
497 ));
498 }
499 return Err(ExpectError::Eof {
500 buffer: self.matcher.buffer_str(),
501 });
502 }
503
504 // Read more data
505 self.read_with_timeout(state.remaining_time()).await?;
506 }
507 }
508
509 /// Expect with a specific timeout.
510 ///
511 /// # Errors
512 ///
513 /// Returns an error on timeout, EOF, or I/O error.
514 pub async fn expect_timeout(
515 &mut self,
516 pattern: impl Into<Pattern>,
517 timeout: Duration,
518 ) -> Result<Match> {
519 let pattern = pattern.into();
520 let mut patterns = PatternSet::new();
521 patterns.add(pattern).add(Pattern::timeout(timeout));
522 self.expect_any(&patterns).await
523 }
524
525 /// Wait until the attached screen contains the given substring.
526 ///
527 /// Drives reads from the transport in short increments, checking the
528 /// rendered screen text after each. Returns successfully as soon as
529 /// `needle` appears in the screen text, or with [`ExpectError::Timeout`]
530 /// when `timeout` elapses without a match. Returns [`ExpectError::Eof`]
531 /// if the process exits before the substring appears.
532 ///
533 /// This is the screen-aware counterpart to [`expect`](Self::expect): use
534 /// it when the byte stream is full of ANSI escape sequences (e.g. when
535 /// driving a TUI), where literal substring matching on the byte stream
536 /// would fail because of interleaved cursor positioning and SGR codes.
537 ///
538 /// Requires an attached screen — call [`attach_screen`](Self::attach_screen)
539 /// first.
540 ///
541 /// # Errors
542 ///
543 /// Returns an error if no screen is attached, the timeout expires, EOF
544 /// is reached, or an I/O error occurs.
545 ///
546 /// Available with the `screen` feature.
547 #[cfg(feature = "screen")]
548 pub async fn expect_screen_contains(&mut self, needle: &str, timeout: Duration) -> Result<()> {
549 let Some(screen) = self.screen.clone() else {
550 return Err(ExpectError::ScreenNotAttached);
551 };
552
553 let start = tokio::time::Instant::now();
554 let poll = self.screen_poll_interval;
555
556 loop {
557 if lock_screen(&screen).query().contains(needle) {
558 return Ok(());
559 }
560 if self.eof {
561 return Err(ExpectError::Eof {
562 buffer: lock_screen(&screen).text(),
563 });
564 }
565 let elapsed = start.elapsed();
566 if elapsed >= timeout {
567 return Err(ExpectError::Timeout {
568 duration: timeout,
569 pattern: needle.to_string(),
570 buffer: lock_screen(&screen).text(),
571 });
572 }
573 let remaining = timeout.saturating_sub(elapsed);
574 self.read_with_timeout(poll.min(remaining)).await?;
575 }
576 }
577
578 /// Wait until the attached screen no longer contains the given substring.
579 ///
580 /// The inverse of [`expect_screen_contains`](Self::expect_screen_contains).
581 /// Returns successfully as soon as `needle` is absent from the rendered
582 /// screen, or with [`ExpectError::Timeout`] when `timeout` elapses with
583 /// the substring still present. EOF is treated as "absent" (the screen
584 /// state is frozen at the final paint).
585 ///
586 /// Useful for anchoring on the *disappearance* of an indicator —
587 /// e.g. waiting for a "request in flight" status to clear, a spinner
588 /// glyph to stop, or a modal to close.
589 ///
590 /// Requires an attached screen.
591 ///
592 /// # Errors
593 ///
594 /// Returns an error if no screen is attached, the timeout expires while
595 /// the substring is still visible, or an I/O error occurs.
596 ///
597 /// Available with the `screen` feature.
598 #[cfg(feature = "screen")]
599 pub async fn wait_screen_not_contains(
600 &mut self,
601 needle: &str,
602 timeout: Duration,
603 ) -> Result<()> {
604 let Some(screen) = self.screen.clone() else {
605 return Err(ExpectError::ScreenNotAttached);
606 };
607
608 let start = tokio::time::Instant::now();
609 let poll = self.screen_poll_interval;
610
611 loop {
612 if !lock_screen(&screen).query().contains(needle) {
613 return Ok(());
614 }
615 if self.eof {
616 return Ok(());
617 }
618 let elapsed = start.elapsed();
619 if elapsed >= timeout {
620 return Err(ExpectError::Timeout {
621 duration: timeout,
622 pattern: format!("!{needle}"),
623 buffer: lock_screen(&screen).text(),
624 });
625 }
626 let remaining = timeout.saturating_sub(elapsed);
627 self.read_with_timeout(poll.min(remaining)).await?;
628 }
629 }
630
631 /// Wait until the attached screen has been unchanged for `quiet_period`.
632 ///
633 /// Drives reads in short increments and tracks whether the rendered
634 /// screen text changes between reads. Returns successfully when the
635 /// screen has been quiescent for `quiet_period`, or with
636 /// [`ExpectError::Timeout`] if `max_wait` elapses first.
637 ///
638 /// Useful as a generic "wait for the TUI to finish drawing" primitive
639 /// when no specific anchor is available — for example, after submitting
640 /// a prompt and before reading the response.
641 ///
642 /// A small `quiet_period` (e.g. 100-300 ms) catches paint completion;
643 /// a larger one (1-2 s) waits out streaming responses with mid-stream
644 /// pauses. Tune to the specific application.
645 ///
646 /// Requires an attached screen.
647 ///
648 /// # Errors
649 ///
650 /// Returns an error if no screen is attached, `max_wait` elapses, or an
651 /// I/O error occurs. EOF is **not** an error — if the process exits, the
652 /// final screen state is considered stable and the method returns Ok.
653 ///
654 /// Available with the `screen` feature.
655 #[cfg(feature = "screen")]
656 pub async fn wait_screen_stable(
657 &mut self,
658 quiet_period: Duration,
659 max_wait: Duration,
660 ) -> Result<()> {
661 let Some(screen) = self.screen.clone() else {
662 return Err(ExpectError::ScreenNotAttached);
663 };
664
665 let start = tokio::time::Instant::now();
666 let poll = self.screen_poll_interval;
667 let mut last_revision = lock_screen(&screen).revision();
668 let mut last_change = tokio::time::Instant::now();
669
670 loop {
671 if last_change.elapsed() >= quiet_period {
672 return Ok(());
673 }
674 if self.eof {
675 return Ok(());
676 }
677 if start.elapsed() >= max_wait {
678 return Err(ExpectError::Timeout {
679 duration: max_wait,
680 pattern: "<screen stability>".to_string(),
681 buffer: lock_screen(&screen).text(),
682 });
683 }
684 self.read_with_timeout(poll).await?;
685 let current_revision = lock_screen(&screen).revision();
686 if current_revision != last_revision {
687 last_revision = current_revision;
688 last_change = tokio::time::Instant::now();
689 }
690 }
691 }
692
693 /// Read data from the transport with timeout.
694 async fn read_with_timeout(&mut self, timeout: Duration) -> Result<usize> {
695 let mut buf = [0u8; 4096];
696 let mut transport = self.transport.lock().await;
697
698 match tokio::time::timeout(timeout, transport.read(&mut buf)).await {
699 Ok(Ok(0)) => {
700 self.eof = true;
701 Ok(0)
702 }
703 Ok(Ok(n)) => {
704 self.matcher.append(&buf[..n]);
705 // Run taps in catch_unwind so a panicking user callback can't
706 // unwind across our await boundary or poison subsequent taps.
707 // We log and continue rather than propagate — taps are
708 // observers, not error sources.
709 for (id, tap) in &self.output_taps {
710 let tap_clone = tap.clone();
711 let chunk = &buf[..n];
712 let result =
713 std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| tap_clone(chunk)));
714 if result.is_err() {
715 tracing::warn!(
716 %id,
717 "output tap panicked; the panic was caught and other taps continue"
718 );
719 }
720 }
721 Ok(n)
722 }
723 Ok(Err(e)) => {
724 // On Linux, reading from PTY master returns EIO when the slave is closed
725 // (i.e., the child process has terminated). Treat this as EOF.
726 // See: https://bugs.python.org/issue5380
727 if is_pty_eof_error(&e) {
728 self.eof = true;
729 Ok(0)
730 } else {
731 Err(ExpectError::io_context("reading from process", e))
732 }
733 }
734 Err(_) => {
735 // Timeout, but not an error - caller will handle
736 Ok(0)
737 }
738 }
739 }
740
741 /// Wait for the process to exit.
742 ///
743 /// This method blocks until EOF is detected on the session, which typically
744 /// happens when the child process terminates.
745 ///
746 /// # Warning
747 ///
748 /// This method has no timeout and may block indefinitely if the process
749 /// does not exit. Consider using [`wait_timeout`](Self::wait_timeout) or
750 /// [`expect_eof_timeout`](Self::expect_eof_timeout) for bounded waits.
751 ///
752 /// # Errors
753 ///
754 /// Returns an error if waiting fails due to I/O error.
755 pub async fn wait(&mut self) -> Result<ProcessExitStatus> {
756 // Read until EOF
757 while !self.eof {
758 if self.read_with_timeout(Duration::from_millis(100)).await? == 0 && !self.eof {
759 tokio::time::sleep(Duration::from_millis(10)).await;
760 }
761 }
762
763 // Return unknown status - actual status depends on backend
764 self.state = SessionState::Exited(ProcessExitStatus::Unknown);
765 Ok(ProcessExitStatus::Unknown)
766 }
767
768 /// Wait for the process to exit with a timeout.
769 ///
770 /// Like [`wait`](Self::wait), but with a maximum duration to wait.
771 ///
772 /// # Errors
773 ///
774 /// Returns an error if:
775 /// - The timeout expires before the process exits
776 /// - An I/O error occurs while waiting
777 pub async fn wait_timeout(&mut self, timeout: Duration) -> Result<ProcessExitStatus> {
778 let deadline = tokio::time::Instant::now() + timeout;
779
780 while !self.eof {
781 let remaining = deadline.saturating_duration_since(tokio::time::Instant::now());
782 if remaining.is_zero() {
783 return Err(ExpectError::timeout(
784 timeout,
785 "<EOF>",
786 self.matcher.buffer_str(),
787 ));
788 }
789
790 // Use smaller of remaining time or 100ms for polling
791 let poll_timeout = remaining.min(Duration::from_millis(100));
792 if self.read_with_timeout(poll_timeout).await? == 0 && !self.eof {
793 tokio::time::sleep(Duration::from_millis(10)).await;
794 }
795 }
796
797 self.state = SessionState::Exited(ProcessExitStatus::Unknown);
798 Ok(ProcessExitStatus::Unknown)
799 }
800
801 /// Check if a pattern matches immediately without blocking.
802 #[must_use]
803 pub fn check(&mut self, pattern: &Pattern) -> Option<MatchResult> {
804 self.matcher.try_match(pattern)
805 }
806
807 /// Get the underlying transport.
808 ///
809 /// Use with caution as direct access bypasses session management.
810 #[must_use]
811 pub const fn transport(&self) -> &Arc<Mutex<T>> {
812 &self.transport
813 }
814
815 /// Start an interactive session with pattern hooks.
816 ///
817 /// This returns a builder that allows you to configure pattern-based
818 /// callbacks that fire when patterns match in the output or input.
819 ///
820 /// # Example
821 ///
822 /// ```no_run
823 /// use rust_expect::{Session, InteractAction};
824 ///
825 /// #[tokio::main]
826 /// async fn main() -> Result<(), rust_expect::ExpectError> {
827 /// let mut session = Session::spawn("/bin/bash", &[]).await?;
828 ///
829 /// session.interact()
830 /// .on_output("password:", |ctx| {
831 /// ctx.send("my_password\n")
832 /// })
833 /// .on_output("logout", |_| {
834 /// InteractAction::Stop
835 /// })
836 /// .start()
837 /// .await?;
838 ///
839 /// Ok(())
840 /// }
841 /// ```
842 #[must_use]
843 pub fn interact(&self) -> InteractBuilder<'_, T>
844 where
845 T: 'static,
846 {
847 // Snapshot the currently registered output taps so the interact
848 // read loop can fire them on every chunk. Without this, attached
849 // screens and transcript recorders would silently freeze for the
850 // duration of interact().
851 let taps: Vec<OutputTap> = self
852 .output_taps
853 .iter()
854 .map(|(_, tap)| tap.clone())
855 .collect();
856 InteractBuilder::new(&self.transport, taps)
857 }
858
859 /// Run a dialog on this session.
860 ///
861 /// A dialog is a predefined sequence of expect/send operations.
862 /// This method executes the dialog and returns the result.
863 ///
864 /// # Example
865 ///
866 /// ```no_run
867 /// use rust_expect::{Session, Dialog, DialogStep};
868 ///
869 /// #[tokio::main]
870 /// async fn main() -> Result<(), rust_expect::ExpectError> {
871 /// let mut session = Session::spawn("/bin/bash", &[]).await?;
872 ///
873 /// let dialog = Dialog::named("shell_test")
874 /// .step(DialogStep::new("prompt")
875 /// .with_expect("$")
876 /// .with_send("echo hello\n"))
877 /// .step(DialogStep::new("verify")
878 /// .with_expect("hello"));
879 ///
880 /// let result = session.run_dialog(&dialog).await?;
881 /// assert!(result.success);
882 /// Ok(())
883 /// }
884 /// ```
885 ///
886 /// # Errors
887 ///
888 /// Returns an error if I/O fails. Step-level timeouts are reported
889 /// in the `DialogResult` rather than as errors.
890 pub async fn run_dialog(&mut self, dialog: &Dialog) -> Result<DialogResult> {
891 let executor = DialogExecutor::default();
892 executor.execute(self, dialog).await
893 }
894
895 /// Run a dialog with a custom executor.
896 ///
897 /// This allows customizing the executor settings (max steps, default timeout).
898 ///
899 /// # Errors
900 ///
901 /// Returns an error if I/O fails.
902 pub async fn run_dialog_with(
903 &mut self,
904 dialog: &Dialog,
905 executor: &DialogExecutor,
906 ) -> Result<DialogResult> {
907 executor.execute(self, dialog).await
908 }
909
910 /// Expect end-of-file (process termination).
911 ///
912 /// This is a convenience method for waiting until the process terminates
913 /// and closes its output stream.
914 ///
915 /// # Example
916 ///
917 /// ```no_run
918 /// use rust_expect::Session;
919 ///
920 /// #[tokio::main]
921 /// async fn main() -> Result<(), rust_expect::ExpectError> {
922 /// let mut session = Session::spawn("echo", &["hello"]).await?;
923 /// session.expect("hello").await?;
924 /// session.expect_eof().await?;
925 /// Ok(())
926 /// }
927 /// ```
928 ///
929 /// # Errors
930 ///
931 /// Returns an error if the session times out before EOF or an I/O error occurs.
932 pub async fn expect_eof(&mut self) -> Result<Match> {
933 self.expect(Pattern::eof()).await
934 }
935
936 /// Expect end-of-file with a specific timeout.
937 ///
938 /// # Errors
939 ///
940 /// Returns an error if the session times out before EOF or an I/O error occurs.
941 pub async fn expect_eof_timeout(&mut self, timeout: Duration) -> Result<Match> {
942 let mut patterns = PatternSet::new();
943 patterns.add(Pattern::eof()).add(Pattern::timeout(timeout));
944 self.expect_any(&patterns).await
945 }
946
947 /// Run a batch of commands, waiting for the prompt after each.
948 ///
949 /// This is a convenience method for executing multiple shell commands
950 /// in sequence. For each command, it sends the command line and waits
951 /// for the prompt pattern to appear.
952 ///
953 /// # Example
954 ///
955 /// ```no_run
956 /// use rust_expect::{Session, Pattern};
957 ///
958 /// #[tokio::main]
959 /// async fn main() -> Result<(), rust_expect::ExpectError> {
960 /// let mut session = Session::spawn("/bin/bash", &[]).await?;
961 /// session.expect(Pattern::shell_prompt()).await?;
962 ///
963 /// // Run a batch of commands
964 /// let results = session.run_script(
965 /// &["pwd", "whoami", "date"],
966 /// Pattern::shell_prompt(),
967 /// ).await?;
968 ///
969 /// for result in &results {
970 /// println!("Output: {}", result.before.trim());
971 /// }
972 ///
973 /// Ok(())
974 /// }
975 /// ```
976 ///
977 /// # Errors
978 ///
979 /// Returns an error if any command times out or I/O fails.
980 /// On error, partial results are lost; consider using [`Self::run_script_with_results`]
981 /// if you need to capture partial results on failure.
982 pub async fn run_script<I, S>(&mut self, commands: I, prompt: Pattern) -> Result<Vec<Match>>
983 where
984 I: IntoIterator<Item = S>,
985 S: AsRef<str>,
986 {
987 let mut results = Vec::new();
988
989 for cmd in commands {
990 self.send_line(cmd.as_ref()).await?;
991 let result = self.expect(prompt.clone()).await?;
992 results.push(result);
993 }
994
995 Ok(results)
996 }
997
998 /// Run a batch of commands with a specific timeout per command.
999 ///
1000 /// Like [`run_script`](Self::run_script), but applies the given timeout
1001 /// to each command individually.
1002 ///
1003 /// # Errors
1004 ///
1005 /// Returns an error if any command times out or I/O fails.
1006 pub async fn run_script_timeout<I, S>(
1007 &mut self,
1008 commands: I,
1009 prompt: Pattern,
1010 timeout: Duration,
1011 ) -> Result<Vec<Match>>
1012 where
1013 I: IntoIterator<Item = S>,
1014 S: AsRef<str>,
1015 {
1016 let mut results = Vec::new();
1017
1018 for cmd in commands {
1019 self.send_line(cmd.as_ref()).await?;
1020 let result = self.expect_timeout(prompt.clone(), timeout).await?;
1021 results.push(result);
1022 }
1023
1024 Ok(results)
1025 }
1026
1027 /// Run a batch of commands, collecting results even on failure.
1028 ///
1029 /// Unlike [`run_script`](Self::run_script), this method continues
1030 /// collecting results and returns them along with any error that occurred.
1031 ///
1032 /// # Returns
1033 ///
1034 /// A tuple of `(results, error)` where:
1035 /// - `results` contains the matches for successfully completed commands
1036 /// - `error` is `Some(err)` if an error occurred, `None` if all commands succeeded
1037 ///
1038 /// # Example
1039 ///
1040 /// ```no_run
1041 /// use rust_expect::{Session, Pattern};
1042 ///
1043 /// #[tokio::main]
1044 /// async fn main() -> Result<(), rust_expect::ExpectError> {
1045 /// let mut session = Session::spawn("/bin/bash", &[]).await?;
1046 /// session.expect(Pattern::shell_prompt()).await?;
1047 ///
1048 /// let (results, error) = session.run_script_with_results(
1049 /// &["pwd", "bad_command", "date"],
1050 /// Pattern::shell_prompt(),
1051 /// ).await;
1052 ///
1053 /// println!("Completed {} commands", results.len());
1054 /// if let Some(e) = error {
1055 /// eprintln!("Script failed at command {}: {}", results.len(), e);
1056 /// }
1057 ///
1058 /// Ok(())
1059 /// }
1060 /// ```
1061 pub async fn run_script_with_results<I, S>(
1062 &mut self,
1063 commands: I,
1064 prompt: Pattern,
1065 ) -> (Vec<Match>, Option<ExpectError>)
1066 where
1067 I: IntoIterator<Item = S>,
1068 S: AsRef<str>,
1069 {
1070 let mut results = Vec::new();
1071
1072 for cmd in commands {
1073 match self.send_line(cmd.as_ref()).await {
1074 Ok(()) => {}
1075 Err(e) => return (results, Some(e)),
1076 }
1077
1078 match self.expect(prompt.clone()).await {
1079 Ok(result) => results.push(result),
1080 Err(e) => return (results, Some(e)),
1081 }
1082 }
1083
1084 (results, None)
1085 }
1086}
1087
1088impl<T: AsyncReadExt + AsyncWriteExt + Unpin + Send> std::fmt::Debug for Session<T> {
1089 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1090 f.debug_struct("Session")
1091 .field("id", &self.id)
1092 .field("state", &self.state)
1093 .field("eof", &self.eof)
1094 .finish_non_exhaustive()
1095 }
1096}
1097
1098// Unix-specific spawn implementation
1099#[cfg(unix)]
1100impl Session<AsyncPty> {
1101 /// Spawn a new process with the given command.
1102 ///
1103 /// This creates a new PTY, forks a child process, and returns a Session
1104 /// connected to the child's terminal.
1105 ///
1106 /// # Example
1107 ///
1108 /// ```no_run
1109 /// use rust_expect::Session;
1110 ///
1111 /// #[tokio::main]
1112 /// async fn main() -> Result<(), rust_expect::ExpectError> {
1113 /// let mut session = Session::spawn("/bin/bash", &[]).await?;
1114 /// session.expect("$").await?;
1115 /// session.send_line("echo hello").await?;
1116 /// session.expect("hello").await?;
1117 /// Ok(())
1118 /// }
1119 /// ```
1120 ///
1121 /// # Errors
1122 ///
1123 /// Returns an error if:
1124 /// - The command contains null bytes
1125 /// - PTY allocation fails
1126 /// - Fork fails
1127 /// - The command cannot be executed
1128 pub async fn spawn(command: &str, args: &[&str]) -> Result<Self> {
1129 Self::spawn_with_config(command, args, SessionConfig::default()).await
1130 }
1131
1132 /// Spawn a new process with custom configuration.
1133 ///
1134 /// # Errors
1135 ///
1136 /// Returns an error if spawning fails.
1137 pub async fn spawn_with_config(
1138 command: &str,
1139 args: &[&str],
1140 config: SessionConfig,
1141 ) -> Result<Self> {
1142 let pty_config = PtyConfig::from(&config);
1143 let spawner = PtySpawner::with_config(pty_config);
1144
1145 // Convert &[&str] to Vec<String> for the spawner
1146 let args_owned: Vec<String> = args.iter().map(|s| (*s).to_string()).collect();
1147
1148 // Spawn the process
1149 let handle = spawner.spawn(command, &args_owned).await?;
1150
1151 // Wrap in AsyncPty for async I/O
1152 let async_pty = AsyncPty::from_handle(handle)
1153 .map_err(|e| ExpectError::io_context("creating async PTY wrapper", e))?;
1154
1155 // Create the session
1156 let mut session = Self::new(async_pty, config);
1157 session.state = SessionState::Running;
1158
1159 Ok(session)
1160 }
1161
1162 /// Get the child process ID.
1163 #[must_use]
1164 pub fn pid(&self) -> u32 {
1165 // We need to access the inner transport's pid
1166 // For now, use the blocking lock since we know it's not contended
1167 // during a sync call like this
1168 if let Ok(transport) = self.transport.try_lock() {
1169 transport.pid()
1170 } else {
1171 0
1172 }
1173 }
1174
1175 /// Resize the terminal.
1176 ///
1177 /// Also resizes the attached screen (if any) so it stays consistent
1178 /// with the PTY. Without this, screen-aware assertions would drift
1179 /// after a resize.
1180 ///
1181 /// # Errors
1182 ///
1183 /// Returns an error if the resize ioctl fails.
1184 pub async fn resize_pty(&mut self, cols: u16, rows: u16) -> Result<()> {
1185 {
1186 let mut transport = self.transport.lock().await;
1187 transport.resize(cols, rows)?;
1188 }
1189 self.config.dimensions = (cols, rows);
1190 #[cfg(feature = "screen")]
1191 if let Some(screen) = self.screen.as_ref()
1192 && let Ok(mut s) = screen.lock()
1193 {
1194 s.resize(rows as usize, cols as usize);
1195 }
1196 Ok(())
1197 }
1198
1199 /// Send a signal to the child process.
1200 ///
1201 /// # Errors
1202 ///
1203 /// Returns an error if sending the signal fails.
1204 pub fn signal(&self, signal: i32) -> Result<()> {
1205 if let Ok(transport) = self.transport.try_lock() {
1206 transport.signal(signal)
1207 } else {
1208 Err(ExpectError::io_context(
1209 "sending signal to process",
1210 std::io::Error::new(std::io::ErrorKind::WouldBlock, "transport is locked"),
1211 ))
1212 }
1213 }
1214
1215 /// Kill the child process.
1216 ///
1217 /// # Errors
1218 ///
1219 /// Returns an error if killing the process fails.
1220 pub fn kill(&self) -> Result<()> {
1221 if let Ok(transport) = self.transport.try_lock() {
1222 transport.kill()
1223 } else {
1224 Err(ExpectError::io_context(
1225 "killing process",
1226 std::io::Error::new(std::io::ErrorKind::WouldBlock, "transport is locked"),
1227 ))
1228 }
1229 }
1230}
1231
1232// Windows-specific spawn implementation
1233#[cfg(windows)]
1234impl Session<WindowsAsyncPty> {
1235 /// Spawn a new process with the given command.
1236 ///
1237 /// This creates a new PTY using Windows ConPTY, spawns a child process,
1238 /// and returns a Session connected to the child's terminal.
1239 ///
1240 /// # Example
1241 ///
1242 /// ```no_run
1243 /// use rust_expect::Session;
1244 ///
1245 /// #[tokio::main]
1246 /// async fn main() -> Result<(), rust_expect::ExpectError> {
1247 /// let mut session = Session::spawn("cmd.exe", &[]).await?;
1248 /// session.expect(">").await?;
1249 /// session.send_line("echo hello").await?;
1250 /// session.expect("hello").await?;
1251 /// Ok(())
1252 /// }
1253 /// ```
1254 ///
1255 /// # Errors
1256 ///
1257 /// Returns an error if:
1258 /// - ConPTY is not available (Windows version too old)
1259 /// - PTY allocation fails
1260 /// - The command cannot be executed
1261 pub async fn spawn(command: &str, args: &[&str]) -> Result<Self> {
1262 Self::spawn_with_config(command, args, SessionConfig::default()).await
1263 }
1264
1265 /// Spawn a new process with custom configuration.
1266 ///
1267 /// # Errors
1268 ///
1269 /// Returns an error if spawning fails.
1270 pub async fn spawn_with_config(
1271 command: &str,
1272 args: &[&str],
1273 config: SessionConfig,
1274 ) -> Result<Self> {
1275 let pty_config = PtyConfig::from(&config);
1276 let spawner = PtySpawner::with_config(pty_config);
1277
1278 // Convert &[&str] to Vec<String> for the spawner
1279 let args_owned: Vec<String> = args.iter().map(|s| s.to_string()).collect();
1280
1281 // Spawn the process
1282 let handle = spawner.spawn(command, &args_owned).await?;
1283
1284 // Wrap in WindowsAsyncPty for async I/O
1285 let async_pty = WindowsAsyncPty::from_handle(handle);
1286
1287 // Create the session
1288 let mut session = Session::new(async_pty, config);
1289 session.state = SessionState::Running;
1290
1291 Ok(session)
1292 }
1293
1294 /// Get the child process ID.
1295 #[must_use]
1296 pub fn pid(&self) -> u32 {
1297 if let Ok(transport) = self.transport.try_lock() {
1298 transport.pid()
1299 } else {
1300 0
1301 }
1302 }
1303
1304 /// Resize the terminal.
1305 ///
1306 /// # Errors
1307 ///
1308 /// Returns an error if the resize operation fails.
1309 pub async fn resize_pty(&mut self, cols: u16, rows: u16) -> Result<()> {
1310 let mut transport = self.transport.lock().await;
1311 transport.resize(cols, rows)
1312 }
1313
1314 /// Check if the child process is still running.
1315 #[must_use]
1316 pub fn is_running(&self) -> bool {
1317 if let Ok(transport) = self.transport.try_lock() {
1318 transport.is_running()
1319 } else {
1320 true // Assume running if we can't check
1321 }
1322 }
1323
1324 /// Kill the child process.
1325 ///
1326 /// # Errors
1327 ///
1328 /// Returns an error if killing the process fails.
1329 pub fn kill(&self) -> Result<()> {
1330 if let Ok(mut transport) = self.transport.try_lock() {
1331 transport.kill()
1332 } else {
1333 Err(ExpectError::io_context(
1334 "killing process",
1335 std::io::Error::new(std::io::ErrorKind::WouldBlock, "transport is locked"),
1336 ))
1337 }
1338 }
1339}
1340
1341/// Extension trait for session operations.
1342pub trait SessionExt {
1343 /// Send and expect in one call.
1344 fn send_expect(
1345 &mut self,
1346 send: &str,
1347 expect: impl Into<Pattern>,
1348 ) -> impl std::future::Future<Output = Result<Match>> + Send;
1349
1350 /// Resize the terminal.
1351 fn resize(
1352 &mut self,
1353 dimensions: Dimensions,
1354 ) -> impl std::future::Future<Output = Result<()>> + Send;
1355}
1356
1357/// Check if an I/O error indicates PTY EOF.
1358///
1359/// On Linux, reading from the PTY master returns EIO when the slave side
1360/// has been closed (i.e., the child process has terminated). This is different
1361/// from the standard EOF behavior where `read()` returns 0 bytes.
1362///
1363/// This function returns true for errors that should be treated as EOF:
1364/// - EIO (errno 5) on Unix systems
1365/// - `BrokenPipe` on any platform
1366fn is_pty_eof_error(e: &std::io::Error) -> bool {
1367 use std::io::ErrorKind;
1368
1369 // BrokenPipe indicates the other end has closed
1370 if e.kind() == ErrorKind::BrokenPipe {
1371 return true;
1372 }
1373
1374 // On Unix, check for EIO which indicates slave PTY closed
1375 #[cfg(unix)]
1376 {
1377 if let Some(errno) = e.raw_os_error() {
1378 // EIO is 5 on Linux/macOS/BSD
1379 if errno == libc::EIO {
1380 return true;
1381 }
1382 }
1383 }
1384
1385 false
1386}