bssh 2.1.2

Parallel SSH command execution tool for cluster management
Documentation
# Upstream PR Proposal: Fix Handle::data() messages not processed from spawned tasks

## Issue Summary

When implementing an SSH server with PTY support, messages sent via `Handle::data()` from spawned tasks may not be delivered to the client. This occurs because the server session loop's `tokio::select!` may not wake up for messages sent through the mpsc channel from external tasks.

## Reproduction Scenario

```rust
// In Handler::shell_request()
fn shell_request(&mut self, channel: ChannelId, session: &mut Session) -> bool {
    let handle = session.handle();

    // Spawn a task to handle shell I/O
    tokio::spawn(async move {
        loop {
            // Read from PTY
            let data = pty.read().await;

            // Send to client - THIS MAY NOT BE DELIVERED
            handle.data(channel, data.into()).await;
        }
    });

    true
}
```

The `handle.data()` call sends a message through an mpsc channel to the session loop. However, the session loop's `select!` macro may be waiting on other futures (socket read, timers) and doesn't always wake up promptly for channel messages.

## Root Cause

In `server/session.rs`, the main loop uses `tokio::select!`:

```rust
while !self.common.disconnected {
    tokio::select! {
        r = &mut reading => { /* handle socket read */ }
        _ = &mut delay => { /* handle keepalive */ }
        msg = self.receiver.recv(), if !self.kex.active() => {
            // Handle messages from Handle
        }
    }
}
```

When the socket read future is pending and no keepalive is due, the `select!` should wake on `receiver.recv()`. However, in practice, messages can accumulate without being processed, especially under load or when the shell produces rapid output.

## Proposed Fix

Add a `try_recv()` loop before entering `select!` to drain pending messages, with a batch limit to ensure client input responsiveness:

```rust
const MAX_MESSAGES_PER_BATCH: usize = 64;

while !self.common.disconnected {
    // Process pending messages before entering select!
    // Limit batch size to ensure client input (e.g., Ctrl+C) is handled promptly
    let mut processed_count = 0usize;
    if !self.kex.active() {
        loop {
            if processed_count >= MAX_MESSAGES_PER_BATCH {
                break;  // Yield to select! to check for client input
            }
            match self.receiver.try_recv() {
                Ok(msg) => {
                    self.handle_msg(msg)?;
                    processed_count += 1;
                }
                Err(TryRecvError::Empty) => break,
                Err(TryRecvError::Disconnected) => break,
            }
        }
        if processed_count > 0 {
            self.flush()?;
        }
    }

    tokio::select! {
        // ... existing select arms
    }
}
```

### Why batch limiting?

Without a limit, during high-throughput output (e.g., `yes` command), all pending messages would be processed before checking for client input. This could delay Ctrl+C handling significantly. The batch limit (64 messages) balances throughput with input responsiveness.

## Why This Fix is Safe

1. **No behavior change for existing code**: If there are no pending messages, `try_recv()` returns `Empty` immediately and proceeds to `select!` as before.

2. **Respects KEX state**: The fix only processes messages when `!self.kex.active()`, same as the existing `select!` arm condition.

3. **Maintains message ordering**: Messages are processed in FIFO order from the same channel.

4. **No performance impact**: `try_recv()` is non-blocking and O(1).

5. **Preserves input responsiveness**: The batch limit ensures client input (signals, keystrokes) is checked every 64 messages, preventing input starvation during high-throughput output.

## Use Case

This fix is essential for implementing SSH servers with:
- Interactive PTY sessions (shell, vim, etc.)
- High-throughput data streaming
- Any scenario where `Handle::data()` is called from spawned tasks

## Diff

```diff
--- a/russh/src/server/session.rs
+++ b/russh/src/server/session.rs
@@ -7,7 +7,7 @@ use std::sync::Arc;
 use log::debug;
 use negotiation::parse_kex_algo_list;
 use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt};
-use tokio::sync::mpsc::{channel, Receiver, Sender};
+use tokio::sync::mpsc::{channel, error::TryRecvError, Receiver, Sender};
 use tokio::sync::oneshot;

 // ... in Session::run() method, before the select! loop:
+
+            // Process pending messages before entering select!
+            // This ensures messages sent via Handle::data() from spawned tasks
+            // are processed even when select! doesn't wake up for them.
+            if !self.kex.active() {
+                loop {
+                    match self.receiver.try_recv() {
+                        Ok(Msg::Channel(id, ChannelMsg::Data { data })) => {
+                            self.data(id, data)?;
+                        }
+                        // ... handle other message types ...
+                        Err(TryRecvError::Empty) => break,
+                        Err(TryRecvError::Disconnected) => break,
+                    }
+                }
+                self.flush()?;
+            }
+
             tokio::select! {
```

## Testing

Tested with:
- Interactive shell sessions (bash, zsh)
- Rapid output commands (`yes`, `cat /dev/urandom | xxd`)
- Multiple concurrent PTY sessions
- Long-running sessions with intermittent output

## Related

This issue may also affect `client/session.rs` if similar patterns are used, though the client side typically doesn't have spawned tasks sending data in the same way.