minifly_api/
docker.rs

1use anyhow::{Context, Result};
2use bollard::{
3    Docker,
4    container::{Config as ContainerConfig, CreateContainerOptions, StartContainerOptions},
5    image::CreateImageOptions,
6    service::{HostConfig, PortBinding, RestartPolicy, RestartPolicyNameEnum, Mount, MountTypeEnum},
7};
8use futures::StreamExt;
9use minifly_core::models::{MachineConfig, GuestConfig, MountConfig};
10use std::collections::HashMap;
11use std::path::PathBuf;
12use tracing::{debug, error, info};
13
14#[derive(Clone)]
15pub struct DockerClient {
16    client: Docker,
17}
18
19impl DockerClient {
20    pub fn new(docker_host: Option<&str>) -> Result<Self> {
21        let client = if let Some(host) = docker_host {
22            Docker::connect_with_socket(host, 120, bollard::API_DEFAULT_VERSION)?
23        } else {
24            Docker::connect_with_local_defaults()?
25        };
26        
27        Ok(Self { client })
28    }
29    
30    pub async fn create_container(
31        &self,
32        machine_id: &str,
33        app_name: &str,
34        config: &MachineConfig,
35    ) -> Result<String> {
36        info!("Creating container for machine {}", machine_id);
37        
38        // Pull image if needed
39        self.pull_image(&config.image).await?;
40        
41        // Build container configuration
42        let container_config = self.build_container_config(machine_id, app_name, config)?;
43        
44        // Create container
45        let options = CreateContainerOptions {
46            name: format!("minifly-{}-{}", app_name, machine_id),
47            ..Default::default()
48        };
49        
50        let response = self.client
51            .create_container(Some(options), container_config)
52            .await
53            .context("Failed to create container")?;
54        
55        Ok(response.id)
56    }
57    
58    pub async fn start_container(&self, container_id: &str) -> Result<()> {
59        info!("Starting container {}", container_id);
60        
61        self.client
62            .start_container(container_id, None::<StartContainerOptions<String>>)
63            .await
64            .context("Failed to start container")?;
65        
66        Ok(())
67    }
68    
69    pub async fn stop_container(&self, container_id: &str, timeout: Option<i64>) -> Result<()> {
70        info!("Stopping container {}", container_id);
71        
72        let options = bollard::container::StopContainerOptions {
73            t: timeout.unwrap_or(30),
74        };
75        
76        self.client
77            .stop_container(container_id, Some(options))
78            .await
79            .context("Failed to stop container")?;
80        
81        Ok(())
82    }
83    
84    pub async fn remove_container(&self, container_id: &str) -> Result<()> {
85        info!("Removing container {}", container_id);
86        
87        let options = bollard::container::RemoveContainerOptions {
88            force: true,
89            ..Default::default()
90        };
91        
92        self.client
93            .remove_container(container_id, Some(options))
94            .await
95            .context("Failed to remove container")?;
96        
97        Ok(())
98    }
99    
100    pub async fn inspect_container(&self, container_id: &str) -> Result<bollard::models::ContainerInspectResponse> {
101        self.client
102            .inspect_container(container_id, None)
103            .await
104            .context("Failed to inspect container")
105    }
106    
107    /// Get the assigned host ports for a container
108    /// 
109    /// Since we use automatic port allocation (port 0), Docker assigns ephemeral ports.
110    /// This function retrieves the actual assigned ports after container creation.
111    /// 
112    /// # Arguments
113    /// * `container_id` - The Docker container ID
114    /// 
115    /// # Returns
116    /// * `Ok(Vec<u16>)` - List of assigned host ports
117    /// * `Err(...)` - Failed to inspect container
118    pub async fn get_container_ports(&self, container_id: &str) -> Result<Vec<u16>> {
119        let container_info = self.inspect_container(container_id).await?;
120        let mut ports = Vec::new();
121        
122        if let Some(network_settings) = &container_info.network_settings {
123            if let Some(port_bindings) = &network_settings.ports {
124                for (_, bindings) in port_bindings {
125                    if let Some(bindings) = bindings {
126                        for binding in bindings {
127                            if let Some(host_port) = &binding.host_port {
128                                if let Ok(port) = host_port.parse::<u16>() {
129                                    ports.push(port);
130                                }
131                            }
132                        }
133                    }
134                }
135            }
136        }
137        
138        ports.sort();
139        Ok(ports)
140    }
141    
142    /// Get Docker daemon version information
143    pub async fn version(&self) -> Result<bollard::system::Version> {
144        self.client
145            .version()
146            .await
147            .context("Failed to get Docker version")
148    }
149    
150    /// List containers with optional filters
151    pub async fn list_containers(&self, filters: Option<HashMap<String, Vec<String>>>) -> Result<Vec<bollard::models::ContainerSummary>> {
152        let options = bollard::container::ListContainersOptions {
153            all: true,
154            filters: filters.unwrap_or_default(),
155            ..Default::default()
156        };
157        
158        self.client
159            .list_containers(Some(options))
160            .await
161            .context("Failed to list containers")
162    }
163    
164    /// Stream logs from a container
165    pub async fn stream_logs(
166        &self, 
167        container_id: &str,
168        follow: bool,
169        tail: Option<String>,
170        timestamps: bool,
171    ) -> Result<impl futures::Stream<Item = Result<bollard::container::LogOutput, bollard::errors::Error>>> {
172        use bollard::container::LogsOptions;
173        
174        let options = LogsOptions::<String> {
175            follow,
176            stdout: true,
177            stderr: true,
178            timestamps,
179            tail: tail.unwrap_or_default(),
180            ..Default::default()
181        };
182        
183        Ok(self.client.logs(container_id, Some(options)))
184    }
185    
186    /// Get container ID by machine ID
187    pub async fn get_container_id_by_machine(&self, machine_id: &str) -> Result<Option<String>> {
188        let mut filters = HashMap::new();
189        filters.insert("label".to_string(), vec![format!("minifly.machine_id={}", machine_id)]);
190        
191        let containers = self.list_containers(Some(filters)).await?;
192        
193        Ok(containers.into_iter()
194            .next()
195            .and_then(|c| c.id))
196    }
197    
198    async fn pull_image(&self, image: &str) -> Result<()> {
199        // Skip pulling for local images (those ending with :latest and containing 'local')
200        if image.contains("-local:") || image.ends_with("-local:latest") {
201            info!("Skipping pull for local image: {}", image);
202            return Ok(());
203        }
204        
205        info!("Pulling image: {}", image);
206        
207        let options = CreateImageOptions {
208            from_image: image,
209            ..Default::default()
210        };
211        
212        let mut stream = self.client.create_image(Some(options), None, None);
213        
214        while let Some(result) = stream.next().await {
215            match result {
216                Ok(info) => debug!("Pull progress: {:?}", info),
217                Err(e) => {
218                    error!("Error pulling image: {}", e);
219                    return Err(e.into());
220                }
221            }
222        }
223        
224        Ok(())
225    }
226    
227    fn build_container_config(
228        &self,
229        machine_id: &str,
230        app_name: &str,
231        config: &MachineConfig,
232    ) -> Result<ContainerConfig<String>> {
233        let mut labels = HashMap::new();
234        labels.insert("minifly.managed".to_string(), "true".to_string());
235        labels.insert("minifly.machine_id".to_string(), machine_id.to_string());
236        labels.insert("minifly.app_name".to_string(), app_name.to_string());
237        labels.insert("minifly.region".to_string(), "local".to_string());
238        
239        let mut container_config = ContainerConfig::<String> {
240            image: Some(config.image.clone()),
241            hostname: Some(format!("{}.vm.{}.internal", machine_id, app_name)),
242            labels: Some(labels),
243            ..Default::default()
244        };
245        
246        // Set environment variables with Fly.io translations
247        let mut env_vars = config.env.clone().unwrap_or_default();
248        self.translate_fly_env_vars(&mut env_vars, app_name, machine_id);
249        
250        let env_vec: Vec<String> = env_vars.iter()
251            .map(|(k, v)| format!("{}={}", k, v))
252            .collect();
253        container_config.env = Some(env_vec);
254        
255        // Set command
256        if let Some(init) = &config.init {
257            if let Some(exec) = &init.exec {
258                container_config.cmd = Some(exec.clone());
259            } else if let Some(entrypoint) = &init.entrypoint {
260                container_config.entrypoint = Some(entrypoint.clone());
261                if let Some(cmd) = &init.cmd {
262                    container_config.cmd = Some(cmd.clone());
263                }
264            }
265        }
266        
267        // Build host configuration
268        let mut host_config = HostConfig::default();
269        
270        // Set resource limits
271        self.set_resource_limits(&mut host_config, &config.guest);
272        
273        // Set restart policy
274        if let Some(restart) = &config.restart {
275            host_config.restart_policy = Some(RestartPolicy {
276                name: Some(match restart.policy.as_str() {
277                    "always" => RestartPolicyNameEnum::ALWAYS,
278                    "on-failure" => RestartPolicyNameEnum::ON_FAILURE,
279                    "unless-stopped" => RestartPolicyNameEnum::UNLESS_STOPPED,
280                    _ => RestartPolicyNameEnum::NO,
281                }),
282                maximum_retry_count: restart.max_retries.map(|n| n as i64),
283            });
284        }
285        
286        // Set port bindings with automatic port allocation for local development
287        // This prevents port conflicts when running multiple apps or when ports are already in use
288        if let Some(services) = &config.services {
289            let mut port_bindings = HashMap::new();
290            
291            for service in services {
292                let internal_port = format!("{}/tcp", service.internal_port);
293                
294                // For local development, use automatic port allocation (port 0)
295                // Docker will assign an available ephemeral port (typically 32768-65535)
296                // This avoids conflicts with other services like web servers on port 80/443
297                let binding = PortBinding {
298                    host_ip: Some("0.0.0.0".to_string()),
299                    host_port: Some("0".to_string()), // Let Docker assign an available port
300                };
301                
302                port_bindings.insert(internal_port, Some(vec![binding]));
303            }
304            
305            host_config.port_bindings = Some(port_bindings);
306        }
307        
308        // Set volume mounts
309        if let Some(mounts) = &config.mounts {
310            host_config.mounts = Some(self.map_fly_volumes(mounts, app_name)?);
311        }
312        
313        container_config.host_config = Some(host_config);
314        
315        Ok(container_config)
316    }
317    
318    fn set_resource_limits(&self, host_config: &mut HostConfig, guest: &GuestConfig) {
319        // Set CPU limits
320        match guest.cpu_kind.as_str() {
321            "shared" => {
322                // For shared CPUs, use CPU shares (relative weight)
323                host_config.cpu_shares = Some((guest.cpus * 1024) as i64);
324            }
325            "performance" => {
326                // For performance CPUs, use CPU quota
327                host_config.cpu_period = Some(100000);
328                host_config.cpu_quota = Some((guest.cpus as i64) * 100000);
329            }
330            _ => {}
331        }
332        
333        // Set memory limit
334        host_config.memory = Some((guest.memory_mb as i64) * 1024 * 1024);
335    }
336    
337    /// Translate Fly.io-specific environment variables to minifly equivalents
338    fn translate_fly_env_vars(&self, env: &mut HashMap<String, String>, app_name: &str, machine_id: &str) {
339        // Core Fly.io environment variables
340        env.insert("FLY_APP_NAME".to_string(), app_name.to_string());
341        env.insert("FLY_MACHINE_ID".to_string(), machine_id.to_string());
342        env.insert("FLY_REGION".to_string(), "local".to_string());
343        env.insert("FLY_PUBLIC_IP".to_string(), "127.0.0.1".to_string());
344        
345        // Generate a consistent private IP based on machine ID
346        let machine_suffix = machine_id.chars()
347            .filter(|c| c.is_numeric())
348            .take(3)
349            .collect::<String>()
350            .parse::<u8>()
351            .unwrap_or(2);
352        env.insert("FLY_PRIVATE_IP".to_string(), format!("172.19.0.{}", machine_suffix));
353        
354        // Simulate Fly's internal DNS and services
355        env.insert("FLY_CONSUL_URL".to_string(), "http://localhost:8500".to_string());
356        env.insert("PRIMARY_REGION".to_string(), "local".to_string());
357        
358        // If using Tigris/S3, point to local MinIO (if configured)
359        if env.contains_key("TIGRIS_ENDPOINT") || env.contains_key("AWS_ENDPOINT_URL") {
360            env.insert("TIGRIS_ENDPOINT".to_string(), "http://localhost:9000".to_string());
361            env.insert("AWS_ENDPOINT_URL".to_string(), "http://localhost:9000".to_string());
362            env.insert("AWS_ENDPOINT_URL_S3".to_string(), "http://localhost:9000".to_string());
363        }
364        
365        // Add helpful development overrides
366        if !env.contains_key("NODE_ENV") && !env.contains_key("RAILS_ENV") {
367            env.insert("NODE_ENV".to_string(), "development".to_string());
368        }
369    }
370    
371    /// Map Fly volumes to local directories
372    fn map_fly_volumes(&self, mounts: &[MountConfig], app_name: &str) -> Result<Vec<Mount>> {
373        mounts.iter().map(|mount| {
374            // Use absolute path based on data directory (reuse existing minifly-data structure)
375            let base_path = if let Ok(data_dir) = std::env::var("MINIFLY_DATA_DIR") {
376                PathBuf::from(data_dir)
377            } else {
378                // Default to /tmp for volumes if no data dir specified
379                PathBuf::from("/tmp")
380            };
381            
382            // Keep the existing minifly-data structure for compatibility
383            let local_path = base_path.join("minifly-data").join(app_name).join("volumes").join(&mount.volume);
384            
385            // Ensure directory exists
386            std::fs::create_dir_all(&local_path)
387                .context(format!("Failed to create volume directory: {:?}", local_path))?;
388            
389            // Create database file if it's a SQLite database path
390            if mount.path == "/litefs" || mount.path.contains("data") {
391                let db_file = local_path.join("app.db");
392                if !db_file.exists() {
393                    std::fs::File::create(&db_file)
394                        .context(format!("Failed to create database file: {:?}", db_file))?;
395                    info!("Created database file: {:?}", db_file);
396                }
397            }
398            
399            Ok(Mount {
400                target: Some(mount.path.clone()),
401                source: Some(local_path.to_string_lossy().to_string()),
402                typ: Some(MountTypeEnum::BIND),
403                read_only: Some(false),
404                consistency: Some("consistent".to_string()),
405                ..Default::default()
406            })
407        }).collect()
408    }
409}
410
411#[cfg(test)]
412mod tests {
413    use super::*;
414    use minifly_core::models::{ServiceConfig, PortConfig};
415    
416    #[test]
417    fn test_translate_fly_env_vars() {
418        let client = DockerClient { client: Docker::connect_with_local_defaults().unwrap() };
419        let mut env = HashMap::new();
420        
421        client.translate_fly_env_vars(&mut env, "test-app", "d123456789");
422        
423        assert_eq!(env.get("FLY_APP_NAME").unwrap(), "test-app");
424        assert_eq!(env.get("FLY_MACHINE_ID").unwrap(), "d123456789");
425        assert_eq!(env.get("FLY_REGION").unwrap(), "local");
426        assert_eq!(env.get("FLY_PUBLIC_IP").unwrap(), "127.0.0.1");
427        assert!(env.contains_key("FLY_PRIVATE_IP"));
428        assert_eq!(env.get("FLY_CONSUL_URL").unwrap(), "http://localhost:8500");
429        assert_eq!(env.get("PRIMARY_REGION").unwrap(), "local");
430        assert_eq!(env.get("NODE_ENV").unwrap(), "development");
431    }
432    
433    #[test]
434    fn test_translate_fly_env_vars_with_tigris() {
435        let client = DockerClient { client: Docker::connect_with_local_defaults().unwrap() };
436        let mut env = HashMap::new();
437        env.insert("TIGRIS_ENDPOINT".to_string(), "https://fly.storage.tigris.dev".to_string());
438        
439        client.translate_fly_env_vars(&mut env, "test-app", "d123456789");
440        
441        assert_eq!(env.get("TIGRIS_ENDPOINT").unwrap(), "http://localhost:9000");
442        assert_eq!(env.get("AWS_ENDPOINT_URL").unwrap(), "http://localhost:9000");
443        assert_eq!(env.get("AWS_ENDPOINT_URL_S3").unwrap(), "http://localhost:9000");
444    }
445    
446    #[test]
447    fn test_translate_fly_env_vars_preserves_existing_node_env() {
448        let client = DockerClient { client: Docker::connect_with_local_defaults().unwrap() };
449        let mut env = HashMap::new();
450        env.insert("NODE_ENV".to_string(), "production".to_string());
451        
452        client.translate_fly_env_vars(&mut env, "test-app", "d123456789");
453        
454        assert_eq!(env.get("NODE_ENV").unwrap(), "production");
455    }
456    
457    #[test]
458    fn test_build_container_config_uses_automatic_port_allocation() {
459        let client = DockerClient { client: Docker::connect_with_local_defaults().unwrap() };
460        
461        let config = MachineConfig {
462            image: "nginx:alpine".to_string(),
463            guest: GuestConfig {
464                cpu_kind: "shared".to_string(),
465                cpus: 1,
466                memory_mb: 256,
467                gpu_kind: None,
468                gpus: None,
469                kernel_args: None,
470            },
471            env: None,
472            services: Some(vec![ServiceConfig {
473                ports: vec![
474                    PortConfig {
475                        port: 80,
476                        handlers: vec!["http".to_string()],
477                        force_https: Some(false),
478                        tls_options: None,
479                    },
480                    PortConfig {
481                        port: 443,
482                        handlers: vec!["tls".to_string(), "http".to_string()],
483                        force_https: Some(false),
484                        tls_options: None,
485                    },
486                ],
487                protocol: "tcp".to_string(),
488                internal_port: 80,
489                autostop: None,
490                autostart: None,
491                force_instance_description: None,
492            }]),
493            checks: None,
494            restart: None,
495            auto_destroy: None,
496            dns: None,
497            processes: None,
498            files: None,
499            init: None,
500            mounts: None,
501            containers: None,
502        };
503        
504        let container_config = client.build_container_config("test-machine", "test-app", &config).unwrap();
505        
506        // Check that host config has port bindings
507        let host_config = container_config.host_config.unwrap();
508        let port_bindings = host_config.port_bindings.unwrap();
509        
510        // Check that port 80/tcp is mapped
511        assert!(port_bindings.contains_key("80/tcp"));
512        let bindings = port_bindings.get("80/tcp").unwrap().as_ref().unwrap();
513        
514        // Should have exactly one binding (not two like before)
515        assert_eq!(bindings.len(), 1);
516        
517        // Check that it uses automatic port allocation (port "0")
518        let binding = &bindings[0];
519        assert_eq!(binding.host_ip.as_ref().unwrap(), "0.0.0.0");
520        assert_eq!(binding.host_port.as_ref().unwrap(), "0");
521    }
522}