use std::collections::HashMap;
use std::path::PathBuf;
use std::sync::Arc;
use chrono::{DateTime, Utc};
use tokio::sync::RwLock;
use uuid::Uuid;
use crate::bootstrap::ironclaw_base_dir;
use crate::error::OrchestratorError;
use crate::orchestrator::auth::{CredentialGrant, TokenStore};
use crate::sandbox::connect_docker;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum JobMode {
Worker,
ClaudeCode,
}
impl JobMode {
pub fn as_str(&self) -> &'static str {
match self {
Self::Worker => "worker",
Self::ClaudeCode => "claude_code",
}
}
}
impl std::fmt::Display for JobMode {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.as_str())
}
}
#[derive(Debug, Clone)]
pub struct ContainerJobConfig {
pub image: String,
pub memory_limit_mb: u64,
pub cpu_shares: u32,
pub orchestrator_port: u16,
pub claude_code_api_key: Option<String>,
pub claude_code_oauth_token: Option<String>,
pub claude_code_model: String,
pub claude_code_max_turns: u32,
pub claude_code_memory_limit_mb: u64,
pub claude_code_allowed_tools: Vec<String>,
}
impl Default for ContainerJobConfig {
fn default() -> Self {
Self {
image: "ironclaw-worker:latest".to_string(),
memory_limit_mb: 2048,
cpu_shares: 1024,
orchestrator_port: 50051,
claude_code_api_key: None,
claude_code_oauth_token: None,
claude_code_model: "sonnet".to_string(),
claude_code_max_turns: 50,
claude_code_memory_limit_mb: 4096,
claude_code_allowed_tools: crate::config::ClaudeCodeConfig::default().allowed_tools,
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ContainerState {
Creating,
Running,
Stopped,
Failed,
}
impl std::fmt::Display for ContainerState {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Creating => write!(f, "creating"),
Self::Running => write!(f, "running"),
Self::Stopped => write!(f, "stopped"),
Self::Failed => write!(f, "failed"),
}
}
}
#[derive(Debug, Clone)]
pub struct ContainerHandle {
pub job_id: Uuid,
pub container_id: String,
pub state: ContainerState,
pub mode: JobMode,
pub created_at: DateTime<Utc>,
pub project_dir: Option<PathBuf>,
pub task_description: String,
pub last_worker_status: Option<String>,
pub worker_iteration: u32,
pub completion_result: Option<CompletionResult>,
}
#[derive(Debug, Clone)]
pub struct CompletionResult {
pub success: bool,
pub message: Option<String>,
}
fn validate_bind_mount_path(
dir: &std::path::Path,
job_id: Uuid,
) -> Result<PathBuf, OrchestratorError> {
let canonical = dir
.canonicalize()
.map_err(|e| OrchestratorError::ContainerCreationFailed {
job_id,
reason: format!(
"failed to canonicalize project dir {}: {}",
dir.display(),
e
),
})?;
let projects_base = ironclaw_base_dir().join("projects");
if !projects_base.is_absolute() {
return Err(OrchestratorError::ContainerCreationFailed {
job_id,
reason: "base directory is not absolute; cannot safely validate bind mounts".into(),
});
}
std::fs::create_dir_all(&projects_base).map_err(|e| {
OrchestratorError::ContainerCreationFailed {
job_id,
reason: format!(
"failed to create projects base {}: {}",
projects_base.display(),
e
),
}
})?;
let canonical_base =
projects_base
.canonicalize()
.map_err(|e| OrchestratorError::ContainerCreationFailed {
job_id,
reason: format!(
"failed to canonicalize projects base {}: {}",
projects_base.display(),
e
),
})?;
if !canonical.starts_with(&canonical_base) {
return Err(OrchestratorError::ContainerCreationFailed {
job_id,
reason: format!(
"project directory {} is outside allowed base {}",
canonical.display(),
canonical_base.display()
),
});
}
Ok(canonical)
}
pub struct ContainerJobManager {
config: ContainerJobConfig,
token_store: TokenStore,
pub(crate) containers: Arc<RwLock<HashMap<Uuid, ContainerHandle>>>,
docker: Arc<RwLock<Option<bollard::Docker>>>,
}
impl ContainerJobManager {
pub fn new(config: ContainerJobConfig, token_store: TokenStore) -> Self {
Self {
config,
token_store,
containers: Arc::new(RwLock::new(HashMap::new())),
docker: Arc::new(RwLock::new(None)),
}
}
async fn docker(&self) -> Result<bollard::Docker, OrchestratorError> {
{
let guard = self.docker.read().await;
if let Some(ref d) = *guard {
return Ok(d.clone());
}
}
let docker = connect_docker()
.await
.map_err(|e| OrchestratorError::Docker {
reason: e.to_string(),
})?;
*self.docker.write().await = Some(docker.clone());
Ok(docker)
}
pub async fn create_job(
&self,
job_id: Uuid,
task: &str,
project_dir: Option<PathBuf>,
mode: JobMode,
credential_grants: Vec<CredentialGrant>,
) -> Result<String, OrchestratorError> {
let token = self.token_store.create_token(job_id).await;
self.token_store
.store_grants(job_id, credential_grants)
.await;
let handle = ContainerHandle {
job_id,
container_id: String::new(), state: ContainerState::Creating,
mode,
created_at: Utc::now(),
project_dir: project_dir.clone(),
task_description: task.to_string(),
last_worker_status: None,
worker_iteration: 0,
completion_result: None,
};
self.containers.write().await.insert(job_id, handle);
match self
.create_job_inner(job_id, &token, project_dir, mode)
.await
{
Ok(()) => Ok(token),
Err(e) => {
self.token_store.revoke(job_id).await;
self.containers.write().await.remove(&job_id);
Err(e)
}
}
}
async fn create_job_inner(
&self,
job_id: Uuid,
token: &str,
project_dir: Option<PathBuf>,
mode: JobMode,
) -> Result<(), OrchestratorError> {
let docker = self.docker().await?;
let orchestrator_host = if cfg!(target_os = "linux") {
"172.17.0.1"
} else {
"host.docker.internal"
};
let orchestrator_url = format!(
"http://{}:{}",
orchestrator_host, self.config.orchestrator_port
);
let mut env_vec = vec![
format!("IRONCLAW_WORKER_TOKEN={}", token),
format!("IRONCLAW_JOB_ID={}", job_id),
format!("IRONCLAW_ORCHESTRATOR_URL={}", orchestrator_url),
];
let mut binds = Vec::new();
if let Some(ref dir) = project_dir {
let canonical = validate_bind_mount_path(dir, job_id)?;
binds.push(format!("{}:/workspace:rw", canonical.display()));
env_vec.push("IRONCLAW_WORKSPACE=/workspace".to_string());
}
if mode == JobMode::ClaudeCode {
if let Some(ref api_key) = self.config.claude_code_api_key {
env_vec.push(format!("ANTHROPIC_API_KEY={}", api_key));
} else if let Some(ref oauth_token) = self.config.claude_code_oauth_token {
env_vec.push(format!("CLAUDE_CODE_OAUTH_TOKEN={}", oauth_token));
}
if !self.config.claude_code_allowed_tools.is_empty() {
env_vec.push(format!(
"CLAUDE_CODE_ALLOWED_TOOLS={}",
self.config.claude_code_allowed_tools.join(",")
));
}
}
let memory_mb = match mode {
JobMode::ClaudeCode => self.config.claude_code_memory_limit_mb,
JobMode::Worker => self.config.memory_limit_mb,
};
use bollard::container::{Config, CreateContainerOptions};
use bollard::models::HostConfig;
let host_config = HostConfig {
binds: if binds.is_empty() { None } else { Some(binds) },
memory: Some((memory_mb * 1024 * 1024) as i64),
cpu_shares: Some(self.config.cpu_shares as i64),
network_mode: Some("bridge".to_string()),
extra_hosts: Some(vec!["host.docker.internal:host-gateway".to_string()]),
cap_drop: Some(vec!["ALL".to_string()]),
cap_add: Some(vec!["CHOWN".to_string()]),
security_opt: Some(vec!["no-new-privileges:true".to_string()]),
tmpfs: Some(
[("/tmp".to_string(), "size=512M".to_string())]
.into_iter()
.collect(),
),
..Default::default()
};
let cmd = match mode {
JobMode::Worker => vec![
"worker".to_string(),
"--job-id".to_string(),
job_id.to_string(),
"--orchestrator-url".to_string(),
orchestrator_url,
],
JobMode::ClaudeCode => vec![
"claude-bridge".to_string(),
"--job-id".to_string(),
job_id.to_string(),
"--orchestrator-url".to_string(),
orchestrator_url,
"--max-turns".to_string(),
self.config.claude_code_max_turns.to_string(),
"--model".to_string(),
self.config.claude_code_model.clone(),
],
};
let mut labels = std::collections::HashMap::new();
labels.insert("ironclaw.job_id".to_string(), job_id.to_string());
labels.insert(
"ironclaw.created_at".to_string(),
chrono::Utc::now().to_rfc3339(),
);
let container_config = Config {
image: Some(self.config.image.clone()),
cmd: Some(cmd),
env: Some(env_vec),
host_config: Some(host_config),
user: Some("1000:1000".to_string()),
working_dir: Some("/workspace".to_string()),
labels: Some(labels),
..Default::default()
};
let container_name = match mode {
JobMode::Worker => format!("ironclaw-worker-{}", job_id),
JobMode::ClaudeCode => format!("ironclaw-claude-{}", job_id),
};
let options = CreateContainerOptions {
name: container_name,
..Default::default()
};
let response = docker
.create_container(Some(options), container_config)
.await
.map_err(|e| OrchestratorError::ContainerCreationFailed {
job_id,
reason: e.to_string(),
})?;
let container_id = response.id;
docker
.start_container::<String>(&container_id, None)
.await
.map_err(|e| OrchestratorError::ContainerCreationFailed {
job_id,
reason: format!("failed to start container: {}", e),
})?;
if let Some(handle) = self.containers.write().await.get_mut(&job_id) {
handle.container_id = container_id;
handle.state = ContainerState::Running;
}
tracing::info!(
job_id = %job_id,
"Created and started worker container"
);
Ok(())
}
pub async fn stop_job(&self, job_id: Uuid) -> Result<(), OrchestratorError> {
let container_id = {
let containers = self.containers.read().await;
containers
.get(&job_id)
.map(|h| h.container_id.clone())
.ok_or(OrchestratorError::ContainerNotFound { job_id })?
};
if container_id.is_empty() {
return Err(OrchestratorError::InvalidContainerState {
job_id,
state: "creating (no container ID yet)".to_string(),
});
}
let docker = self.docker().await?;
if let Err(e) = docker
.stop_container(
&container_id,
Some(bollard::container::StopContainerOptions { t: 10 }),
)
.await
{
tracing::warn!(job_id = %job_id, error = %e, "Failed to stop container (may already be stopped)");
}
if let Err(e) = docker
.remove_container(
&container_id,
Some(bollard::container::RemoveContainerOptions {
force: true,
..Default::default()
}),
)
.await
{
tracing::warn!(job_id = %job_id, error = %e, "Failed to remove container (may require manual cleanup)");
}
if let Some(handle) = self.containers.write().await.get_mut(&job_id) {
handle.state = ContainerState::Stopped;
}
self.token_store.revoke(job_id).await;
tracing::info!(job_id = %job_id, "Stopped worker container");
Ok(())
}
pub async fn complete_job(
&self,
job_id: Uuid,
result: CompletionResult,
) -> Result<(), OrchestratorError> {
{
let mut containers = self.containers.write().await;
if let Some(handle) = containers.get_mut(&job_id) {
handle.completion_result = Some(result);
handle.state = ContainerState::Stopped;
}
}
let container_id = {
let containers = self.containers.read().await;
containers.get(&job_id).map(|h| h.container_id.clone())
};
if let Some(cid) = container_id
&& !cid.is_empty()
{
match self.docker().await {
Ok(docker) => {
if let Err(e) = docker
.stop_container(
&cid,
Some(bollard::container::StopContainerOptions { t: 5 }),
)
.await
{
tracing::warn!(job_id = %job_id, error = %e, "Failed to stop completed container");
}
if let Err(e) = docker
.remove_container(
&cid,
Some(bollard::container::RemoveContainerOptions {
force: true,
..Default::default()
}),
)
.await
{
tracing::warn!(job_id = %job_id, error = %e, "Failed to remove completed container");
}
}
Err(e) => {
tracing::warn!(job_id = %job_id, error = %e, "Failed to connect to Docker for container cleanup");
}
}
}
self.token_store.revoke(job_id).await;
tracing::info!(job_id = %job_id, "Completed worker container");
Ok(())
}
pub async fn cleanup_job(&self, job_id: Uuid) {
self.containers.write().await.remove(&job_id);
}
pub async fn update_worker_status(
&self,
job_id: Uuid,
message: Option<String>,
iteration: u32,
) {
if let Some(handle) = self.containers.write().await.get_mut(&job_id) {
handle.last_worker_status = message;
handle.worker_iteration = iteration;
}
}
pub async fn get_handle(&self, job_id: Uuid) -> Option<ContainerHandle> {
self.containers.read().await.get(&job_id).cloned()
}
pub async fn list_jobs(&self) -> Vec<ContainerHandle> {
self.containers.read().await.values().cloned().collect()
}
pub fn token_store(&self) -> &TokenStore {
&self.token_store
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_container_job_config_default() {
let config = ContainerJobConfig::default();
assert_eq!(config.orchestrator_port, 50051);
assert_eq!(config.memory_limit_mb, 2048);
}
#[test]
fn test_container_state_display() {
assert_eq!(ContainerState::Running.to_string(), "running");
assert_eq!(ContainerState::Stopped.to_string(), "stopped");
}
#[test]
fn test_validate_bind_mount_valid_path() {
let base = crate::bootstrap::compute_ironclaw_base_dir().join("projects");
std::fs::create_dir_all(&base).unwrap();
let test_dir = base.join("test_validate_bind");
std::fs::create_dir_all(&test_dir).unwrap();
let result = validate_bind_mount_path(&test_dir, Uuid::new_v4());
assert!(result.is_ok());
let canonical = result.unwrap();
assert!(canonical.starts_with(base.canonicalize().unwrap()));
let _ = std::fs::remove_dir_all(&test_dir);
}
#[test]
fn test_validate_bind_mount_rejects_outside_base() {
let tmp = tempfile::tempdir().unwrap();
let outside = tmp.path().to_path_buf();
let result = validate_bind_mount_path(&outside, Uuid::new_v4());
assert!(result.is_err());
let err = result.unwrap_err().to_string();
assert!(
err.contains("outside allowed base"),
"expected 'outside allowed base', got: {}",
err
);
}
#[test]
fn test_validate_bind_mount_rejects_nonexistent() {
let nonexistent = PathBuf::from("/no/such/path/at/all");
let result = validate_bind_mount_path(&nonexistent, Uuid::new_v4());
assert!(result.is_err());
let err = result.unwrap_err().to_string();
assert!(
err.contains("canonicalize"),
"expected canonicalize error, got: {}",
err
);
}
#[tokio::test]
async fn test_update_worker_status() {
let store = TokenStore::new();
let mgr = ContainerJobManager::new(ContainerJobConfig::default(), store);
let job_id = Uuid::new_v4();
{
let mut containers = mgr.containers.write().await;
containers.insert(
job_id,
ContainerHandle {
job_id,
container_id: "test".to_string(),
state: ContainerState::Running,
mode: JobMode::Worker,
created_at: chrono::Utc::now(),
project_dir: None,
task_description: "test job".to_string(),
last_worker_status: None,
worker_iteration: 0,
completion_result: None,
},
);
}
mgr.update_worker_status(job_id, Some("Iteration 3".to_string()), 3)
.await;
let handle = mgr.get_handle(job_id).await.unwrap();
assert_eq!(handle.worker_iteration, 3);
assert_eq!(handle.last_worker_status.as_deref(), Some("Iteration 3"));
}
}