Skip to main content

solverforge_solver/manager/solver_manager/
manager.rs

1use std::marker::PhantomData;
2use std::panic::AssertUnwindSafe;
3use std::sync::atomic::Ordering;
4
5use solverforge_core::domain::PlanningSolution;
6use solverforge_core::score::Score;
7use tokio::sync::mpsc;
8
9use super::super::solution_manager::Analyzable;
10use super::runtime::{panic_payload_to_string, SolverRuntime};
11use super::slot::{JobSlot, SLOT_FREE, SLOT_SOLVING};
12use super::types::{
13    SolverEvent, SolverLifecycleState, SolverManagerError, SolverSnapshot, SolverSnapshotAnalysis,
14    SolverStatus,
15};
16
17/// Maximum concurrent jobs per SolverManager instance.
18pub const MAX_JOBS: usize = 16;
19
20/// Trait for solutions that can run inside the retained lifecycle manager.
21pub trait Solvable: PlanningSolution + Send + 'static {
22    fn solve(self, runtime: SolverRuntime<Self>);
23}
24
25/// Manages retained async solve jobs with lifecycle-complete event streaming.
26pub struct SolverManager<S: Solvable> {
27    slots: [JobSlot<S>; MAX_JOBS],
28    _phantom: PhantomData<fn() -> S>,
29}
30
31impl<S: Solvable> Default for SolverManager<S> {
32    fn default() -> Self {
33        Self::new()
34    }
35}
36
37impl<S: Solvable> SolverManager<S>
38where
39    S::Score: Score,
40{
41    pub const fn new() -> Self {
42        Self {
43            slots: [
44                JobSlot::new(),
45                JobSlot::new(),
46                JobSlot::new(),
47                JobSlot::new(),
48                JobSlot::new(),
49                JobSlot::new(),
50                JobSlot::new(),
51                JobSlot::new(),
52                JobSlot::new(),
53                JobSlot::new(),
54                JobSlot::new(),
55                JobSlot::new(),
56                JobSlot::new(),
57                JobSlot::new(),
58                JobSlot::new(),
59                JobSlot::new(),
60            ],
61            _phantom: PhantomData,
62        }
63    }
64
65    pub fn solve(
66        &'static self,
67        solution: S,
68    ) -> Result<(usize, mpsc::UnboundedReceiver<SolverEvent<S>>), SolverManagerError> {
69        let (sender, receiver) = mpsc::unbounded_channel();
70
71        let Some(slot_idx) = self.slots.iter().position(|slot| {
72            slot.state
73                .compare_exchange(SLOT_FREE, SLOT_SOLVING, Ordering::SeqCst, Ordering::SeqCst)
74                .is_ok()
75        }) else {
76            return Err(SolverManagerError::NoFreeJobSlots);
77        };
78
79        let slot = &self.slots[slot_idx];
80        slot.initialize(sender);
81        let runtime = SolverRuntime::new(slot_idx, slot);
82
83        rayon::spawn(move || {
84            let result = std::panic::catch_unwind(AssertUnwindSafe(|| solution.solve(runtime)));
85
86            match result {
87                Ok(()) => {
88                    if !runtime.is_terminal() {
89                        if runtime.is_cancel_requested() {
90                            let (current_score, best_score, telemetry) = {
91                                let record = runtime.slot.record.lock().unwrap();
92                                (record.current_score, record.best_score, record.telemetry)
93                            };
94                            runtime.emit_cancelled(current_score, best_score, telemetry);
95                        } else {
96                            runtime.emit_failed(
97                                "solver returned without emitting a terminal lifecycle event"
98                                    .to_string(),
99                            );
100                        }
101                    }
102                }
103                Err(payload) => {
104                    runtime.emit_failed(panic_payload_to_string(payload));
105                }
106            }
107
108            runtime.slot.worker_exited();
109        });
110
111        Ok((slot_idx, receiver))
112    }
113
114    pub fn get_status(&self, job_id: usize) -> Result<SolverStatus<S::Score>, SolverManagerError> {
115        let slot = self.slot(job_id)?;
116        let state = slot
117            .public_state()
118            .ok_or(SolverManagerError::JobNotFound { job_id })?;
119        let record = slot.record.lock().unwrap();
120        Ok(record.status(job_id, state))
121    }
122
123    pub fn pause(&self, job_id: usize) -> Result<(), SolverManagerError> {
124        let slot = self.slot(job_id)?;
125        let paused = slot.with_publication(|sender, record| {
126            match slot.state.compare_exchange(
127                SLOT_SOLVING,
128                super::slot::SLOT_PAUSE_REQUESTED,
129                Ordering::SeqCst,
130                Ordering::SeqCst,
131            ) {
132                Ok(_) => {
133                    slot.pause_requested.store(true, Ordering::SeqCst);
134                    let metadata =
135                        record.next_metadata(job_id, SolverLifecycleState::PauseRequested, None);
136                    if let Some(sender) = sender {
137                        let _ = sender.send(SolverEvent::PauseRequested { metadata });
138                    }
139                    true
140                }
141                Err(_) => false,
142            }
143        });
144        if paused {
145            Ok(())
146        } else {
147            let state = slot
148                .public_state()
149                .ok_or(SolverManagerError::JobNotFound { job_id })?;
150            Err(SolverManagerError::InvalidStateTransition {
151                job_id,
152                action: "pause",
153                state,
154            })
155        }
156    }
157
158    pub fn resume(&self, job_id: usize) -> Result<(), SolverManagerError> {
159        let slot = self.slot(job_id)?;
160        let state = slot
161            .public_state()
162            .ok_or(SolverManagerError::JobNotFound { job_id })?;
163        if state != SolverLifecycleState::Paused {
164            return Err(SolverManagerError::InvalidStateTransition {
165                job_id,
166                action: "resume",
167                state,
168            });
169        }
170
171        slot.pause_requested.store(false, Ordering::SeqCst);
172        slot.pause_condvar.notify_one();
173        Ok(())
174    }
175
176    pub fn cancel(&self, job_id: usize) -> Result<(), SolverManagerError> {
177        let slot = self.slot(job_id)?;
178        let state = slot
179            .public_state()
180            .ok_or(SolverManagerError::JobNotFound { job_id })?;
181        if !matches!(
182            state,
183            SolverLifecycleState::Solving
184                | SolverLifecycleState::PauseRequested
185                | SolverLifecycleState::Paused
186        ) {
187            return Err(SolverManagerError::InvalidStateTransition {
188                job_id,
189                action: "cancel",
190                state,
191            });
192        }
193
194        slot.terminate.store(true, Ordering::SeqCst);
195        slot.pause_requested.store(false, Ordering::SeqCst);
196        slot.pause_condvar.notify_one();
197        Ok(())
198    }
199
200    pub fn delete(&self, job_id: usize) -> Result<(), SolverManagerError> {
201        let slot = self.slot(job_id)?;
202        let state = slot
203            .public_state()
204            .ok_or(SolverManagerError::JobNotFound { job_id })?;
205        if !state.is_terminal() {
206            return Err(SolverManagerError::InvalidStateTransition {
207                job_id,
208                action: "delete",
209                state,
210            });
211        }
212
213        slot.mark_deleted();
214        slot.try_reset_deleted();
215        Ok(())
216    }
217
218    pub fn get_snapshot(
219        &self,
220        job_id: usize,
221        snapshot_revision: Option<u64>,
222    ) -> Result<SolverSnapshot<S>, SolverManagerError> {
223        let slot = self.slot(job_id)?;
224        if slot.public_state().is_none() {
225            return Err(SolverManagerError::JobNotFound { job_id });
226        }
227
228        let record = slot.record.lock().unwrap();
229        if record.snapshots.is_empty() {
230            return Err(SolverManagerError::NoSnapshotAvailable { job_id });
231        }
232
233        match snapshot_revision {
234            Some(revision) => record
235                .snapshots
236                .iter()
237                .find(|snapshot| snapshot.snapshot_revision == revision)
238                .cloned()
239                .ok_or(SolverManagerError::SnapshotNotFound {
240                    job_id,
241                    snapshot_revision: revision,
242                }),
243            None => Ok(record
244                .snapshots
245                .last()
246                .expect("checked non-empty snapshots")
247                .clone()),
248        }
249    }
250
251    pub fn analyze_snapshot(
252        &self,
253        job_id: usize,
254        snapshot_revision: Option<u64>,
255    ) -> Result<SolverSnapshotAnalysis<S::Score>, SolverManagerError>
256    where
257        S: Analyzable,
258    {
259        let snapshot = self.get_snapshot(job_id, snapshot_revision)?;
260        Ok(SolverSnapshotAnalysis {
261            job_id,
262            lifecycle_state: snapshot.lifecycle_state,
263            terminal_reason: snapshot.terminal_reason,
264            snapshot_revision: snapshot.snapshot_revision,
265            analysis: snapshot.solution.analyze(),
266        })
267    }
268
269    pub fn active_job_count(&self) -> usize {
270        self.slots
271            .iter()
272            .filter(|slot| slot.public_state().is_some())
273            .count()
274    }
275
276    #[cfg(test)]
277    pub(crate) fn slot_is_free_for_test(&self, job_id: usize) -> bool {
278        self.slots
279            .get(job_id)
280            .is_some_and(|slot| slot.state.load(Ordering::Acquire) == SLOT_FREE)
281    }
282
283    fn slot(&self, job_id: usize) -> Result<&JobSlot<S>, SolverManagerError> {
284        self.slots
285            .get(job_id)
286            .ok_or(SolverManagerError::JobNotFound { job_id })
287    }
288}