bssh 2.1.2

Parallel SSH command execution tool for cluster management
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
// Copyright 2025 Lablup Inc. and Jeongkyu Shin
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

//! SSH channel operations including command execution and PTY management.
//!
//! This module provides methods for:
//! - Opening SSH channels
//! - Executing commands
//! - Managing interactive shells and PTY sessions
//! - Port forwarding channels

use bytes::Bytes;
use russh::Channel;
use russh::client::Msg;
use std::io;
use std::net::SocketAddr;
use tokio::sync::mpsc::{Receiver, Sender, channel};
use tokio::task::JoinHandle;

use super::ToSocketAddrsWithHostname;
use super::connection::Client;
use crate::security::{SudoPassword, contains_sudo_failure, contains_sudo_prompt};

// Buffer size constants for SSH operations
/// SSH I/O buffer size constants - optimized for different operation types
///
/// Buffer sizing rationale:
/// - Initial allocations start small and grow dynamically based on actual output
/// - This avoids wasting memory for commands with minimal output
/// - Growth factor of 1.5x amortizes reallocation costs
///
/// Output events channel capacity for streaming
///
/// - 100 events provides good buffering without excessive memory
/// - Balances between latency and throughput
/// - With typical SSH packet sizes (32KB), this can buffer ~3.2MB of data
/// - If the channel fills, backpressure is applied to prevent memory exhaustion
/// - Commands producing more than 3.2MB/sec may experience throttling
const OUTPUT_EVENTS_CHANNEL_SIZE: usize = 100;

/// Maximum buffer size for sudo prompt detection (64KB)
///
/// This prevents unbounded memory growth when detecting sudo prompts in command output.
/// Sudo prompts are typically very short (<1KB), so 64KB is more than sufficient.
/// If output exceeds this size without a sudo prompt, we truncate to prevent memory issues.
const MAX_SUDO_PROMPT_BUFFER_SIZE: usize = 64 * 1024;

/// Maximum number of times to send sudo password in a single session.
/// This allows handling multiple sudo commands (e.g., `sudo cmd1 && sudo cmd2`)
/// while preventing infinite loops if authentication fails.
/// Set to 10 to support reasonable multi-sudo command chains.
const MAX_SUDO_PASSWORD_SENDS: u32 = 10;

/// Command output variants for streaming
#[derive(Debug, Clone)]
pub enum CommandOutput {
    /// Standard output data
    StdOut(Bytes),
    /// Standard error data
    StdErr(Bytes),
    /// Exit code (sent when command completes)
    ExitCode(u32),
}

/// Buffer for collecting streaming command output
pub(crate) struct CommandOutputBuffer {
    pub(crate) sender: Sender<CommandOutput>,
    pub(crate) receiver_task: JoinHandle<(Vec<u8>, Vec<u8>)>,
}

impl CommandOutputBuffer {
    /// Create a new command output buffer with a background task to collect output
    ///
    /// The background task collects output in a memory-efficient manner with proper
    /// capacity management to avoid excessive allocations.
    pub(crate) fn new() -> Self {
        let (sender, mut receiver): (Sender<CommandOutput>, Receiver<CommandOutput>) =
            channel(OUTPUT_EVENTS_CHANNEL_SIZE);

        let receiver_task = tokio::task::spawn(async move {
            // Start with smaller initial capacity and grow as needed
            // This avoids wasting memory for commands with minimal output
            let mut stdout = Vec::with_capacity(1024); // Start with 1KB
            let mut stderr = Vec::with_capacity(256); // Start with 256B for stderr

            while let Some(output) = receiver.recv().await {
                match output {
                    CommandOutput::StdOut(buffer) => {
                        // Reserve additional capacity if needed to avoid frequent reallocations
                        let required = stdout.len() + buffer.len();
                        if stdout.capacity() < required {
                            // Grow by at least 50% to amortize allocation cost
                            let new_capacity =
                                required.max(stdout.capacity() + stdout.capacity() / 2);
                            stdout.reserve(new_capacity - stdout.capacity());
                        }
                        stdout.extend_from_slice(&buffer);
                    }
                    CommandOutput::StdErr(buffer) => {
                        // Reserve additional capacity if needed
                        let required = stderr.len() + buffer.len();
                        if stderr.capacity() < required {
                            // Grow by at least 50% to amortize allocation cost
                            let new_capacity =
                                required.max(stderr.capacity() + stderr.capacity() / 2);
                            stderr.reserve(new_capacity - stderr.capacity());
                        }
                        stderr.extend_from_slice(&buffer);
                    }
                    CommandOutput::ExitCode(_) => {
                        // Exit code is handled by the stream manager, not collected here
                    }
                }
            }

            (stdout, stderr)
        });

        Self {
            sender,
            receiver_task,
        }
    }
}

/// Result of a command execution.
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct CommandExecutedResult {
    /// The stdout output of the command.
    pub stdout: String,
    /// The stderr output of the command.
    pub stderr: String,
    /// The unix exit status (`$?` in bash).
    pub exit_status: u32,
}

impl Client {
    /// Get a new SSH channel for communication.
    pub async fn get_channel(&self) -> Result<Channel<Msg>, super::Error> {
        self.connection_handle
            .channel_open_session()
            .await
            .map_err(super::Error::SshError)
    }

    /// Open a TCP/IP forwarding channel.
    ///
    /// This opens a `direct-tcpip` channel to the given target.
    pub async fn open_direct_tcpip_channel<
        T: ToSocketAddrsWithHostname,
        S: Into<Option<SocketAddr>>,
    >(
        &self,
        target: T,
        src: S,
    ) -> Result<Channel<Msg>, super::Error> {
        let targets = target
            .to_socket_addrs()
            .map_err(super::Error::AddressInvalid)?;
        let src = src
            .into()
            .map(|src| (src.ip().to_string(), src.port().into()))
            .unwrap_or_else(|| ("127.0.0.1".to_string(), 22));

        let mut connect_err = super::Error::AddressInvalid(io::Error::new(
            io::ErrorKind::InvalidInput,
            "could not resolve to any addresses",
        ));
        for target in targets {
            match self
                .connection_handle
                .channel_open_direct_tcpip(
                    target.ip().to_string(),
                    target.port().into(),
                    src.0.clone(),
                    src.1,
                )
                .await
            {
                Ok(channel) => return Ok(channel),
                Err(err) => connect_err = super::Error::SshError(err),
            }
        }

        Err(connect_err)
    }

    /// Execute a remote command via the ssh connection with streaming output.
    ///
    /// This method sends command output in real-time to the provided sender channel.
    /// Output is sent as `CommandOutput::StdOut` or `CommandOutput::StdErr` variants.
    ///
    /// Returns only the exit status of the command. Stdout and stderr are streamed
    /// through the sender channel.
    ///
    /// Make sure your commands don't read from stdin and exit after bounded time.
    ///
    /// Can be called multiple times, but every invocation is a new shell context.
    /// Thus `cd`, setting variables and alike have no effect on future invocations.
    ///
    /// # Backpressure Handling
    /// If the channel fills up (receiver is slower than output production), this method
    /// will apply backpressure by blocking until space is available. This prevents
    /// unbounded memory growth but may slow down command execution for high-throughput
    /// commands.
    ///
    /// # Error Handling
    /// - If the receiver drops the channel, this method will stop processing output
    ///   and return the last known exit status.
    /// - Command sanitization errors are propagated as `CommandValidationFailed`.
    ///
    /// # Arguments
    /// * `command` - The command to execute
    /// * `sender` - Channel sender for streaming output
    ///
    /// # Returns
    /// The exit status of the command
    pub async fn execute_streaming(
        &self,
        command: &str,
        sender: Sender<CommandOutput>,
    ) -> Result<u32, super::Error> {
        // Sanitize command to prevent injection attacks
        let sanitized_command = crate::utils::sanitize_command(command)
            .map_err(|e| super::Error::CommandValidationFailed(e.to_string()))?;

        let mut channel = self.connection_handle.channel_open_session().await?;
        channel.exec(true, sanitized_command.as_str()).await?;

        let mut result: Option<u32> = None;

        // While the channel has messages...
        while let Some(msg) = channel.wait().await {
            match msg {
                // If we get data, send it to the streaming channel
                // Note: We must clone the data here because russh owns it and will reuse the buffer
                russh::ChannelMsg::Data { ref data } => {
                    // Try non-blocking send first for better performance
                    match sender.try_send(CommandOutput::StdOut(data.clone())) {
                        Ok(_) => {}
                        Err(tokio::sync::mpsc::error::TrySendError::Full(output)) => {
                            // Channel is full - apply backpressure by waiting
                            // This prevents memory exhaustion on high-throughput commands
                            tracing::trace!("Channel full, applying backpressure for stdout");
                            if sender.send(output).await.is_err() {
                                // Receiver dropped - stop processing
                                tracing::debug!("Receiver dropped, stopping stdout processing");
                                break;
                            }
                        }
                        Err(tokio::sync::mpsc::error::TrySendError::Closed(_)) => {
                            // Receiver dropped - stop processing
                            tracing::debug!("Channel closed, stopping stdout processing");
                            break;
                        }
                    }
                }
                russh::ChannelMsg::ExtendedData { ref data, ext } => {
                    if ext == 1 {
                        // Handle backpressure for stderr as well
                        match sender.try_send(CommandOutput::StdErr(data.clone())) {
                            Ok(_) => {}
                            Err(tokio::sync::mpsc::error::TrySendError::Full(output)) => {
                                // Channel is full - apply backpressure by waiting
                                tracing::trace!("Channel full, applying backpressure for stderr");
                                if sender.send(output).await.is_err() {
                                    // Receiver dropped - stop processing
                                    tracing::debug!("Receiver dropped, stopping stderr processing");
                                    break;
                                }
                            }
                            Err(tokio::sync::mpsc::error::TrySendError::Closed(_)) => {
                                // Receiver dropped - stop processing
                                tracing::debug!("Channel closed, stopping stderr processing");
                                break;
                            }
                        }
                    }
                }

                // If we get an exit code report, store it, but crucially don't
                // assume this message means end of communications. The data might
                // not be finished yet!
                russh::ChannelMsg::ExitStatus { exit_status } => result = Some(exit_status),

                // We SHOULD get this EOF message, but 4254 sec 5.3 also permits
                // the channel to close without it being sent. And sometimes this
                // message can even precede the Data message, so don't handle it
                // russh::ChannelMsg::Eof => break,
                _ => {}
            }
        }

        // Drop sender to signal completion to receiver
        // This is critical: dropping the sender causes receiver.recv() to return None,
        // allowing the background task to finish collecting any remaining buffered data
        drop(sender);

        // If we received an exit code, report it back
        if let Some(result) = result {
            Ok(result)
        // Otherwise, report an error
        } else {
            Err(super::Error::CommandDidntExit)
        }
    }

    /// Execute a remote command with sudo password support.
    ///
    /// This method handles automatic sudo password injection when sudo prompts are detected
    /// in the command output. It monitors both stdout and stderr for sudo password prompts
    /// and automatically sends the password when detected.
    ///
    /// # Arguments
    /// * `command` - The command to execute (typically starts with `sudo`)
    /// * `sender` - Channel sender for streaming output
    /// * `sudo_password` - The sudo password to inject when prompted
    ///
    /// # Returns
    /// The exit status of the command
    ///
    /// # Security
    /// - Password is only sent when a sudo prompt is detected
    /// - Password is never logged or included in error messages
    /// - Detects sudo authentication failures and reports them appropriately
    pub async fn execute_with_sudo(
        &self,
        command: &str,
        sender: Sender<CommandOutput>,
        sudo_password: &SudoPassword,
    ) -> Result<u32, super::Error> {
        // Sanitize command to prevent injection attacks
        let sanitized_command = crate::utils::sanitize_command(command)
            .map_err(|e| super::Error::CommandValidationFailed(e.to_string()))?;

        // Request a PTY for sudo to properly interact with
        // Sudo requires a PTY to prompt for password
        let mut channel = self.connection_handle.channel_open_session().await?;

        // Request PTY with reasonable defaults for sudo
        channel
            .request_pty(
                true,    // want reply
                "xterm", // term type
                80,      // columns
                24,      // rows
                0,       // pixel width
                0,       // pixel height
                &[],     // terminal modes (empty for defaults)
            )
            .await?;

        channel.exec(true, sanitized_command.as_str()).await?;

        let mut result: Option<u32> = None;
        let mut password_send_count: u32 = 0;
        let mut accumulated_output = String::new();

        // While the channel has messages...
        while let Some(msg) = channel.wait().await {
            match msg {
                russh::ChannelMsg::Data { ref data } => {
                    // Check for sudo prompt before sending to output
                    let text = String::from_utf8_lossy(data);
                    accumulated_output.push_str(&text);

                    // Enforce buffer size limit to prevent unbounded memory growth
                    if accumulated_output.len() > MAX_SUDO_PROMPT_BUFFER_SIZE {
                        // Keep only the last MAX_SUDO_PROMPT_BUFFER_SIZE bytes
                        // This ensures we can still detect sudo prompts at the end
                        let truncate_at = accumulated_output.len() - MAX_SUDO_PROMPT_BUFFER_SIZE;
                        accumulated_output = accumulated_output[truncate_at..].to_string();
                        tracing::debug!(
                            "Sudo prompt buffer exceeded limit, truncated to {} bytes",
                            MAX_SUDO_PROMPT_BUFFER_SIZE
                        );
                    }

                    // Send output to streaming channel
                    match sender.try_send(CommandOutput::StdOut(data.clone())) {
                        Ok(_) => {}
                        Err(tokio::sync::mpsc::error::TrySendError::Full(output)) => {
                            tracing::trace!("Channel full, applying backpressure for stdout");
                            if sender.send(output).await.is_err() {
                                tracing::debug!("Receiver dropped, stopping stdout processing");
                                break;
                            }
                        }
                        Err(tokio::sync::mpsc::error::TrySendError::Closed(_)) => {
                            tracing::debug!("Channel closed, stopping stdout processing");
                            break;
                        }
                    }

                    // Check if we need to send the password (supports multiple sudo prompts)
                    if password_send_count < MAX_SUDO_PASSWORD_SENDS
                        && contains_sudo_prompt(&accumulated_output)
                    {
                        password_send_count += 1;
                        tracing::debug!(
                            "Sudo prompt detected, sending password (attempt {}/{})",
                            password_send_count,
                            MAX_SUDO_PASSWORD_SENDS
                        );
                        // Send the password with newline
                        let password_data = sudo_password.with_newline();
                        if let Err(e) = channel.data(&password_data[..]).await {
                            tracing::error!("Failed to send sudo password: {}", e);
                            return Err(super::Error::SshError(e));
                        }
                        // Clear accumulated output after sending password to detect next prompt
                        accumulated_output.clear();
                    }

                    // Check for sudo failure after password was sent
                    if password_send_count > 0 && contains_sudo_failure(&accumulated_output) {
                        tracing::debug!(
                            "Sudo authentication failed after {} attempt(s), closing channel",
                            password_send_count
                        );
                        // Send error message to stderr so user can see why it failed
                        let error_msg = format!(
                            "\n[bssh] Sudo authentication failed after {} attempt(s). \
                             Please verify your sudo password is correct.\n",
                            password_send_count
                        );
                        let _ = sender
                            .send(CommandOutput::StdErr(Bytes::from(error_msg.into_bytes())))
                            .await;
                        // Send exit code 1 to indicate failure to the stream
                        let _ = sender.send(CommandOutput::ExitCode(1)).await;
                        // Close the channel and return failure exit code
                        let _ = channel.eof().await;
                        let _ = channel.close().await;
                        drop(sender);
                        // Return exit code 1 to indicate sudo authentication failure
                        return Ok(1);
                    }
                }
                russh::ChannelMsg::ExtendedData { ref data, ext } => {
                    if ext == 1 {
                        // Stderr - also check for sudo prompts
                        let text = String::from_utf8_lossy(data);
                        accumulated_output.push_str(&text);

                        // Enforce buffer size limit to prevent unbounded memory growth
                        if accumulated_output.len() > MAX_SUDO_PROMPT_BUFFER_SIZE {
                            // Keep only the last MAX_SUDO_PROMPT_BUFFER_SIZE bytes
                            let truncate_at =
                                accumulated_output.len() - MAX_SUDO_PROMPT_BUFFER_SIZE;
                            accumulated_output = accumulated_output[truncate_at..].to_string();
                            tracing::debug!(
                                "Sudo prompt buffer exceeded limit (stderr), truncated to {} bytes",
                                MAX_SUDO_PROMPT_BUFFER_SIZE
                            );
                        }

                        match sender.try_send(CommandOutput::StdErr(data.clone())) {
                            Ok(_) => {}
                            Err(tokio::sync::mpsc::error::TrySendError::Full(output)) => {
                                tracing::trace!("Channel full, applying backpressure for stderr");
                                if sender.send(output).await.is_err() {
                                    tracing::debug!("Receiver dropped, stopping stderr processing");
                                    break;
                                }
                            }
                            Err(tokio::sync::mpsc::error::TrySendError::Closed(_)) => {
                                tracing::debug!("Channel closed, stopping stderr processing");
                                break;
                            }
                        }

                        // Check if we need to send the password (sudo can prompt on stderr)
                        if password_send_count < MAX_SUDO_PASSWORD_SENDS
                            && contains_sudo_prompt(&accumulated_output)
                        {
                            password_send_count += 1;
                            tracing::debug!(
                                "Sudo prompt detected on stderr, sending password (attempt {}/{})",
                                password_send_count,
                                MAX_SUDO_PASSWORD_SENDS
                            );
                            let password_data = sudo_password.with_newline();
                            if let Err(e) = channel.data(&password_data[..]).await {
                                tracing::error!("Failed to send sudo password: {}", e);
                                return Err(super::Error::SshError(e));
                            }
                            accumulated_output.clear();
                        }

                        // Check for sudo failure
                        if password_send_count > 0 && contains_sudo_failure(&accumulated_output) {
                            tracing::debug!(
                                "Sudo authentication failed on stderr after {} attempt(s), closing channel",
                                password_send_count
                            );
                            // Send error message to stderr so user can see why it failed
                            let error_msg = format!(
                                "\n[bssh] Sudo authentication failed after {} attempt(s). \
                                 Please verify your sudo password is correct.\n",
                                password_send_count
                            );
                            let _ = sender
                                .send(CommandOutput::StdErr(Bytes::from(error_msg.into_bytes())))
                                .await;
                            // Send exit code 1 to indicate failure to the stream
                            let _ = sender.send(CommandOutput::ExitCode(1)).await;
                            // Close the channel and return failure exit code
                            let _ = channel.eof().await;
                            let _ = channel.close().await;
                            drop(sender);
                            return Ok(1);
                        }
                    }
                }
                russh::ChannelMsg::ExitStatus { exit_status } => result = Some(exit_status),
                _ => {}
            }
        }

        drop(sender);

        if let Some(result) = result {
            Ok(result)
        } else {
            Err(super::Error::CommandDidntExit)
        }
    }

    /// Execute a remote command via the ssh connection.
    ///
    /// Returns stdout, stderr and the exit code of the command,
    /// packaged in a [`CommandExecutedResult`] struct.
    /// If you need the stderr output interleaved within stdout, you should postfix the command with a redirection,
    /// e.g. `echo foo 2>&1`.
    /// If you dont want any output at all, use something like `echo foo >/dev/null 2>&1`.
    ///
    /// Make sure your commands don't read from stdin and exit after bounded time.
    ///
    /// Can be called multiple times, but every invocation is a new shell context.
    /// Thus `cd`, setting variables and alike have no effect on future invocations.
    pub async fn execute(&self, command: &str) -> Result<CommandExecutedResult, super::Error> {
        // Use streaming internally but collect all output
        let output_buffer = CommandOutputBuffer::new();
        let sender = output_buffer.sender.clone();

        // Execute with streaming
        let exit_status = self.execute_streaming(command, sender).await?;

        // CRITICAL: Drop the original sender to signal completion to the receiver task
        // execute_streaming() only drops the clone, but the receiver task waits for
        // ALL senders to be dropped before finishing. Without this, receiver.recv()
        // will hang forever waiting for more data.
        drop(output_buffer.sender);

        // Wait for all output to be collected
        // Handle both JoinError (task panic) and potential collection errors
        let (stdout_bytes, stderr_bytes) = output_buffer.receiver_task.await.map_err(|e| {
            // JoinError occurs if the task panicked or was cancelled
            // Convert to a more informative error
            super::Error::JoinError(e)
        })?;

        Ok(CommandExecutedResult {
            stdout: String::from_utf8_lossy(&stdout_bytes).to_string(),
            stderr: String::from_utf8_lossy(&stderr_bytes).to_string(),
            exit_status,
        })
    }

    /// Request an interactive shell channel.
    ///
    /// This method opens a new SSH channel suitable for interactive shell sessions.
    /// Note: This method no longer requests PTY directly. The PTY should be requested
    /// by the caller (e.g., PtySession) with appropriate terminal modes.
    ///
    /// # Arguments
    /// * `_term_type` - Terminal type (unused, kept for API compatibility)
    /// * `_width` - Terminal width (unused, kept for API compatibility)
    /// * `_height` - Terminal height (unused, kept for API compatibility)
    ///
    /// # Returns
    /// A `Channel` that can be used for bidirectional communication with the remote shell.
    ///
    /// # Note
    /// The caller is responsible for:
    /// 1. Requesting PTY with proper terminal modes via `channel.request_pty()`
    /// 2. Requesting shell via `channel.request_shell()`
    ///
    /// This change fixes issue #40: PTY should be requested once with proper terminal
    /// modes by PtySession::initialize() rather than twice with empty modes.
    pub async fn request_interactive_shell(
        &self,
        _term_type: &str,
        _width: u32,
        _height: u32,
    ) -> Result<Channel<Msg>, super::Error> {
        // Open a session channel - PTY and shell will be requested by the caller
        // (e.g., PtySession::initialize() with proper terminal modes)
        let channel = self.connection_handle.channel_open_session().await?;
        Ok(channel)
    }

    /// Request window size change for an existing PTY channel.
    ///
    /// This should be called when the local terminal is resized to update
    /// the remote PTY dimensions.
    pub async fn resize_pty(
        &self,
        channel: &mut Channel<Msg>,
        width: u32,
        height: u32,
    ) -> Result<(), super::Error> {
        channel
            .window_change(width, height, 0, 0)
            .await
            .map_err(super::Error::SshError)
    }
}