use std::time::{Duration, Instant};
use k8s_openapi::api::core::v1::Pod;
use kube::api::{Api, DeleteParams, LogParams, PostParams};
use kube::runtime::wait::await_condition;
use tokio::time;
use tracing::{debug, error, warn};
use crate::error::AgentError;
use crate::provider::{AgentConfig, AgentProvider, InvokeFuture};
use crate::providers::claude::common as claude_common;
use crate::providers::claude::common::DEFAULT_TIMEOUT;
use super::common::{
ImagePullPolicy, K8sClusterConfig, K8sResources, PodConfig, build_credentials_prefix,
build_pod_spec, create_client, generate_pod_name,
};
fn is_pod_completed() -> impl kube::runtime::wait::Condition<Pod> {
|obj: Option<&Pod>| {
obj.and_then(|pod| pod.status.as_ref())
.and_then(|status| status.phase.as_deref())
.is_some_and(|phase| phase == "Succeeded" || phase == "Failed")
}
}
#[derive(Clone)]
pub struct K8sEphemeralProvider {
image: String,
namespace: String,
claude_path: String,
working_dir: Option<String>,
resources: K8sResources,
service_account: Option<String>,
image_pull_policy: ImagePullPolicy,
env_vars: Vec<(String, String)>,
image_pull_secrets: Vec<String>,
oauth_credentials: Option<String>,
cluster_config: K8sClusterConfig,
timeout: Duration,
}
impl K8sEphemeralProvider {
pub fn new(image: &str) -> Self {
Self {
image: image.to_string(),
namespace: "default".to_string(),
claude_path: "claude".to_string(),
working_dir: None,
resources: K8sResources::default(),
service_account: None,
image_pull_policy: ImagePullPolicy::default(),
env_vars: Vec::new(),
image_pull_secrets: Vec::new(),
oauth_credentials: None,
cluster_config: K8sClusterConfig::default(),
timeout: DEFAULT_TIMEOUT,
}
}
pub fn namespace(mut self, ns: &str) -> Self {
self.namespace = ns.to_string();
self
}
pub fn claude_path(mut self, path: &str) -> Self {
self.claude_path = path.to_string();
self
}
pub fn working_dir(mut self, dir: &str) -> Self {
self.working_dir = Some(dir.to_string());
self
}
pub fn resources(mut self, resources: K8sResources) -> Self {
self.resources = resources;
self
}
pub fn service_account(mut self, sa: &str) -> Self {
self.service_account = Some(sa.to_string());
self
}
pub fn image_pull_policy(mut self, policy: ImagePullPolicy) -> Self {
self.image_pull_policy = policy;
self
}
pub fn oauth_credentials(mut self, json: &str) -> Self {
self.oauth_credentials = Some(json.to_string());
self
}
pub fn image_pull_secret(mut self, secret_name: &str) -> Self {
self.image_pull_secrets.push(secret_name.to_string());
self
}
pub fn env(mut self, key: &str, value: &str) -> Self {
self.env_vars.push((key.to_string(), value.to_string()));
self
}
pub fn cluster_config(mut self, config: K8sClusterConfig) -> Self {
self.cluster_config = config;
self
}
pub fn timeout(mut self, timeout: Duration) -> Self {
self.timeout = timeout;
self
}
}
impl AgentProvider for K8sEphemeralProvider {
fn invoke<'a>(&'a self, config: &'a AgentConfig) -> InvokeFuture<'a> {
Box::pin(async move {
claude_common::validate_prompt_size(config)?;
let args = claude_common::build_args(config)?;
let claude_cmd = claude_common::build_shell_command(&self.claude_path, &args);
let creds_prefix = build_credentials_prefix(self.oauth_credentials.as_deref());
let full_cmd = match (&self.working_dir, &config.working_dir) {
(_, Some(dir)) | (Some(dir), None) => {
format!(
"{creds_prefix}cd {} && {}",
claude_common::build_shell_command(dir, &[]),
claude_cmd
)
}
(None, None) => format!("{creds_prefix}{claude_cmd}"),
};
let pod_name = generate_pod_name("claude-code");
debug!(
pod_name = %pod_name,
namespace = %self.namespace,
image = %self.image,
model = %config.model,
"creating ephemeral K8s pod"
);
let start = Instant::now();
let client = create_client(&self.cluster_config).await?;
let pods: Api<Pod> = Api::namespaced(client, &self.namespace);
let pod_spec = build_pod_spec(&PodConfig {
name: &pod_name,
image: &self.image,
command: vec!["sh".to_string(), "-c".to_string(), full_cmd],
namespace: &self.namespace,
resources: &self.resources,
service_account: self.service_account.as_deref(),
restart_policy: "Never",
image_pull_policy: &self.image_pull_policy,
env_vars: &self.env_vars,
image_pull_secrets: &self.image_pull_secrets,
})?;
pods.create(&PostParams::default(), &pod_spec)
.await
.map_err(|e| AgentError::ProcessFailed {
exit_code: -1,
stderr: format!("failed to create K8s pod: {e}"),
})?;
let wait_result = time::timeout(
self.timeout,
await_condition(pods.clone(), &pod_name, is_pod_completed()),
)
.await;
let timed_out = wait_result.is_err();
let pod_phase = if timed_out {
"TimedOut".to_string()
} else {
let condition_result =
wait_result.expect("timeout already handled").map_err(|e| {
AgentError::ProcessFailed {
exit_code: -1,
stderr: format!("failed waiting for pod completion: {e}"),
}
})?;
condition_result
.and_then(|p| p.status)
.and_then(|s| s.phase)
.unwrap_or_else(|| "Unknown".to_string())
};
let logs = pods
.logs(&pod_name, &LogParams::default())
.await
.unwrap_or_default();
let _ = pods.delete(&pod_name, &DeleteParams::default()).await;
if timed_out {
warn!(timeout = ?self.timeout, pod = %pod_name, "K8s pod timed out");
return Err(AgentError::Timeout {
limit: self.timeout,
});
}
let duration_ms = start.elapsed().as_millis() as u64;
let exit_code = if pod_phase == "Succeeded" { 0 } else { 1 };
if exit_code != 0 {
error!(pod = %pod_name, phase = %pod_phase, "ephemeral claude pod failed");
return Err(AgentError::ProcessFailed {
exit_code,
stderr: if logs.is_empty() {
"(no logs captured)".to_string()
} else {
logs
},
});
}
debug!(stdout_len = logs.len(), "ephemeral claude pod completed");
claude_common::parse_output(&logs, config, duration_ms)
})
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn ephemeral_provider_defaults() {
let provider = K8sEphemeralProvider::new("my-image:v1");
assert_eq!(provider.image, "my-image:v1");
assert_eq!(provider.namespace, "default");
assert_eq!(provider.claude_path, "claude");
assert!(provider.working_dir.is_none());
assert!(provider.service_account.is_none());
assert_eq!(provider.timeout, DEFAULT_TIMEOUT);
}
#[test]
fn ephemeral_provider_builder_chain() {
let provider = K8sEphemeralProvider::new("img:v2")
.namespace("ci")
.claude_path("/usr/bin/claude")
.working_dir("/workspace")
.service_account("claude-sa")
.resources(K8sResources {
cpu_limit: Some("1".to_string()),
memory_limit: Some("2Gi".to_string()),
})
.timeout(Duration::from_secs(600));
assert_eq!(provider.namespace, "ci");
assert_eq!(provider.claude_path, "/usr/bin/claude");
assert_eq!(provider.working_dir, Some("/workspace".to_string()));
assert_eq!(provider.service_account, Some("claude-sa".to_string()));
assert_eq!(provider.resources.cpu_limit, Some("1".to_string()));
assert_eq!(provider.resources.memory_limit, Some("2Gi".to_string()));
assert_eq!(provider.timeout, Duration::from_secs(600));
}
#[test]
fn ephemeral_provider_image_pull_secrets() {
let provider = K8sEphemeralProvider::new("registry.gitlab.com/org/img:v1")
.image_pull_secret("gitlab-registry")
.image_pull_secret("dockerhub");
assert_eq!(provider.image_pull_secrets.len(), 2);
assert_eq!(provider.image_pull_secrets[0], "gitlab-registry");
assert_eq!(provider.image_pull_secrets[1], "dockerhub");
}
#[test]
fn ephemeral_provider_clone() {
let provider = K8sEphemeralProvider::new("img")
.namespace("ns")
.timeout(Duration::from_secs(42));
let cloned = provider.clone();
assert_eq!(cloned.namespace, "ns");
assert_eq!(cloned.timeout, Duration::from_secs(42));
}
}