Skip to main content

shipper_core/engine/parallel/
mod.rs

1//! Wave-based parallel publishing engine.
2//!
3//! Schedules independent crates into concurrent publish waves based on the
4//! dependency graph produced by `shipper_plan::ReleasePlan::group_by_levels`.
5//!
6//! Absorbed from the standalone `shipper-engine-parallel` crate. See
7//! `CLAUDE.md` alongside this module for module-level guidance.
8
9use std::path::Path;
10use std::sync::{Arc, Mutex};
11
12use anyhow::Result;
13
14use crate::plan::PlannedWorkspace;
15use crate::state::events;
16use shipper_registry::HttpRegistryClient as RegistryClient;
17use shipper_types::{
18    ExecutionResult, ExecutionState, PackageEvidence, PackageReceipt, PackageState, RuntimeOptions,
19};
20
21mod policy;
22mod publish;
23mod readiness;
24mod reconcile;
25mod webhook;
26
27/// Re-exported for parallel publish wave planning.
28pub use crate::plan::chunking::chunk_by_max_concurrent;
29
30use publish::run_publish_level;
31use webhook::{WebhookEvent, maybe_send_event};
32
33/// Reporter interface shared with the host crate. Parallel publish forwards
34/// status updates and warnings through this trait.
35pub trait Reporter {
36    fn info(&mut self, msg: &str);
37    fn warn(&mut self, msg: &str);
38    fn error(&mut self, msg: &str);
39}
40
41/// Adapter that bridges the host crate's `crate::engine::Reporter` trait into
42/// this module's local `Reporter` trait. Allows callers inside `shipper` to
43/// pass their existing reporters without any wrapping at the call site.
44struct HostReporterAdapter<'a> {
45    inner: &'a mut dyn crate::engine::Reporter,
46}
47
48impl<'a> Reporter for HostReporterAdapter<'a> {
49    fn info(&mut self, msg: &str) {
50        self.inner.info(msg);
51    }
52    fn warn(&mut self, msg: &str) {
53        self.inner.warn(msg);
54    }
55    fn error(&mut self, msg: &str) {
56        self.inner.error(msg);
57    }
58}
59
60/// Run publish in parallel mode using `shipper`'s wrapped `RegistryClient`.
61///
62/// This is the entry point called by `engine::run_publish`. It adapts the
63/// host crate's types (`crate::registry::RegistryClient`, `crate::engine::Reporter`)
64/// into the inner ones expected by the parallel engine.
65///
66/// Constructs a fresh `shipper_registry::RegistryClient` from the host
67/// registry's configuration so the call works regardless of which `registry`
68/// impl variant is active (micro wrapper vs. in-tree legacy).
69pub fn run_publish_parallel(
70    ws: &crate::plan::PlannedWorkspace,
71    opts: &RuntimeOptions,
72    st: &mut ExecutionState,
73    state_dir: &Path,
74    reg: &crate::registry::RegistryClient,
75    reporter: &mut dyn crate::engine::Reporter,
76) -> Result<Vec<PackageReceipt>> {
77    let api_base = reg.registry().api_base.trim_end_matches('/');
78    let reg_inner = shipper_registry::HttpRegistryClient::new(api_base);
79    let mut adapter = HostReporterAdapter { inner: reporter };
80    run_publish_parallel_inner(ws, opts, st, state_dir, &reg_inner, &mut adapter)
81}
82
83/// Inner entry point operating on `shipper_registry::RegistryClient` and the
84/// local `Reporter` trait. Kept `pub` for tests inside this module.
85pub(crate) fn run_publish_parallel_inner(
86    ws: &PlannedWorkspace,
87    opts: &RuntimeOptions,
88    st: &mut ExecutionState,
89    state_dir: &Path,
90    reg: &RegistryClient,
91    reporter: &mut dyn Reporter,
92) -> Result<Vec<PackageReceipt>> {
93    let levels = ws.plan.group_by_levels();
94
95    reporter.info(&format!(
96        "parallel publish: {} levels, {} packages total",
97        levels.len(),
98        ws.plan.packages.len()
99    ));
100
101    // Send webhook notification: publish started
102    webhook::maybe_send_event(
103        &opts.webhook,
104        WebhookEvent::PublishStarted {
105            plan_id: ws.plan.plan_id.clone(),
106            package_count: ws.plan.packages.len(),
107            registry: ws.plan.registry.name.clone(),
108        },
109    );
110
111    // Initialize event log
112    let events_path = events::events_path(state_dir);
113    let event_log = Arc::new(Mutex::new(events::EventLog::new()));
114
115    // Wrap state and reporter in Arc<Mutex<>> for thread safety
116    let st_arc = Arc::new(Mutex::new(st.clone()));
117
118    // Create a thread-safe reporter wrapper
119    struct SendReporter {
120        infos: Mutex<Vec<String>>,
121        warns: Mutex<Vec<String>>,
122        errors: Mutex<Vec<String>>,
123    }
124    impl Reporter for SendReporter {
125        fn info(&mut self, msg: &str) {
126            self.infos.lock().unwrap().push(msg.to_string());
127        }
128        fn warn(&mut self, msg: &str) {
129            self.warns.lock().unwrap().push(msg.to_string());
130        }
131        fn error(&mut self, msg: &str) {
132            self.errors.lock().unwrap().push(msg.to_string());
133        }
134    }
135
136    let send_reporter = Arc::new(Mutex::new(SendReporter {
137        infos: Mutex::new(Vec::new()),
138        warns: Mutex::new(Vec::new()),
139        errors: Mutex::new(Vec::new()),
140    }));
141
142    let mut all_receipts: Vec<PackageReceipt> = Vec::new();
143
144    // Track if we've reached the resume point if one was specified
145    let mut reached_resume_point = opts.resume_from.is_none();
146
147    for level in &levels {
148        // If we haven't reached the resume point, check if it's in this level
149        if !reached_resume_point {
150            if level
151                .packages
152                .iter()
153                .any(|p| Some(&p.name) == opts.resume_from.as_ref())
154            {
155                reached_resume_point = true;
156            } else {
157                // Check if all packages in this level are already done in state
158                // If so, we can "skip" it silently (as already done).
159                // If NOT done, we skip it with a warning because of resume_from.
160                let mut level_done = true;
161                {
162                    let st_guard = st_arc.lock().unwrap();
163                    for p in &level.packages {
164                        let key = crate::runtime::execution::pkg_key(&p.name, &p.version);
165                        if let Some(progress) = st_guard.packages.get(&key) {
166                            if !matches!(
167                                progress.state,
168                                PackageState::Published | PackageState::Skipped { .. }
169                            ) {
170                                level_done = false;
171                                break;
172                            }
173                        } else {
174                            level_done = false;
175                            break;
176                        }
177                    }
178                }
179
180                if level_done {
181                    reporter.info(&format!(
182                        "Level {}: already complete (skipping)",
183                        level.level
184                    ));
185                } else {
186                    reporter.warn(&format!(
187                        "Level {}: skipping (before resume point {})",
188                        level.level,
189                        opts.resume_from.as_ref().unwrap()
190                    ));
191                }
192
193                // Still need to "collect" receipts for these skipped packages so they appear in final receipt
194                for p in &level.packages {
195                    let key = crate::runtime::execution::pkg_key(&p.name, &p.version);
196                    let st_guard = st_arc.lock().unwrap();
197                    if let Some(progress) = st_guard.packages.get(&key) {
198                        all_receipts.push(PackageReceipt {
199                            name: p.name.clone(),
200                            version: p.version.clone(),
201                            attempts: progress.attempts,
202                            state: progress.state.clone(),
203                            started_at: chrono::Utc::now(),
204                            finished_at: chrono::Utc::now(),
205                            duration_ms: 0,
206                            evidence: PackageEvidence {
207                                attempts: vec![],
208                                readiness_checks: vec![],
209                            },
210                            compromised_at: None,
211                            compromised_by: None,
212                            superseded_by: None,
213                        });
214                    }
215                }
216                continue;
217            }
218        }
219
220        let level_receipts = run_publish_level(
221            level,
222            ws,
223            opts,
224            reg,
225            &st_arc,
226            state_dir,
227            &event_log,
228            &events_path,
229            &(send_reporter.clone() as Arc<Mutex<dyn Reporter + Send>>),
230        )?;
231        all_receipts.extend(level_receipts);
232    }
233
234    // Replay messages to the real reporter
235    {
236        let sr = send_reporter.lock().unwrap();
237        for msg in sr.infos.lock().unwrap().iter() {
238            reporter.info(msg);
239        }
240        for msg in sr.warns.lock().unwrap().iter() {
241            reporter.warn(msg);
242        }
243        for msg in sr.errors.lock().unwrap().iter() {
244            reporter.error(msg);
245        }
246    }
247
248    // Copy updated state back
249    let updated_st = st_arc.lock().unwrap();
250    *st = updated_st.clone();
251
252    // Calculate publish completion statistics
253    let total_packages = all_receipts.len();
254    let success_count = all_receipts
255        .iter()
256        .filter(|r| matches!(r.state, PackageState::Published))
257        .count();
258    let failure_count = all_receipts
259        .iter()
260        .filter(|r| matches!(r.state, PackageState::Failed { .. }))
261        .count();
262    let skipped_count = all_receipts
263        .iter()
264        .filter(|r| matches!(r.state, PackageState::Skipped { .. }))
265        .count();
266
267    let exec_result = if all_receipts.iter().all(|r| {
268        matches!(
269            r.state,
270            PackageState::Published | PackageState::Uploaded | PackageState::Skipped { .. }
271        )
272    }) {
273        ExecutionResult::Success
274    } else if success_count == 0 {
275        ExecutionResult::CompleteFailure
276    } else {
277        ExecutionResult::PartialFailure
278    };
279
280    // Send webhook notification: all complete
281    maybe_send_event(
282        &opts.webhook,
283        WebhookEvent::PublishCompleted {
284            plan_id: ws.plan.plan_id.clone(),
285            total_packages,
286            success_count,
287            failure_count,
288            skipped_count,
289            result: match exec_result {
290                ExecutionResult::Success => "success".to_string(),
291                ExecutionResult::PartialFailure => "partial_failure".to_string(),
292                ExecutionResult::CompleteFailure => "complete_failure".to_string(),
293            },
294        },
295    );
296
297    Ok(all_receipts)
298}
299
300#[cfg(test)]
301mod property_tests {
302    use proptest::prelude::*;
303
304    use super::chunk_by_max_concurrent;
305
306    fn names() -> impl Strategy<Value = Vec<String>> {
307        prop::collection::vec("[a-z]{1,8}", 0..64)
308    }
309
310    proptest! {
311        #[test]
312        fn chunking_preserves_order_and_limits_size(items in names(), limit in 0usize..64) {
313            let chunks = chunk_by_max_concurrent(&items, limit);
314            let flattened: Vec<String> = chunks.iter().flatten().cloned().collect();
315
316            prop_assert_eq!(flattened.as_slice(), items.as_slice());
317
318            let max_size = limit.max(1);
319            for chunk in &chunks {
320                prop_assert!(chunk.len() <= max_size);
321            }
322
323            if !flattened.is_empty() {
324                if max_size == 1 {
325                    prop_assert!(chunks.iter().all(|chunk| chunk.len() <= 1));
326                } else {
327                    prop_assert!(chunks.iter().all(|chunk| !chunk.is_empty() && chunk.len() <= max_size));
328                }
329            }
330        }
331    }
332}
333
334#[cfg(test)]
335mod tests;