1use anyhow::{Context, Result};
2use bollard::{
3 Docker,
4 container::{Config as ContainerConfig, CreateContainerOptions, StartContainerOptions},
5 image::CreateImageOptions,
6 service::{HostConfig, PortBinding, RestartPolicy, RestartPolicyNameEnum, Mount, MountTypeEnum},
7};
8use futures::StreamExt;
9use minifly_core::models::{MachineConfig, GuestConfig, MountConfig};
10use std::collections::HashMap;
11use std::path::PathBuf;
12use tracing::{debug, error, info};
13
14#[derive(Clone)]
15pub struct DockerClient {
16 client: Docker,
17}
18
19impl DockerClient {
20 pub fn new(docker_host: Option<&str>) -> Result<Self> {
21 let client = if let Some(host) = docker_host {
22 Docker::connect_with_socket(host, 120, bollard::API_DEFAULT_VERSION)?
23 } else {
24 Docker::connect_with_local_defaults()?
25 };
26
27 Ok(Self { client })
28 }
29
30 pub async fn create_container(
31 &self,
32 machine_id: &str,
33 app_name: &str,
34 config: &MachineConfig,
35 ) -> Result<String> {
36 info!("Creating container for machine {}", machine_id);
37
38 self.pull_image(&config.image).await?;
40
41 let container_config = self.build_container_config(machine_id, app_name, config)?;
43
44 let options = CreateContainerOptions {
46 name: format!("minifly-{}-{}", app_name, machine_id),
47 ..Default::default()
48 };
49
50 let response = self.client
51 .create_container(Some(options), container_config)
52 .await
53 .context("Failed to create container")?;
54
55 Ok(response.id)
56 }
57
58 pub async fn start_container(&self, container_id: &str) -> Result<()> {
59 info!("Starting container {}", container_id);
60
61 self.client
62 .start_container(container_id, None::<StartContainerOptions<String>>)
63 .await
64 .context("Failed to start container")?;
65
66 Ok(())
67 }
68
69 pub async fn stop_container(&self, container_id: &str, timeout: Option<i64>) -> Result<()> {
70 info!("Stopping container {}", container_id);
71
72 let options = bollard::container::StopContainerOptions {
73 t: timeout.unwrap_or(30),
74 };
75
76 self.client
77 .stop_container(container_id, Some(options))
78 .await
79 .context("Failed to stop container")?;
80
81 Ok(())
82 }
83
84 pub async fn remove_container(&self, container_id: &str) -> Result<()> {
85 info!("Removing container {}", container_id);
86
87 let options = bollard::container::RemoveContainerOptions {
88 force: true,
89 ..Default::default()
90 };
91
92 self.client
93 .remove_container(container_id, Some(options))
94 .await
95 .context("Failed to remove container")?;
96
97 Ok(())
98 }
99
100 pub async fn inspect_container(&self, container_id: &str) -> Result<bollard::models::ContainerInspectResponse> {
101 self.client
102 .inspect_container(container_id, None)
103 .await
104 .context("Failed to inspect container")
105 }
106
107 pub async fn get_container_ports(&self, container_id: &str) -> Result<Vec<u16>> {
119 let container_info = self.inspect_container(container_id).await?;
120 let mut ports = Vec::new();
121
122 if let Some(network_settings) = &container_info.network_settings {
123 if let Some(port_bindings) = &network_settings.ports {
124 for (_, bindings) in port_bindings {
125 if let Some(bindings) = bindings {
126 for binding in bindings {
127 if let Some(host_port) = &binding.host_port {
128 if let Ok(port) = host_port.parse::<u16>() {
129 ports.push(port);
130 }
131 }
132 }
133 }
134 }
135 }
136 }
137
138 ports.sort();
139 Ok(ports)
140 }
141
142 pub async fn version(&self) -> Result<bollard::system::Version> {
144 self.client
145 .version()
146 .await
147 .context("Failed to get Docker version")
148 }
149
150 pub async fn list_containers(&self, filters: Option<HashMap<String, Vec<String>>>) -> Result<Vec<bollard::models::ContainerSummary>> {
152 let options = bollard::container::ListContainersOptions {
153 all: true,
154 filters: filters.unwrap_or_default(),
155 ..Default::default()
156 };
157
158 self.client
159 .list_containers(Some(options))
160 .await
161 .context("Failed to list containers")
162 }
163
164 pub async fn stream_logs(
166 &self,
167 container_id: &str,
168 follow: bool,
169 tail: Option<String>,
170 timestamps: bool,
171 ) -> Result<impl futures::Stream<Item = Result<bollard::container::LogOutput, bollard::errors::Error>>> {
172 use bollard::container::LogsOptions;
173
174 let options = LogsOptions::<String> {
175 follow,
176 stdout: true,
177 stderr: true,
178 timestamps,
179 tail: tail.unwrap_or_default(),
180 ..Default::default()
181 };
182
183 Ok(self.client.logs(container_id, Some(options)))
184 }
185
186 pub async fn get_container_id_by_machine(&self, machine_id: &str) -> Result<Option<String>> {
188 let mut filters = HashMap::new();
189 filters.insert("label".to_string(), vec![format!("minifly.machine_id={}", machine_id)]);
190
191 let containers = self.list_containers(Some(filters)).await?;
192
193 Ok(containers.into_iter()
194 .next()
195 .and_then(|c| c.id))
196 }
197
198 async fn pull_image(&self, image: &str) -> Result<()> {
199 if image.contains("-local:") || image.ends_with("-local:latest") {
201 info!("Skipping pull for local image: {}", image);
202 return Ok(());
203 }
204
205 info!("Pulling image: {}", image);
206
207 let options = CreateImageOptions {
208 from_image: image,
209 ..Default::default()
210 };
211
212 let mut stream = self.client.create_image(Some(options), None, None);
213
214 while let Some(result) = stream.next().await {
215 match result {
216 Ok(info) => debug!("Pull progress: {:?}", info),
217 Err(e) => {
218 error!("Error pulling image: {}", e);
219 return Err(e.into());
220 }
221 }
222 }
223
224 Ok(())
225 }
226
227 fn build_container_config(
228 &self,
229 machine_id: &str,
230 app_name: &str,
231 config: &MachineConfig,
232 ) -> Result<ContainerConfig<String>> {
233 let mut labels = HashMap::new();
234 labels.insert("minifly.managed".to_string(), "true".to_string());
235 labels.insert("minifly.machine_id".to_string(), machine_id.to_string());
236 labels.insert("minifly.app_name".to_string(), app_name.to_string());
237 labels.insert("minifly.region".to_string(), "local".to_string());
238
239 let mut container_config = ContainerConfig::<String> {
240 image: Some(config.image.clone()),
241 hostname: Some(format!("{}.vm.{}.internal", machine_id, app_name)),
242 labels: Some(labels),
243 ..Default::default()
244 };
245
246 let mut env_vars = config.env.clone().unwrap_or_default();
248 self.translate_fly_env_vars(&mut env_vars, app_name, machine_id);
249
250 let env_vec: Vec<String> = env_vars.iter()
251 .map(|(k, v)| format!("{}={}", k, v))
252 .collect();
253 container_config.env = Some(env_vec);
254
255 if let Some(init) = &config.init {
257 if let Some(exec) = &init.exec {
258 container_config.cmd = Some(exec.clone());
259 } else if let Some(entrypoint) = &init.entrypoint {
260 container_config.entrypoint = Some(entrypoint.clone());
261 if let Some(cmd) = &init.cmd {
262 container_config.cmd = Some(cmd.clone());
263 }
264 }
265 }
266
267 let mut host_config = HostConfig::default();
269
270 self.set_resource_limits(&mut host_config, &config.guest);
272
273 if let Some(restart) = &config.restart {
275 host_config.restart_policy = Some(RestartPolicy {
276 name: Some(match restart.policy.as_str() {
277 "always" => RestartPolicyNameEnum::ALWAYS,
278 "on-failure" => RestartPolicyNameEnum::ON_FAILURE,
279 "unless-stopped" => RestartPolicyNameEnum::UNLESS_STOPPED,
280 _ => RestartPolicyNameEnum::NO,
281 }),
282 maximum_retry_count: restart.max_retries.map(|n| n as i64),
283 });
284 }
285
286 if let Some(services) = &config.services {
289 let mut port_bindings = HashMap::new();
290
291 for service in services {
292 let internal_port = format!("{}/tcp", service.internal_port);
293
294 let binding = PortBinding {
298 host_ip: Some("0.0.0.0".to_string()),
299 host_port: Some("0".to_string()), };
301
302 port_bindings.insert(internal_port, Some(vec![binding]));
303 }
304
305 host_config.port_bindings = Some(port_bindings);
306 }
307
308 if let Some(mounts) = &config.mounts {
310 host_config.mounts = Some(self.map_fly_volumes(mounts, app_name)?);
311 }
312
313 container_config.host_config = Some(host_config);
314
315 Ok(container_config)
316 }
317
318 fn set_resource_limits(&self, host_config: &mut HostConfig, guest: &GuestConfig) {
319 match guest.cpu_kind.as_str() {
321 "shared" => {
322 host_config.cpu_shares = Some((guest.cpus * 1024) as i64);
324 }
325 "performance" => {
326 host_config.cpu_period = Some(100000);
328 host_config.cpu_quota = Some((guest.cpus as i64) * 100000);
329 }
330 _ => {}
331 }
332
333 host_config.memory = Some((guest.memory_mb as i64) * 1024 * 1024);
335 }
336
337 fn translate_fly_env_vars(&self, env: &mut HashMap<String, String>, app_name: &str, machine_id: &str) {
339 env.insert("FLY_APP_NAME".to_string(), app_name.to_string());
341 env.insert("FLY_MACHINE_ID".to_string(), machine_id.to_string());
342 env.insert("FLY_REGION".to_string(), "local".to_string());
343 env.insert("FLY_PUBLIC_IP".to_string(), "127.0.0.1".to_string());
344
345 let machine_suffix = machine_id.chars()
347 .filter(|c| c.is_numeric())
348 .take(3)
349 .collect::<String>()
350 .parse::<u8>()
351 .unwrap_or(2);
352 env.insert("FLY_PRIVATE_IP".to_string(), format!("172.19.0.{}", machine_suffix));
353
354 env.insert("FLY_CONSUL_URL".to_string(), "http://localhost:8500".to_string());
356 env.insert("PRIMARY_REGION".to_string(), "local".to_string());
357
358 if env.contains_key("TIGRIS_ENDPOINT") || env.contains_key("AWS_ENDPOINT_URL") {
360 env.insert("TIGRIS_ENDPOINT".to_string(), "http://localhost:9000".to_string());
361 env.insert("AWS_ENDPOINT_URL".to_string(), "http://localhost:9000".to_string());
362 env.insert("AWS_ENDPOINT_URL_S3".to_string(), "http://localhost:9000".to_string());
363 }
364
365 if !env.contains_key("NODE_ENV") && !env.contains_key("RAILS_ENV") {
367 env.insert("NODE_ENV".to_string(), "development".to_string());
368 }
369 }
370
371 fn map_fly_volumes(&self, mounts: &[MountConfig], app_name: &str) -> Result<Vec<Mount>> {
373 mounts.iter().map(|mount| {
374 let base_path = if let Ok(data_dir) = std::env::var("MINIFLY_DATA_DIR") {
376 PathBuf::from(data_dir)
377 } else {
378 PathBuf::from("/tmp")
380 };
381
382 let local_path = base_path.join("minifly-data").join(app_name).join("volumes").join(&mount.volume);
384
385 std::fs::create_dir_all(&local_path)
387 .context(format!("Failed to create volume directory: {:?}", local_path))?;
388
389 if mount.path == "/litefs" || mount.path.contains("data") {
391 let db_file = local_path.join("app.db");
392 if !db_file.exists() {
393 std::fs::File::create(&db_file)
394 .context(format!("Failed to create database file: {:?}", db_file))?;
395 info!("Created database file: {:?}", db_file);
396 }
397 }
398
399 Ok(Mount {
400 target: Some(mount.path.clone()),
401 source: Some(local_path.to_string_lossy().to_string()),
402 typ: Some(MountTypeEnum::BIND),
403 read_only: Some(false),
404 consistency: Some("consistent".to_string()),
405 ..Default::default()
406 })
407 }).collect()
408 }
409}
410
411#[cfg(test)]
412mod tests {
413 use super::*;
414 use minifly_core::models::{ServiceConfig, PortConfig};
415
416 #[test]
417 fn test_translate_fly_env_vars() {
418 let client = DockerClient { client: Docker::connect_with_local_defaults().unwrap() };
419 let mut env = HashMap::new();
420
421 client.translate_fly_env_vars(&mut env, "test-app", "d123456789");
422
423 assert_eq!(env.get("FLY_APP_NAME").unwrap(), "test-app");
424 assert_eq!(env.get("FLY_MACHINE_ID").unwrap(), "d123456789");
425 assert_eq!(env.get("FLY_REGION").unwrap(), "local");
426 assert_eq!(env.get("FLY_PUBLIC_IP").unwrap(), "127.0.0.1");
427 assert!(env.contains_key("FLY_PRIVATE_IP"));
428 assert_eq!(env.get("FLY_CONSUL_URL").unwrap(), "http://localhost:8500");
429 assert_eq!(env.get("PRIMARY_REGION").unwrap(), "local");
430 assert_eq!(env.get("NODE_ENV").unwrap(), "development");
431 }
432
433 #[test]
434 fn test_translate_fly_env_vars_with_tigris() {
435 let client = DockerClient { client: Docker::connect_with_local_defaults().unwrap() };
436 let mut env = HashMap::new();
437 env.insert("TIGRIS_ENDPOINT".to_string(), "https://fly.storage.tigris.dev".to_string());
438
439 client.translate_fly_env_vars(&mut env, "test-app", "d123456789");
440
441 assert_eq!(env.get("TIGRIS_ENDPOINT").unwrap(), "http://localhost:9000");
442 assert_eq!(env.get("AWS_ENDPOINT_URL").unwrap(), "http://localhost:9000");
443 assert_eq!(env.get("AWS_ENDPOINT_URL_S3").unwrap(), "http://localhost:9000");
444 }
445
446 #[test]
447 fn test_translate_fly_env_vars_preserves_existing_node_env() {
448 let client = DockerClient { client: Docker::connect_with_local_defaults().unwrap() };
449 let mut env = HashMap::new();
450 env.insert("NODE_ENV".to_string(), "production".to_string());
451
452 client.translate_fly_env_vars(&mut env, "test-app", "d123456789");
453
454 assert_eq!(env.get("NODE_ENV").unwrap(), "production");
455 }
456
457 #[test]
458 fn test_build_container_config_uses_automatic_port_allocation() {
459 let client = DockerClient { client: Docker::connect_with_local_defaults().unwrap() };
460
461 let config = MachineConfig {
462 image: "nginx:alpine".to_string(),
463 guest: GuestConfig {
464 cpu_kind: "shared".to_string(),
465 cpus: 1,
466 memory_mb: 256,
467 gpu_kind: None,
468 gpus: None,
469 kernel_args: None,
470 },
471 env: None,
472 services: Some(vec![ServiceConfig {
473 ports: vec![
474 PortConfig {
475 port: 80,
476 handlers: vec!["http".to_string()],
477 force_https: Some(false),
478 tls_options: None,
479 },
480 PortConfig {
481 port: 443,
482 handlers: vec!["tls".to_string(), "http".to_string()],
483 force_https: Some(false),
484 tls_options: None,
485 },
486 ],
487 protocol: "tcp".to_string(),
488 internal_port: 80,
489 autostop: None,
490 autostart: None,
491 force_instance_description: None,
492 }]),
493 checks: None,
494 restart: None,
495 auto_destroy: None,
496 dns: None,
497 processes: None,
498 files: None,
499 init: None,
500 mounts: None,
501 containers: None,
502 };
503
504 let container_config = client.build_container_config("test-machine", "test-app", &config).unwrap();
505
506 let host_config = container_config.host_config.unwrap();
508 let port_bindings = host_config.port_bindings.unwrap();
509
510 assert!(port_bindings.contains_key("80/tcp"));
512 let bindings = port_bindings.get("80/tcp").unwrap().as_ref().unwrap();
513
514 assert_eq!(bindings.len(), 1);
516
517 let binding = &bindings[0];
519 assert_eq!(binding.host_ip.as_ref().unwrap(), "0.0.0.0");
520 assert_eq!(binding.host_port.as_ref().unwrap(), "0");
521 }
522}