rolling_deployer/
docker_client.rs

1use std::io::{Read, Write};
2use std::os::unix::net::UnixStream;
3
4use crate::types::Container;
5
6pub struct DockerClient {
7    socket_path: String,
8}
9
10impl DockerClient {
11    pub fn new(socket_path: String) -> Self {
12        Self { socket_path }
13    }
14
15    async fn api_call(&self, endpoint: &str) -> Result<String, Box<dyn std::error::Error>> {
16        let stream = UnixStream::connect(&self.socket_path)?;
17        self.send_request(stream, endpoint).await
18    }
19
20    async fn send_request(
21        &self,
22        mut stream: UnixStream,
23        endpoint: &str,
24    ) -> Result<String, Box<dyn std::error::Error>> {
25        let request = format!(
26            "GET {} HTTP/1.1\r\nHost: localhost\r\nConnection: close\r\n\r\n",
27            endpoint
28        );
29
30        stream.write_all(request.as_bytes())?;
31        self.read_response(stream)
32    }
33
34    fn read_response(&self, mut stream: UnixStream) -> Result<String, Box<dyn std::error::Error>> {
35        let mut response = String::new();
36        stream.read_to_string(&mut response)?;
37
38        // Clean up HTTP chunked encoding and extract JSON body
39        if let Some(json_start) = response.find("\r\n\r\n") {
40            let body = &response[json_start + 4..];
41            // Handle chunked encoding - remove chunk size markers
42            Ok(self.clean_chunked_response(body))
43        } else {
44            Ok(response)
45        }
46    }
47
48    fn clean_chunked_response(&self, body: &str) -> String {
49        // Remove HTTP chunked encoding artifacts
50        let mut cleaned = body.to_string();
51
52        // Remove chunk size at the beginning (like "f053\r\n")
53        if let Some(first_newline) = cleaned.find("\r\n") {
54            if cleaned[..first_newline]
55                .chars()
56                .all(|c| c.is_ascii_hexdigit())
57            {
58                cleaned = cleaned[first_newline + 2..].to_string();
59            }
60        }
61
62        // Remove trailing chunk markers (like "\r\n0\r\n\r\n")
63        if cleaned.ends_with("\r\n0\r\n\r\n") {
64            cleaned.truncate(cleaned.len() - 7);
65        } else if cleaned.ends_with("\n\r\n0\r\n\r\n") {
66            cleaned.truncate(cleaned.len() - 8);
67        }
68
69        cleaned
70    }
71
72    pub async fn list_containers(
73        &self,
74        all: bool,
75    ) -> Result<Vec<Container>, Box<dyn std::error::Error>> {
76        let endpoint = if all {
77            "/containers/json?all=true"
78        } else {
79            "/containers/json"
80        };
81        let json_response = self.api_call(endpoint).await?;
82        let containers: Vec<Container> = serde_json::from_str(&json_response)?;
83        Ok(containers)
84    }
85
86    pub async fn get_running_containers_by_image_substring(
87        &self,
88        image_substring: &str,
89    ) -> Result<Vec<Container>, Box<dyn std::error::Error>> {
90        let containers = self.list_containers(true).await?;
91        Ok(containers
92            .into_iter()
93            .filter(|container| {
94                container.state == "running" && container.image.contains(image_substring)
95            })
96            .collect())
97    }
98
99    pub async fn remove_container(
100        &self,
101        container_id: &str,
102    ) -> Result<(), Box<dyn std::error::Error>> {
103        let endpoint = &format!("/containers/{}?force=true", container_id);
104        let stream = UnixStream::connect(&self.socket_path)?;
105        self.send_delete_request(stream, endpoint).await?;
106        Ok(())
107    }
108
109    async fn send_delete_request(
110        &self,
111        mut stream: UnixStream,
112        endpoint: &str,
113    ) -> Result<String, Box<dyn std::error::Error>> {
114        let request = format!(
115            "DELETE {} HTTP/1.1\r\nHost: localhost\r\nConnection: close\r\n\r\n",
116            endpoint
117        );
118
119        stream.write_all(request.as_bytes())?;
120        self.read_response(stream)
121    }
122
123    pub async fn stop_container(
124        &self,
125        container_id: &str,
126    ) -> Result<(), Box<dyn std::error::Error>> {
127        let endpoint = &format!("/containers/{}/stop", container_id);
128        let stream = UnixStream::connect(&self.socket_path)?;
129        self.send_post_request(stream, endpoint, "").await?;
130        Ok(())
131    }
132
133    pub async fn start_container(
134        &self,
135        container_id: &str,
136    ) -> Result<(), Box<dyn std::error::Error>> {
137        let endpoint = &format!("/containers/{}/start", container_id);
138        let stream = UnixStream::connect(&self.socket_path)?;
139        self.send_post_request(stream, endpoint, "").await?;
140        Ok(())
141    }
142
143    async fn send_post_request(
144        &self,
145        mut stream: UnixStream,
146        endpoint: &str,
147        body: &str,
148    ) -> Result<String, Box<dyn std::error::Error>> {
149        let request = format!(
150            "POST {} HTTP/1.1\r\nHost: localhost\r\nContent-Type: application/json\r\nContent-Length: {}\r\nConnection: close\r\n\r\n{}",
151            endpoint,
152            body.len(),
153            body
154        );
155
156        stream.write_all(request.as_bytes())?;
157        self.read_response(stream)
158    }
159
160    pub async fn get_running_containers_by_name(
161        &self,
162        name: &str,
163    ) -> Result<Vec<Container>, Box<dyn std::error::Error>> {
164        let containers = self.list_containers(true).await?;
165        Ok(containers
166            .into_iter()
167            .filter(|container| {
168                container.state == "running" && container.names.iter().any(|n| n.contains(name))
169            })
170            .collect())
171    }
172}