Skip to main content

rust_expect/session/
handle.rs

1//! Session handle for interacting with spawned processes.
2//!
3//! This module provides the main `Session` type that users interact with
4//! to control spawned processes, send input, and expect output.
5
6use std::sync::Arc;
7#[cfg(feature = "screen")]
8use std::sync::{Mutex as StdMutex, MutexGuard};
9use std::time::Duration;
10
11use tokio::io::{AsyncReadExt, AsyncWriteExt};
12use tokio::sync::Mutex;
13
14#[cfg(unix)]
15use crate::backend::{AsyncPty, PtyConfig, PtySpawner};
16#[cfg(windows)]
17use crate::backend::{PtyConfig, PtySpawner, WindowsAsyncPty};
18use crate::config::SessionConfig;
19use crate::dialog::{Dialog, DialogExecutor, DialogResult};
20use crate::error::{ExpectError, Result};
21use crate::expect::{ExpectState, MatchResult, Matcher, Pattern, PatternManager, PatternSet};
22use crate::interact::InteractBuilder;
23#[cfg(feature = "screen")]
24use crate::screen::Screen;
25use crate::types::{ControlChar, Dimensions, Match, ProcessExitStatus, SessionId, SessionState};
26
27/// Callback invoked for every chunk of bytes read from the transport.
28///
29/// Taps observe the raw byte stream as it arrives, after it is appended to the
30/// matcher buffer but before any pattern matching is performed. They are the
31/// foundation for screen emulation, transcript recording, and other features
32/// that need to see output as it happens.
33pub type OutputTap = Arc<dyn Fn(&[u8]) + Send + Sync>;
34
35/// Opaque handle identifying a registered output tap. Returned by
36/// [`Session::add_output_tap`] and accepted by
37/// [`Session::remove_output_tap`].
38///
39/// Backed by `u64`. The id space is large enough that wraparound is not
40/// reachable in practice; the implementation uses a non-wrapping `+= 1`
41/// so a hypothetical exhaustion would surface as a loud panic instead of
42/// silently colliding with a still-registered tap.
43#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
44pub struct TapId(u64);
45
46impl std::fmt::Display for TapId {
47    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
48        write!(f, "tap#{}", self.0)
49    }
50}
51
52/// Lock the screen mutex, recovering from poisoning.
53///
54/// A user-supplied tap (or `Screen::process` panicking on a malformed parse
55/// path) can poison the screen mutex. Silently returning a default on
56/// poisoning makes screen-aware expects look like they always-miss, which
57/// is a confusing failure mode. Recovering via `into_inner` lets the call
58/// continue against the actual screen state — the screen contents are
59/// still valid; only the lock was tainted.
60#[cfg(feature = "screen")]
61fn lock_screen(screen: &Arc<StdMutex<Screen>>) -> MutexGuard<'_, Screen> {
62    match screen.lock() {
63        Ok(g) => g,
64        Err(poison) => {
65            tracing::warn!("screen mutex was poisoned; recovering inner state");
66            poison.into_inner()
67        }
68    }
69}
70
71/// A session handle for interacting with a spawned process.
72///
73/// The session provides methods to send input, expect patterns in output,
74/// and manage the lifecycle of the process.
75pub struct Session<T: AsyncReadExt + AsyncWriteExt + Unpin + Send> {
76    /// The underlying transport (PTY, SSH channel, etc.).
77    transport: Arc<Mutex<T>>,
78    /// Session configuration.
79    config: SessionConfig,
80    /// Pattern matcher.
81    matcher: Matcher,
82    /// Pattern manager for before/after patterns.
83    pattern_manager: PatternManager,
84    /// Current session state.
85    state: SessionState,
86    /// Unique session identifier.
87    id: SessionId,
88    /// EOF flag.
89    eof: bool,
90    /// Output taps invoked on every chunk of bytes read from the transport,
91    /// stored as (id, callback) so they can be removed individually.
92    output_taps: Vec<(TapId, OutputTap)>,
93    /// Monotonic counter for assigning new `TapId`s.
94    next_tap_id: u64,
95    /// Attached virtual terminal screen, fed from an output tap.
96    #[cfg(feature = "screen")]
97    screen: Option<Arc<StdMutex<Screen>>>,
98    /// Tap id used to feed the attached screen, so `detach_screen` can
99    /// remove only that tap and leave user-registered taps in place.
100    #[cfg(feature = "screen")]
101    screen_tap_id: Option<TapId>,
102    /// Poll interval used by the screen-aware expect helpers
103    /// (`expect_screen_contains`, `wait_screen_not_contains`,
104    /// `wait_screen_stable`). 50 ms by default.
105    #[cfg(feature = "screen")]
106    screen_poll_interval: Duration,
107}
108
109impl<T: AsyncReadExt + AsyncWriteExt + Unpin + Send> Session<T> {
110    /// Create a new session with the given transport.
111    pub fn new(transport: T, config: SessionConfig) -> Self {
112        let buffer_size = config.buffer.max_size;
113        let mut matcher = Matcher::new(buffer_size);
114        matcher.set_default_timeout(config.timeout.default);
115        Self {
116            transport: Arc::new(Mutex::new(transport)),
117            config,
118            matcher,
119            pattern_manager: PatternManager::new(),
120            state: SessionState::Starting,
121            id: SessionId::new(),
122            eof: false,
123            output_taps: Vec::new(),
124            next_tap_id: 0,
125            #[cfg(feature = "screen")]
126            screen: None,
127            #[cfg(feature = "screen")]
128            screen_tap_id: None,
129            #[cfg(feature = "screen")]
130            screen_poll_interval: Duration::from_millis(50),
131        }
132    }
133
134    /// Set the polling interval used by the screen-aware expect helpers.
135    ///
136    /// Affects `expect_screen_contains`, `wait_screen_not_contains`, and
137    /// `wait_screen_stable`. Smaller values reduce match latency at the
138    /// cost of CPU; larger values do the opposite. Default is 50 ms.
139    ///
140    /// Available with the `screen` feature.
141    #[cfg(feature = "screen")]
142    pub const fn set_screen_poll_interval(&mut self, interval: Duration) {
143        self.screen_poll_interval = interval;
144    }
145
146    /// Get the current screen-poll interval. Default 50 ms.
147    ///
148    /// Available with the `screen` feature.
149    #[cfg(feature = "screen")]
150    #[must_use]
151    pub const fn screen_poll_interval(&self) -> Duration {
152        self.screen_poll_interval
153    }
154
155    /// Register a callback that will be invoked with every chunk of bytes
156    /// read from the transport.
157    ///
158    /// Taps observe the raw byte stream as it arrives — they receive bytes
159    /// in the same form the underlying process produced them, including any
160    /// ANSI escape sequences. Taps are invoked synchronously inside the read
161    /// loop after the bytes are appended to the matcher buffer; they should
162    /// be cheap and non-blocking. Use a channel if expensive work is required.
163    ///
164    /// Multiple taps may be registered; they are invoked in registration
165    /// order. Taps are dropped when the session is dropped.
166    ///
167    /// # Example
168    ///
169    /// ```ignore
170    /// use std::sync::Arc;
171    /// use std::sync::Mutex;
172    /// let captured: Arc<Mutex<Vec<u8>>> = Arc::new(Mutex::new(Vec::new()));
173    /// let buf = captured.clone();
174    /// session.add_output_tap(move |chunk| {
175    ///     buf.lock().unwrap().extend_from_slice(chunk);
176    /// });
177    /// ```
178    pub fn add_output_tap<F>(&mut self, f: F) -> TapId
179    where
180        F: Fn(&[u8]) + Send + Sync + 'static,
181    {
182        let id = TapId(self.next_tap_id);
183        // Plain addition (not wrapping_add): on the astronomically unlikely
184        // event of u64 exhaustion on a single session, we'd rather panic
185        // loudly than silently issue a colliding id.
186        self.next_tap_id += 1;
187        self.output_taps.push((id, Arc::new(f)));
188        id
189    }
190
191    /// Remove a previously registered output tap by its [`TapId`]. Returns
192    /// `true` if a tap was removed, `false` if the id was not registered
193    /// (already removed, or never existed).
194    pub fn remove_output_tap(&mut self, id: TapId) -> bool {
195        let len_before = self.output_taps.len();
196        self.output_taps.retain(|(existing, _)| *existing != id);
197        self.output_taps.len() != len_before
198    }
199
200    /// Iterate the callbacks for all currently registered output taps.
201    ///
202    /// Exposed for instrumentation and inspection only — the read loops in
203    /// [`expect`](Self::expect) and [`interact`](Self::interact) invoke
204    /// these themselves. Returns the callback `Arc`s in registration
205    /// order; ids are intentionally omitted (use
206    /// [`add_output_tap`](Self::add_output_tap)'s return value if you
207    /// need the id).
208    pub fn output_tap_callbacks(&self) -> impl Iterator<Item = &OutputTap> {
209        self.output_taps.iter().map(|(_, cb)| cb)
210    }
211
212    /// Attach a virtual terminal screen to this session.
213    ///
214    /// Creates a [`Screen`](crate::screen::Screen) with the session's
215    /// configured dimensions and registers an output tap that feeds every
216    /// chunk of output into the screen's ANSI parser. The screen is then
217    /// accessible via [`screen()`](Self::screen) and is automatically updated
218    /// whenever output is read from the transport (i.e. inside `expect_*`,
219    /// `wait`, or `wait_screen_stable`).
220    ///
221    /// Repeated calls replace the previous screen.
222    ///
223    /// Available with the `screen` feature.
224    #[cfg(feature = "screen")]
225    pub fn attach_screen(&mut self) {
226        let (cols, rows) = self.config.dimensions;
227        self.attach_screen_with_dims(rows, cols);
228    }
229
230    /// Attach a screen with custom dimensions.
231    ///
232    /// `rows` and `cols` are the screen size in cells. Note that this does
233    /// not resize the PTY itself — use [`resize_pty`](Self::resize_pty) for
234    /// that. The two should normally match, but it can be useful to set a
235    /// larger virtual screen for transcript capture.
236    ///
237    /// Available with the `screen` feature.
238    #[cfg(feature = "screen")]
239    pub fn attach_screen_with_dims(&mut self, rows: u16, cols: u16) {
240        // Replace any previous screen + its tap so we don't leak callbacks.
241        self.detach_screen();
242        let screen = Arc::new(StdMutex::new(Screen::new(rows as usize, cols as usize)));
243        let screen_for_tap = screen.clone();
244        let id = self.add_output_tap(move |chunk| {
245            // Reuse the shared poison-recovery helper so the tap-side and
246            // read-side recovery logic stays in lockstep.
247            lock_screen(&screen_for_tap).process(chunk);
248        });
249        self.screen = Some(screen);
250        self.screen_tap_id = Some(id);
251    }
252
253    /// Detach the currently attached screen, also removing its output tap.
254    /// No-op if no screen is attached. Returns `true` if a screen was
255    /// detached.
256    ///
257    /// Available with the `screen` feature.
258    #[cfg(feature = "screen")]
259    pub fn detach_screen(&mut self) -> bool {
260        if let Some(id) = self.screen_tap_id.take() {
261            self.remove_output_tap(id);
262        }
263        self.screen.take().is_some()
264    }
265
266    /// Get the attached virtual terminal screen, if any.
267    ///
268    /// Returns a shared handle protected by a [`std::sync::Mutex`]. Lock it
269    /// briefly to read screen state — the lock is also taken by the output
270    /// tap on every read, so holding it for long stretches blocks the read
271    /// loop.
272    ///
273    /// Available with the `screen` feature.
274    #[cfg(feature = "screen")]
275    #[must_use]
276    pub const fn screen(&self) -> Option<&Arc<StdMutex<Screen>>> {
277        self.screen.as_ref()
278    }
279
280    /// Get the session ID.
281    #[must_use]
282    pub const fn id(&self) -> &SessionId {
283        &self.id
284    }
285
286    /// Get the current session state.
287    #[must_use]
288    pub const fn state(&self) -> SessionState {
289        self.state
290    }
291
292    /// Get the session configuration.
293    #[must_use]
294    pub const fn config(&self) -> &SessionConfig {
295        &self.config
296    }
297
298    /// Check if EOF has been detected.
299    #[must_use]
300    pub const fn is_eof(&self) -> bool {
301        self.eof
302    }
303
304    /// Get the current buffer contents.
305    #[must_use]
306    pub fn buffer(&mut self) -> String {
307        self.matcher.buffer_str()
308    }
309
310    /// Clear the buffer.
311    pub fn clear_buffer(&mut self) {
312        self.matcher.clear();
313    }
314
315    /// Get the pattern manager for before/after patterns.
316    #[must_use]
317    pub const fn pattern_manager(&self) -> &PatternManager {
318        &self.pattern_manager
319    }
320
321    /// Get mutable access to the pattern manager.
322    pub const fn pattern_manager_mut(&mut self) -> &mut PatternManager {
323        &mut self.pattern_manager
324    }
325
326    /// Set the session state.
327    pub const fn set_state(&mut self, state: SessionState) {
328        self.state = state;
329    }
330
331    /// Send bytes to the process.
332    ///
333    /// # Errors
334    ///
335    /// Returns an error if the write fails.
336    #[allow(clippy::significant_drop_tightening)]
337    pub async fn send(&mut self, data: &[u8]) -> Result<()> {
338        if matches!(self.state, SessionState::Closed | SessionState::Exited(_)) {
339            return Err(ExpectError::SessionClosed);
340        }
341
342        let mut transport = self.transport.lock().await;
343        transport
344            .write_all(data)
345            .await
346            .map_err(|e| ExpectError::io_context("writing to process", e))?;
347        transport
348            .flush()
349            .await
350            .map_err(|e| ExpectError::io_context("flushing process output", e))?;
351        Ok(())
352    }
353
354    /// Send a string to the process.
355    ///
356    /// # Errors
357    ///
358    /// Returns an error if the write fails.
359    pub async fn send_str(&mut self, s: &str) -> Result<()> {
360        self.send(s.as_bytes()).await
361    }
362
363    /// Send a line to the process (appends newline based on config).
364    ///
365    /// # Errors
366    ///
367    /// Returns an error if the write fails.
368    pub async fn send_line(&mut self, line: &str) -> Result<()> {
369        let line_ending = self.config.line_ending.as_str();
370        let data = format!("{line}{line_ending}");
371        self.send(data.as_bytes()).await
372    }
373
374    /// Send a control character to the process.
375    ///
376    /// # Errors
377    ///
378    /// Returns an error if the write fails.
379    pub async fn send_control(&mut self, ctrl: ControlChar) -> Result<()> {
380        self.send(&[ctrl.as_byte()]).await
381    }
382
383    /// Send a Shift+Tab keystroke.
384    ///
385    /// Sends the xterm "back tab" sequence `\x1b[Z` (CSI Z). Most TUIs use
386    /// this to cycle a focused-element ring backwards or, in Claude Code's
387    /// case, to cycle permission modes. Compatible with both plain xterm
388    /// and the kitty keyboard protocol's CSI-u fallback mode.
389    ///
390    /// # Errors
391    ///
392    /// Returns an error if the write fails.
393    pub async fn send_shift_tab(&mut self) -> Result<()> {
394        self.send(b"\x1b[Z").await
395    }
396
397    /// Send text using bracketed paste mode (DECSET 2004).
398    ///
399    /// Wraps the content in `\x1b[200~` and `\x1b[201~` markers. Applications
400    /// that have enabled bracketed paste treat the enclosed content as
401    /// pasted input rather than typed input — this suppresses autocomplete,
402    /// command-history scanning, and per-character interpretation such as a
403    /// leading `/` triggering a slash-command popup. Safe to call even when
404    /// the receiver hasn't enabled bracketed paste: most terminals ignore
405    /// the markers and deliver the inner text as-is.
406    ///
407    /// # Errors
408    ///
409    /// Returns an error if the write fails or if `text` contains the
410    /// closing paste marker `\x1b[201~`, which would let the receiver drop
411    /// out of paste mode mid-payload. Callers that want to send such bytes
412    /// should write them through the regular [`send`](Self::send) path.
413    pub async fn send_paste(&mut self, text: &str) -> Result<()> {
414        if memchr::memmem::find(text.as_bytes(), b"\x1b[201~").is_some() {
415            return Err(ExpectError::InvalidInput {
416                api: "send_paste".to_string(),
417                reason:
418                    "input contains the bracketed-paste end marker (\\x1b[201~); use send() for raw bytes that include this sequence"
419                        .to_string(),
420            });
421        }
422        self.send(b"\x1b[200~").await?;
423        self.send(text.as_bytes()).await?;
424        self.send(b"\x1b[201~").await
425    }
426
427    /// Expect a pattern in the output.
428    ///
429    /// Blocks until the pattern is matched, EOF is detected, or timeout occurs.
430    ///
431    /// # Errors
432    ///
433    /// Returns an error on timeout, EOF (if not expected), or I/O error.
434    pub async fn expect(&mut self, pattern: impl Into<Pattern>) -> Result<Match> {
435        let patterns = PatternSet::from_patterns(vec![pattern.into()]);
436        self.expect_any(&patterns).await
437    }
438
439    /// Expect any of the given patterns.
440    ///
441    /// # Errors
442    ///
443    /// Returns an error on timeout, EOF (if not expected), or I/O error.
444    pub async fn expect_any(&mut self, patterns: &PatternSet) -> Result<Match> {
445        let timeout = self.matcher.get_timeout(patterns);
446        let state = ExpectState::new(patterns.clone(), timeout);
447
448        loop {
449            // Check before patterns first
450            if let Some((_, action)) = self
451                .pattern_manager
452                .check_before(&self.matcher.buffer_str())
453            {
454                match action {
455                    crate::expect::HandlerAction::Continue => {}
456                    crate::expect::HandlerAction::Return(s) => {
457                        return Ok(Match::new(0, s, String::new(), self.matcher.buffer_str()));
458                    }
459                    crate::expect::HandlerAction::Abort(msg) => {
460                        return Err(ExpectError::PatternNotFound {
461                            pattern: msg,
462                            buffer: self.matcher.buffer_str(),
463                        });
464                    }
465                    crate::expect::HandlerAction::Respond(s) => {
466                        self.send_str(&s).await?;
467                    }
468                }
469            }
470
471            // Check for pattern match
472            if let Some(result) = self.matcher.try_match_any(patterns) {
473                return Ok(self.matcher.consume_match(&result));
474            }
475
476            // Check for timeout
477            if state.is_timed_out() {
478                return Err(ExpectError::Timeout {
479                    duration: timeout,
480                    pattern: patterns
481                        .iter()
482                        .next()
483                        .map(|p| p.pattern.as_str().to_string())
484                        .unwrap_or_default(),
485                    buffer: self.matcher.buffer_str(),
486                });
487            }
488
489            // Check for EOF
490            if self.eof {
491                if state.expects_eof() {
492                    return Ok(Match::new(
493                        0,
494                        String::new(),
495                        self.matcher.buffer_str(),
496                        String::new(),
497                    ));
498                }
499                return Err(ExpectError::Eof {
500                    buffer: self.matcher.buffer_str(),
501                });
502            }
503
504            // Read more data
505            self.read_with_timeout(state.remaining_time()).await?;
506        }
507    }
508
509    /// Expect with a specific timeout.
510    ///
511    /// # Errors
512    ///
513    /// Returns an error on timeout, EOF, or I/O error.
514    pub async fn expect_timeout(
515        &mut self,
516        pattern: impl Into<Pattern>,
517        timeout: Duration,
518    ) -> Result<Match> {
519        let pattern = pattern.into();
520        let mut patterns = PatternSet::new();
521        patterns.add(pattern).add(Pattern::timeout(timeout));
522        self.expect_any(&patterns).await
523    }
524
525    /// Wait until the attached screen contains the given substring.
526    ///
527    /// Drives reads from the transport in short increments, checking the
528    /// rendered screen text after each. Returns successfully as soon as
529    /// `needle` appears in the screen text, or with [`ExpectError::Timeout`]
530    /// when `timeout` elapses without a match. Returns [`ExpectError::Eof`]
531    /// if the process exits before the substring appears.
532    ///
533    /// This is the screen-aware counterpart to [`expect`](Self::expect): use
534    /// it when the byte stream is full of ANSI escape sequences (e.g. when
535    /// driving a TUI), where literal substring matching on the byte stream
536    /// would fail because of interleaved cursor positioning and SGR codes.
537    ///
538    /// Requires an attached screen — call [`attach_screen`](Self::attach_screen)
539    /// first.
540    ///
541    /// # Errors
542    ///
543    /// Returns an error if no screen is attached, the timeout expires, EOF
544    /// is reached, or an I/O error occurs.
545    ///
546    /// Available with the `screen` feature.
547    #[cfg(feature = "screen")]
548    pub async fn expect_screen_contains(&mut self, needle: &str, timeout: Duration) -> Result<()> {
549        let Some(screen) = self.screen.clone() else {
550            return Err(ExpectError::ScreenNotAttached);
551        };
552
553        let start = tokio::time::Instant::now();
554        let poll = self.screen_poll_interval;
555
556        loop {
557            if lock_screen(&screen).query().contains(needle) {
558                return Ok(());
559            }
560            if self.eof {
561                return Err(ExpectError::Eof {
562                    buffer: lock_screen(&screen).text(),
563                });
564            }
565            let elapsed = start.elapsed();
566            if elapsed >= timeout {
567                return Err(ExpectError::Timeout {
568                    duration: timeout,
569                    pattern: needle.to_string(),
570                    buffer: lock_screen(&screen).text(),
571                });
572            }
573            let remaining = timeout.saturating_sub(elapsed);
574            self.read_with_timeout(poll.min(remaining)).await?;
575        }
576    }
577
578    /// Wait until the attached screen no longer contains the given substring.
579    ///
580    /// The inverse of [`expect_screen_contains`](Self::expect_screen_contains).
581    /// Returns successfully as soon as `needle` is absent from the rendered
582    /// screen, or with [`ExpectError::Timeout`] when `timeout` elapses with
583    /// the substring still present. EOF is treated as "absent" (the screen
584    /// state is frozen at the final paint).
585    ///
586    /// Useful for anchoring on the *disappearance* of an indicator —
587    /// e.g. waiting for a "request in flight" status to clear, a spinner
588    /// glyph to stop, or a modal to close.
589    ///
590    /// Requires an attached screen.
591    ///
592    /// # Errors
593    ///
594    /// Returns an error if no screen is attached, the timeout expires while
595    /// the substring is still visible, or an I/O error occurs.
596    ///
597    /// Available with the `screen` feature.
598    #[cfg(feature = "screen")]
599    pub async fn wait_screen_not_contains(
600        &mut self,
601        needle: &str,
602        timeout: Duration,
603    ) -> Result<()> {
604        let Some(screen) = self.screen.clone() else {
605            return Err(ExpectError::ScreenNotAttached);
606        };
607
608        let start = tokio::time::Instant::now();
609        let poll = self.screen_poll_interval;
610
611        loop {
612            if !lock_screen(&screen).query().contains(needle) {
613                return Ok(());
614            }
615            if self.eof {
616                return Ok(());
617            }
618            let elapsed = start.elapsed();
619            if elapsed >= timeout {
620                return Err(ExpectError::Timeout {
621                    duration: timeout,
622                    pattern: format!("!{needle}"),
623                    buffer: lock_screen(&screen).text(),
624                });
625            }
626            let remaining = timeout.saturating_sub(elapsed);
627            self.read_with_timeout(poll.min(remaining)).await?;
628        }
629    }
630
631    /// Wait until the attached screen has been unchanged for `quiet_period`.
632    ///
633    /// Drives reads in short increments and tracks whether the rendered
634    /// screen text changes between reads. Returns successfully when the
635    /// screen has been quiescent for `quiet_period`, or with
636    /// [`ExpectError::Timeout`] if `max_wait` elapses first.
637    ///
638    /// Useful as a generic "wait for the TUI to finish drawing" primitive
639    /// when no specific anchor is available — for example, after submitting
640    /// a prompt and before reading the response.
641    ///
642    /// A small `quiet_period` (e.g. 100-300 ms) catches paint completion;
643    /// a larger one (1-2 s) waits out streaming responses with mid-stream
644    /// pauses. Tune to the specific application.
645    ///
646    /// Requires an attached screen.
647    ///
648    /// # Errors
649    ///
650    /// Returns an error if no screen is attached, `max_wait` elapses, or an
651    /// I/O error occurs. EOF is **not** an error — if the process exits, the
652    /// final screen state is considered stable and the method returns Ok.
653    ///
654    /// Available with the `screen` feature.
655    #[cfg(feature = "screen")]
656    pub async fn wait_screen_stable(
657        &mut self,
658        quiet_period: Duration,
659        max_wait: Duration,
660    ) -> Result<()> {
661        let Some(screen) = self.screen.clone() else {
662            return Err(ExpectError::ScreenNotAttached);
663        };
664
665        let start = tokio::time::Instant::now();
666        let poll = self.screen_poll_interval;
667        let mut last_revision = lock_screen(&screen).revision();
668        let mut last_change = tokio::time::Instant::now();
669
670        loop {
671            if last_change.elapsed() >= quiet_period {
672                return Ok(());
673            }
674            if self.eof {
675                return Ok(());
676            }
677            if start.elapsed() >= max_wait {
678                return Err(ExpectError::Timeout {
679                    duration: max_wait,
680                    pattern: "<screen stability>".to_string(),
681                    buffer: lock_screen(&screen).text(),
682                });
683            }
684            self.read_with_timeout(poll).await?;
685            let current_revision = lock_screen(&screen).revision();
686            if current_revision != last_revision {
687                last_revision = current_revision;
688                last_change = tokio::time::Instant::now();
689            }
690        }
691    }
692
693    /// Read data from the transport with timeout.
694    async fn read_with_timeout(&mut self, timeout: Duration) -> Result<usize> {
695        let mut buf = [0u8; 4096];
696        let mut transport = self.transport.lock().await;
697
698        match tokio::time::timeout(timeout, transport.read(&mut buf)).await {
699            Ok(Ok(0)) => {
700                self.eof = true;
701                Ok(0)
702            }
703            Ok(Ok(n)) => {
704                self.matcher.append(&buf[..n]);
705                // Run taps in catch_unwind so a panicking user callback can't
706                // unwind across our await boundary or poison subsequent taps.
707                // We log and continue rather than propagate — taps are
708                // observers, not error sources.
709                for (id, tap) in &self.output_taps {
710                    let tap_clone = tap.clone();
711                    let chunk = &buf[..n];
712                    let result =
713                        std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| tap_clone(chunk)));
714                    if result.is_err() {
715                        tracing::warn!(
716                            %id,
717                            "output tap panicked; the panic was caught and other taps continue"
718                        );
719                    }
720                }
721                Ok(n)
722            }
723            Ok(Err(e)) => {
724                // On Linux, reading from PTY master returns EIO when the slave is closed
725                // (i.e., the child process has terminated). Treat this as EOF.
726                // See: https://bugs.python.org/issue5380
727                if is_pty_eof_error(&e) {
728                    self.eof = true;
729                    Ok(0)
730                } else {
731                    Err(ExpectError::io_context("reading from process", e))
732                }
733            }
734            Err(_) => {
735                // Timeout, but not an error - caller will handle
736                Ok(0)
737            }
738        }
739    }
740
741    /// Wait for the process to exit.
742    ///
743    /// This method blocks until EOF is detected on the session, which typically
744    /// happens when the child process terminates.
745    ///
746    /// # Warning
747    ///
748    /// This method has no timeout and may block indefinitely if the process
749    /// does not exit. Consider using [`wait_timeout`](Self::wait_timeout) or
750    /// [`expect_eof_timeout`](Self::expect_eof_timeout) for bounded waits.
751    ///
752    /// # Errors
753    ///
754    /// Returns an error if waiting fails due to I/O error.
755    pub async fn wait(&mut self) -> Result<ProcessExitStatus> {
756        // Read until EOF
757        while !self.eof {
758            if self.read_with_timeout(Duration::from_millis(100)).await? == 0 && !self.eof {
759                tokio::time::sleep(Duration::from_millis(10)).await;
760            }
761        }
762
763        // Return unknown status - actual status depends on backend
764        self.state = SessionState::Exited(ProcessExitStatus::Unknown);
765        Ok(ProcessExitStatus::Unknown)
766    }
767
768    /// Wait for the process to exit with a timeout.
769    ///
770    /// Like [`wait`](Self::wait), but with a maximum duration to wait.
771    ///
772    /// # Errors
773    ///
774    /// Returns an error if:
775    /// - The timeout expires before the process exits
776    /// - An I/O error occurs while waiting
777    pub async fn wait_timeout(&mut self, timeout: Duration) -> Result<ProcessExitStatus> {
778        let deadline = tokio::time::Instant::now() + timeout;
779
780        while !self.eof {
781            let remaining = deadline.saturating_duration_since(tokio::time::Instant::now());
782            if remaining.is_zero() {
783                return Err(ExpectError::timeout(
784                    timeout,
785                    "<EOF>",
786                    self.matcher.buffer_str(),
787                ));
788            }
789
790            // Use smaller of remaining time or 100ms for polling
791            let poll_timeout = remaining.min(Duration::from_millis(100));
792            if self.read_with_timeout(poll_timeout).await? == 0 && !self.eof {
793                tokio::time::sleep(Duration::from_millis(10)).await;
794            }
795        }
796
797        self.state = SessionState::Exited(ProcessExitStatus::Unknown);
798        Ok(ProcessExitStatus::Unknown)
799    }
800
801    /// Check if a pattern matches immediately without blocking.
802    #[must_use]
803    pub fn check(&mut self, pattern: &Pattern) -> Option<MatchResult> {
804        self.matcher.try_match(pattern)
805    }
806
807    /// Get the underlying transport.
808    ///
809    /// Use with caution as direct access bypasses session management.
810    #[must_use]
811    pub const fn transport(&self) -> &Arc<Mutex<T>> {
812        &self.transport
813    }
814
815    /// Start an interactive session with pattern hooks.
816    ///
817    /// This returns a builder that allows you to configure pattern-based
818    /// callbacks that fire when patterns match in the output or input.
819    ///
820    /// # Example
821    ///
822    /// ```no_run
823    /// use rust_expect::{Session, InteractAction};
824    ///
825    /// #[tokio::main]
826    /// async fn main() -> Result<(), rust_expect::ExpectError> {
827    ///     let mut session = Session::spawn("/bin/bash", &[]).await?;
828    ///
829    ///     session.interact()
830    ///         .on_output("password:", |ctx| {
831    ///             ctx.send("my_password\n")
832    ///         })
833    ///         .on_output("logout", |_| {
834    ///             InteractAction::Stop
835    ///         })
836    ///         .start()
837    ///         .await?;
838    ///
839    ///     Ok(())
840    /// }
841    /// ```
842    #[must_use]
843    pub fn interact(&self) -> InteractBuilder<'_, T>
844    where
845        T: 'static,
846    {
847        // Snapshot the currently registered output taps so the interact
848        // read loop can fire them on every chunk. Without this, attached
849        // screens and transcript recorders would silently freeze for the
850        // duration of interact().
851        let taps: Vec<OutputTap> = self
852            .output_taps
853            .iter()
854            .map(|(_, tap)| tap.clone())
855            .collect();
856        InteractBuilder::new(&self.transport, taps)
857    }
858
859    /// Run a dialog on this session.
860    ///
861    /// A dialog is a predefined sequence of expect/send operations.
862    /// This method executes the dialog and returns the result.
863    ///
864    /// # Example
865    ///
866    /// ```no_run
867    /// use rust_expect::{Session, Dialog, DialogStep};
868    ///
869    /// #[tokio::main]
870    /// async fn main() -> Result<(), rust_expect::ExpectError> {
871    ///     let mut session = Session::spawn("/bin/bash", &[]).await?;
872    ///
873    ///     let dialog = Dialog::named("shell_test")
874    ///         .step(DialogStep::new("prompt")
875    ///             .with_expect("$")
876    ///             .with_send("echo hello\n"))
877    ///         .step(DialogStep::new("verify")
878    ///             .with_expect("hello"));
879    ///
880    ///     let result = session.run_dialog(&dialog).await?;
881    ///     assert!(result.success);
882    ///     Ok(())
883    /// }
884    /// ```
885    ///
886    /// # Errors
887    ///
888    /// Returns an error if I/O fails. Step-level timeouts are reported
889    /// in the `DialogResult` rather than as errors.
890    pub async fn run_dialog(&mut self, dialog: &Dialog) -> Result<DialogResult> {
891        let executor = DialogExecutor::default();
892        executor.execute(self, dialog).await
893    }
894
895    /// Run a dialog with a custom executor.
896    ///
897    /// This allows customizing the executor settings (max steps, default timeout).
898    ///
899    /// # Errors
900    ///
901    /// Returns an error if I/O fails.
902    pub async fn run_dialog_with(
903        &mut self,
904        dialog: &Dialog,
905        executor: &DialogExecutor,
906    ) -> Result<DialogResult> {
907        executor.execute(self, dialog).await
908    }
909
910    /// Expect end-of-file (process termination).
911    ///
912    /// This is a convenience method for waiting until the process terminates
913    /// and closes its output stream.
914    ///
915    /// # Example
916    ///
917    /// ```no_run
918    /// use rust_expect::Session;
919    ///
920    /// #[tokio::main]
921    /// async fn main() -> Result<(), rust_expect::ExpectError> {
922    ///     let mut session = Session::spawn("echo", &["hello"]).await?;
923    ///     session.expect("hello").await?;
924    ///     session.expect_eof().await?;
925    ///     Ok(())
926    /// }
927    /// ```
928    ///
929    /// # Errors
930    ///
931    /// Returns an error if the session times out before EOF or an I/O error occurs.
932    pub async fn expect_eof(&mut self) -> Result<Match> {
933        self.expect(Pattern::eof()).await
934    }
935
936    /// Expect end-of-file with a specific timeout.
937    ///
938    /// # Errors
939    ///
940    /// Returns an error if the session times out before EOF or an I/O error occurs.
941    pub async fn expect_eof_timeout(&mut self, timeout: Duration) -> Result<Match> {
942        let mut patterns = PatternSet::new();
943        patterns.add(Pattern::eof()).add(Pattern::timeout(timeout));
944        self.expect_any(&patterns).await
945    }
946
947    /// Run a batch of commands, waiting for the prompt after each.
948    ///
949    /// This is a convenience method for executing multiple shell commands
950    /// in sequence. For each command, it sends the command line and waits
951    /// for the prompt pattern to appear.
952    ///
953    /// # Example
954    ///
955    /// ```no_run
956    /// use rust_expect::{Session, Pattern};
957    ///
958    /// #[tokio::main]
959    /// async fn main() -> Result<(), rust_expect::ExpectError> {
960    ///     let mut session = Session::spawn("/bin/bash", &[]).await?;
961    ///     session.expect(Pattern::shell_prompt()).await?;
962    ///
963    ///     // Run a batch of commands
964    ///     let results = session.run_script(
965    ///         &["pwd", "whoami", "date"],
966    ///         Pattern::shell_prompt(),
967    ///     ).await?;
968    ///
969    ///     for result in &results {
970    ///         println!("Output: {}", result.before.trim());
971    ///     }
972    ///
973    ///     Ok(())
974    /// }
975    /// ```
976    ///
977    /// # Errors
978    ///
979    /// Returns an error if any command times out or I/O fails.
980    /// On error, partial results are lost; consider using [`Self::run_script_with_results`]
981    /// if you need to capture partial results on failure.
982    pub async fn run_script<I, S>(&mut self, commands: I, prompt: Pattern) -> Result<Vec<Match>>
983    where
984        I: IntoIterator<Item = S>,
985        S: AsRef<str>,
986    {
987        let mut results = Vec::new();
988
989        for cmd in commands {
990            self.send_line(cmd.as_ref()).await?;
991            let result = self.expect(prompt.clone()).await?;
992            results.push(result);
993        }
994
995        Ok(results)
996    }
997
998    /// Run a batch of commands with a specific timeout per command.
999    ///
1000    /// Like [`run_script`](Self::run_script), but applies the given timeout
1001    /// to each command individually.
1002    ///
1003    /// # Errors
1004    ///
1005    /// Returns an error if any command times out or I/O fails.
1006    pub async fn run_script_timeout<I, S>(
1007        &mut self,
1008        commands: I,
1009        prompt: Pattern,
1010        timeout: Duration,
1011    ) -> Result<Vec<Match>>
1012    where
1013        I: IntoIterator<Item = S>,
1014        S: AsRef<str>,
1015    {
1016        let mut results = Vec::new();
1017
1018        for cmd in commands {
1019            self.send_line(cmd.as_ref()).await?;
1020            let result = self.expect_timeout(prompt.clone(), timeout).await?;
1021            results.push(result);
1022        }
1023
1024        Ok(results)
1025    }
1026
1027    /// Run a batch of commands, collecting results even on failure.
1028    ///
1029    /// Unlike [`run_script`](Self::run_script), this method continues
1030    /// collecting results and returns them along with any error that occurred.
1031    ///
1032    /// # Returns
1033    ///
1034    /// A tuple of `(results, error)` where:
1035    /// - `results` contains the matches for successfully completed commands
1036    /// - `error` is `Some(err)` if an error occurred, `None` if all commands succeeded
1037    ///
1038    /// # Example
1039    ///
1040    /// ```no_run
1041    /// use rust_expect::{Session, Pattern};
1042    ///
1043    /// #[tokio::main]
1044    /// async fn main() -> Result<(), rust_expect::ExpectError> {
1045    ///     let mut session = Session::spawn("/bin/bash", &[]).await?;
1046    ///     session.expect(Pattern::shell_prompt()).await?;
1047    ///
1048    ///     let (results, error) = session.run_script_with_results(
1049    ///         &["pwd", "bad_command", "date"],
1050    ///         Pattern::shell_prompt(),
1051    ///     ).await;
1052    ///
1053    ///     println!("Completed {} commands", results.len());
1054    ///     if let Some(e) = error {
1055    ///         eprintln!("Script failed at command {}: {}", results.len(), e);
1056    ///     }
1057    ///
1058    ///     Ok(())
1059    /// }
1060    /// ```
1061    pub async fn run_script_with_results<I, S>(
1062        &mut self,
1063        commands: I,
1064        prompt: Pattern,
1065    ) -> (Vec<Match>, Option<ExpectError>)
1066    where
1067        I: IntoIterator<Item = S>,
1068        S: AsRef<str>,
1069    {
1070        let mut results = Vec::new();
1071
1072        for cmd in commands {
1073            match self.send_line(cmd.as_ref()).await {
1074                Ok(()) => {}
1075                Err(e) => return (results, Some(e)),
1076            }
1077
1078            match self.expect(prompt.clone()).await {
1079                Ok(result) => results.push(result),
1080                Err(e) => return (results, Some(e)),
1081            }
1082        }
1083
1084        (results, None)
1085    }
1086}
1087
1088impl<T: AsyncReadExt + AsyncWriteExt + Unpin + Send> std::fmt::Debug for Session<T> {
1089    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1090        f.debug_struct("Session")
1091            .field("id", &self.id)
1092            .field("state", &self.state)
1093            .field("eof", &self.eof)
1094            .finish_non_exhaustive()
1095    }
1096}
1097
1098// Unix-specific spawn implementation
1099#[cfg(unix)]
1100impl Session<AsyncPty> {
1101    /// Spawn a new process with the given command.
1102    ///
1103    /// This creates a new PTY, forks a child process, and returns a Session
1104    /// connected to the child's terminal.
1105    ///
1106    /// # Example
1107    ///
1108    /// ```no_run
1109    /// use rust_expect::Session;
1110    ///
1111    /// #[tokio::main]
1112    /// async fn main() -> Result<(), rust_expect::ExpectError> {
1113    ///     let mut session = Session::spawn("/bin/bash", &[]).await?;
1114    ///     session.expect("$").await?;
1115    ///     session.send_line("echo hello").await?;
1116    ///     session.expect("hello").await?;
1117    ///     Ok(())
1118    /// }
1119    /// ```
1120    ///
1121    /// # Errors
1122    ///
1123    /// Returns an error if:
1124    /// - The command contains null bytes
1125    /// - PTY allocation fails
1126    /// - Fork fails
1127    /// - The command cannot be executed
1128    pub async fn spawn(command: &str, args: &[&str]) -> Result<Self> {
1129        Self::spawn_with_config(command, args, SessionConfig::default()).await
1130    }
1131
1132    /// Spawn a new process with custom configuration.
1133    ///
1134    /// # Errors
1135    ///
1136    /// Returns an error if spawning fails.
1137    pub async fn spawn_with_config(
1138        command: &str,
1139        args: &[&str],
1140        config: SessionConfig,
1141    ) -> Result<Self> {
1142        let pty_config = PtyConfig::from(&config);
1143        let spawner = PtySpawner::with_config(pty_config);
1144
1145        // Convert &[&str] to Vec<String> for the spawner
1146        let args_owned: Vec<String> = args.iter().map(|s| (*s).to_string()).collect();
1147
1148        // Spawn the process
1149        let handle = spawner.spawn(command, &args_owned).await?;
1150
1151        // Wrap in AsyncPty for async I/O
1152        let async_pty = AsyncPty::from_handle(handle)
1153            .map_err(|e| ExpectError::io_context("creating async PTY wrapper", e))?;
1154
1155        // Create the session
1156        let mut session = Self::new(async_pty, config);
1157        session.state = SessionState::Running;
1158
1159        Ok(session)
1160    }
1161
1162    /// Get the child process ID.
1163    #[must_use]
1164    pub fn pid(&self) -> u32 {
1165        // We need to access the inner transport's pid
1166        // For now, use the blocking lock since we know it's not contended
1167        // during a sync call like this
1168        if let Ok(transport) = self.transport.try_lock() {
1169            transport.pid()
1170        } else {
1171            0
1172        }
1173    }
1174
1175    /// Resize the terminal.
1176    ///
1177    /// Also resizes the attached screen (if any) so it stays consistent
1178    /// with the PTY. Without this, screen-aware assertions would drift
1179    /// after a resize.
1180    ///
1181    /// # Errors
1182    ///
1183    /// Returns an error if the resize ioctl fails.
1184    pub async fn resize_pty(&mut self, cols: u16, rows: u16) -> Result<()> {
1185        {
1186            let mut transport = self.transport.lock().await;
1187            transport.resize(cols, rows)?;
1188        }
1189        self.config.dimensions = (cols, rows);
1190        #[cfg(feature = "screen")]
1191        if let Some(screen) = self.screen.as_ref()
1192            && let Ok(mut s) = screen.lock()
1193        {
1194            s.resize(rows as usize, cols as usize);
1195        }
1196        Ok(())
1197    }
1198
1199    /// Send a signal to the child process.
1200    ///
1201    /// # Errors
1202    ///
1203    /// Returns an error if sending the signal fails.
1204    pub fn signal(&self, signal: i32) -> Result<()> {
1205        if let Ok(transport) = self.transport.try_lock() {
1206            transport.signal(signal)
1207        } else {
1208            Err(ExpectError::io_context(
1209                "sending signal to process",
1210                std::io::Error::new(std::io::ErrorKind::WouldBlock, "transport is locked"),
1211            ))
1212        }
1213    }
1214
1215    /// Kill the child process.
1216    ///
1217    /// # Errors
1218    ///
1219    /// Returns an error if killing the process fails.
1220    pub fn kill(&self) -> Result<()> {
1221        if let Ok(transport) = self.transport.try_lock() {
1222            transport.kill()
1223        } else {
1224            Err(ExpectError::io_context(
1225                "killing process",
1226                std::io::Error::new(std::io::ErrorKind::WouldBlock, "transport is locked"),
1227            ))
1228        }
1229    }
1230}
1231
1232// Windows-specific spawn implementation
1233#[cfg(windows)]
1234impl Session<WindowsAsyncPty> {
1235    /// Spawn a new process with the given command.
1236    ///
1237    /// This creates a new PTY using Windows ConPTY, spawns a child process,
1238    /// and returns a Session connected to the child's terminal.
1239    ///
1240    /// # Example
1241    ///
1242    /// ```no_run
1243    /// use rust_expect::Session;
1244    ///
1245    /// #[tokio::main]
1246    /// async fn main() -> Result<(), rust_expect::ExpectError> {
1247    ///     let mut session = Session::spawn("cmd.exe", &[]).await?;
1248    ///     session.expect(">").await?;
1249    ///     session.send_line("echo hello").await?;
1250    ///     session.expect("hello").await?;
1251    ///     Ok(())
1252    /// }
1253    /// ```
1254    ///
1255    /// # Errors
1256    ///
1257    /// Returns an error if:
1258    /// - ConPTY is not available (Windows version too old)
1259    /// - PTY allocation fails
1260    /// - The command cannot be executed
1261    pub async fn spawn(command: &str, args: &[&str]) -> Result<Self> {
1262        Self::spawn_with_config(command, args, SessionConfig::default()).await
1263    }
1264
1265    /// Spawn a new process with custom configuration.
1266    ///
1267    /// # Errors
1268    ///
1269    /// Returns an error if spawning fails.
1270    pub async fn spawn_with_config(
1271        command: &str,
1272        args: &[&str],
1273        config: SessionConfig,
1274    ) -> Result<Self> {
1275        let pty_config = PtyConfig::from(&config);
1276        let spawner = PtySpawner::with_config(pty_config);
1277
1278        // Convert &[&str] to Vec<String> for the spawner
1279        let args_owned: Vec<String> = args.iter().map(|s| s.to_string()).collect();
1280
1281        // Spawn the process
1282        let handle = spawner.spawn(command, &args_owned).await?;
1283
1284        // Wrap in WindowsAsyncPty for async I/O
1285        let async_pty = WindowsAsyncPty::from_handle(handle);
1286
1287        // Create the session
1288        let mut session = Session::new(async_pty, config);
1289        session.state = SessionState::Running;
1290
1291        Ok(session)
1292    }
1293
1294    /// Get the child process ID.
1295    #[must_use]
1296    pub fn pid(&self) -> u32 {
1297        if let Ok(transport) = self.transport.try_lock() {
1298            transport.pid()
1299        } else {
1300            0
1301        }
1302    }
1303
1304    /// Resize the terminal.
1305    ///
1306    /// # Errors
1307    ///
1308    /// Returns an error if the resize operation fails.
1309    pub async fn resize_pty(&mut self, cols: u16, rows: u16) -> Result<()> {
1310        let mut transport = self.transport.lock().await;
1311        transport.resize(cols, rows)
1312    }
1313
1314    /// Check if the child process is still running.
1315    #[must_use]
1316    pub fn is_running(&self) -> bool {
1317        if let Ok(transport) = self.transport.try_lock() {
1318            transport.is_running()
1319        } else {
1320            true // Assume running if we can't check
1321        }
1322    }
1323
1324    /// Kill the child process.
1325    ///
1326    /// # Errors
1327    ///
1328    /// Returns an error if killing the process fails.
1329    pub fn kill(&self) -> Result<()> {
1330        if let Ok(mut transport) = self.transport.try_lock() {
1331            transport.kill()
1332        } else {
1333            Err(ExpectError::io_context(
1334                "killing process",
1335                std::io::Error::new(std::io::ErrorKind::WouldBlock, "transport is locked"),
1336            ))
1337        }
1338    }
1339}
1340
1341/// Extension trait for session operations.
1342pub trait SessionExt {
1343    /// Send and expect in one call.
1344    fn send_expect(
1345        &mut self,
1346        send: &str,
1347        expect: impl Into<Pattern>,
1348    ) -> impl std::future::Future<Output = Result<Match>> + Send;
1349
1350    /// Resize the terminal.
1351    fn resize(
1352        &mut self,
1353        dimensions: Dimensions,
1354    ) -> impl std::future::Future<Output = Result<()>> + Send;
1355}
1356
1357/// Check if an I/O error indicates PTY EOF.
1358///
1359/// On Linux, reading from the PTY master returns EIO when the slave side
1360/// has been closed (i.e., the child process has terminated). This is different
1361/// from the standard EOF behavior where `read()` returns 0 bytes.
1362///
1363/// This function returns true for errors that should be treated as EOF:
1364/// - EIO (errno 5) on Unix systems
1365/// - `BrokenPipe` on any platform
1366fn is_pty_eof_error(e: &std::io::Error) -> bool {
1367    use std::io::ErrorKind;
1368
1369    // BrokenPipe indicates the other end has closed
1370    if e.kind() == ErrorKind::BrokenPipe {
1371        return true;
1372    }
1373
1374    // On Unix, check for EIO which indicates slave PTY closed
1375    #[cfg(unix)]
1376    {
1377        if let Some(errno) = e.raw_os_error() {
1378            // EIO is 5 on Linux/macOS/BSD
1379            if errno == libc::EIO {
1380                return true;
1381            }
1382        }
1383    }
1384
1385    false
1386}