shipper_core/engine/parallel/
mod.rs1use std::path::Path;
10use std::sync::{Arc, Mutex};
11
12use anyhow::Result;
13
14use crate::plan::PlannedWorkspace;
15use crate::state::events;
16use shipper_registry::HttpRegistryClient as RegistryClient;
17use shipper_types::{
18 ExecutionResult, ExecutionState, PackageEvidence, PackageReceipt, PackageState, RuntimeOptions,
19};
20
21mod policy;
22mod publish;
23mod readiness;
24mod reconcile;
25mod webhook;
26
27pub use crate::plan::chunking::chunk_by_max_concurrent;
29
30use publish::run_publish_level;
31use webhook::{WebhookEvent, maybe_send_event};
32
33pub trait Reporter {
36 fn info(&mut self, msg: &str);
37 fn warn(&mut self, msg: &str);
38 fn error(&mut self, msg: &str);
39}
40
41struct HostReporterAdapter<'a> {
45 inner: &'a mut dyn crate::engine::Reporter,
46}
47
48impl<'a> Reporter for HostReporterAdapter<'a> {
49 fn info(&mut self, msg: &str) {
50 self.inner.info(msg);
51 }
52 fn warn(&mut self, msg: &str) {
53 self.inner.warn(msg);
54 }
55 fn error(&mut self, msg: &str) {
56 self.inner.error(msg);
57 }
58}
59
60pub fn run_publish_parallel(
70 ws: &crate::plan::PlannedWorkspace,
71 opts: &RuntimeOptions,
72 st: &mut ExecutionState,
73 state_dir: &Path,
74 reg: &crate::registry::RegistryClient,
75 reporter: &mut dyn crate::engine::Reporter,
76) -> Result<Vec<PackageReceipt>> {
77 let api_base = reg.registry().api_base.trim_end_matches('/');
78 let reg_inner = shipper_registry::HttpRegistryClient::new(api_base);
79 let mut adapter = HostReporterAdapter { inner: reporter };
80 run_publish_parallel_inner(ws, opts, st, state_dir, ®_inner, &mut adapter)
81}
82
83pub(crate) fn run_publish_parallel_inner(
86 ws: &PlannedWorkspace,
87 opts: &RuntimeOptions,
88 st: &mut ExecutionState,
89 state_dir: &Path,
90 reg: &RegistryClient,
91 reporter: &mut dyn Reporter,
92) -> Result<Vec<PackageReceipt>> {
93 let levels = ws.plan.group_by_levels();
94
95 reporter.info(&format!(
96 "parallel publish: {} levels, {} packages total",
97 levels.len(),
98 ws.plan.packages.len()
99 ));
100
101 webhook::maybe_send_event(
103 &opts.webhook,
104 WebhookEvent::PublishStarted {
105 plan_id: ws.plan.plan_id.clone(),
106 package_count: ws.plan.packages.len(),
107 registry: ws.plan.registry.name.clone(),
108 },
109 );
110
111 let events_path = events::events_path(state_dir);
113 let event_log = Arc::new(Mutex::new(events::EventLog::new()));
114
115 let st_arc = Arc::new(Mutex::new(st.clone()));
117
118 struct SendReporter {
120 infos: Mutex<Vec<String>>,
121 warns: Mutex<Vec<String>>,
122 errors: Mutex<Vec<String>>,
123 }
124 impl Reporter for SendReporter {
125 fn info(&mut self, msg: &str) {
126 self.infos.lock().unwrap().push(msg.to_string());
127 }
128 fn warn(&mut self, msg: &str) {
129 self.warns.lock().unwrap().push(msg.to_string());
130 }
131 fn error(&mut self, msg: &str) {
132 self.errors.lock().unwrap().push(msg.to_string());
133 }
134 }
135
136 let send_reporter = Arc::new(Mutex::new(SendReporter {
137 infos: Mutex::new(Vec::new()),
138 warns: Mutex::new(Vec::new()),
139 errors: Mutex::new(Vec::new()),
140 }));
141
142 let mut all_receipts: Vec<PackageReceipt> = Vec::new();
143
144 let mut reached_resume_point = opts.resume_from.is_none();
146
147 for level in &levels {
148 if !reached_resume_point {
150 if level
151 .packages
152 .iter()
153 .any(|p| Some(&p.name) == opts.resume_from.as_ref())
154 {
155 reached_resume_point = true;
156 } else {
157 let mut level_done = true;
161 {
162 let st_guard = st_arc.lock().unwrap();
163 for p in &level.packages {
164 let key = crate::runtime::execution::pkg_key(&p.name, &p.version);
165 if let Some(progress) = st_guard.packages.get(&key) {
166 if !matches!(
167 progress.state,
168 PackageState::Published | PackageState::Skipped { .. }
169 ) {
170 level_done = false;
171 break;
172 }
173 } else {
174 level_done = false;
175 break;
176 }
177 }
178 }
179
180 if level_done {
181 reporter.info(&format!(
182 "Level {}: already complete (skipping)",
183 level.level
184 ));
185 } else {
186 reporter.warn(&format!(
187 "Level {}: skipping (before resume point {})",
188 level.level,
189 opts.resume_from.as_ref().unwrap()
190 ));
191 }
192
193 for p in &level.packages {
195 let key = crate::runtime::execution::pkg_key(&p.name, &p.version);
196 let st_guard = st_arc.lock().unwrap();
197 if let Some(progress) = st_guard.packages.get(&key) {
198 all_receipts.push(PackageReceipt {
199 name: p.name.clone(),
200 version: p.version.clone(),
201 attempts: progress.attempts,
202 state: progress.state.clone(),
203 started_at: chrono::Utc::now(),
204 finished_at: chrono::Utc::now(),
205 duration_ms: 0,
206 evidence: PackageEvidence {
207 attempts: vec![],
208 readiness_checks: vec![],
209 },
210 compromised_at: None,
211 compromised_by: None,
212 superseded_by: None,
213 });
214 }
215 }
216 continue;
217 }
218 }
219
220 let level_receipts = run_publish_level(
221 level,
222 ws,
223 opts,
224 reg,
225 &st_arc,
226 state_dir,
227 &event_log,
228 &events_path,
229 &(send_reporter.clone() as Arc<Mutex<dyn Reporter + Send>>),
230 )?;
231 all_receipts.extend(level_receipts);
232 }
233
234 {
236 let sr = send_reporter.lock().unwrap();
237 for msg in sr.infos.lock().unwrap().iter() {
238 reporter.info(msg);
239 }
240 for msg in sr.warns.lock().unwrap().iter() {
241 reporter.warn(msg);
242 }
243 for msg in sr.errors.lock().unwrap().iter() {
244 reporter.error(msg);
245 }
246 }
247
248 let updated_st = st_arc.lock().unwrap();
250 *st = updated_st.clone();
251
252 let total_packages = all_receipts.len();
254 let success_count = all_receipts
255 .iter()
256 .filter(|r| matches!(r.state, PackageState::Published))
257 .count();
258 let failure_count = all_receipts
259 .iter()
260 .filter(|r| matches!(r.state, PackageState::Failed { .. }))
261 .count();
262 let skipped_count = all_receipts
263 .iter()
264 .filter(|r| matches!(r.state, PackageState::Skipped { .. }))
265 .count();
266
267 let exec_result = if all_receipts.iter().all(|r| {
268 matches!(
269 r.state,
270 PackageState::Published | PackageState::Uploaded | PackageState::Skipped { .. }
271 )
272 }) {
273 ExecutionResult::Success
274 } else if success_count == 0 {
275 ExecutionResult::CompleteFailure
276 } else {
277 ExecutionResult::PartialFailure
278 };
279
280 maybe_send_event(
282 &opts.webhook,
283 WebhookEvent::PublishCompleted {
284 plan_id: ws.plan.plan_id.clone(),
285 total_packages,
286 success_count,
287 failure_count,
288 skipped_count,
289 result: match exec_result {
290 ExecutionResult::Success => "success".to_string(),
291 ExecutionResult::PartialFailure => "partial_failure".to_string(),
292 ExecutionResult::CompleteFailure => "complete_failure".to_string(),
293 },
294 },
295 );
296
297 Ok(all_receipts)
298}
299
300#[cfg(test)]
301mod property_tests {
302 use proptest::prelude::*;
303
304 use super::chunk_by_max_concurrent;
305
306 fn names() -> impl Strategy<Value = Vec<String>> {
307 prop::collection::vec("[a-z]{1,8}", 0..64)
308 }
309
310 proptest! {
311 #[test]
312 fn chunking_preserves_order_and_limits_size(items in names(), limit in 0usize..64) {
313 let chunks = chunk_by_max_concurrent(&items, limit);
314 let flattened: Vec<String> = chunks.iter().flatten().cloned().collect();
315
316 prop_assert_eq!(flattened.as_slice(), items.as_slice());
317
318 let max_size = limit.max(1);
319 for chunk in &chunks {
320 prop_assert!(chunk.len() <= max_size);
321 }
322
323 if !flattened.is_empty() {
324 if max_size == 1 {
325 prop_assert!(chunks.iter().all(|chunk| chunk.len() <= 1));
326 } else {
327 prop_assert!(chunks.iter().all(|chunk| !chunk.is_empty() && chunk.len() <= max_size));
328 }
329 }
330 }
331 }
332}
333
334#[cfg(test)]
335mod tests;