stormchaser_runner_docker/container_machine/transitions/
running.rs1use crate::container_machine::{state, ContainerMetrics, ContainerState, DockerContainerMachine};
2use anyhow::Result;
3use bollard::container::{
4 Config, CreateContainerOptions, LogsOptions, StartContainerOptions, WaitContainerOptions,
5};
6use bollard::service::HostConfig;
7use chrono::Utc;
8use cloudevents::EventBuilder;
9use futures::StreamExt;
10use serde_json::Value;
11use std::collections::HashMap;
12use std::time::Duration;
13use stormchaser_model::dsl::CommonContainerSpec;
14use tokio::time::sleep;
15use tracing::{error, info};
16use uuid::Uuid;
17
18impl DockerContainerMachine<state::Running> {
19 pub async fn wait(self) -> Result<DockerContainerMachine<state::Finished>> {
21 let container_name = self.state.container_name.clone();
22 let dispatched_at = self.state.dispatched_at;
23 let volumes_to_cleanup = self.state.volumes_to_cleanup.clone();
24 let storage_names = self.state.storage_names.clone();
25 let mounts = self.state.mounts.clone();
26
27 let mut wait_stream = self.docker.wait_container(
28 &container_name,
29 Some(WaitContainerOptions {
30 condition: "not-running",
31 }),
32 );
33
34 let exit_code = if let Some(wait_result) = wait_stream.next().await {
35 match wait_result {
36 Ok(response) => Some(response.status_code),
37 Err(bollard::errors::Error::DockerContainerWaitError { error: _, code }) => {
38 Some(code)
39 }
40 Err(e) => {
41 error!("Error waiting for container {}: {:?}", container_name, e);
42 None
43 }
44 }
45 } else {
46 None
47 };
48
49 let latency_ms = (dispatched_at - self.metadata.received_at)
50 .num_milliseconds()
51 .max(0) as u64;
52 let duration_ms = (Utc::now() - dispatched_at).num_milliseconds().max(0) as u64;
53
54 let mut metrics = ContainerMetrics {
55 exit_code,
56 duration_ms,
57 latency_ms,
58 ..Default::default()
59 };
60
61 let spec: Option<CommonContainerSpec> =
63 serde_json::from_value(self.metadata.step_dsl.spec.clone()).ok();
64
65 if !storage_names.is_empty() || !self.metadata.step_dsl.reports.is_empty() {
66 let agent_image = "stormchaser-agent:v1";
67 let park_container_name = format!("park-{}", Uuid::new_v4());
68
69 let mut parking_urls = HashMap::new();
70 let mut mount_paths = HashMap::new();
71 let mut artifact_urls = HashMap::new();
72
73 if let Some(storage_data) = &self.metadata.storage {
74 for name in &storage_names {
75 if let Some(urls) = storage_data.get(name) {
76 parking_urls.insert(name.clone(), urls.clone());
77 if let Some(artifacts) = urls.get("artifacts").and_then(|a| a.as_object()) {
78 let allowed_artifacts = self.metadata.step_dsl.artifacts.as_ref();
79 for (art_name, art_val) in artifacts {
80 if let Some(allowed) = allowed_artifacts {
81 if !allowed.contains(art_name) {
82 continue;
83 }
84 } else {
85 continue;
87 }
88
89 let mut cloned_art = art_val.clone();
90 if let Some(m) = spec
91 .as_ref()
92 .and_then(|s| s.storage_mounts.as_ref())
93 .and_then(|ms| ms.iter().find(|m| &m.name == name))
94 {
95 if let Some(p) = art_val.get("path").and_then(|p| p.as_str()) {
96 let abs_path = std::path::Path::new(&m.mount_path).join(p);
97 cloned_art["path"] =
98 Value::String(abs_path.to_string_lossy().to_string());
99 }
100 }
101 artifact_urls.insert(art_name.clone(), cloned_art);
102 }
103 }
104 }
105 if let Some(m) = spec
106 .as_ref()
107 .and_then(|s| s.storage_mounts.as_ref())
108 .and_then(|ms| ms.iter().find(|m| &m.name == name))
109 {
110 mount_paths.insert(name.clone(), m.mount_path.clone());
111 }
112 }
113 }
114
115 let mut agent_args = vec![
116 "run".to_string(),
117 "--parking-urls".to_string(),
118 serde_json::to_string(&parking_urls)?,
119 "--mount-paths".to_string(),
120 serde_json::to_string(&mount_paths)?,
121 ];
122
123 if !artifact_urls.is_empty() {
124 agent_args.push("--artifact-urls".to_string());
125 agent_args.push(serde_json::to_string(&artifact_urls)?);
126 }
127
128 if let Some(reports) = &self.metadata.test_report_urls {
129 if !reports.is_empty() {
130 agent_args.push("--report-urls".to_string());
131 agent_args.push(serde_json::to_string(reports)?);
132 }
133 }
134
135 if !self.metadata.step_dsl.reports.is_empty() {
136 agent_args.push("--test-reports".to_string());
137 agent_args.push(serde_json::to_string(&self.metadata.step_dsl.reports)?);
138 }
139
140 if exit_code != Some(0) {
141 }
144
145 agent_args.push("--".to_string());
146 agent_args.push("/bin/true".to_string());
147
148 let agent_config = Config {
149 image: Some(agent_image.to_string()),
150 cmd: Some(agent_args),
151 entrypoint: Some(vec!["/usr/local/bin/stormchaser-agent".to_string()]),
152 host_config: Some(HostConfig {
153 mounts: Some(mounts),
154 network_mode: self.get_network_mode().await,
155 ..Default::default()
156 }),
157 ..Default::default()
158 };
159
160 info!("Running agent for parking/reports: {}", park_container_name);
161 if let Some(nats) = &self.nats {
162 let packing_event = serde_json::json!({
163 "run_id": self.metadata.run_id,
164 "step_id": self.metadata.step_id,
165 "status": "packing_sfs",
166 "timestamp": chrono::Utc::now(),
167 });
168 if let Ok(ce) = cloudevents::EventBuilderV10::new()
169 .id(uuid::Uuid::new_v4().to_string())
170 .ty("stormchaser.v1.step.packing_sfs")
171 .source("/stormchaser/runner")
172 .time(chrono::Utc::now())
173 .data("application/json", packing_event)
174 .build()
175 {
176 if let Ok(payload_bytes) = serde_json::to_vec(&ce) {
177 let _ = nats
178 .publish("stormchaser.v1.step.packing_sfs", payload_bytes.into())
179 .await;
180 }
181 }
182 }
183
184 self.docker
185 .create_container(
186 Some(CreateContainerOptions {
187 name: park_container_name.clone(),
188 ..Default::default()
189 }),
190 agent_config,
191 )
192 .await?;
193
194 self.docker
195 .start_container(&park_container_name, None::<StartContainerOptions<String>>)
196 .await?;
197
198 let mut agent_wait_stream = self.docker.wait_container(
199 &park_container_name,
200 Some(WaitContainerOptions {
201 condition: "not-running",
202 }),
203 );
204
205 let _ = agent_wait_stream.next().await;
206
207 if let Ok(Some(artifacts)) = self.get_artifact_meta(&park_container_name).await {
208 metrics.artifacts = Some(artifacts);
209 }
210 if let Ok(Some(hashes)) = self.get_storage_hashes(&park_container_name).await {
211 metrics.storage_hashes = Some(hashes);
212 }
213 if let Ok(Some(reports)) = self.get_test_reports(&park_container_name).await {
214 metrics.test_reports = Some(reports);
215 }
216
217 sleep(Duration::from_secs(15)).await;
219 let _ = self
220 .docker
221 .remove_container(&park_container_name, None)
222 .await;
223 } else {
224 if let Ok(Some(artifacts)) = self.get_artifact_meta(&container_name).await {
226 metrics.artifacts = Some(artifacts);
227 }
228 if let Ok(Some(hashes)) = self.get_storage_hashes(&container_name).await {
229 metrics.storage_hashes = Some(hashes);
230 }
231 if let Ok(Some(reports)) = self.get_test_reports(&container_name).await {
232 metrics.test_reports = Some(reports);
233 }
234 }
235
236 sleep(Duration::from_secs(15)).await;
239 let _ = self.docker.remove_container(&container_name, None).await;
240
241 for vol in volumes_to_cleanup {
242 info!("Cleaning up volume: {}", vol);
243 let _ = self.docker.remove_volume(&vol, None).await;
244 }
245
246 let result = if exit_code == Some(0) {
247 info!("Container {} completed successfully", container_name);
248 ContainerState::Succeeded(metrics)
249 } else {
250 let error_msg = format!("Container exited with code {:?}", exit_code);
251 error!("Container {} failed: {}", container_name, error_msg);
252 ContainerState::Failed(error_msg, metrics)
253 };
254
255 Ok(DockerContainerMachine {
256 nats: self.nats.clone(),
257 docker: self.docker,
258 metadata: self.metadata,
259 state: state::Finished { result },
260 })
261 }
262
263 async fn get_artifact_meta(
264 &self,
265 container_name: &str,
266 ) -> Result<Option<HashMap<String, Value>>> {
267 let mut logs = self.docker.logs(
268 container_name,
269 Some(LogsOptions::<String> {
270 stdout: true,
271 stderr: true,
272 ..Default::default()
273 }),
274 );
275
276 while let Some(log_result) = logs.next().await {
277 if let Ok(output) = log_result {
278 let line = output.to_string();
279 if line.contains("Parked artifacts:") {
280 if let Some(json_start) = line.find('{') {
281 let json_part = &line[json_start..];
282 if let Ok(meta) = serde_json::from_str(json_part) {
283 return Ok(Some(meta));
284 }
285 }
286 }
287 }
288 }
289
290 Ok(None)
291 }
292
293 async fn get_storage_hashes(
294 &self,
295 container_name: &str,
296 ) -> Result<Option<HashMap<String, String>>> {
297 let mut logs = self.docker.logs(
298 container_name,
299 Some(LogsOptions::<String> {
300 stdout: true,
301 stderr: true,
302 ..Default::default()
303 }),
304 );
305
306 while let Some(log_result) = logs.next().await {
307 if let Ok(output) = log_result {
308 let line = output.to_string();
309 if line.contains("Parked storage hashes:") {
310 if let Some(json_start) = line.find('{') {
311 let json_part = &line[json_start..];
312 if let Ok(hashes) = serde_json::from_str(json_part) {
313 return Ok(Some(hashes));
314 }
315 }
316 }
317 }
318 }
319
320 Ok(None)
321 }
322
323 async fn get_test_reports(&self, container_name: &str) -> Result<Option<Value>> {
324 let mut logs = self.docker.logs(
325 container_name,
326 Some(LogsOptions::<String> {
327 stdout: true,
328 stderr: true,
329 ..Default::default()
330 }),
331 );
332
333 while let Some(log_result) = logs.next().await {
334 if let Ok(output) = log_result {
335 let line = output.to_string();
336 if line.contains("Collected test reports JSON:") {
337 if let Some(json_start) = line.find('{') {
338 let json_part = &line[json_start..];
339 if let Ok(reports) = serde_json::from_str(json_part) {
340 return Ok(Some(reports));
341 }
342 }
343 }
344 }
345 }
346
347 Ok(None)
348 }
349}