Skip to main content

fresh/services/remote/
connection.rs

1//! SSH connection management
2//!
3//! Handles spawning SSH process and bootstrapping the Python agent.
4
5use crate::services::process_hidden::HideWindow;
6use crate::services::remote::channel::AgentChannel;
7use crate::services::remote::protocol::AgentResponse;
8use crate::services::remote::AGENT_SOURCE;
9use std::path::PathBuf;
10use std::process::Stdio;
11use tokio::io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufReader};
12use tokio::process::{Child, ChildStderr, Command};
13
14/// Error type for SSH connection
15#[derive(Debug, thiserror::Error)]
16pub enum SshError {
17    #[error("Failed to spawn SSH process ({0}). Is the `ssh` command installed and in your PATH?")]
18    SpawnFailed(#[from] std::io::Error),
19
20    #[error("Agent failed to start: {0}")]
21    AgentStartFailed(String),
22
23    #[error("Protocol version mismatch: expected {expected}, got {got}")]
24    VersionMismatch { expected: u32, got: u32 },
25
26    #[error("Connection closed")]
27    ConnectionClosed,
28
29    #[error("Authentication failed")]
30    AuthenticationFailed,
31}
32
33/// SSH connection parameters
34#[derive(Debug, Clone)]
35pub struct ConnectionParams {
36    /// SSH login user. `None` lets ssh pick the user (its config / the current
37    /// local user), so `host` and `ssh://host` work without a `user@`.
38    pub user: Option<String>,
39    pub host: String,
40    pub port: Option<u16>,
41    pub identity_file: Option<PathBuf>,
42    /// Extra `ssh` arguments inserted verbatim before the target on every ssh
43    /// invocation (agent connect, reconnect, interactive terminal, LSP/probe
44    /// spawns), so options like `-J jump` or `-o ProxyCommand=…` apply end to
45    /// end rather than only to the initial connect.
46    pub extra_args: Vec<String>,
47}
48
49impl ConnectionParams {
50    /// Parse a connection string like `host`, `user@host`, or `user@host:port`
51    /// (a leading `ssh://` is tolerated). The user is optional.
52    pub fn parse(s: &str) -> Option<Self> {
53        let s = s.strip_prefix("ssh://").unwrap_or(s);
54        let (user_host, port) = if let Some((uh, p)) = s.rsplit_once(':') {
55            if let Ok(port) = p.parse::<u16>() {
56                (uh, Some(port))
57            } else {
58                (s, None)
59            }
60        } else {
61            (s, None)
62        };
63
64        let (user, host) = match user_host.split_once('@') {
65            Some((u, h)) => (Some(u.to_string()), h),
66            None => (None, user_host),
67        };
68        if host.is_empty() || user.as_deref() == Some("") {
69            return None;
70        }
71
72        Some(Self {
73            user,
74            host: host.to_string(),
75            port,
76            identity_file: None,
77            extra_args: Vec::new(),
78        })
79    }
80
81    /// The ssh target argument: `user@host` when a user is set, else bare
82    /// `host` (ssh then resolves the user itself).
83    pub fn ssh_target(&self) -> String {
84        match &self.user {
85            Some(user) if !user.is_empty() => format!("{user}@{}", self.host),
86            _ => self.host.clone(),
87        }
88    }
89}
90
91impl std::fmt::Display for ConnectionParams {
92    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
93        match self.port {
94            Some(port) => write!(f, "{}:{}", self.ssh_target(), port),
95            None => write!(f, "{}", self.ssh_target()),
96        }
97    }
98}
99
100/// Active SSH connection with bootstrapped agent
101pub struct SshConnection {
102    /// SSH child process
103    process: Child,
104    /// Communication channel with agent (wrapped in Arc for sharing)
105    channel: std::sync::Arc<AgentChannel>,
106    /// Connection parameters
107    params: ConnectionParams,
108}
109
110impl SshConnection {
111    /// Establish a new SSH connection and bootstrap the agent
112    pub async fn connect(params: ConnectionParams) -> Result<Self, SshError> {
113        let mut cmd = Command::new("ssh");
114
115        // Don't check host key strictly for ease of use
116        cmd.arg("-o").arg("StrictHostKeyChecking=accept-new");
117
118        if let Some(port) = params.port {
119            cmd.arg("-p").arg(port.to_string());
120        }
121
122        if let Some(ref identity) = params.identity_file {
123            cmd.arg("-i").arg(identity);
124        }
125
126        cmd.args(&params.extra_args);
127        cmd.arg(params.ssh_target());
128
129        // Bootstrap the agent using Python itself to read the exact byte count.
130        // This avoids requiring bash or other shell utilities on the remote.
131        // Python reads exactly N bytes (the agent code), execs it, and the agent
132        // then continues reading from stdin for protocol messages.
133        //
134        // Note: SSH passes the remote command through a shell, so we need to
135        // properly quote the Python code. We use double quotes for the outer
136        // shell and avoid problematic characters in the Python code.
137        let agent_len = AGENT_SOURCE.len();
138        let bootstrap = format!(
139            "python3 -u -c \"import sys;exec(sys.stdin.read({}))\"",
140            agent_len
141        );
142        cmd.arg(bootstrap);
143
144        cmd.stdin(Stdio::piped());
145        cmd.stdout(Stdio::piped());
146        // Capture ssh's stderr instead of inheriting it. The editor runs a
147        // full-screen ratatui UI on the alternate screen; an inherited stderr
148        // lets ssh scribble its diagnostics ("Could not resolve hostname …")
149        // straight over the rendered UI. ratatui has no idea those cells
150        // changed, so the garbage persists until the next full repaint — the
151        // "corrupted window" users see after a bad host. We pipe stderr and
152        // fold its message into the connection error instead (see
153        // `ssh_eof_error`), so a failed connect becomes a clean status line.
154        cmd.stderr(Stdio::piped());
155        // Kill the ssh process if this connect future is dropped before it
156        // finishes (e.g. the New-Session dialog's Cancel aborts the connect
157        // task while the handshake is still hanging). Without this a hung
158        // connect would orphan the ssh child until it timed out on its own.
159        // For an established carrier `SshConnection`'s Drop also kills it; this
160        // covers the window before the connection object exists.
161        cmd.kill_on_drop(true);
162        cmd.hide_window();
163
164        let mut child = cmd.spawn()?;
165
166        // Get handles
167        let mut stdin = child
168            .stdin
169            .take()
170            .ok_or_else(|| SshError::AgentStartFailed("failed to get stdin".to_string()))?;
171        let stdout = child
172            .stdout
173            .take()
174            .ok_or_else(|| SshError::AgentStartFailed("failed to get stdout".to_string()))?;
175        let stderr = child.stderr.take();
176
177        // Send the agent code (exact byte count). If the carrier already died
178        // (a failed connect — e.g. the host was unreachable), this write/flush
179        // races the child's exit and can fail with a broken pipe. That pipe
180        // error isn't the actionable reason; the carrier's own stderr is. Fall
181        // through to the same EOF path so we surface "ssh: …" rather than a bare
182        // `SpawnFailed`, regardless of which side loses the race.
183        if stdin.write_all(AGENT_SOURCE.as_bytes()).await.is_err() || stdin.flush().await.is_err() {
184            return Err(ssh_eof_error(&mut child, &params, stderr).await);
185        }
186
187        // Create buffered reader for stdout
188        let mut reader = BufReader::new(stdout);
189
190        // Wait for ready message from agent
191        // No timeout needed - all failure modes (auth failure, network issues, etc.)
192        // result in SSH exiting and us getting EOF. User can Ctrl+C if needed.
193        let mut ready_line = String::new();
194        match reader.read_line(&mut ready_line).await {
195            Ok(0) => {
196                return Err(ssh_eof_error(&mut child, &params, stderr).await);
197            }
198            Ok(_) => {}
199            Err(e) => return Err(SshError::AgentStartFailed(format!("read error: {}", e))),
200        }
201
202        // Connected. Drain ssh's stderr for the life of the connection so the
203        // occasional later diagnostic (host-key warnings, etc.) is discarded
204        // rather than filling the pipe or — if we'd inherited it — landing on
205        // the editor's screen.
206        if let Some(mut stderr) = stderr {
207            tokio::spawn(async move {
208                let mut sink = tokio::io::sink();
209                // Best-effort drain; the byte count / EOF error is irrelevant
210                // since we're discarding ssh's stderr for the session.
211                #[allow(clippy::let_underscore_must_use)]
212                let _ = tokio::io::copy(&mut stderr, &mut sink).await;
213            });
214        }
215
216        let ready: AgentResponse = serde_json::from_str(&ready_line).map_err(|e| {
217            SshError::AgentStartFailed(format!(
218                "invalid ready message '{}': {}",
219                ready_line.trim(),
220                e
221            ))
222        })?;
223
224        if !ready.is_ready() {
225            return Err(SshError::AgentStartFailed(
226                "agent did not send ready message".to_string(),
227            ));
228        }
229
230        // Check protocol version
231        let version = ready.version.unwrap_or(0);
232        if version != crate::services::remote::protocol::PROTOCOL_VERSION {
233            return Err(SshError::VersionMismatch {
234                expected: crate::services::remote::protocol::PROTOCOL_VERSION,
235                got: version,
236            });
237        }
238
239        // Create channel (takes ownership of stdin for writing)
240        let channel = std::sync::Arc::new(AgentChannel::new(reader, stdin));
241
242        Ok(Self {
243            process: child,
244            channel,
245            params,
246        })
247    }
248
249    /// Get the communication channel as an Arc for sharing
250    pub fn channel(&self) -> std::sync::Arc<AgentChannel> {
251        self.channel.clone()
252    }
253
254    /// Get connection parameters
255    pub fn params(&self) -> &ConnectionParams {
256        &self.params
257    }
258
259    /// Check if the connection is still alive
260    pub fn is_connected(&self) -> bool {
261        self.channel.is_connected()
262    }
263
264    /// Get the connection string for display
265    pub fn connection_string(&self) -> String {
266        self.params.to_string()
267    }
268}
269
270impl Drop for SshConnection {
271    fn drop(&mut self) {
272        // Best-effort kill of the SSH process during cleanup.
273        // If it fails (process already exited, permission error, etc.)
274        // there's nothing we can do in a Drop impl — the OS will clean
275        // up the zombie when our process exits.
276        if let Ok(()) = self.process.start_kill() {}
277    }
278}
279
280/// Default interval between reconnection attempts.
281const DEFAULT_RECONNECT_INTERVAL: std::time::Duration = std::time::Duration::from_secs(5);
282
283/// Configuration for the reconnect task.
284pub struct ReconnectConfig {
285    /// How long to wait between reconnection attempts.
286    pub interval: std::time::Duration,
287}
288
289impl Default for ReconnectConfig {
290    fn default() -> Self {
291        Self {
292            interval: DEFAULT_RECONNECT_INTERVAL,
293        }
294    }
295}
296
297/// Spawn a background task that automatically reconnects when the channel
298/// disconnects.
299///
300/// The task monitors `channel.is_connected()` and, when false, attempts to
301/// establish a new SSH connection using the given `params`. On success, it
302/// calls `channel.replace_transport()` to hot-swap the underlying reader/writer.
303///
304/// The task runs until the channel is dropped (write_tx closed) or the
305/// returned `tokio::task::JoinHandle` is aborted.
306pub fn spawn_reconnect_task(
307    channel: std::sync::Arc<AgentChannel>,
308    params: ConnectionParams,
309) -> tokio::task::JoinHandle<()> {
310    let connect_fn = move || {
311        let params = params.clone();
312        async move {
313            let (reader, writer, _child) = establish_ssh_transport(&params).await?;
314            // Box the reader/writer so they have a uniform type
315            let reader: Box<dyn tokio::io::AsyncBufRead + Unpin + Send> = Box::new(reader);
316            let writer: Box<dyn tokio::io::AsyncWrite + Unpin + Send> = Box::new(writer);
317            Ok::<_, SshError>((reader, writer))
318        }
319    };
320
321    spawn_reconnect_task_with(
322        channel,
323        connect_fn,
324        ReconnectConfig::default(),
325        "SSH remote",
326    )
327}
328
329/// Spawn a reconnect task with a custom connection factory.
330///
331/// This is the generic version used by both production (via `spawn_reconnect_task`)
332/// and tests (with a fake connection factory). The `connect_fn` is called each
333/// time a reconnection attempt is made. It should return a `(reader, writer)` pair
334/// on success.
335pub fn spawn_reconnect_task_with<F, Fut>(
336    channel: std::sync::Arc<AgentChannel>,
337    connect_fn: F,
338    config: ReconnectConfig,
339    label: &'static str,
340) -> tokio::task::JoinHandle<()>
341where
342    F: Fn() -> Fut + Send + 'static,
343    Fut: std::future::Future<
344            Output = Result<
345                (
346                    Box<dyn tokio::io::AsyncBufRead + Unpin + Send>,
347                    Box<dyn tokio::io::AsyncWrite + Unpin + Send>,
348                ),
349                SshError,
350            >,
351        > + Send,
352{
353    tokio::spawn(async move {
354        loop {
355            // Wait until disconnected
356            while channel.is_connected() {
357                tokio::time::sleep(config.interval).await;
358            }
359
360            tracing::info!("{label}: connection lost, attempting reconnection...");
361
362            // Retry loop
363            loop {
364                tokio::time::sleep(config.interval).await;
365
366                // Check if channel was dropped (write_tx gone)
367                if !channel.is_connected() {
368                    // Still disconnected — try to reconnect
369                } else {
370                    // Something else reconnected us (e.g., manual replace_transport)
371                    break;
372                }
373
374                match (connect_fn)().await {
375                    Ok((reader, writer)) => {
376                        tracing::info!("{label}: reconnected successfully");
377                        channel.replace_transport(reader, writer).await;
378                        break;
379                    }
380                    Err(e) => {
381                        tracing::debug!("{label}: reconnection attempt failed: {e}");
382                    }
383                }
384            }
385        }
386    })
387}
388
389/// Default heartbeat interval. Comfortably under the smallest common
390/// load-balancer / NAT idle timeout (~5 min) so an otherwise-idle agent
391/// stream keeps generating traffic and isn't silently dropped.
392pub const DEFAULT_HEARTBEAT_INTERVAL: std::time::Duration = std::time::Duration::from_secs(60);
393
394/// Spawn a background task that pings the agent periodically so an idle
395/// connection's stream keeps producing traffic.
396///
397/// Long-lived agent streams that sit idle (no edits, no LSP chatter) get
398/// silently dropped by ELB / NAT idle timers after a few minutes — the
399/// client never sees a FIN, so the *next* request just hangs until it
400/// times out and the UI appears frozen. A cheap periodic `info` request
401/// keeps the NAT state-table entry warm. Shared by every agent transport
402/// (SSH and `kubectl exec` alike); `info` is already handled by every
403/// agent version, so no protocol bump is needed.
404///
405/// Holds only a `Weak` reference, so the task terminates on its own once
406/// the last owner of the channel is dropped — no JoinHandle bookkeeping
407/// is required to avoid a leak (callers may still `abort()` it to stop
408/// pinging immediately when the carrier dies). Pinging while disconnected
409/// is skipped; the reconnect task owns re-establishment.
410pub fn spawn_heartbeat_task(
411    channel: &std::sync::Arc<AgentChannel>,
412    interval: std::time::Duration,
413) -> tokio::task::JoinHandle<()> {
414    let weak = std::sync::Arc::downgrade(channel);
415    tokio::spawn(async move {
416        loop {
417            tokio::time::sleep(interval).await;
418            let Some(channel) = weak.upgrade() else {
419                break;
420            };
421            if channel.is_connected() {
422                // Outcome ignored on purpose: a failed/timed-out ping
423                // already marks the channel disconnected (see `request`),
424                // and the reconnect task owns recovery from there. Bound
425                // to a named `_` to satisfy `deny(let_underscore_must_use)`.
426                let _ping = channel.request("info", serde_json::json!({})).await;
427            }
428        }
429    })
430}
431
432/// Establish a new SSH connection and return the raw transport + child process.
433///
434/// Build a descriptive error when the SSH process closes stdout (EOF) without
435/// sending a ready message. We wait for the SSH process to exit and inspect its
436/// exit code to give the user a more actionable message than a generic
437/// "connection closed".
438async fn ssh_eof_error(
439    child: &mut Child,
440    params: &ConnectionParams,
441    stderr: Option<ChildStderr>,
442) -> SshError {
443    // Give SSH a moment to finish so we can read its exit code.
444    let status = tokio::time::timeout(std::time::Duration::from_secs(5), child.wait()).await;
445
446    let hint = match status {
447        Ok(Ok(status)) => {
448            match status.code() {
449                // 255 is SSH's conventional exit code for connection errors
450                // (host unreachable, connection refused, DNS failure, auth
451                // failure, etc.).
452                Some(255) => format!(
453                    "SSH could not connect to {}. Check that the host is \
454                     reachable, the hostname is correct, and your SSH \
455                     credentials are valid (exit code 255)",
456                    params
457                ),
458                // 127 is the shell's "command not found" — for our bootstrap
459                // that means `python3` is missing on the remote. Fresh's remote
460                // backend (agent + the integrated terminal's env launcher) runs
461                // on python3, so name the requirement and the fix plainly.
462                Some(127) => format!(
463                    "Python 3 was not found on the remote host {}. \
464                     Fresh's remote support requires python3 on the remote — \
465                     install it there, then reconnect",
466                    params
467                ),
468                Some(code) => format!(
469                    "SSH process exited with code {} while connecting to {}",
470                    code, params
471                ),
472                None => format!(
473                    "SSH process was killed by a signal while connecting to {}",
474                    params
475                ),
476            }
477        }
478        Ok(Err(e)) => format!("failed to get SSH exit status: {}", e),
479        Err(_) => {
480            // Timed out waiting for exit — kill it so we don't leak.
481            if let Err(e) = child.start_kill() {
482                tracing::warn!("Failed to kill timed-out SSH process: {}", e);
483            }
484            format!(
485                "SSH process did not exit in time while connecting to {}",
486                params
487            )
488        }
489    };
490
491    // ssh writes the actionable reason ("Could not resolve hostname",
492    // "Permission denied", "Connection refused", …) to stderr. We piped it
493    // (rather than letting it corrupt the editor's screen), so fold the most
494    // specific line into the error for the status bar.
495    match read_ssh_stderr(stderr).await {
496        Some(detail) => SshError::AgentStartFailed(format!("{hint}: {detail}")),
497        None => SshError::AgentStartFailed(hint),
498    }
499}
500
501/// Read whatever a failed ssh process wrote to stderr and return its most
502/// specific (last non-empty) line. ssh has closed stdout by the time we call
503/// this and is exiting, so the read is bounded; we still cap the wait so a
504/// wedged pipe can't hang the error path.
505async fn read_ssh_stderr(stderr: Option<ChildStderr>) -> Option<String> {
506    let mut stderr = stderr?;
507    let mut buf = String::new();
508    #[allow(clippy::let_underscore_must_use)]
509    let _ = tokio::time::timeout(
510        std::time::Duration::from_secs(2),
511        stderr.read_to_string(&mut buf),
512    )
513    .await;
514    buf.trim()
515        .lines()
516        .map(str::trim)
517        .filter(|line| !line.is_empty())
518        .next_back()
519        .map(str::to_string)
520}
521
522/// This is the lower-level function used by both `SshConnection::connect` and
523/// the reconnect task. It spawns an SSH process, bootstraps the Python agent,
524/// and returns the reader/writer pair ready for use with `AgentChannel`.
525async fn establish_ssh_transport(
526    params: &ConnectionParams,
527) -> Result<
528    (
529        BufReader<tokio::process::ChildStdout>,
530        tokio::process::ChildStdin,
531        Child,
532    ),
533    SshError,
534> {
535    let mut cmd = Command::new("ssh");
536
537    cmd.arg("-o").arg("StrictHostKeyChecking=accept-new");
538    // Disable password prompts for reconnection (non-interactive)
539    cmd.arg("-o").arg("BatchMode=yes");
540
541    if let Some(port) = params.port {
542        cmd.arg("-p").arg(port.to_string());
543    }
544
545    if let Some(ref identity) = params.identity_file {
546        cmd.arg("-i").arg(identity);
547    }
548
549    cmd.args(&params.extra_args);
550    cmd.arg(params.ssh_target());
551
552    let agent_len = AGENT_SOURCE.len();
553    let bootstrap = format!(
554        "python3 -u -c \"import sys;exec(sys.stdin.read({}))\"",
555        agent_len
556    );
557    cmd.arg(bootstrap);
558
559    cmd.stdin(Stdio::piped());
560    cmd.stdout(Stdio::piped());
561    cmd.stderr(Stdio::null()); // No terminal for reconnection
562    cmd.hide_window();
563
564    let mut child = cmd.spawn()?;
565
566    let mut stdin = child
567        .stdin
568        .take()
569        .ok_or_else(|| SshError::AgentStartFailed("failed to get stdin".to_string()))?;
570    let stdout = child
571        .stdout
572        .take()
573        .ok_or_else(|| SshError::AgentStartFailed("failed to get stdout".to_string()))?;
574
575    // Send the agent code
576    stdin.write_all(AGENT_SOURCE.as_bytes()).await?;
577    stdin.flush().await?;
578
579    let mut reader = BufReader::new(stdout);
580
581    // Wait for ready message
582    let mut ready_line = String::new();
583    match reader.read_line(&mut ready_line).await {
584        Ok(0) => {
585            // Reconnect spawns with `stderr(Stdio::null())`, so there is no
586            // captured stderr to attach here.
587            return Err(ssh_eof_error(&mut child, params, None).await);
588        }
589        Ok(_) => {}
590        Err(e) => return Err(SshError::AgentStartFailed(format!("read error: {}", e))),
591    }
592
593    let ready: AgentResponse = serde_json::from_str(&ready_line).map_err(|e| {
594        SshError::AgentStartFailed(format!(
595            "invalid ready message '{}': {}",
596            ready_line.trim(),
597            e
598        ))
599    })?;
600
601    if !ready.is_ready() {
602        return Err(SshError::AgentStartFailed(
603            "agent did not send ready message".to_string(),
604        ));
605    }
606
607    let version = ready.version.unwrap_or(0);
608    if version != crate::services::remote::protocol::PROTOCOL_VERSION {
609        return Err(SshError::VersionMismatch {
610            expected: crate::services::remote::protocol::PROTOCOL_VERSION,
611            got: version,
612        });
613    }
614
615    Ok((reader, stdin, child))
616}
617
618/// Spawn a local agent process for testing (no SSH)
619///
620/// This is used by integration tests to test the full stack without SSH.
621/// Not intended for production use.
622#[doc(hidden)]
623pub async fn spawn_local_agent() -> Result<std::sync::Arc<AgentChannel>, SshError> {
624    use tokio::process::Command as TokioCommand;
625
626    let mut child = TokioCommand::new("python3")
627        .arg("-u")
628        .arg("-c")
629        .arg(AGENT_SOURCE)
630        .stdin(Stdio::piped())
631        .stdout(Stdio::piped())
632        .stderr(Stdio::piped())
633        .hide_window()
634        .spawn()?;
635
636    let stdin = child
637        .stdin
638        .take()
639        .ok_or_else(|| SshError::AgentStartFailed("failed to get stdin".to_string()))?;
640    let stdout = child
641        .stdout
642        .take()
643        .ok_or_else(|| SshError::AgentStartFailed("failed to get stdout".to_string()))?;
644
645    let mut reader = BufReader::new(stdout);
646
647    // Wait for ready message
648    let mut ready_line = String::new();
649    reader.read_line(&mut ready_line).await?;
650
651    let ready: AgentResponse = serde_json::from_str(&ready_line)
652        .map_err(|e| SshError::AgentStartFailed(format!("invalid ready message: {}", e)))?;
653
654    if !ready.is_ready() {
655        return Err(SshError::AgentStartFailed(
656            "agent did not send ready message".to_string(),
657        ));
658    }
659
660    Ok(std::sync::Arc::new(AgentChannel::new(reader, stdin)))
661}
662
663/// Spawn a local Python agent with a custom data channel capacity.
664///
665/// Same as `spawn_local_agent` but allows overriding the channel capacity
666/// for stress-testing backpressure handling.
667#[doc(hidden)]
668pub async fn spawn_local_agent_with_capacity(
669    data_channel_capacity: usize,
670) -> Result<std::sync::Arc<AgentChannel>, SshError> {
671    use tokio::process::Command as TokioCommand;
672
673    let mut child = TokioCommand::new("python3")
674        .arg("-u")
675        .arg("-c")
676        .arg(AGENT_SOURCE)
677        .stdin(Stdio::piped())
678        .stdout(Stdio::piped())
679        .stderr(Stdio::piped())
680        .hide_window()
681        .spawn()?;
682
683    let stdin = child
684        .stdin
685        .take()
686        .ok_or_else(|| SshError::AgentStartFailed("failed to get stdin".to_string()))?;
687    let stdout = child
688        .stdout
689        .take()
690        .ok_or_else(|| SshError::AgentStartFailed("failed to get stdout".to_string()))?;
691
692    let mut reader = BufReader::new(stdout);
693
694    // Wait for ready message
695    let mut ready_line = String::new();
696    reader.read_line(&mut ready_line).await?;
697
698    let ready: AgentResponse = serde_json::from_str(&ready_line)
699        .map_err(|e| SshError::AgentStartFailed(format!("invalid ready message: {}", e)))?;
700
701    if !ready.is_ready() {
702        return Err(SshError::AgentStartFailed(
703            "agent did not send ready message".to_string(),
704        ));
705    }
706
707    Ok(std::sync::Arc::new(AgentChannel::with_capacity(
708        reader,
709        stdin,
710        data_channel_capacity,
711    )))
712}
713
714/// Spawn a local Python agent and return the raw reader/writer transport.
715///
716/// Unlike `spawn_local_agent`, this does NOT create an `AgentChannel`. It
717/// returns the ready-to-use reader and writer so callers can feed them to
718/// `AgentChannel::replace_transport()` for reconnection testing.
719#[doc(hidden)]
720pub async fn spawn_local_agent_transport() -> Result<
721    (
722        tokio::io::BufReader<tokio::process::ChildStdout>,
723        tokio::process::ChildStdin,
724    ),
725    SshError,
726> {
727    use tokio::process::Command as TokioCommand;
728
729    let mut child = TokioCommand::new("python3")
730        .arg("-u")
731        .arg("-c")
732        .arg(AGENT_SOURCE)
733        .stdin(Stdio::piped())
734        .stdout(Stdio::piped())
735        .stderr(Stdio::piped())
736        .hide_window()
737        .spawn()?;
738
739    let stdin = child
740        .stdin
741        .take()
742        .ok_or_else(|| SshError::AgentStartFailed("failed to get stdin".to_string()))?;
743    let stdout = child
744        .stdout
745        .take()
746        .ok_or_else(|| SshError::AgentStartFailed("failed to get stdout".to_string()))?;
747
748    let mut reader = BufReader::new(stdout);
749
750    // Wait for ready message
751    let mut ready_line = String::new();
752    reader.read_line(&mut ready_line).await?;
753
754    let ready: AgentResponse = serde_json::from_str(&ready_line)
755        .map_err(|e| SshError::AgentStartFailed(format!("invalid ready message: {}", e)))?;
756
757    if !ready.is_ready() {
758        return Err(SshError::AgentStartFailed(
759            "agent did not send ready message".to_string(),
760        ));
761    }
762
763    Ok((reader, stdin))
764}
765
766#[cfg(test)]
767mod tests {
768    use super::*;
769
770    #[test]
771    fn test_parse_connection_params() {
772        let params = ConnectionParams::parse("user@host").unwrap();
773        assert_eq!(params.user.as_deref(), Some("user"));
774        assert_eq!(params.host, "host");
775        assert_eq!(params.port, None);
776
777        let params = ConnectionParams::parse("user@host:22").unwrap();
778        assert_eq!(params.user.as_deref(), Some("user"));
779        assert_eq!(params.host, "host");
780        assert_eq!(params.port, Some(22));
781
782        // User is optional: bare host and ssh:// both parse, user = None.
783        let params = ConnectionParams::parse("hostonly").unwrap();
784        assert_eq!(params.user, None);
785        assert_eq!(params.host, "hostonly");
786        assert_eq!(params.ssh_target(), "hostonly");
787
788        let params = ConnectionParams::parse("ssh://example.com:2222").unwrap();
789        assert_eq!(params.user, None);
790        assert_eq!(params.host, "example.com");
791        assert_eq!(params.port, Some(2222));
792
793        // Empty user / empty host are still rejected.
794        assert!(ConnectionParams::parse("@host").is_none());
795        assert!(ConnectionParams::parse("user@").is_none());
796    }
797
798    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
799    async fn heartbeat_keeps_channel_warm_and_exits_on_drop() {
800        // Real agent over local stdio — no SSH/kubectl, same channel.
801        let channel = spawn_local_agent().await.expect("spawn local agent");
802        let handle = spawn_heartbeat_task(&channel, std::time::Duration::from_millis(30));
803
804        // Let several heartbeats fire; the channel must stay healthy.
805        tokio::time::sleep(std::time::Duration::from_millis(150)).await;
806        assert!(
807            channel.is_connected(),
808            "channel stays connected while heartbeat pings"
809        );
810        assert!(
811            channel.request("info", serde_json::json!({})).await.is_ok(),
812            "agent still answers after heartbeats"
813        );
814
815        // Dropping the last strong ref lets the Weak-based task terminate
816        // on its own — proving it can't leak past the connection's life.
817        drop(channel);
818        tokio::time::timeout(std::time::Duration::from_secs(3), handle)
819            .await
820            .expect("heartbeat task exits after the channel is dropped")
821            .expect("heartbeat task did not panic");
822    }
823
824    #[test]
825    fn test_connection_string() {
826        let params = ConnectionParams {
827            user: Some("alice".to_string()),
828            host: "example.com".to_string(),
829            port: None,
830            identity_file: None,
831            extra_args: Vec::new(),
832        };
833        assert_eq!(params.to_string(), "alice@example.com");
834
835        let params = ConnectionParams {
836            user: Some("bob".to_string()),
837            host: "server.local".to_string(),
838            port: Some(2222),
839            identity_file: None,
840            extra_args: Vec::new(),
841        };
842        assert_eq!(params.to_string(), "bob@server.local:2222");
843
844        // No user: the target (and display) is the bare host.
845        let params = ConnectionParams {
846            user: None,
847            host: "server.local".to_string(),
848            port: Some(2222),
849            identity_file: None,
850            extra_args: Vec::new(),
851        };
852        assert_eq!(params.to_string(), "server.local:2222");
853        assert_eq!(params.ssh_target(), "server.local");
854    }
855}