Skip to main content

fresh/services/remote/
transport.rs

1//! Pluggable carriers for the remote agent.
2//!
3//! The agent bootstrap — stream `AGENT_SOURCE` into a `python3` process on
4//! the far side, wait for its `ready` line, check the protocol version, then
5//! hand the stdio pair to an [`AgentChannel`] — is identical regardless of
6//! *how* we reach that far side. SSH spawns `ssh … python3 …`; K8s spawns
7//! `kubectl exec … -- python3 …`. The only thing that differs is the carrier
8//! command.
9//!
10//! A [`RemoteTransport`] supplies exactly that: a configured carrier
11//! [`Command`] whose execution runs the python bootstrap remotely. Everything
12//! above the channel ([`RemoteFileSystem`](super::RemoteFileSystem), the
13//! remote spawners, the agent protocol, the reconnect task) is transport-
14//! agnostic and reused verbatim.
15//!
16//! This module is additive: the existing SSH path in `connection.rs` is left
17//! untouched, so SSH behaviour is unchanged. SSH can migrate onto this seam
18//! later (see `docs/internal/K8S_AUTHORITY_DESIGN.md`).
19
20use std::process::Stdio;
21use std::sync::Arc;
22
23use tokio::io::{AsyncBufRead, AsyncBufReadExt, AsyncWrite, AsyncWriteExt, BufReader};
24use tokio::process::{Child, ChildStdin, ChildStdout, Command};
25
26use crate::services::process_hidden::HideWindow;
27use crate::services::remote::channel::AgentChannel;
28use crate::services::remote::protocol::AgentResponse;
29use crate::services::remote::AGENT_SOURCE;
30
31/// Where a Kubernetes-exec authority acts: a single pod (and optional
32/// container) on a cluster reachable through the host's kubeconfig.
33///
34/// `workspace` is the pod-side path the editor roots at — used to open the
35/// integrated terminal in the right place; file/process operations carry
36/// their own absolute paths and don't need it.
37#[derive(Debug, Clone)]
38pub struct KubeTarget {
39    /// kubeconfig context to select (`--context`); `None` uses the current one.
40    pub context: Option<String>,
41    pub namespace: String,
42    pub pod: String,
43    /// Target container in a multi-container pod (`-c`); `None` uses the default.
44    pub container: Option<String>,
45    /// Pod-side workspace root (for the terminal's `cd`); `None` = home.
46    pub workspace: Option<String>,
47}
48
49impl KubeTarget {
50    /// Stable, human-readable identity, e.g. `k8s:prod/dev/pod-7c9f`.
51    pub fn display(&self) -> String {
52        let ctx = self.context.as_deref().unwrap_or("-");
53        match &self.container {
54            Some(c) => format!("k8s:{ctx}/{}/{}/{c}", self.namespace, self.pod),
55            None => format!("k8s:{ctx}/{}/{}", self.namespace, self.pod),
56        }
57    }
58}
59
60/// Compose a `kubectl exec` argv.
61///
62/// Shared by the agent transport (`-i`, running `python3 …`), the
63/// long-running LSP spawner (`-i`, running the server), and — via
64/// [`build_kube_terminal_args`](super::build_kube_terminal_args) — the
65/// integrated terminal (`-it`, running a login shell). Everything after `--`
66/// is exec'd directly by `kubectl` (no remote shell), so unlike the SSH path
67/// no shell-quoting of the command is required.
68///
69/// Layout: `[--context CTX] exec <flags…> -n NS [-c C] POD -- command args…`.
70pub(crate) fn kubectl_exec_argv(
71    target: &KubeTarget,
72    flags: &[&str],
73    command: &str,
74    args: &[String],
75) -> Vec<String> {
76    let mut a: Vec<String> = Vec::with_capacity(args.len() + flags.len() + 9);
77    if let Some(ctx) = target.context.as_ref() {
78        a.push("--context".into());
79        a.push(ctx.clone());
80    }
81    a.push("exec".into());
82    for f in flags {
83        a.push((*f).into());
84    }
85    a.push("-n".into());
86    a.push(target.namespace.clone());
87    if let Some(c) = target.container.as_ref() {
88        a.push("-c".into());
89        a.push(c.clone());
90    }
91    a.push(target.pod.clone());
92    a.push("--".into());
93    a.push(command.into());
94    a.extend(args.iter().cloned());
95    a
96}
97
98/// The python one-liner the carrier runs: read exactly `AGENT_SOURCE.len()`
99/// bytes from stdin and `exec` them, then the agent keeps reading stdin for
100/// protocol messages. Byte-count framing avoids any dependency on a remote
101/// shell or here-doc support — identical to the SSH bootstrap.
102pub(crate) fn agent_bootstrap_pycode() -> String {
103    format!("import sys;exec(sys.stdin.read({}))", AGENT_SOURCE.len())
104}
105
106/// How the carrier's stderr is wired.
107#[derive(Debug, Clone, Copy, PartialEq, Eq)]
108pub enum StderrMode {
109    /// Inherit the terminal — lets the carrier prompt (SSH password, an
110    /// interactive `kubectl` auth helper) on first connect.
111    Inherit,
112    /// Discard — for non-interactive reconnection.
113    Null,
114}
115
116/// Builds the carrier command that runs the agent bootstrap on the far side.
117///
118/// Object-safe on purpose: callers hold `&dyn RemoteTransport` and the
119/// reconnect path re-invokes it to respawn after a drop.
120pub trait RemoteTransport: Send + Sync {
121    /// A configured [`Command`] with stdin/stdout piped and stderr per
122    /// `stderr`, whose execution launches `python3 … <bootstrap>` remotely.
123    fn build_command(&self, stderr: StderrMode) -> Command;
124    /// Human-readable identity for status/logging.
125    fn display(&self) -> String;
126}
127
128/// `kubectl exec` carrier — runs the agent inside a Kubernetes pod (any cluster: EKS/GKE/AKS/k3d).
129pub struct KubectlExecTransport {
130    target: KubeTarget,
131}
132
133impl KubectlExecTransport {
134    pub fn new(target: KubeTarget) -> Self {
135        Self { target }
136    }
137
138    pub fn target(&self) -> &KubeTarget {
139        &self.target
140    }
141}
142
143impl RemoteTransport for KubectlExecTransport {
144    fn build_command(&self, stderr: StderrMode) -> Command {
145        let pycode = agent_bootstrap_pycode();
146        let argv = kubectl_exec_argv(
147            &self.target,
148            &["-i"],
149            "python3",
150            &["-u".to_string(), "-c".to_string(), pycode],
151        );
152        let mut cmd = Command::new("kubectl");
153        cmd.args(&argv);
154        cmd.stdin(Stdio::piped());
155        cmd.stdout(Stdio::piped());
156        match stderr {
157            StderrMode::Inherit => {
158                cmd.stderr(Stdio::inherit());
159            }
160            StderrMode::Null => {
161                cmd.stderr(Stdio::null());
162            }
163        }
164        cmd.hide_window();
165        cmd
166    }
167
168    fn display(&self) -> String {
169        self.target.display()
170    }
171}
172
173/// Error establishing a transport-backed agent connection.
174#[derive(Debug, thiserror::Error)]
175pub enum TransportError {
176    #[error("failed to spawn carrier process: {0}")]
177    Spawn(#[from] std::io::Error),
178
179    #[error("agent failed to start: {0}")]
180    AgentStartFailed(String),
181
182    #[error("protocol version mismatch: expected {expected}, got {got}")]
183    VersionMismatch { expected: u32, got: u32 },
184}
185
186/// Spawn the carrier, stream the agent in, and wait for `ready`.
187///
188/// Returns the ready-to-use `(reader, writer, child)` — the same triple
189/// `establish_ssh_transport` yields for SSH, so callers can build an
190/// [`AgentChannel`] or hand the pair to `replace_transport` for reconnects.
191pub async fn bootstrap_agent(
192    transport: &dyn RemoteTransport,
193    stderr: StderrMode,
194) -> Result<(BufReader<ChildStdout>, ChildStdin, Child), TransportError> {
195    let mut cmd = transport.build_command(stderr);
196    let mut child = cmd.spawn()?;
197
198    let mut stdin = child
199        .stdin
200        .take()
201        .ok_or_else(|| TransportError::AgentStartFailed("failed to get stdin".to_string()))?;
202    let stdout = child
203        .stdout
204        .take()
205        .ok_or_else(|| TransportError::AgentStartFailed("failed to get stdout".to_string()))?;
206
207    // Stream the agent source, exact byte count (see `agent_bootstrap_pycode`).
208    stdin.write_all(AGENT_SOURCE.as_bytes()).await?;
209    stdin.flush().await?;
210
211    let mut reader = BufReader::new(stdout);
212    let mut ready_line = String::new();
213    match reader.read_line(&mut ready_line).await {
214        Ok(0) => {
215            return Err(TransportError::AgentStartFailed(format!(
216                "{} closed the connection before the agent was ready \
217                 (is python3 present in the pod, and the context/namespace/pod correct?)",
218                transport.display()
219            )));
220        }
221        Ok(_) => {}
222        Err(e) => {
223            return Err(TransportError::AgentStartFailed(format!("read error: {e}")));
224        }
225    }
226
227    let ready: AgentResponse = serde_json::from_str(&ready_line).map_err(|e| {
228        TransportError::AgentStartFailed(format!(
229            "invalid ready message '{}': {e}",
230            ready_line.trim()
231        ))
232    })?;
233    if !ready.is_ready() {
234        return Err(TransportError::AgentStartFailed(
235            "agent did not send ready message".to_string(),
236        ));
237    }
238
239    let version = ready.version.unwrap_or(0);
240    if version != crate::services::remote::protocol::PROTOCOL_VERSION {
241        return Err(TransportError::VersionMismatch {
242            expected: crate::services::remote::protocol::PROTOCOL_VERSION,
243            got: version,
244        });
245    }
246
247    Ok((reader, stdin, child))
248}
249
250/// Active agent connection over a [`RemoteTransport`].
251///
252/// The K8s analogue of [`SshConnection`](super::SshConnection): owns the
253/// carrier child and the [`AgentChannel`] the editor's remote filesystem and
254/// spawners ride on. Dropping it kills the carrier.
255pub struct KubeConnection {
256    process: Child,
257    channel: Arc<AgentChannel>,
258    display: String,
259    /// Keeps the idle `kubectl exec` stream warm against LB/NAT idle
260    /// timeouts. Aborted on drop so a dead carrier stops being pinged.
261    heartbeat: tokio::task::JoinHandle<()>,
262}
263
264impl KubeConnection {
265    /// Bootstrap the agent inside the pod named by `target`.
266    pub async fn connect(target: KubeTarget) -> Result<Self, TransportError> {
267        let transport = KubectlExecTransport::new(target);
268        // Capture (discard) the carrier's stderr rather than inheriting it: the
269        // editor renders a full-screen ratatui UI on the alternate screen, so
270        // an inherited stderr lets kubectl scribble diagnostics straight over
271        // the rendered UI and corrupt it (the same failure mode as the SSH
272        // path). The EOF error below already explains the likely causes.
273        let (reader, writer, child) = bootstrap_agent(&transport, StderrMode::Null).await?;
274        let channel = Arc::new(AgentChannel::new(reader, writer));
275        let heartbeat = crate::services::remote::spawn_heartbeat_task(
276            &channel,
277            crate::services::remote::DEFAULT_HEARTBEAT_INTERVAL,
278        );
279        Ok(Self {
280            process: child,
281            channel,
282            display: transport.display(),
283            heartbeat,
284        })
285    }
286
287    /// Share the channel for the filesystem / spawners.
288    pub fn channel(&self) -> Arc<AgentChannel> {
289        self.channel.clone()
290    }
291
292    pub fn is_connected(&self) -> bool {
293        self.channel.is_connected()
294    }
295
296    pub fn connection_string(&self) -> &str {
297        &self.display
298    }
299}
300
301/// Reconnect task for a K8s agent channel: when the channel drops, it
302/// re-bootstraps the agent by re-running `kubectl exec` against the pod and
303/// hot-swaps the transport via `replace_transport`. The K8s analogue of
304/// [`spawn_reconnect_task`](super::spawn_reconnect_task), reusing the generic
305/// [`spawn_reconnect_task_with`](super::spawn_reconnect_task_with).
306///
307/// Reconnects to the *same* `target`. A pod reschedule / eviction changes the
308/// pod name, which this does not yet re-resolve — the plugin "resolve current
309/// pod" callback (`AUTHORITY_DESIGN.md` open question 3) layers on later. A
310/// same-name reconnect still covers transient stream drops (the common idle /
311/// network-blip case).
312pub fn spawn_kube_reconnect_task(
313    channel: &Arc<AgentChannel>,
314    target: KubeTarget,
315) -> tokio::task::JoinHandle<()> {
316    let connect_fn = move || {
317        let target = target.clone();
318        async move {
319            let transport = KubectlExecTransport::new(target);
320            // Non-interactive on reconnect (no terminal to prompt on).
321            let (reader, writer, _child) = bootstrap_agent(&transport, StderrMode::Null)
322                .await
323                .map_err(|e| crate::services::remote::SshError::AgentStartFailed(e.to_string()))?;
324            let reader: Box<dyn AsyncBufRead + Unpin + Send> = Box::new(reader);
325            let writer: Box<dyn AsyncWrite + Unpin + Send> = Box::new(writer);
326            Ok::<_, crate::services::remote::SshError>((reader, writer))
327        }
328    };
329    crate::services::remote::spawn_reconnect_task_with(
330        Arc::clone(channel),
331        connect_fn,
332        crate::services::remote::ReconnectConfig::default(),
333        "K8s remote",
334    )
335}
336
337impl Drop for KubeConnection {
338    fn drop(&mut self) {
339        // Stop pinging a carrier we're about to kill.
340        self.heartbeat.abort();
341        // Best-effort kill; the OS reaps on our exit if this fails. Same
342        // shape as `SshConnection::Drop` (the crate denies
343        // `let_underscore_must_use`, so we can't `let _ =` the result).
344        if let Ok(()) = self.process.start_kill() {}
345    }
346}
347
348#[cfg(test)]
349mod tests {
350    use super::*;
351
352    fn target() -> KubeTarget {
353        KubeTarget {
354            context: Some("k3d-dev".to_string()),
355            namespace: "dev".to_string(),
356            pod: "fresh-7c9f".to_string(),
357            container: None,
358            workspace: Some("/workspace".to_string()),
359        }
360    }
361
362    #[test]
363    fn argv_orders_flags_namespace_pod_then_command() {
364        let argv = kubectl_exec_argv(&target(), &["-i"], "python3", &["-u".into()]);
365        assert_eq!(
366            argv,
367            vec![
368                "--context",
369                "k3d-dev",
370                "exec",
371                "-i",
372                "-n",
373                "dev",
374                "fresh-7c9f",
375                "--",
376                "python3",
377                "-u",
378            ]
379        );
380    }
381
382    #[test]
383    fn argv_includes_container_when_set() {
384        let mut t = target();
385        t.container = Some("app".to_string());
386        let argv = kubectl_exec_argv(&t, &["-it"], "sh", &[]);
387        // `-c app` must sit between the pod-scoping flags and the pod name.
388        let c = argv.iter().position(|a| a == "-c").expect("-c present");
389        let pod = argv.iter().position(|a| a == "fresh-7c9f").unwrap();
390        let sep = argv.iter().position(|a| a == "--").unwrap();
391        assert_eq!(argv[c + 1], "app");
392        assert!(c < pod, "-c precedes pod");
393        assert!(pod < sep, "pod precedes --");
394    }
395
396    #[test]
397    fn argv_omits_context_when_none() {
398        let mut t = target();
399        t.context = None;
400        let argv = kubectl_exec_argv(&t, &["-i"], "python3", &[]);
401        assert!(!argv.iter().any(|a| a == "--context"));
402        assert_eq!(argv[0], "exec");
403    }
404
405    #[test]
406    fn bootstrap_pycode_reads_exact_agent_length() {
407        let code = agent_bootstrap_pycode();
408        assert_eq!(
409            code,
410            format!("import sys;exec(sys.stdin.read({}))", AGENT_SOURCE.len())
411        );
412        // No shell metacharacters that would need quoting under `kubectl --`.
413        assert!(!code.contains('\''));
414    }
415
416    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
417    async fn kube_reconnect_task_spawns_and_aborts_cleanly() {
418        // We can't run a real `kubectl exec` here, but we can verify the
419        // task's lifecycle over a live (local-agent) channel: while the
420        // channel is connected the task idles (it only acts on disconnect),
421        // and aborting it terminates promptly without panicking. The actual
422        // reconnect path is exercised by the generic `spawn_reconnect_task_with`
423        // tests; this guards the K8s wiring on top of it.
424        let channel = crate::services::remote::spawn_local_agent()
425            .await
426            .expect("spawn local agent");
427        let handle = spawn_kube_reconnect_task(&channel, target());
428        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
429        assert!(channel.is_connected(), "channel healthy; reconnect idles");
430        handle.abort();
431        let joined = tokio::time::timeout(std::time::Duration::from_secs(2), handle).await;
432        assert!(joined.is_ok(), "aborted reconnect task joins promptly");
433    }
434
435    #[test]
436    fn build_command_pipes_stdio_and_targets_kubectl() {
437        // We can't introspect a tokio Command's program directly, but we can
438        // assert the argv we'd hand kubectl is the interactive python bootstrap.
439        let t = target();
440        let pycode = agent_bootstrap_pycode();
441        let argv = kubectl_exec_argv(
442            &t,
443            &["-i"],
444            "python3",
445            &["-u".to_string(), "-c".to_string(), pycode.clone()],
446        );
447        assert_eq!(argv.last().unwrap(), &pycode);
448        assert!(argv.contains(&"-i".to_string()));
449        assert_eq!(
450            KubectlExecTransport::new(t).display(),
451            "k8s:k3d-dev/dev/fresh-7c9f"
452        );
453    }
454}