1use std::process::Stdio;
21use std::sync::Arc;
22
23use tokio::io::{AsyncBufRead, AsyncBufReadExt, AsyncWrite, AsyncWriteExt, BufReader};
24use tokio::process::{Child, ChildStdin, ChildStdout, Command};
25
26use crate::services::process_hidden::HideWindow;
27use crate::services::remote::channel::AgentChannel;
28use crate::services::remote::protocol::AgentResponse;
29use crate::services::remote::AGENT_SOURCE;
30
31#[derive(Debug, Clone)]
38pub struct KubeTarget {
39 pub context: Option<String>,
41 pub namespace: String,
42 pub pod: String,
43 pub container: Option<String>,
45 pub workspace: Option<String>,
47}
48
49impl KubeTarget {
50 pub fn display(&self) -> String {
52 let ctx = self.context.as_deref().unwrap_or("-");
53 match &self.container {
54 Some(c) => format!("k8s:{ctx}/{}/{}/{c}", self.namespace, self.pod),
55 None => format!("k8s:{ctx}/{}/{}", self.namespace, self.pod),
56 }
57 }
58}
59
60pub(crate) fn kubectl_exec_argv(
71 target: &KubeTarget,
72 flags: &[&str],
73 command: &str,
74 args: &[String],
75) -> Vec<String> {
76 let mut a: Vec<String> = Vec::with_capacity(args.len() + flags.len() + 9);
77 if let Some(ctx) = target.context.as_ref() {
78 a.push("--context".into());
79 a.push(ctx.clone());
80 }
81 a.push("exec".into());
82 for f in flags {
83 a.push((*f).into());
84 }
85 a.push("-n".into());
86 a.push(target.namespace.clone());
87 if let Some(c) = target.container.as_ref() {
88 a.push("-c".into());
89 a.push(c.clone());
90 }
91 a.push(target.pod.clone());
92 a.push("--".into());
93 a.push(command.into());
94 a.extend(args.iter().cloned());
95 a
96}
97
98pub(crate) fn agent_bootstrap_pycode() -> String {
103 format!("import sys;exec(sys.stdin.read({}))", AGENT_SOURCE.len())
104}
105
106#[derive(Debug, Clone, Copy, PartialEq, Eq)]
108pub enum StderrMode {
109 Inherit,
112 Null,
114}
115
116pub trait RemoteTransport: Send + Sync {
121 fn build_command(&self, stderr: StderrMode) -> Command;
124 fn display(&self) -> String;
126}
127
128pub struct KubectlExecTransport {
130 target: KubeTarget,
131}
132
133impl KubectlExecTransport {
134 pub fn new(target: KubeTarget) -> Self {
135 Self { target }
136 }
137
138 pub fn target(&self) -> &KubeTarget {
139 &self.target
140 }
141}
142
143impl RemoteTransport for KubectlExecTransport {
144 fn build_command(&self, stderr: StderrMode) -> Command {
145 let pycode = agent_bootstrap_pycode();
146 let argv = kubectl_exec_argv(
147 &self.target,
148 &["-i"],
149 "python3",
150 &["-u".to_string(), "-c".to_string(), pycode],
151 );
152 let mut cmd = Command::new("kubectl");
153 cmd.args(&argv);
154 cmd.stdin(Stdio::piped());
155 cmd.stdout(Stdio::piped());
156 match stderr {
157 StderrMode::Inherit => {
158 cmd.stderr(Stdio::inherit());
159 }
160 StderrMode::Null => {
161 cmd.stderr(Stdio::null());
162 }
163 }
164 cmd.hide_window();
165 cmd
166 }
167
168 fn display(&self) -> String {
169 self.target.display()
170 }
171}
172
173#[derive(Debug, thiserror::Error)]
175pub enum TransportError {
176 #[error("failed to spawn carrier process: {0}")]
177 Spawn(#[from] std::io::Error),
178
179 #[error("agent failed to start: {0}")]
180 AgentStartFailed(String),
181
182 #[error("protocol version mismatch: expected {expected}, got {got}")]
183 VersionMismatch { expected: u32, got: u32 },
184}
185
186pub async fn bootstrap_agent(
192 transport: &dyn RemoteTransport,
193 stderr: StderrMode,
194) -> Result<(BufReader<ChildStdout>, ChildStdin, Child), TransportError> {
195 let mut cmd = transport.build_command(stderr);
196 let mut child = cmd.spawn()?;
197
198 let mut stdin = child
199 .stdin
200 .take()
201 .ok_or_else(|| TransportError::AgentStartFailed("failed to get stdin".to_string()))?;
202 let stdout = child
203 .stdout
204 .take()
205 .ok_or_else(|| TransportError::AgentStartFailed("failed to get stdout".to_string()))?;
206
207 stdin.write_all(AGENT_SOURCE.as_bytes()).await?;
209 stdin.flush().await?;
210
211 let mut reader = BufReader::new(stdout);
212 let mut ready_line = String::new();
213 match reader.read_line(&mut ready_line).await {
214 Ok(0) => {
215 return Err(TransportError::AgentStartFailed(format!(
216 "{} closed the connection before the agent was ready \
217 (is python3 present in the pod, and the context/namespace/pod correct?)",
218 transport.display()
219 )));
220 }
221 Ok(_) => {}
222 Err(e) => {
223 return Err(TransportError::AgentStartFailed(format!("read error: {e}")));
224 }
225 }
226
227 let ready: AgentResponse = serde_json::from_str(&ready_line).map_err(|e| {
228 TransportError::AgentStartFailed(format!(
229 "invalid ready message '{}': {e}",
230 ready_line.trim()
231 ))
232 })?;
233 if !ready.is_ready() {
234 return Err(TransportError::AgentStartFailed(
235 "agent did not send ready message".to_string(),
236 ));
237 }
238
239 let version = ready.version.unwrap_or(0);
240 if version != crate::services::remote::protocol::PROTOCOL_VERSION {
241 return Err(TransportError::VersionMismatch {
242 expected: crate::services::remote::protocol::PROTOCOL_VERSION,
243 got: version,
244 });
245 }
246
247 Ok((reader, stdin, child))
248}
249
250pub struct KubeConnection {
256 process: Child,
257 channel: Arc<AgentChannel>,
258 display: String,
259 heartbeat: tokio::task::JoinHandle<()>,
262}
263
264impl KubeConnection {
265 pub async fn connect(target: KubeTarget) -> Result<Self, TransportError> {
267 let transport = KubectlExecTransport::new(target);
268 let (reader, writer, child) = bootstrap_agent(&transport, StderrMode::Null).await?;
274 let channel = Arc::new(AgentChannel::new(reader, writer));
275 let heartbeat = crate::services::remote::spawn_heartbeat_task(
276 &channel,
277 crate::services::remote::DEFAULT_HEARTBEAT_INTERVAL,
278 );
279 Ok(Self {
280 process: child,
281 channel,
282 display: transport.display(),
283 heartbeat,
284 })
285 }
286
287 pub fn channel(&self) -> Arc<AgentChannel> {
289 self.channel.clone()
290 }
291
292 pub fn is_connected(&self) -> bool {
293 self.channel.is_connected()
294 }
295
296 pub fn connection_string(&self) -> &str {
297 &self.display
298 }
299}
300
301pub fn spawn_kube_reconnect_task(
313 channel: &Arc<AgentChannel>,
314 target: KubeTarget,
315) -> tokio::task::JoinHandle<()> {
316 let connect_fn = move || {
317 let target = target.clone();
318 async move {
319 let transport = KubectlExecTransport::new(target);
320 let (reader, writer, _child) = bootstrap_agent(&transport, StderrMode::Null)
322 .await
323 .map_err(|e| crate::services::remote::SshError::AgentStartFailed(e.to_string()))?;
324 let reader: Box<dyn AsyncBufRead + Unpin + Send> = Box::new(reader);
325 let writer: Box<dyn AsyncWrite + Unpin + Send> = Box::new(writer);
326 Ok::<_, crate::services::remote::SshError>((reader, writer))
327 }
328 };
329 crate::services::remote::spawn_reconnect_task_with(
330 Arc::clone(channel),
331 connect_fn,
332 crate::services::remote::ReconnectConfig::default(),
333 "K8s remote",
334 )
335}
336
337impl Drop for KubeConnection {
338 fn drop(&mut self) {
339 self.heartbeat.abort();
341 if let Ok(()) = self.process.start_kill() {}
345 }
346}
347
348#[cfg(test)]
349mod tests {
350 use super::*;
351
352 fn target() -> KubeTarget {
353 KubeTarget {
354 context: Some("k3d-dev".to_string()),
355 namespace: "dev".to_string(),
356 pod: "fresh-7c9f".to_string(),
357 container: None,
358 workspace: Some("/workspace".to_string()),
359 }
360 }
361
362 #[test]
363 fn argv_orders_flags_namespace_pod_then_command() {
364 let argv = kubectl_exec_argv(&target(), &["-i"], "python3", &["-u".into()]);
365 assert_eq!(
366 argv,
367 vec![
368 "--context",
369 "k3d-dev",
370 "exec",
371 "-i",
372 "-n",
373 "dev",
374 "fresh-7c9f",
375 "--",
376 "python3",
377 "-u",
378 ]
379 );
380 }
381
382 #[test]
383 fn argv_includes_container_when_set() {
384 let mut t = target();
385 t.container = Some("app".to_string());
386 let argv = kubectl_exec_argv(&t, &["-it"], "sh", &[]);
387 let c = argv.iter().position(|a| a == "-c").expect("-c present");
389 let pod = argv.iter().position(|a| a == "fresh-7c9f").unwrap();
390 let sep = argv.iter().position(|a| a == "--").unwrap();
391 assert_eq!(argv[c + 1], "app");
392 assert!(c < pod, "-c precedes pod");
393 assert!(pod < sep, "pod precedes --");
394 }
395
396 #[test]
397 fn argv_omits_context_when_none() {
398 let mut t = target();
399 t.context = None;
400 let argv = kubectl_exec_argv(&t, &["-i"], "python3", &[]);
401 assert!(!argv.iter().any(|a| a == "--context"));
402 assert_eq!(argv[0], "exec");
403 }
404
405 #[test]
406 fn bootstrap_pycode_reads_exact_agent_length() {
407 let code = agent_bootstrap_pycode();
408 assert_eq!(
409 code,
410 format!("import sys;exec(sys.stdin.read({}))", AGENT_SOURCE.len())
411 );
412 assert!(!code.contains('\''));
414 }
415
416 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
417 async fn kube_reconnect_task_spawns_and_aborts_cleanly() {
418 let channel = crate::services::remote::spawn_local_agent()
425 .await
426 .expect("spawn local agent");
427 let handle = spawn_kube_reconnect_task(&channel, target());
428 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
429 assert!(channel.is_connected(), "channel healthy; reconnect idles");
430 handle.abort();
431 let joined = tokio::time::timeout(std::time::Duration::from_secs(2), handle).await;
432 assert!(joined.is_ok(), "aborted reconnect task joins promptly");
433 }
434
435 #[test]
436 fn build_command_pipes_stdio_and_targets_kubectl() {
437 let t = target();
440 let pycode = agent_bootstrap_pycode();
441 let argv = kubectl_exec_argv(
442 &t,
443 &["-i"],
444 "python3",
445 &["-u".to_string(), "-c".to_string(), pycode.clone()],
446 );
447 assert_eq!(argv.last().unwrap(), &pycode);
448 assert!(argv.contains(&"-i".to_string()));
449 assert_eq!(
450 KubectlExecTransport::new(t).display(),
451 "k8s:k3d-dev/dev/fresh-7c9f"
452 );
453 }
454}