rolling_deployer/
docker_client.rs1use std::io::{Read, Write};
2use std::os::unix::net::UnixStream;
3
4use crate::types::Container;
5
6pub struct DockerClient {
7 socket_path: String,
8}
9
10impl DockerClient {
11 pub fn new(socket_path: String) -> Self {
12 Self { socket_path }
13 }
14
15 async fn api_call(&self, endpoint: &str) -> Result<String, Box<dyn std::error::Error>> {
16 let stream = UnixStream::connect(&self.socket_path)?;
17 self.send_request(stream, endpoint).await
18 }
19
20 async fn send_request(
21 &self,
22 mut stream: UnixStream,
23 endpoint: &str,
24 ) -> Result<String, Box<dyn std::error::Error>> {
25 let request = format!(
26 "GET {} HTTP/1.1\r\nHost: localhost\r\nConnection: close\r\n\r\n",
27 endpoint
28 );
29
30 stream.write_all(request.as_bytes())?;
31 self.read_response(stream)
32 }
33
34 fn read_response(&self, mut stream: UnixStream) -> Result<String, Box<dyn std::error::Error>> {
35 let mut response = String::new();
36 stream.read_to_string(&mut response)?;
37
38 if let Some(json_start) = response.find("\r\n\r\n") {
40 let body = &response[json_start + 4..];
41 Ok(self.clean_chunked_response(body))
43 } else {
44 Ok(response)
45 }
46 }
47
48 fn clean_chunked_response(&self, body: &str) -> String {
49 let mut cleaned = body.to_string();
51
52 if let Some(first_newline) = cleaned.find("\r\n") {
54 if cleaned[..first_newline]
55 .chars()
56 .all(|c| c.is_ascii_hexdigit())
57 {
58 cleaned = cleaned[first_newline + 2..].to_string();
59 }
60 }
61
62 if cleaned.ends_with("\r\n0\r\n\r\n") {
64 cleaned.truncate(cleaned.len() - 7);
65 } else if cleaned.ends_with("\n\r\n0\r\n\r\n") {
66 cleaned.truncate(cleaned.len() - 8);
67 }
68
69 cleaned
70 }
71
72 pub async fn list_containers(
73 &self,
74 all: bool,
75 ) -> Result<Vec<Container>, Box<dyn std::error::Error>> {
76 let endpoint = if all {
77 "/containers/json?all=true"
78 } else {
79 "/containers/json"
80 };
81 let json_response = self.api_call(endpoint).await?;
82 let containers: Vec<Container> = serde_json::from_str(&json_response)?;
83 Ok(containers)
84 }
85
86 pub async fn get_running_containers_by_image_substring(
87 &self,
88 image_substring: &str,
89 ) -> Result<Vec<Container>, Box<dyn std::error::Error>> {
90 let containers = self.list_containers(true).await?;
91 Ok(containers
92 .into_iter()
93 .filter(|container| {
94 container.state == "running" && container.image.contains(image_substring)
95 })
96 .collect())
97 }
98
99 pub async fn remove_container(
100 &self,
101 container_id: &str,
102 ) -> Result<(), Box<dyn std::error::Error>> {
103 let endpoint = &format!("/containers/{}?force=true", container_id);
104 let stream = UnixStream::connect(&self.socket_path)?;
105 self.send_delete_request(stream, endpoint).await?;
106 Ok(())
107 }
108
109 async fn send_delete_request(
110 &self,
111 mut stream: UnixStream,
112 endpoint: &str,
113 ) -> Result<String, Box<dyn std::error::Error>> {
114 let request = format!(
115 "DELETE {} HTTP/1.1\r\nHost: localhost\r\nConnection: close\r\n\r\n",
116 endpoint
117 );
118
119 stream.write_all(request.as_bytes())?;
120 self.read_response(stream)
121 }
122
123 pub async fn stop_container(
124 &self,
125 container_id: &str,
126 ) -> Result<(), Box<dyn std::error::Error>> {
127 let endpoint = &format!("/containers/{}/stop", container_id);
128 let stream = UnixStream::connect(&self.socket_path)?;
129 self.send_post_request(stream, endpoint, "").await?;
130 Ok(())
131 }
132
133 pub async fn start_container(
134 &self,
135 container_id: &str,
136 ) -> Result<(), Box<dyn std::error::Error>> {
137 let endpoint = &format!("/containers/{}/start", container_id);
138 let stream = UnixStream::connect(&self.socket_path)?;
139 self.send_post_request(stream, endpoint, "").await?;
140 Ok(())
141 }
142
143 async fn send_post_request(
144 &self,
145 mut stream: UnixStream,
146 endpoint: &str,
147 body: &str,
148 ) -> Result<String, Box<dyn std::error::Error>> {
149 let request = format!(
150 "POST {} HTTP/1.1\r\nHost: localhost\r\nContent-Type: application/json\r\nContent-Length: {}\r\nConnection: close\r\n\r\n{}",
151 endpoint,
152 body.len(),
153 body
154 );
155
156 stream.write_all(request.as_bytes())?;
157 self.read_response(stream)
158 }
159
160 pub async fn get_running_containers_by_name(
161 &self,
162 name: &str,
163 ) -> Result<Vec<Container>, Box<dyn std::error::Error>> {
164 let containers = self.list_containers(true).await?;
165 Ok(containers
166 .into_iter()
167 .filter(|container| {
168 container.state == "running" && container.names.iter().any(|n| n.contains(name))
169 })
170 .collect())
171 }
172}