use docker_api::opts::{
ContainerCreateOpts, ContainerListOpts, ContainerRestartOpts, ContainerStopOpts,
ExecCreateOpts, ExecStartOpts, ImageBuildOpts, ImagePushOpts, LogsOpts, NetworkCreateOpts,
PublishPort, PullOpts, RegistryAuth, VolumeCreateOpts,
};
use docker_api::{Containers, Images, Networks, Volumes};
use pyo3::exceptions::{PyRuntimeError, PyValueError};
use pyo3::prelude::*;
use pythonize::pythonize;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::fs;
use std::path::Path;
use crate::Pyo3Docker;
#[pymodule]
pub fn compose(_py: Python<'_>, m: &Bound<'_, PyModule>) -> PyResult<()> {
m.add_class::<Pyo3ComposeFile>()?;
m.add_class::<Pyo3ComposeProject>()?;
m.add_function(wrap_pyfunction!(parse_compose_string, m)?)?;
m.add_function(wrap_pyfunction!(parse_compose_file, m)?)?;
Ok(())
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[pyclass(name = "ComposeFile")]
pub struct Pyo3ComposeFile {
#[serde(default)]
pub version: Option<String>,
#[serde(default)]
pub services: HashMap<String, ComposeService>,
#[serde(default)]
pub networks: HashMap<String, Option<ComposeNetwork>>,
#[serde(default)]
pub volumes: HashMap<String, Option<ComposeVolume>>,
#[serde(default)]
pub configs: HashMap<String, Option<ComposeConfig>>,
#[serde(default)]
pub secrets: HashMap<String, Option<ComposeSecret>>,
#[serde(default)]
pub name: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct ComposeService {
#[serde(default)]
pub image: Option<String>,
#[serde(default)]
pub build: Option<ComposeBuild>,
#[serde(default)]
pub container_name: Option<String>,
#[serde(default)]
pub command: Option<StringOrList>,
#[serde(default)]
pub entrypoint: Option<StringOrList>,
#[serde(default)]
pub environment: Option<EnvironmentVars>,
#[serde(default)]
pub env_file: Option<StringOrList>,
#[serde(default)]
pub ports: Option<Vec<PortMapping>>,
#[serde(default)]
#[serde(alias = "volume")]
pub volumes: Option<Vec<VolumeMount>>,
#[serde(default)]
pub networks: Option<ServiceNetworks>,
#[serde(default)]
pub depends_on: Option<DependsOn>,
#[serde(default)]
pub restart: Option<String>,
#[serde(default)]
pub working_dir: Option<String>,
#[serde(default)]
pub user: Option<String>,
#[serde(default)]
pub labels: Option<Labels>,
#[serde(default)]
pub extra_hosts: Option<Vec<String>>,
#[serde(default)]
pub dns: Option<StringOrList>,
#[serde(default)]
pub hostname: Option<String>,
#[serde(default)]
pub domainname: Option<String>,
#[serde(default)]
pub privileged: Option<bool>,
#[serde(default)]
pub read_only: Option<bool>,
#[serde(default)]
pub stdin_open: Option<bool>,
#[serde(default)]
pub tty: Option<bool>,
#[serde(default)]
pub stop_signal: Option<String>,
#[serde(default)]
pub stop_grace_period: Option<String>,
#[serde(default)]
pub healthcheck: Option<HealthCheck>,
#[serde(default)]
pub logging: Option<LoggingConfig>,
#[serde(default)]
pub deploy: Option<DeployConfig>,
#[serde(default)]
pub secrets: Option<Vec<ServiceSecret>>,
#[serde(default)]
pub configs: Option<Vec<ServiceConfig>>,
#[serde(default)]
pub cap_add: Option<Vec<String>>,
#[serde(default)]
pub cap_drop: Option<Vec<String>>,
#[serde(default)]
pub devices: Option<Vec<String>>,
#[serde(default)]
pub expose: Option<Vec<StringOrInt>>,
#[serde(default)]
pub links: Option<Vec<String>>,
#[serde(default)]
pub network_mode: Option<String>,
#[serde(default)]
pub pid: Option<String>,
#[serde(default)]
pub ipc: Option<String>,
#[serde(default)]
pub security_opt: Option<Vec<String>>,
#[serde(default)]
pub sysctls: Option<HashMap<String, StringOrInt>>,
#[serde(default)]
pub ulimits: Option<HashMap<String, Ulimit>>,
#[serde(default)]
pub tmpfs: Option<StringOrList>,
#[serde(default)]
pub init: Option<bool>,
#[serde(default)]
pub profiles: Option<Vec<String>>,
#[serde(default)]
pub platform: Option<String>,
#[serde(default)]
pub pull_policy: Option<String>,
#[serde(default)]
pub scale: Option<i32>,
#[serde(default)]
pub mem_limit: Option<StringOrInt>,
#[serde(default)]
pub mem_reservation: Option<StringOrInt>,
#[serde(default)]
pub cpus: Option<f64>,
#[serde(default)]
pub shm_size: Option<StringOrInt>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(untagged)]
pub enum ComposeBuild {
Simple(String),
Full(BuildConfig),
}
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct BuildConfig {
#[serde(default)]
pub context: Option<String>,
#[serde(default)]
pub dockerfile: Option<String>,
#[serde(default)]
pub args: Option<HashMap<String, Option<String>>>,
#[serde(default)]
pub target: Option<String>,
#[serde(default)]
pub cache_from: Option<Vec<String>>,
#[serde(default)]
pub extra_hosts: Option<Vec<String>>,
#[serde(default)]
pub network: Option<String>,
#[serde(default)]
pub ssh: Option<Vec<String>>,
#[serde(default)]
pub labels: Option<Labels>,
#[serde(default)]
pub platform: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct ComposeNetwork {
#[serde(default)]
pub driver: Option<String>,
#[serde(default)]
pub driver_opts: Option<HashMap<String, String>>,
#[serde(default)]
pub external: Option<ExternalRef>,
#[serde(default)]
pub enable_ipv6: Option<bool>,
#[serde(default)]
pub ipam: Option<IpamConfig>,
#[serde(default)]
pub internal: Option<bool>,
#[serde(default)]
pub attachable: Option<bool>,
#[serde(default)]
pub labels: Option<Labels>,
#[serde(default)]
pub name: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct ComposeVolume {
#[serde(default)]
pub driver: Option<String>,
#[serde(default)]
pub driver_opts: Option<HashMap<String, String>>,
#[serde(default)]
pub external: Option<ExternalRef>,
#[serde(default)]
pub labels: Option<Labels>,
#[serde(default)]
pub name: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct ComposeConfig {
#[serde(default)]
pub file: Option<String>,
#[serde(default)]
pub external: Option<ExternalRef>,
#[serde(default)]
pub name: Option<String>,
#[serde(default)]
pub template_driver: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct ComposeSecret {
#[serde(default)]
pub file: Option<String>,
#[serde(default)]
pub environment: Option<String>,
#[serde(default)]
pub external: Option<ExternalRef>,
#[serde(default)]
pub name: Option<String>,
#[serde(default)]
pub template_driver: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(untagged)]
pub enum ExternalRef {
Bool(bool),
Named { name: Option<String> },
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(untagged)]
pub enum ServiceNetworks {
List(Vec<String>),
Map(HashMap<String, Option<ServiceNetworkConfig>>),
}
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct ServiceNetworkConfig {
#[serde(default)]
pub aliases: Option<Vec<String>>,
#[serde(default)]
pub ipv4_address: Option<String>,
#[serde(default)]
pub ipv6_address: Option<String>,
#[serde(default)]
pub priority: Option<i32>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(untagged)]
pub enum DependsOn {
List(Vec<String>),
Map(HashMap<String, DependsOnCondition>),
}
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct DependsOnCondition {
#[serde(default)]
pub condition: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct HealthCheck {
#[serde(default)]
pub test: Option<StringOrList>,
#[serde(default)]
pub interval: Option<String>,
#[serde(default)]
pub timeout: Option<String>,
#[serde(default)]
pub retries: Option<i32>,
#[serde(default)]
pub start_period: Option<String>,
#[serde(default)]
pub disable: Option<bool>,
}
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct LoggingConfig {
#[serde(default)]
pub driver: Option<String>,
#[serde(default)]
pub options: Option<HashMap<String, String>>,
}
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct DeployConfig {
#[serde(default)]
pub mode: Option<String>,
#[serde(default)]
pub replicas: Option<i32>,
#[serde(default)]
pub endpoint_mode: Option<String>,
#[serde(default)]
pub placement: Option<PlacementConfig>,
#[serde(default)]
pub resources: Option<ResourceConfig>,
#[serde(default)]
pub restart_policy: Option<RestartPolicyConfig>,
#[serde(default)]
pub rollback_config: Option<UpdateConfig>,
#[serde(default)]
pub update_config: Option<UpdateConfig>,
#[serde(default)]
pub labels: Option<Labels>,
}
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct PlacementConfig {
#[serde(default)]
pub constraints: Option<Vec<String>>,
#[serde(default)]
pub preferences: Option<Vec<HashMap<String, String>>>,
#[serde(default)]
pub max_replicas_per_node: Option<i32>,
}
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct ResourceConfig {
#[serde(default)]
pub limits: Option<ResourceSpec>,
#[serde(default)]
pub reservations: Option<ResourceSpec>,
}
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct ResourceSpec {
#[serde(default)]
pub cpus: Option<String>,
#[serde(default)]
pub memory: Option<String>,
#[serde(default)]
pub devices: Option<Vec<DeviceSpec>>,
}
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct DeviceSpec {
#[serde(default)]
pub capabilities: Option<Vec<String>>,
#[serde(default)]
pub driver: Option<String>,
#[serde(default)]
pub count: Option<StringOrInt>,
#[serde(default)]
pub device_ids: Option<Vec<String>>,
#[serde(default)]
pub options: Option<HashMap<String, String>>,
}
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct RestartPolicyConfig {
#[serde(default)]
pub condition: Option<String>,
#[serde(default)]
pub delay: Option<String>,
#[serde(default)]
pub max_attempts: Option<i32>,
#[serde(default)]
pub window: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct UpdateConfig {
#[serde(default)]
pub parallelism: Option<i32>,
#[serde(default)]
pub delay: Option<String>,
#[serde(default)]
pub failure_action: Option<String>,
#[serde(default)]
pub monitor: Option<String>,
#[serde(default)]
pub max_failure_ratio: Option<f64>,
#[serde(default)]
pub order: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(untagged)]
pub enum ServiceSecret {
Simple(String),
Full(ServiceSecretConfig),
}
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct ServiceSecretConfig {
#[serde(default)]
pub source: Option<String>,
#[serde(default)]
pub target: Option<String>,
#[serde(default)]
pub uid: Option<String>,
#[serde(default)]
pub gid: Option<String>,
#[serde(default)]
pub mode: Option<i32>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(untagged)]
pub enum ServiceConfig {
Simple(String),
Full(ServiceConfigConfig),
}
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct ServiceConfigConfig {
#[serde(default)]
pub source: Option<String>,
#[serde(default)]
pub target: Option<String>,
#[serde(default)]
pub uid: Option<String>,
#[serde(default)]
pub gid: Option<String>,
#[serde(default)]
pub mode: Option<i32>,
}
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct IpamConfig {
#[serde(default)]
pub driver: Option<String>,
#[serde(default)]
pub config: Option<Vec<IpamPoolConfig>>,
#[serde(default)]
pub options: Option<HashMap<String, String>>,
}
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct IpamPoolConfig {
#[serde(default)]
pub subnet: Option<String>,
#[serde(default)]
pub gateway: Option<String>,
#[serde(default)]
pub ip_range: Option<String>,
#[serde(default)]
pub aux_addresses: Option<HashMap<String, String>>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(untagged)]
pub enum Ulimit {
Simple(i64),
Full {
soft: Option<i64>,
hard: Option<i64>,
},
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(untagged)]
pub enum StringOrList {
String(String),
List(Vec<String>),
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(untagged)]
pub enum StringOrInt {
String(String),
Int(i64),
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(untagged)]
pub enum EnvironmentVars {
List(Vec<String>),
Map(HashMap<String, Option<StringOrInt>>),
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(untagged)]
pub enum Labels {
List(Vec<String>),
Map(HashMap<String, String>),
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(untagged)]
pub enum PortMapping {
Simple(String),
Int(i64),
Full(PortConfig),
}
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct PortConfig {
#[serde(default)]
pub target: Option<i32>,
#[serde(default)]
pub published: Option<StringOrInt>,
#[serde(default)]
pub protocol: Option<String>,
#[serde(default)]
pub mode: Option<String>,
#[serde(default)]
pub host_ip: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(untagged)]
pub enum VolumeMount {
Simple(String),
Full(VolumeMountConfig),
}
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct VolumeMountConfig {
#[serde(default, rename = "type")]
pub mount_type: Option<String>,
#[serde(default)]
pub source: Option<String>,
#[serde(default)]
pub target: Option<String>,
#[serde(default)]
pub read_only: Option<bool>,
#[serde(default)]
pub bind: Option<BindOptions>,
#[serde(default)]
pub volume: Option<VolumeOptions>,
#[serde(default)]
pub tmpfs: Option<TmpfsOptions>,
#[serde(default)]
pub consistency: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct BindOptions {
#[serde(default)]
pub propagation: Option<String>,
#[serde(default)]
pub create_host_path: Option<bool>,
#[serde(default)]
pub selinux: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct VolumeOptions {
#[serde(default)]
pub nocopy: Option<bool>,
}
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct TmpfsOptions {
#[serde(default)]
pub size: Option<StringOrInt>,
#[serde(default)]
pub mode: Option<i32>,
}
#[pymethods]
impl Pyo3ComposeFile {
#[getter]
pub fn get_version(&self) -> Option<String> {
self.version.clone()
}
#[getter]
pub fn get_name(&self) -> Option<String> {
self.name.clone()
}
pub fn service_names(&self) -> Vec<String> {
self.services.keys().cloned().collect()
}
pub fn network_names(&self) -> Vec<String> {
self.networks.keys().cloned().collect()
}
pub fn volume_names(&self) -> Vec<String> {
self.volumes.keys().cloned().collect()
}
pub fn config_names(&self) -> Vec<String> {
self.configs.keys().cloned().collect()
}
pub fn secret_names(&self) -> Vec<String> {
self.secrets.keys().cloned().collect()
}
pub fn to_dict(&self, py: Python<'_>) -> PyResult<Py<PyAny>> {
pythonize(py, self)
.map(|bound| bound.unbind())
.map_err(|e| PyValueError::new_err(format!("Serialization error: {}", e)))
}
pub fn get_service(&self, py: Python<'_>, name: &str) -> PyResult<Option<Py<PyAny>>> {
if let Some(service) = self.services.get(name) {
let result = pythonize(py, service)
.map(|bound| bound.unbind())
.map_err(|e| PyValueError::new_err(format!("Serialization error: {}", e)))?;
Ok(Some(result))
} else {
Ok(None)
}
}
pub fn to_yaml(&self) -> PyResult<String> {
serde_yaml::to_string(self)
.map_err(|e| PyValueError::new_err(format!("YAML serialization error: {}", e)))
}
}
#[pyfunction]
pub fn parse_compose_string(content: &str) -> PyResult<Pyo3ComposeFile> {
serde_yaml::from_str(content)
.map_err(|e| PyValueError::new_err(format!("Failed to parse compose file: {}", e)))
}
#[pyfunction]
pub fn parse_compose_file(path: &str) -> PyResult<Pyo3ComposeFile> {
let path = Path::new(path);
let content = fs::read_to_string(path)
.map_err(|e| PyValueError::new_err(format!("Failed to read file: {}", e)))?;
parse_compose_string(&content)
}
#[derive(Debug)]
#[pyclass(name = "ComposeProject")]
pub struct Pyo3ComposeProject {
docker: docker_api::Docker,
compose: Pyo3ComposeFile,
project_name: String,
}
#[derive(Debug, Clone, Serialize)]
pub struct ComposeUpResult {
pub networks: Vec<String>,
pub volumes: Vec<String>,
pub containers: Vec<String>,
}
#[derive(Debug, Clone, Serialize)]
pub struct ComposeDownResult {
pub stopped_containers: Vec<String>,
pub removed_containers: Vec<String>,
pub removed_networks: Vec<String>,
pub removed_volumes: Vec<String>,
}
#[pymethods]
impl Pyo3ComposeProject {
#[new]
pub fn new(docker: Pyo3Docker, compose: Pyo3ComposeFile, project_name: &str) -> Self {
Pyo3ComposeProject {
docker: docker.0,
compose,
project_name: project_name.to_string(),
}
}
#[getter]
pub fn get_project_name(&self) -> String {
self.project_name.clone()
}
#[pyo3(signature = (detach=None))]
pub fn up(&self, py: Python<'_>, detach: Option<bool>) -> PyResult<Py<PyAny>> {
let _detach = detach.unwrap_or(true);
let result = __compose_up(&self.docker, &self.compose, &self.project_name)?;
pythonize(py, &result)
.map(|bound| bound.unbind())
.map_err(|e| PyValueError::new_err(format!("Serialization error: {}", e)))
}
#[pyo3(signature = (remove_volumes=None, remove_networks=None, timeout=None))]
pub fn down(
&self,
py: Python<'_>,
remove_volumes: Option<bool>,
remove_networks: Option<bool>,
timeout: Option<u64>,
) -> PyResult<Py<PyAny>> {
let remove_volumes = remove_volumes.unwrap_or(false);
let remove_networks = remove_networks.unwrap_or(true);
let timeout = timeout.unwrap_or(10);
let result = __compose_down(
&self.docker,
&self.compose,
&self.project_name,
remove_volumes,
remove_networks,
timeout,
)?;
pythonize(py, &result)
.map(|bound| bound.unbind())
.map_err(|e| PyValueError::new_err(format!("Serialization error: {}", e)))
}
pub fn ps(&self) -> PyResult<Vec<String>> {
__compose_ps(&self.docker, &self.project_name)
}
pub fn start(&self) -> PyResult<Vec<String>> {
__compose_start(&self.docker, &self.project_name)
}
#[pyo3(signature = (timeout=None))]
pub fn stop(&self, timeout: Option<u64>) -> PyResult<Vec<String>> {
let timeout = timeout.unwrap_or(10);
__compose_stop(&self.docker, &self.project_name, timeout)
}
#[pyo3(signature = (timeout=None))]
pub fn restart(&self, timeout: Option<u64>) -> PyResult<Vec<String>> {
let timeout = timeout.unwrap_or(10);
__compose_restart(&self.docker, &self.project_name, timeout)
}
pub fn pause(&self) -> PyResult<Vec<String>> {
__compose_pause(&self.docker, &self.project_name)
}
pub fn unpause(&self) -> PyResult<Vec<String>> {
__compose_unpause(&self.docker, &self.project_name)
}
pub fn pull(&self) -> PyResult<Vec<String>> {
__compose_pull(&self.docker, &self.compose)
}
#[pyo3(signature = (no_cache=None, pull=None))]
pub fn build(&self, no_cache: Option<bool>, pull: Option<bool>) -> PyResult<Vec<String>> {
let no_cache = no_cache.unwrap_or(false);
let pull = pull.unwrap_or(false);
__compose_build(
&self.docker,
&self.compose,
&self.project_name,
no_cache,
pull,
)
}
pub fn push(&self) -> PyResult<Vec<String>> {
__compose_push(&self.docker, &self.compose)
}
pub fn ps_detailed(&self, py: Python<'_>) -> PyResult<Py<PyAny>> {
let result = __compose_ps_detailed(&self.docker, &self.project_name)?;
pythonize(py, &result)
.map(|bound| bound.unbind())
.map_err(|e| PyValueError::new_err(format!("Serialization error: {}", e)))
}
#[pyo3(signature = (service=None, tail=None, timestamps=None))]
pub fn logs(
&self,
py: Python<'_>,
service: Option<&str>,
tail: Option<usize>,
timestamps: Option<bool>,
) -> PyResult<Py<PyAny>> {
let timestamps = timestamps.unwrap_or(false);
let result = __compose_logs(&self.docker, &self.project_name, service, tail, timestamps)?;
pythonize(py, &result)
.map(|bound| bound.unbind())
.map_err(|e| PyValueError::new_err(format!("Serialization error: {}", e)))
}
pub fn config(&self, py: Python<'_>) -> PyResult<Py<PyAny>> {
pythonize(py, &self.compose)
.map(|bound| bound.unbind())
.map_err(|e| PyValueError::new_err(format!("Serialization error: {}", e)))
}
#[pyo3(signature = (ps_args=None))]
pub fn top(&self, py: Python<'_>, ps_args: Option<&str>) -> PyResult<Py<PyAny>> {
let result = __compose_top(&self.docker, &self.project_name, ps_args)?;
pythonize(py, &result)
.map(|bound| bound.unbind())
.map_err(|e| PyValueError::new_err(format!("Serialization error: {}", e)))
}
#[pyo3(signature = (service, command, user=None, workdir=None, env=None, privileged=None, tty=None))]
pub fn exec(
&self,
service: &str,
command: Vec<String>,
user: Option<&str>,
workdir: Option<&str>,
env: Option<Vec<String>>,
privileged: Option<bool>,
tty: Option<bool>,
) -> PyResult<String> {
let privileged = privileged.unwrap_or(false);
let tty = tty.unwrap_or(false);
__compose_exec(
&self.docker,
&self.project_name,
service,
command,
user,
workdir,
env,
privileged,
tty,
)
}
#[pyo3(signature = (service, command=None, user=None, workdir=None, env=None, rm=None, detach=None))]
pub fn run(
&self,
py: Python<'_>,
service: &str,
command: Option<Vec<String>>,
user: Option<&str>,
workdir: Option<&str>,
env: Option<Vec<String>>,
rm: Option<bool>,
detach: Option<bool>,
) -> PyResult<Py<PyAny>> {
let rm = rm.unwrap_or(true);
let detach = detach.unwrap_or(false);
let result = __compose_run(
&self.docker,
&self.compose,
&self.project_name,
service,
command,
user,
workdir,
env,
rm,
detach,
)?;
pythonize(py, &result)
.map(|bound| bound.unbind())
.map_err(|e| PyValueError::new_err(format!("Serialization error: {}", e)))
}
}
fn resource_name(project_name: &str, resource: &str) -> String {
format!("{}_{}", project_name, resource)
}
fn env_to_vec(env: &Option<EnvironmentVars>) -> Vec<String> {
match env {
Some(EnvironmentVars::List(list)) => list.clone(),
Some(EnvironmentVars::Map(map)) => map
.iter()
.map(|(k, v)| match v {
Some(StringOrInt::String(s)) => format!("{}={}", k, s),
Some(StringOrInt::Int(i)) => format!("{}={}", k, i),
None => k.clone(),
})
.collect(),
None => vec![],
}
}
fn command_to_vec(cmd: &Option<StringOrList>) -> Option<Vec<String>> {
match cmd {
Some(StringOrList::String(s)) => {
Some(vec!["/bin/sh".to_string(), "-c".to_string(), s.clone()])
}
Some(StringOrList::List(list)) => Some(list.clone()),
None => None,
}
}
fn labels_to_map(
labels: &Option<Labels>,
project_name: &str,
service_name: &str,
) -> HashMap<String, String> {
let mut result = HashMap::new();
result.insert(
"com.docker.compose.project".to_string(),
project_name.to_string(),
);
result.insert(
"com.docker.compose.service".to_string(),
service_name.to_string(),
);
match labels {
Some(Labels::List(list)) => {
for item in list {
if let Some((k, v)) = item.split_once('=') {
result.insert(k.to_string(), v.to_string());
}
}
}
Some(Labels::Map(map)) => {
for (k, v) in map {
result.insert(k.clone(), v.clone());
}
}
None => {}
}
result
}
#[tokio::main]
async fn __compose_up(
docker: &docker_api::Docker,
compose: &Pyo3ComposeFile,
project_name: &str,
) -> PyResult<ComposeUpResult> {
let mut result = ComposeUpResult {
networks: vec![],
volumes: vec![],
containers: vec![],
};
let networks = Networks::new(docker.clone());
for (name, network_config) in &compose.networks {
let network_name = resource_name(project_name, name);
let existing: Vec<docker_api::models::Network> =
networks
.list(&Default::default())
.await
.map_err(|e| PyRuntimeError::new_err(format!("Failed to list networks: {}", e)))?;
if existing
.iter()
.any(|n| n.name.as_ref() == Some(&network_name))
{
result.networks.push(network_name);
continue;
}
let mut opts = NetworkCreateOpts::builder(&network_name);
if let Some(Some(config)) = network_config.as_ref().map(|c| Some(c)) {
if let Some(driver) = &config.driver {
opts = opts.driver(driver.as_str());
}
if let Some(internal) = config.internal {
opts = opts.internal(internal);
}
if let Some(attachable) = config.attachable {
opts = opts.attachable(attachable);
}
}
opts = opts.labels([("com.docker.compose.project", project_name)]);
let network = networks.create(&opts.build()).await.map_err(|e| {
PyRuntimeError::new_err(format!("Failed to create network {}: {}", network_name, e))
})?;
result.networks.push(network.id().to_string());
}
if compose.networks.is_empty() {
let default_network_name = format!("{}_default", project_name);
let existing: Vec<docker_api::models::Network> =
networks
.list(&Default::default())
.await
.map_err(|e| PyRuntimeError::new_err(format!("Failed to list networks: {}", e)))?;
if !existing
.iter()
.any(|n| n.name.as_ref() == Some(&default_network_name))
{
let opts = NetworkCreateOpts::builder(&default_network_name)
.labels([("com.docker.compose.project", project_name)])
.build();
let network = networks.create(&opts).await.map_err(|e| {
PyRuntimeError::new_err(format!("Failed to create default network: {}", e))
})?;
result.networks.push(network.id().to_string());
}
}
let volumes = Volumes::new(docker.clone());
for (name, volume_config) in &compose.volumes {
let volume_name = resource_name(project_name, name);
let existing = volumes
.list(&Default::default())
.await
.map_err(|e| PyRuntimeError::new_err(format!("Failed to list volumes: {}", e)))?;
if let Some(vols) = existing.volumes {
if vols.iter().any(|v| v.name == volume_name) {
result.volumes.push(volume_name);
continue;
}
}
let mut opts = VolumeCreateOpts::builder();
opts = opts.name(&volume_name);
if let Some(Some(config)) = volume_config.as_ref().map(|c| Some(c)) {
if let Some(driver) = &config.driver {
opts = opts.driver(driver.as_str());
}
}
opts = opts.labels([("com.docker.compose.project", project_name)]);
volumes.create(&opts.build()).await.map_err(|e| {
PyRuntimeError::new_err(format!("Failed to create volume {}: {}", volume_name, e))
})?;
result.volumes.push(volume_name);
}
let containers = Containers::new(docker.clone());
let service_order = get_service_order(compose);
for service_name in &service_order {
if let Some(service) = compose.services.get(service_name) {
let image = match &service.image {
Some(img) => img.clone(),
None => {
if service.build.is_some() {
continue; }
continue;
}
};
let container_name = service
.container_name
.clone()
.unwrap_or_else(|| resource_name(project_name, service_name));
let existing: Vec<docker_api::models::ContainerSummary> = containers
.list(&ContainerListOpts::builder().all(true).build())
.await
.map_err(|e| {
PyRuntimeError::new_err(format!("Failed to list containers: {}", e))
})?;
let existing_container = existing.iter().find(|c| {
c.names.as_ref().map_or(false, |names| {
names
.iter()
.any(|n| n.trim_start_matches('/') == container_name)
})
});
if let Some(existing) = existing_container {
if existing.state.as_ref() != Some(&"running".to_string()) {
let container = containers.get(existing.id.as_ref().unwrap());
container.start().await.map_err(|e| {
PyRuntimeError::new_err(format!(
"Failed to start container {}: {}",
container_name, e
))
})?;
}
result
.containers
.push(existing.id.clone().unwrap_or_default());
continue;
}
let mut opts = ContainerCreateOpts::builder()
.image(&image)
.name(&container_name);
let labels = labels_to_map(&service.labels, project_name, service_name);
let labels_ref: HashMap<&str, &str> = labels
.iter()
.map(|(k, v)| (k.as_str(), v.as_str()))
.collect();
opts = opts.labels(labels_ref);
let env = env_to_vec(&service.environment);
if !env.is_empty() {
let env_refs: Vec<&str> = env.iter().map(|s| s.as_str()).collect();
opts = opts.env(env_refs);
}
if let Some(cmd) = command_to_vec(&service.command) {
let cmd_refs: Vec<&str> = cmd.iter().map(|s| s.as_str()).collect();
opts = opts.command(cmd_refs);
}
if let Some(wd) = &service.working_dir {
opts = opts.working_dir(wd.as_str());
}
if let Some(user) = &service.user {
opts = opts.user(user.as_str());
}
if let Some(tty) = service.tty {
opts = opts.tty(tty);
}
if let Some(stdin) = service.stdin_open {
opts = opts.attach_stdin(stdin);
}
if let Some(priv_mode) = service.privileged {
opts = opts.privileged(priv_mode);
}
if let Some(hostname) = &service.hostname {
let _ = hostname;
}
let default_network = format!("{}_default", project_name);
opts = opts.network_mode(&default_network);
if let Some(ports) = &service.ports {
for port_mapping in ports {
match port_mapping {
PortMapping::Simple(s) => {
if let Some((host_port, container_port)) = s.split_once(':') {
let container_port =
container_port.split('/').next().unwrap_or(container_port);
if let (Ok(hp), Ok(cp)) =
(host_port.parse::<u32>(), container_port.parse::<u32>())
{
opts = opts.expose(PublishPort::tcp(cp), hp);
}
}
}
PortMapping::Int(p) => {
opts = opts.expose(PublishPort::tcp(*p as u32), *p as u32);
}
PortMapping::Full(config) => {
if let (Some(target), Some(published)) =
(&config.target, &config.published)
{
let host_port = match published {
StringOrInt::String(s) => {
s.parse::<u32>().unwrap_or(*target as u32)
}
StringOrInt::Int(i) => *i as u32,
};
let protocol = config.protocol.as_deref().unwrap_or("tcp");
let publish_port = match protocol {
"udp" => PublishPort::udp(*target as u32),
_ => PublishPort::tcp(*target as u32),
};
opts = opts.expose(publish_port, host_port);
}
}
}
}
}
if let Some(vol_mounts) = &service.volumes {
let mut volume_bindings: Vec<String> = vec![];
for mount in vol_mounts {
match mount {
VolumeMount::Simple(s) => {
let parts: Vec<&str> = s.split(':').collect();
if parts.len() >= 2 {
let source = parts[0];
let target = parts[1];
let mode = parts.get(2).unwrap_or(&"rw");
if compose.volumes.contains_key(source) {
let vol_name = resource_name(project_name, source);
volume_bindings
.push(format!("{}:{}:{}", vol_name, target, mode));
} else {
volume_bindings.push(s.clone());
}
}
}
VolumeMount::Full(config) => {
if let (Some(source), Some(target)) = (&config.source, &config.target) {
let mode = if config.read_only.unwrap_or(false) {
"ro"
} else {
"rw"
};
if compose.volumes.contains_key(source) {
let vol_name = resource_name(project_name, source);
volume_bindings
.push(format!("{}:{}:{}", vol_name, target, mode));
} else {
volume_bindings.push(format!("{}:{}:{}", source, target, mode));
}
}
}
}
}
if !volume_bindings.is_empty() {
let vol_refs: Vec<&str> = volume_bindings.iter().map(|s| s.as_str()).collect();
opts = opts.volumes(vol_refs);
}
}
if let Some(restart) = &service.restart {
let (policy, retries) = match restart.as_str() {
"always" => ("always", 0u64),
"unless-stopped" => ("unless-stopped", 0u64),
"on-failure" => ("on-failure", 3u64),
_ => ("no", 0u64),
};
opts = opts.restart_policy(policy, retries);
}
let container = containers.create(&opts.build()).await.map_err(|e| {
PyRuntimeError::new_err(format!(
"Failed to create container {}: {}",
container_name, e
))
})?;
container.start().await.map_err(|e| {
PyRuntimeError::new_err(format!(
"Failed to start container {}: {}",
container_name, e
))
})?;
result.containers.push(container.id().to_string());
}
}
Ok(result)
}
#[tokio::main]
async fn __compose_down(
docker: &docker_api::Docker,
compose: &Pyo3ComposeFile,
project_name: &str,
remove_volumes: bool,
remove_networks: bool,
timeout: u64,
) -> PyResult<ComposeDownResult> {
let mut result = ComposeDownResult {
stopped_containers: vec![],
removed_containers: vec![],
removed_networks: vec![],
removed_volumes: vec![],
};
let containers = Containers::new(docker.clone());
let container_list: Vec<docker_api::models::ContainerSummary> = containers
.list(&ContainerListOpts::builder().all(true).build())
.await
.map_err(|e| PyRuntimeError::new_err(format!("Failed to list containers: {}", e)))?;
let project_containers: Vec<_> = container_list
.iter()
.filter(|c| {
c.labels.as_ref().map_or(false, |labels| {
labels.get("com.docker.compose.project") == Some(&project_name.to_string())
})
})
.collect();
for container_info in project_containers {
if let Some(id) = &container_info.id {
let container = containers.get(id);
if container_info.state.as_ref() == Some(&"running".to_string()) {
let stop_opts = ContainerStopOpts::builder()
.wait(std::time::Duration::from_secs(timeout))
.build();
if container.stop(&stop_opts).await.is_ok() {
result.stopped_containers.push(id.clone());
}
}
if container.delete().await.is_ok() {
result.removed_containers.push(id.clone());
}
}
}
if remove_networks {
let networks = Networks::new(docker.clone());
let network_list: Vec<docker_api::models::Network> = networks
.list(&Default::default())
.await
.map_err(|e| PyRuntimeError::new_err(format!("Failed to list networks: {}", e)))?;
let project_networks: Vec<_> = network_list
.iter()
.filter(|n| {
n.labels.as_ref().map_or(false, |labels| {
labels.get("com.docker.compose.project") == Some(&project_name.to_string())
})
})
.collect();
for network_info in project_networks {
if let Some(id) = &network_info.id {
let network = networks.get(id);
if network.delete().await.is_ok() {
result.removed_networks.push(id.clone());
}
}
}
}
if remove_volumes {
let volumes = Volumes::new(docker.clone());
for name in compose.volumes.keys() {
let volume_name = resource_name(project_name, name);
let volume = volumes.get(&volume_name);
if volume.delete().await.is_ok() {
result.removed_volumes.push(volume_name);
}
}
}
Ok(result)
}
#[tokio::main]
async fn __compose_ps(docker: &docker_api::Docker, project_name: &str) -> PyResult<Vec<String>> {
let containers = Containers::new(docker.clone());
let container_list: Vec<docker_api::models::ContainerSummary> = containers
.list(&ContainerListOpts::builder().all(true).build())
.await
.map_err(|e| PyRuntimeError::new_err(format!("Failed to list containers: {}", e)))?;
let project_containers: Vec<String> = container_list
.iter()
.filter(|c| {
c.labels.as_ref().map_or(false, |labels| {
labels.get("com.docker.compose.project") == Some(&project_name.to_string())
})
})
.filter_map(|c| c.id.clone())
.collect();
Ok(project_containers)
}
#[tokio::main]
async fn __compose_start(docker: &docker_api::Docker, project_name: &str) -> PyResult<Vec<String>> {
let containers = Containers::new(docker.clone());
let container_list: Vec<docker_api::models::ContainerSummary> = containers
.list(&ContainerListOpts::builder().all(true).build())
.await
.map_err(|e| PyRuntimeError::new_err(format!("Failed to list containers: {}", e)))?;
let project_containers: Vec<_> = container_list
.iter()
.filter(|c| {
c.labels.as_ref().map_or(false, |labels| {
labels.get("com.docker.compose.project") == Some(&project_name.to_string())
})
})
.filter(|c| c.state.as_ref() != Some(&"running".to_string()))
.collect();
let mut started = Vec::new();
for container_info in project_containers {
if let Some(id) = &container_info.id {
let container = containers.get(id);
if container.start().await.is_ok() {
started.push(id.clone());
}
}
}
Ok(started)
}
#[tokio::main]
async fn __compose_stop(
docker: &docker_api::Docker,
project_name: &str,
timeout: u64,
) -> PyResult<Vec<String>> {
let containers = Containers::new(docker.clone());
let container_list: Vec<docker_api::models::ContainerSummary> = containers
.list(&ContainerListOpts::builder().all(true).build())
.await
.map_err(|e| PyRuntimeError::new_err(format!("Failed to list containers: {}", e)))?;
let project_containers: Vec<_> = container_list
.iter()
.filter(|c| {
c.labels.as_ref().map_or(false, |labels| {
labels.get("com.docker.compose.project") == Some(&project_name.to_string())
})
})
.filter(|c| c.state.as_ref() == Some(&"running".to_string()))
.collect();
let mut stopped = Vec::new();
for container_info in project_containers {
if let Some(id) = &container_info.id {
let container = containers.get(id);
let stop_opts = ContainerStopOpts::builder()
.wait(std::time::Duration::from_secs(timeout))
.build();
if container.stop(&stop_opts).await.is_ok() {
stopped.push(id.clone());
}
}
}
Ok(stopped)
}
#[tokio::main]
async fn __compose_restart(
docker: &docker_api::Docker,
project_name: &str,
timeout: u64,
) -> PyResult<Vec<String>> {
let containers = Containers::new(docker.clone());
let container_list: Vec<docker_api::models::ContainerSummary> = containers
.list(&ContainerListOpts::builder().all(true).build())
.await
.map_err(|e| PyRuntimeError::new_err(format!("Failed to list containers: {}", e)))?;
let project_containers: Vec<_> = container_list
.iter()
.filter(|c| {
c.labels.as_ref().map_or(false, |labels| {
labels.get("com.docker.compose.project") == Some(&project_name.to_string())
})
})
.collect();
let mut restarted = Vec::new();
for container_info in project_containers {
if let Some(id) = &container_info.id {
let container = containers.get(id);
let restart_opts = ContainerRestartOpts::builder()
.wait(std::time::Duration::from_secs(timeout))
.build();
if container.restart(&restart_opts).await.is_ok() {
restarted.push(id.clone());
}
}
}
Ok(restarted)
}
#[tokio::main]
async fn __compose_pause(docker: &docker_api::Docker, project_name: &str) -> PyResult<Vec<String>> {
let containers = Containers::new(docker.clone());
let container_list: Vec<docker_api::models::ContainerSummary> = containers
.list(&ContainerListOpts::builder().all(true).build())
.await
.map_err(|e| PyRuntimeError::new_err(format!("Failed to list containers: {}", e)))?;
let project_containers: Vec<_> = container_list
.iter()
.filter(|c| {
c.labels.as_ref().map_or(false, |labels| {
labels.get("com.docker.compose.project") == Some(&project_name.to_string())
})
})
.filter(|c| c.state.as_ref() == Some(&"running".to_string()))
.collect();
let mut paused = Vec::new();
for container_info in project_containers {
if let Some(id) = &container_info.id {
let container = containers.get(id);
if container.pause().await.is_ok() {
paused.push(id.clone());
}
}
}
Ok(paused)
}
#[tokio::main]
async fn __compose_unpause(
docker: &docker_api::Docker,
project_name: &str,
) -> PyResult<Vec<String>> {
let containers = Containers::new(docker.clone());
let container_list: Vec<docker_api::models::ContainerSummary> = containers
.list(&ContainerListOpts::builder().all(true).build())
.await
.map_err(|e| PyRuntimeError::new_err(format!("Failed to list containers: {}", e)))?;
let project_containers: Vec<_> = container_list
.iter()
.filter(|c| {
c.labels.as_ref().map_or(false, |labels| {
labels.get("com.docker.compose.project") == Some(&project_name.to_string())
})
})
.filter(|c| c.state.as_ref() == Some(&"paused".to_string()))
.collect();
let mut unpaused = Vec::new();
for container_info in project_containers {
if let Some(id) = &container_info.id {
let container = containers.get(id);
if container.unpause().await.is_ok() {
unpaused.push(id.clone());
}
}
}
Ok(unpaused)
}
#[tokio::main]
async fn __compose_pull(
docker: &docker_api::Docker,
compose: &Pyo3ComposeFile,
) -> PyResult<Vec<String>> {
use futures_util::StreamExt;
let images = Images::new(docker.clone());
let mut pulled = Vec::new();
for (_service_name, service) in &compose.services {
if let Some(image) = &service.image {
let pull_opts = PullOpts::builder()
.image(image.as_str())
.auth(RegistryAuth::builder().build())
.build();
let mut stream = images.pull(&pull_opts);
let mut success = true;
while let Some(result) = stream.next().await {
if result.is_err() {
success = false;
break;
}
}
if success {
pulled.push(image.clone());
}
}
}
Ok(pulled)
}
#[tokio::main]
async fn __compose_build(
docker: &docker_api::Docker,
compose: &Pyo3ComposeFile,
project_name: &str,
no_cache: bool,
pull: bool,
) -> PyResult<Vec<String>> {
use futures_util::StreamExt;
let images = Images::new(docker.clone());
let mut built = Vec::new();
for (service_name, service) in &compose.services {
if let Some(build_config) = &service.build {
let build_context = match build_config {
ComposeBuild::Simple(path) => path.clone(),
ComposeBuild::Full(config) => {
config.context.clone().unwrap_or_else(|| ".".to_string())
}
};
let dockerfile = match build_config {
ComposeBuild::Simple(_) => None,
ComposeBuild::Full(config) => config.dockerfile.clone(),
};
let tag = service
.image
.clone()
.unwrap_or_else(|| format!("{}_{}", project_name, service_name));
let mut build_opts = ImageBuildOpts::builder(&build_context);
build_opts = build_opts.tag(&tag);
if let Some(df) = &dockerfile {
build_opts = build_opts.dockerfile(df);
}
if no_cache {
build_opts = build_opts.nocahe(true);
}
if pull {
build_opts = build_opts.pull("true");
}
let mut stream = images.build(&build_opts.build());
let mut success = true;
while let Some(result) = stream.next().await {
if result.is_err() {
success = false;
break;
}
}
if success {
built.push(service_name.clone());
}
}
}
Ok(built)
}
#[tokio::main]
async fn __compose_push(
docker: &docker_api::Docker,
compose: &Pyo3ComposeFile,
) -> PyResult<Vec<String>> {
let images = Images::new(docker.clone());
let mut pushed = Vec::new();
for (_service_name, service) in &compose.services {
if let Some(image_name) = &service.image {
let image = images.get(image_name);
let push_opts = ImagePushOpts::builder()
.auth(RegistryAuth::builder().build())
.build();
if image.push(&push_opts).await.is_ok() {
pushed.push(image_name.clone());
}
}
}
Ok(pushed)
}
#[derive(Debug, Clone, Serialize, Deserialize)]
struct ContainerInfo {
id: String,
name: String,
service: String,
state: String,
status: String,
image: String,
}
#[tokio::main]
async fn __compose_ps_detailed(
docker: &docker_api::Docker,
project_name: &str,
) -> PyResult<Vec<ContainerInfo>> {
let containers = Containers::new(docker.clone());
let container_list: Vec<docker_api::models::ContainerSummary> = containers
.list(&ContainerListOpts::builder().all(true).build())
.await
.map_err(|e| PyRuntimeError::new_err(format!("Failed to list containers: {}", e)))?;
let project_containers: Vec<ContainerInfo> = container_list
.iter()
.filter(|c| {
c.labels.as_ref().map_or(false, |labels| {
labels.get("com.docker.compose.project") == Some(&project_name.to_string())
})
})
.map(|c| {
let service = c
.labels
.as_ref()
.and_then(|l| l.get("com.docker.compose.service"))
.cloned()
.unwrap_or_default();
let name = c
.names
.as_ref()
.and_then(|n| n.first())
.map(|n| n.trim_start_matches('/').to_string())
.unwrap_or_default();
ContainerInfo {
id: c.id.clone().unwrap_or_default(),
name,
service,
state: c.state.clone().unwrap_or_default(),
status: c.status.clone().unwrap_or_default(),
image: c.image.clone().unwrap_or_default(),
}
})
.collect();
Ok(project_containers)
}
#[tokio::main]
async fn __compose_logs(
docker: &docker_api::Docker,
project_name: &str,
service_filter: Option<&str>,
tail: Option<usize>,
timestamps: bool,
) -> PyResult<HashMap<String, String>> {
use futures_util::StreamExt;
let containers = Containers::new(docker.clone());
let container_list: Vec<docker_api::models::ContainerSummary> = containers
.list(&ContainerListOpts::builder().all(true).build())
.await
.map_err(|e| PyRuntimeError::new_err(format!("Failed to list containers: {}", e)))?;
let project_containers: Vec<_> = container_list
.iter()
.filter(|c| {
c.labels.as_ref().map_or(false, |labels| {
let matches_project =
labels.get("com.docker.compose.project") == Some(&project_name.to_string());
let matches_service = service_filter.map_or(true, |svc| {
labels.get("com.docker.compose.service") == Some(&svc.to_string())
});
matches_project && matches_service
})
})
.collect();
let mut logs_map: HashMap<String, String> = HashMap::new();
for container_info in project_containers {
if let Some(id) = &container_info.id {
let container = containers.get(id);
let mut log_opts = LogsOpts::builder();
log_opts = log_opts.stdout(true);
log_opts = log_opts.stderr(true);
if let Some(n) = tail {
log_opts = log_opts.n_lines(n);
}
if timestamps {
log_opts = log_opts.timestamps(true);
}
let log_stream = container.logs(&log_opts.build());
let log_chunks: Vec<Vec<u8>> = log_stream
.map(|chunk| match chunk {
Ok(chunk) => chunk.to_vec(),
Err(_) => vec![],
})
.collect()
.await;
let log_bytes: Vec<u8> = log_chunks.into_iter().flatten().collect();
let log_str = String::from_utf8_lossy(&log_bytes).to_string();
let name = container_info
.names
.as_ref()
.and_then(|n| n.first())
.map(|n| n.trim_start_matches('/').to_string())
.unwrap_or_else(|| id.clone());
logs_map.insert(name, log_str);
}
}
Ok(logs_map)
}
#[tokio::main]
async fn __compose_top(
docker: &docker_api::Docker,
project_name: &str,
ps_args: Option<&str>,
) -> PyResult<HashMap<String, serde_json::Value>> {
let containers = Containers::new(docker.clone());
let container_list: Vec<docker_api::models::ContainerSummary> = containers
.list(&ContainerListOpts::builder().all(true).build())
.await
.map_err(|e| PyRuntimeError::new_err(format!("Failed to list containers: {}", e)))?;
let project_containers: Vec<_> = container_list
.iter()
.filter(|c| {
c.labels.as_ref().map_or(false, |labels| {
labels.get("com.docker.compose.project") == Some(&project_name.to_string())
})
})
.filter(|c| c.state.as_ref() == Some(&"running".to_string()))
.collect();
let mut top_map: HashMap<String, serde_json::Value> = HashMap::new();
for container_info in project_containers {
if let Some(id) = &container_info.id {
let container = containers.get(id);
if let Ok(top_result) = container.top(ps_args).await {
let name = container_info
.names
.as_ref()
.and_then(|n| n.first())
.map(|n| n.trim_start_matches('/').to_string())
.unwrap_or_else(|| id.clone());
let value = serde_json::json!({
"titles": top_result.titles,
"processes": top_result.processes
});
top_map.insert(name, value);
}
}
}
Ok(top_map)
}
#[derive(Debug, Clone, Serialize)]
pub struct ComposeRunResult {
pub container_id: String,
pub output: Option<String>,
pub exit_code: Option<i64>,
}
#[tokio::main]
async fn __compose_exec(
docker: &docker_api::Docker,
project_name: &str,
service: &str,
command: Vec<String>,
user: Option<&str>,
workdir: Option<&str>,
env: Option<Vec<String>>,
privileged: bool,
tty: bool,
) -> PyResult<String> {
use futures_util::StreamExt;
let containers = Containers::new(docker.clone());
let container_list: Vec<docker_api::models::ContainerSummary> = containers
.list(&ContainerListOpts::builder().all(true).build())
.await
.map_err(|e| PyRuntimeError::new_err(format!("Failed to list containers: {}", e)))?;
let service_container = container_list.iter().find(|c| {
c.labels.as_ref().map_or(false, |labels| {
labels.get("com.docker.compose.project") == Some(&project_name.to_string())
&& labels.get("com.docker.compose.service") == Some(&service.to_string())
}) && c.state.as_ref() == Some(&"running".to_string())
});
let container_info = service_container.ok_or_else(|| {
PyRuntimeError::new_err(format!(
"No running container found for service '{}' in project '{}'",
service, project_name
))
})?;
let container_id = container_info
.id
.as_ref()
.ok_or_else(|| PyRuntimeError::new_err("Container ID not found".to_string()))?;
let container = containers.get(container_id);
let cmd_refs: Vec<&str> = command.iter().map(|s| s.as_str()).collect();
let mut exec_opts = ExecCreateOpts::builder()
.command(cmd_refs)
.attach_stdout(true)
.attach_stderr(true)
.privileged(privileged)
.tty(tty);
if let Some(u) = user {
exec_opts = exec_opts.user(u);
}
if let Some(wd) = workdir {
exec_opts = exec_opts.working_dir(wd);
}
if let Some(env_vars) = &env {
let env_refs: Vec<&str> = env_vars.iter().map(|s| s.as_str()).collect();
exec_opts = exec_opts.env(env_refs);
}
let start_opts = ExecStartOpts::builder().build();
let mut multiplexer = container
.exec(&exec_opts.build(), &start_opts)
.await
.map_err(|e| PyRuntimeError::new_err(format!("Failed to exec command: {}", e)))?;
let mut output = Vec::new();
while let Some(chunk_result) = multiplexer.next().await {
match chunk_result {
Ok(chunk) => {
output.extend_from_slice(&chunk.to_vec());
}
Err(_) => break,
}
}
Ok(String::from_utf8_lossy(&output).to_string())
}
#[tokio::main]
async fn __compose_run(
docker: &docker_api::Docker,
compose: &Pyo3ComposeFile,
project_name: &str,
service: &str,
command: Option<Vec<String>>,
user: Option<&str>,
workdir: Option<&str>,
env: Option<Vec<String>>,
rm: bool,
detach: bool,
) -> PyResult<ComposeRunResult> {
use futures_util::StreamExt;
let service_config = compose.services.get(service).ok_or_else(|| {
PyRuntimeError::new_err(format!("Service '{}' not found in compose file", service))
})?;
let image = service_config.image.as_ref().ok_or_else(|| {
PyRuntimeError::new_err(format!(
"Service '{}' does not have an image specified",
service
))
})?;
let container_name = format!(
"{}_{}_run_{}",
project_name,
service,
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_millis()
);
let containers = Containers::new(docker.clone());
let mut opts = ContainerCreateOpts::builder()
.image(image)
.name(&container_name)
.auto_remove(rm);
let mut labels = HashMap::new();
labels.insert(
"com.docker.compose.project".to_string(),
project_name.to_string(),
);
labels.insert(
"com.docker.compose.service".to_string(),
service.to_string(),
);
labels.insert("com.docker.compose.oneoff".to_string(), "True".to_string());
let labels_ref: HashMap<&str, &str> = labels
.iter()
.map(|(k, v)| (k.as_str(), v.as_str()))
.collect();
opts = opts.labels(labels_ref);
let cmd = command.or_else(|| command_to_vec(&service_config.command));
if let Some(c) = &cmd {
let cmd_refs: Vec<&str> = c.iter().map(|s| s.as_str()).collect();
opts = opts.command(cmd_refs);
}
let mut all_env = env_to_vec(&service_config.environment);
if let Some(additional_env) = &env {
all_env.extend(additional_env.iter().cloned());
}
if !all_env.is_empty() {
let env_refs: Vec<&str> = all_env.iter().map(|s| s.as_str()).collect();
opts = opts.env(env_refs);
}
if let Some(u) = user {
opts = opts.user(u);
} else if let Some(u) = &service_config.user {
opts = opts.user(u.as_str());
}
if let Some(wd) = workdir {
opts = opts.working_dir(wd);
} else if let Some(wd) = &service_config.working_dir {
opts = opts.working_dir(wd.as_str());
}
if let Some(tty) = service_config.tty {
opts = opts.tty(tty);
}
if let Some(stdin) = service_config.stdin_open {
opts = opts.attach_stdin(stdin);
}
let default_network = format!("{}_default", project_name);
opts = opts.network_mode(&default_network);
let container = containers.create(&opts.build()).await.map_err(|e| {
PyRuntimeError::new_err(format!("Failed to create container for run: {}", e))
})?;
let container_id = container.id().to_string();
container
.start()
.await
.map_err(|e| PyRuntimeError::new_err(format!("Failed to start run container: {}", e)))?;
if detach {
return Ok(ComposeRunResult {
container_id,
output: None,
exit_code: None,
});
}
let log_opts = LogsOpts::builder()
.stdout(true)
.stderr(true)
.follow(true)
.build();
let log_stream = container.logs(&log_opts);
let log_chunks: Vec<Vec<u8>> = log_stream
.map(|chunk| match chunk {
Ok(chunk) => chunk.to_vec(),
Err(_) => vec![],
})
.collect()
.await;
let log_bytes: Vec<u8> = log_chunks.into_iter().flatten().collect();
let output = String::from_utf8_lossy(&log_bytes).to_string();
let wait_result = container.wait().await.ok();
let exit_code = wait_result.map(|r| r.status_code);
Ok(ComposeRunResult {
container_id,
output: Some(output),
exit_code,
})
}
fn get_service_order(compose: &Pyo3ComposeFile) -> Vec<String> {
let mut result = Vec::new();
let mut visited = std::collections::HashSet::new();
fn visit(
name: &str,
compose: &Pyo3ComposeFile,
visited: &mut std::collections::HashSet<String>,
result: &mut Vec<String>,
) {
if visited.contains(name) {
return;
}
visited.insert(name.to_string());
if let Some(service) = compose.services.get(name) {
if let Some(depends) = &service.depends_on {
match depends {
DependsOn::List(deps) => {
for dep in deps {
visit(dep, compose, visited, result);
}
}
DependsOn::Map(deps) => {
for dep in deps.keys() {
visit(dep, compose, visited, result);
}
}
}
}
}
result.push(name.to_string());
}
for name in compose.services.keys() {
visit(name, compose, &mut visited, &mut result);
}
result
}