1pub mod composer;
2pub mod image_manager;
3pub mod layers;
4pub mod template_manager;
5
6use crate::agent::{Agent, LogProcessor};
7use crate::context::docker_client::DockerClient;
8use crate::context::file_system::FileSystemOperations;
9use bollard::container::{Config, CreateContainerOptions, LogsOptions, RemoveContainerOptions};
10use futures_util::stream::StreamExt;
11use std::collections::HashMap;
12use std::path::{Path, PathBuf};
13use std::sync::Arc;
14
15const CONTAINER_MEMORY_LIMIT: i64 = 4 * 1024 * 1024 * 1024; const CONTAINER_CPU_QUOTA: i64 = 400000; const CONTAINER_WORKING_DIR: &str = "/workspace";
19const CONTAINER_USER: &str = "agent";
20const TSK_NETWORK_NAME: &str = "tsk-network";
21const PROXY_CONTAINER_NAME: &str = "tsk-proxy";
22const PROXY_IMAGE: &str = "tsk/proxy";
23
24pub struct DockerManager {
25 client: Arc<dyn DockerClient>,
26 file_system: Arc<dyn FileSystemOperations>,
27}
28
29impl DockerManager {
30 pub fn new(client: Arc<dyn DockerClient>, file_system: Arc<dyn FileSystemOperations>) -> Self {
31 Self {
32 client,
33 file_system,
34 }
35 }
36
37 #[allow(dead_code)]
38 pub async fn stop_proxy(&self) -> Result<(), String> {
39 match self
41 .client
42 .remove_container(
43 PROXY_CONTAINER_NAME,
44 Some(RemoveContainerOptions {
45 force: true,
46 ..Default::default()
47 }),
48 )
49 .await
50 {
51 Ok(_) => Ok(()),
52 Err(e) if e.contains("No such container") => Ok(()),
53 Err(e) => Err(format!("Failed to stop proxy container: {e}")),
54 }
55 }
56
57 async fn ensure_network(&self) -> Result<(), String> {
58 if !self.client.network_exists(TSK_NETWORK_NAME).await? {
59 self.client.create_network(TSK_NETWORK_NAME).await?;
60 }
61 Ok(())
62 }
63
64 async fn ensure_proxy(&self) -> Result<(), String> {
65 let proxy_config = Config {
67 image: Some(PROXY_IMAGE.to_string()),
68 exposed_ports: Some(
69 vec![("3128/tcp".to_string(), HashMap::new())]
70 .into_iter()
71 .collect(),
72 ),
73 host_config: Some(bollard::service::HostConfig {
74 network_mode: Some(TSK_NETWORK_NAME.to_string()),
75 restart_policy: Some(bollard::service::RestartPolicy {
76 name: Some(bollard::service::RestartPolicyNameEnum::UNLESS_STOPPED),
77 maximum_retry_count: None,
78 }),
79 ..Default::default()
80 }),
81 ..Default::default()
82 };
83
84 let create_options = CreateContainerOptions {
85 name: PROXY_CONTAINER_NAME.to_string(),
86 platform: None,
87 };
88
89 match self
91 .client
92 .create_container(Some(create_options), proxy_config)
93 .await
94 {
95 Ok(_) => {
96 self.client.start_container(PROXY_CONTAINER_NAME).await?;
98 }
99 Err(e) => {
100 if e.contains("already in use") {
102 match self.client.start_container(PROXY_CONTAINER_NAME).await {
104 Ok(_) => (),
105 Err(e) if e.contains("already started") => (),
106 Err(e) => return Err(e),
107 }
108 } else {
109 return Err(e);
110 }
111 }
112 }
113
114 Ok(())
115 }
116
117 fn prepare_worktree_path(worktree_path: &Path) -> Result<PathBuf, String> {
118 let absolute_path = if worktree_path.is_relative() {
120 std::env::current_dir()
121 .map_err(|e| format!("Failed to get current directory: {e}"))?
122 .join(worktree_path)
123 } else {
124 worktree_path.to_path_buf()
125 };
126 Ok(absolute_path)
127 }
128
129 fn create_base_container_config(
130 image: &str,
131 worktree_path_str: &str,
132 command: Option<Vec<String>>,
133 interactive: bool,
134 instructions_file_path: Option<&PathBuf>,
135 agent: Option<&dyn Agent>,
136 ) -> Config<String> {
137 let mut binds = vec![format!("{worktree_path_str}:{CONTAINER_WORKING_DIR}")];
139
140 if let Some(agent) = agent {
142 for (host_path, container_path, options) in agent.volumes() {
143 let bind = if options.is_empty() {
144 format!("{host_path}:{container_path}")
145 } else {
146 format!("{host_path}:{container_path}:{options}")
147 };
148 binds.push(bind);
149 }
150 }
151
152 if let Some(inst_path) = instructions_file_path {
154 if let Some(parent) = inst_path.parent() {
155 let abs_parent = parent
157 .canonicalize()
158 .unwrap_or_else(|_| parent.to_path_buf());
159 binds.push(format!("{}:/instructions:ro", abs_parent.to_str().unwrap()));
160 }
161 }
162
163 let mut env_vars = vec![
165 "HTTP_PROXY=http://tsk-proxy:3128".to_string(),
167 "HTTPS_PROXY=http://tsk-proxy:3128".to_string(),
168 "http_proxy=http://tsk-proxy:3128".to_string(),
169 "https_proxy=http://tsk-proxy:3128".to_string(),
170 "NO_PROXY=localhost,127.0.0.1".to_string(),
172 "no_proxy=localhost,127.0.0.1".to_string(),
173 ];
174
175 if let Some(agent) = agent {
177 for (key, value) in agent.environment() {
178 env_vars.push(format!("{key}={value}"));
179 }
180 } else {
181 env_vars.push(format!("HOME=/home/{CONTAINER_USER}"));
183 env_vars.push(format!("USER={CONTAINER_USER}"));
184 }
185
186 Config {
187 image: Some(image.to_string()),
188 user: Some(CONTAINER_USER.to_string()),
190 cmd: command,
191 host_config: Some(bollard::service::HostConfig {
192 binds: Some(binds),
193 network_mode: Some(TSK_NETWORK_NAME.to_string()),
194 memory: Some(CONTAINER_MEMORY_LIMIT),
195 cpu_quota: Some(CONTAINER_CPU_QUOTA),
196 cap_drop: Some(vec![
198 "NET_ADMIN".to_string(),
199 "NET_RAW".to_string(),
200 "SETPCAP".to_string(),
201 "SYS_ADMIN".to_string(),
202 "SYS_PTRACE".to_string(),
203 "DAC_OVERRIDE".to_string(),
204 "AUDIT_WRITE".to_string(),
205 "SETUID".to_string(),
206 "SETGID".to_string(),
207 ]),
208 ..Default::default()
209 }),
210 working_dir: Some(CONTAINER_WORKING_DIR.to_string()),
211 env: Some(env_vars),
212 attach_stdin: Some(interactive),
213 attach_stdout: Some(interactive),
214 attach_stderr: Some(interactive),
215 tty: Some(interactive),
216 ..Default::default()
217 }
218 }
219
220 pub async fn stop_and_remove_container(&self, container_name: &str) -> Result<(), String> {
221 self.client
222 .remove_container(
223 container_name,
224 Some(RemoveContainerOptions {
225 force: true,
226 ..Default::default()
227 }),
228 )
229 .await
230 }
231
232 #[allow(clippy::too_many_arguments)]
247 pub async fn run_task_container(
248 &self,
249 image: &str,
250 worktree_path: &Path,
251 instructions_file_path: Option<&PathBuf>,
252 agent: &dyn Agent,
253 is_interactive: bool,
254 _task_name: &str,
255 log_file_path: Option<&Path>,
256 ) -> Result<(String, Option<crate::agent::TaskResult>), String> {
257 self.ensure_network().await?;
259 self.ensure_proxy().await?;
260
261 let absolute_worktree_path = Self::prepare_worktree_path(worktree_path)?;
262 let worktree_path_str = absolute_worktree_path
263 .to_str()
264 .ok_or_else(|| "Invalid worktree path".to_string())?;
265
266 let command = if is_interactive {
268 let agent_command = agent.build_command(
270 instructions_file_path
271 .and_then(|p| p.to_str())
272 .unwrap_or("instructions.md"),
273 );
274
275 let command_str = format!("{}; exec /bin/bash", agent_command.join(" "));
277 Some(vec!["sh".to_string(), "-c".to_string(), command_str])
278 } else {
279 let agent_command = agent.build_command(
281 instructions_file_path
282 .and_then(|p| p.to_str())
283 .unwrap_or("instructions.md"),
284 );
285 if agent_command.is_empty() {
286 None
287 } else {
288 Some(agent_command)
289 }
290 };
291
292 let config = Self::create_base_container_config(
293 image,
294 worktree_path_str,
295 command,
296 is_interactive,
297 instructions_file_path,
298 Some(agent),
299 );
300
301 let container_name = if is_interactive {
302 format!("tsk-interactive-{}", chrono::Utc::now().timestamp())
303 } else {
304 format!("tsk-{}", chrono::Utc::now().timestamp())
305 };
306
307 let options = CreateContainerOptions {
308 name: container_name.clone(),
309 platform: None,
310 };
311
312 let container_id = self.client.create_container(Some(options), config).await?;
313 self.client.start_container(&container_id).await?;
314
315 if is_interactive {
316 self.attach_to_container(&container_id, &container_name)
318 .await?;
319
320 self.client
322 .remove_container(
323 &container_id,
324 Some(RemoveContainerOptions {
325 force: true,
326 ..Default::default()
327 }),
328 )
329 .await?;
330
331 Ok((String::new(), None))
333 } else {
334 let mut log_processor = agent.create_log_processor(self.file_system.clone());
336 let output = self
337 .stream_container_logs(&container_id, &mut *log_processor)
338 .await?;
339
340 if let Some(log_path) = log_file_path {
342 if let Err(e) = log_processor.save_full_log(log_path).await {
343 eprintln!("Warning: Failed to save full log file: {e}");
344 } else {
345 println!("Full log saved to: {}", log_path.display());
346 }
347 }
348
349 let task_result = log_processor.get_final_result().cloned();
351
352 self.client
354 .remove_container(
355 &container_id,
356 Some(RemoveContainerOptions {
357 force: true,
358 ..Default::default()
359 }),
360 )
361 .await?;
362
363 Ok((output, task_result))
364 }
365 }
366
367 async fn attach_to_container(
370 &self,
371 _container_id: &str,
372 container_name: &str,
373 ) -> Result<(), String> {
374 println!("\nDocker container started successfully!");
375 println!("Container name: {container_name}");
376 println!("\nStarting interactive session...");
377
378 println!("\nAgent command is running, you'll be dropped into a shell when it completes...");
379
380 let status = std::process::Command::new("docker")
383 .args(["attach", container_name])
384 .status()
385 .map_err(|e| format!("Failed to execute docker attach: {e}"))?;
386
387 if !status.success() {
388 eprintln!("Interactive session exited with non-zero status");
389 }
390
391 println!("\nInteractive session ended");
392 Ok(())
393 }
394
395 async fn stream_container_logs(
397 &self,
398 container_id: &str,
399 log_processor: &mut dyn LogProcessor,
400 ) -> Result<String, String> {
401 let client_clone = Arc::clone(&self.client);
403 let container_id_clone = container_id.to_string();
404 let (tx, mut rx) = tokio::sync::mpsc::channel::<String>(100);
405
406 let log_task = tokio::spawn(async move {
407 let log_options = LogsOptions {
408 stdout: true,
409 stderr: true,
410 follow: true,
411 timestamps: false,
412 ..Default::default()
413 };
414
415 match client_clone
416 .logs_stream(&container_id_clone, Some(log_options))
417 .await
418 {
419 Ok(mut stream) => {
420 while let Some(result) = stream.next().await {
421 match result {
422 Ok(log_line) => {
423 if tx.send(log_line).await.is_err() {
424 break;
425 }
426 }
427 Err(e) => {
428 eprintln!("Error streaming logs: {e}");
429 break;
430 }
431 }
432 }
433 }
434 Err(e) => {
435 eprintln!("Failed to start log streaming: {e}");
436 }
437 }
438 });
439
440 let mut all_logs = String::new();
442
443 loop {
445 tokio::select! {
446 Some(log_line) = rx.recv() => {
447 all_logs.push_str(&log_line);
448 if let Some(formatted) = log_processor.process_line(&log_line) {
450 println!("{formatted}");
451 }
452 }
453 exit_code = self.client.wait_container(container_id) => {
454 let exit_code = exit_code?;
455
456 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
458
459 while let Ok(log_line) = rx.try_recv() {
461 all_logs.push_str(&log_line);
462 if let Some(formatted) = log_processor.process_line(&log_line) {
463 println!("{formatted}");
464 }
465 }
466
467 log_task.abort();
469
470 if exit_code == 0 {
471 return Ok(all_logs);
472 } else {
473 return Err(format!(
474 "Container exited with non-zero status: {exit_code}. Output:\n{all_logs}"
475 ));
476 }
477 }
478 }
479 }
480 }
481}
482
483#[cfg(test)]
484mod tests {
485 use super::*;
486 use crate::context::file_system::tests::MockFileSystem;
487 use crate::test_utils::TrackedDockerClient;
488
489 #[tokio::test]
490 async fn test_run_task_container_success() {
491 let mock_client = Arc::new(TrackedDockerClient::default());
492 let fs = Arc::new(MockFileSystem::new());
493 let manager = DockerManager::new(mock_client.clone() as Arc<dyn DockerClient>, fs);
494
495 let worktree_path = Path::new("/tmp/test-worktree");
496
497 let agent = crate::agent::ClaudeCodeAgent::new();
498 let result = manager
499 .run_task_container(
500 "tsk/base",
501 worktree_path,
502 None,
503 &agent,
504 false, "test-task",
506 None, )
508 .await;
509
510 assert!(result.is_ok());
511 let (output, task_result) = result.unwrap();
512 assert_eq!(output, "Container logs");
513 assert!(task_result.is_none());
514
515 let create_calls = mock_client.create_container_calls.lock().unwrap();
516 assert_eq!(create_calls.len(), 2); let task_container_config = &create_calls[1].1;
520 let actual_cmd = task_container_config.cmd.as_ref().unwrap();
521 assert_eq!(actual_cmd.len(), 3);
523 assert_eq!(actual_cmd[0], "sh");
524 assert_eq!(actual_cmd[1], "-c");
525 assert!(actual_cmd[2].contains("claude"));
526
527 assert_eq!(task_container_config.user, Some("agent".to_string()));
529
530 let env = task_container_config.env.as_ref().unwrap();
532 assert!(env.contains(&"HTTP_PROXY=http://tsk-proxy:3128".to_string()));
533 assert!(env.contains(&"HTTPS_PROXY=http://tsk-proxy:3128".to_string()));
534 drop(create_calls); let start_calls = mock_client.start_container_calls.lock().unwrap();
537 assert_eq!(start_calls.len(), 2); assert_eq!(start_calls[0], "tsk-proxy");
539 assert_eq!(start_calls[1], "test-container-id-1");
540
541 let wait_calls = mock_client.wait_container_calls.lock().unwrap();
542 assert_eq!(wait_calls.len(), 1);
543 assert_eq!(wait_calls[0], "test-container-id-1");
544
545 let remove_calls = mock_client.remove_container_calls.lock().unwrap();
546 assert_eq!(remove_calls.len(), 1);
547 assert_eq!(remove_calls[0].0, "test-container-id-1");
548 }
549
550 #[tokio::test]
551 #[ignore = "Interactive mode requires docker attach which doesn't work in test environment"]
552 async fn test_run_task_container_interactive() {
553 let mock_client = Arc::new(TrackedDockerClient::default());
554 let fs = Arc::new(MockFileSystem::new());
555 let manager = DockerManager::new(mock_client.clone() as Arc<dyn DockerClient>, fs);
556
557 let worktree_path = Path::new("/tmp/test-worktree");
558
559 let agent = crate::agent::ClaudeCodeAgent::new();
560 let result = manager
561 .run_task_container(
562 "tsk/base",
563 worktree_path,
564 None,
565 &agent,
566 true, "test-task",
568 None, )
570 .await;
571
572 assert!(result.is_ok());
573 let (output, task_result) = result.unwrap();
574 assert_eq!(output, ""); assert!(task_result.is_none());
576
577 let create_calls = mock_client.create_container_calls.lock().unwrap();
579 assert_eq!(create_calls.len(), 2); let task_container_config = &create_calls[1].1;
581 let actual_cmd = task_container_config.cmd.as_ref().unwrap();
582 assert_eq!(actual_cmd[0], "sh");
583 assert_eq!(actual_cmd[1], "-c");
584 assert!(actual_cmd[2].contains("; exec /bin/bash"));
585
586 assert_eq!(task_container_config.user, Some("agent".to_string()));
588 }
589
590 #[tokio::test]
591 #[ignore = "Test needs to be updated for new behavior where agent commands handle exit codes"]
592 async fn test_run_task_container_non_zero_exit() {
593 let mock_client = Arc::new(TrackedDockerClient::default());
594 let fs = Arc::new(MockFileSystem::new());
595 let manager = DockerManager::new(mock_client.clone() as Arc<dyn DockerClient>, fs);
596
597 let worktree_path = Path::new("/tmp/test-worktree");
598
599 let agent = crate::agent::ClaudeCodeAgent::new();
600 let result = manager
601 .run_task_container(
602 "tsk/base",
603 worktree_path,
604 None,
605 &agent,
606 false, "test-task",
608 None, )
610 .await;
611
612 assert!(result.is_err());
613 assert!(
614 result
615 .unwrap_err()
616 .contains("Container exited with non-zero status: 1")
617 );
618
619 let remove_calls = mock_client.remove_container_calls.lock().unwrap();
620 assert_eq!(remove_calls.len(), 1);
621 }
622
623 #[tokio::test]
624 async fn test_run_task_container_create_fails() {
625 let mut mock_client = TrackedDockerClient::default();
626 mock_client.network_exists = false;
627 mock_client.create_network_error = Some("Docker daemon not running".to_string());
628 let mock_client = Arc::new(mock_client);
629 let fs = Arc::new(MockFileSystem::new());
630 let manager = DockerManager::new(mock_client.clone() as Arc<dyn DockerClient>, fs);
631
632 let worktree_path = Path::new("/tmp/test-worktree");
633
634 let agent = crate::agent::ClaudeCodeAgent::new();
635 let result = manager
636 .run_task_container(
637 "tsk/base",
638 worktree_path,
639 None,
640 &agent,
641 false, "test-task",
643 None, )
645 .await;
646
647 assert!(result.is_err());
648 assert_eq!(result.unwrap_err(), "Docker daemon not running");
649
650 let start_calls = mock_client.start_container_calls.lock().unwrap();
651 assert_eq!(start_calls.len(), 0);
652 }
653
654 #[tokio::test]
655 async fn test_container_configuration() {
656 let mock_client = Arc::new(TrackedDockerClient::default());
657 let fs = Arc::new(MockFileSystem::new());
658 let manager = DockerManager::new(mock_client.clone() as Arc<dyn DockerClient>, fs);
659
660 let worktree_path = Path::new("/tmp/test-worktree");
661
662 let agent = crate::agent::ClaudeCodeAgent::new();
663 let _ = manager
664 .run_task_container(
665 "tsk/base",
666 worktree_path,
667 None,
668 &agent,
669 false, "test-task",
671 None, )
673 .await;
674
675 let create_calls = mock_client.create_container_calls.lock().unwrap();
676 assert_eq!(create_calls.len(), 2); let (proxy_options, proxy_config) = &create_calls[0];
680 assert_eq!(proxy_options.as_ref().unwrap().name, PROXY_CONTAINER_NAME);
681 assert_eq!(proxy_config.image, Some(PROXY_IMAGE.to_string()));
682
683 let (options, config) = &create_calls[1];
685
686 assert!(options.as_ref().unwrap().name.starts_with("tsk-"));
687 assert_eq!(config.image, Some("tsk/base".to_string()));
688 assert_eq!(config.working_dir, Some(CONTAINER_WORKING_DIR.to_string()));
689 assert_eq!(config.user, Some(CONTAINER_USER.to_string()));
691
692 let actual_cmd = config.cmd.as_ref().unwrap();
694 assert_eq!(actual_cmd.len(), 3);
695 assert_eq!(actual_cmd[0], "sh");
696 assert_eq!(actual_cmd[1], "-c");
697 assert!(actual_cmd[2].contains("claude"));
698
699 assert!(config.entrypoint.is_none());
701
702 let host_config = config.host_config.as_ref().unwrap();
703 assert_eq!(host_config.network_mode, Some(TSK_NETWORK_NAME.to_string()));
704 assert_eq!(host_config.memory, Some(CONTAINER_MEMORY_LIMIT));
705 assert_eq!(host_config.cpu_quota, Some(CONTAINER_CPU_QUOTA));
706
707 let binds = host_config.binds.as_ref().unwrap();
708 assert_eq!(binds.len(), 3);
709 assert!(binds[0].contains(&format!("/tmp/test-worktree:{CONTAINER_WORKING_DIR}")));
710 assert!(binds[1].ends_with("/.claude:/home/agent/.claude"));
711 assert!(binds[2].ends_with("/.claude.json:/home/agent/.claude.json"));
712
713 let env = config.env.as_ref().unwrap();
715 assert!(env.contains(&"HTTP_PROXY=http://tsk-proxy:3128".to_string()));
716 assert!(env.contains(&"HTTPS_PROXY=http://tsk-proxy:3128".to_string()));
717 }
718
719 #[tokio::test]
720 async fn test_run_task_container_with_instructions_file() {
721 let mock_client = Arc::new(TrackedDockerClient::default());
722 let fs = Arc::new(MockFileSystem::new());
723 let manager = DockerManager::new(mock_client.clone() as Arc<dyn DockerClient>, fs);
724
725 let worktree_path = Path::new("/tmp/test-worktree");
726 let instructions_path = PathBuf::from("/tmp/tsk-test/instructions.txt");
727
728 let agent = crate::agent::ClaudeCodeAgent::new();
729 let result = manager
730 .run_task_container(
731 "tsk/base",
732 worktree_path,
733 Some(&instructions_path),
734 &agent,
735 false, "test-task",
737 None, )
739 .await;
740
741 assert!(result.is_ok());
742
743 let create_calls = mock_client.create_container_calls.lock().unwrap();
744 assert_eq!(create_calls.len(), 2); let task_container_config = &create_calls[1].1;
748 let host_config = task_container_config.host_config.as_ref().unwrap();
749 let binds = host_config.binds.as_ref().unwrap();
750 assert_eq!(binds.len(), 4); assert!(binds[3].contains("/tmp/tsk-test:/instructions:ro"));
752 }
753
754 #[tokio::test]
755 async fn test_relative_path_conversion() {
756 let mock_client = Arc::new(TrackedDockerClient::default());
757 let fs = Arc::new(MockFileSystem::new());
758 let manager = DockerManager::new(mock_client.clone() as Arc<dyn DockerClient>, fs);
759
760 let temp_dir = tempfile::TempDir::new().unwrap();
762 let absolute_path = temp_dir.path().join("test-worktree");
763
764 let agent = crate::agent::ClaudeCodeAgent::new();
765 let result = manager
766 .run_task_container(
767 "tsk/base",
768 &absolute_path,
769 None,
770 &agent,
771 false, "test-task",
773 None, )
775 .await;
776
777 assert!(result.is_ok());
778
779 let create_calls = mock_client.create_container_calls.lock().unwrap();
780 assert_eq!(create_calls.len(), 2); let (_, config) = &create_calls[1]; let host_config = config.host_config.as_ref().unwrap();
784 let binds = host_config.binds.as_ref().unwrap();
785 let worktree_bind = &binds[0];
786
787 assert!(worktree_bind.starts_with('/'));
789 assert!(worktree_bind.contains("test-worktree"));
790 assert!(worktree_bind.ends_with(&format!(":{CONTAINER_WORKING_DIR}")));
791
792 assert_eq!(binds.len(), 3);
794 assert!(binds[1].ends_with("/.claude:/home/agent/.claude"));
795 assert!(binds[2].ends_with("/.claude.json:/home/agent/.claude.json"));
796 }
797}