solverforge_solver/manager/solver_manager/
manager.rs1use std::marker::PhantomData;
2use std::panic::AssertUnwindSafe;
3use std::sync::atomic::Ordering;
4
5use solverforge_core::domain::PlanningSolution;
6use solverforge_core::score::Score;
7use tokio::sync::mpsc;
8
9use super::super::solution_manager::Analyzable;
10use super::runtime::{panic_payload_to_string, SolverRuntime};
11use super::slot::{JobSlot, SLOT_FREE, SLOT_SOLVING};
12use super::types::{
13 SolverEvent, SolverLifecycleState, SolverManagerError, SolverSnapshot, SolverSnapshotAnalysis,
14 SolverStatus,
15};
16
17pub const MAX_JOBS: usize = 16;
19
20pub trait Solvable: PlanningSolution + Send + 'static {
22 fn solve(self, runtime: SolverRuntime<Self>);
23}
24
25pub struct SolverManager<S: Solvable> {
27 slots: [JobSlot<S>; MAX_JOBS],
28 _phantom: PhantomData<fn() -> S>,
29}
30
31impl<S: Solvable> Default for SolverManager<S> {
32 fn default() -> Self {
33 Self::new()
34 }
35}
36
37impl<S: Solvable> SolverManager<S>
38where
39 S::Score: Score,
40{
41 pub const fn new() -> Self {
42 Self {
43 slots: [
44 JobSlot::new(),
45 JobSlot::new(),
46 JobSlot::new(),
47 JobSlot::new(),
48 JobSlot::new(),
49 JobSlot::new(),
50 JobSlot::new(),
51 JobSlot::new(),
52 JobSlot::new(),
53 JobSlot::new(),
54 JobSlot::new(),
55 JobSlot::new(),
56 JobSlot::new(),
57 JobSlot::new(),
58 JobSlot::new(),
59 JobSlot::new(),
60 ],
61 _phantom: PhantomData,
62 }
63 }
64
65 pub fn solve(
66 &'static self,
67 solution: S,
68 ) -> Result<(usize, mpsc::UnboundedReceiver<SolverEvent<S>>), SolverManagerError> {
69 let (sender, receiver) = mpsc::unbounded_channel();
70
71 let Some(slot_idx) = self.slots.iter().position(|slot| {
72 slot.state
73 .compare_exchange(SLOT_FREE, SLOT_SOLVING, Ordering::SeqCst, Ordering::SeqCst)
74 .is_ok()
75 }) else {
76 return Err(SolverManagerError::NoFreeJobSlots);
77 };
78
79 let slot = &self.slots[slot_idx];
80 slot.initialize(sender);
81 let runtime = SolverRuntime::new(slot_idx, slot);
82
83 rayon::spawn(move || {
84 let result = std::panic::catch_unwind(AssertUnwindSafe(|| solution.solve(runtime)));
85
86 match result {
87 Ok(()) => {
88 if !runtime.is_terminal() {
89 if runtime.is_cancel_requested() {
90 let (current_score, best_score, telemetry) = {
91 let record = runtime.slot.record.lock().unwrap();
92 (record.current_score, record.best_score, record.telemetry)
93 };
94 runtime.emit_cancelled(current_score, best_score, telemetry);
95 } else {
96 runtime.emit_failed(
97 "solver returned without emitting a terminal lifecycle event"
98 .to_string(),
99 );
100 }
101 }
102 }
103 Err(payload) => {
104 runtime.emit_failed(panic_payload_to_string(payload));
105 }
106 }
107
108 runtime.slot.worker_exited();
109 });
110
111 Ok((slot_idx, receiver))
112 }
113
114 pub fn get_status(&self, job_id: usize) -> Result<SolverStatus<S::Score>, SolverManagerError> {
115 let slot = self.slot(job_id)?;
116 let state = slot
117 .public_state()
118 .ok_or(SolverManagerError::JobNotFound { job_id })?;
119 let record = slot.record.lock().unwrap();
120 Ok(record.status(job_id, state))
121 }
122
123 pub fn pause(&self, job_id: usize) -> Result<(), SolverManagerError> {
124 let slot = self.slot(job_id)?;
125 let paused = slot.with_publication(|sender, record| {
126 match slot.state.compare_exchange(
127 SLOT_SOLVING,
128 super::slot::SLOT_PAUSE_REQUESTED,
129 Ordering::SeqCst,
130 Ordering::SeqCst,
131 ) {
132 Ok(_) => {
133 slot.pause_requested.store(true, Ordering::SeqCst);
134 let metadata =
135 record.next_metadata(job_id, SolverLifecycleState::PauseRequested, None);
136 if let Some(sender) = sender {
137 let _ = sender.send(SolverEvent::PauseRequested { metadata });
138 }
139 true
140 }
141 Err(_) => false,
142 }
143 });
144 if paused {
145 Ok(())
146 } else {
147 let state = slot
148 .public_state()
149 .ok_or(SolverManagerError::JobNotFound { job_id })?;
150 Err(SolverManagerError::InvalidStateTransition {
151 job_id,
152 action: "pause",
153 state,
154 })
155 }
156 }
157
158 pub fn resume(&self, job_id: usize) -> Result<(), SolverManagerError> {
159 let slot = self.slot(job_id)?;
160 let state = slot
161 .public_state()
162 .ok_or(SolverManagerError::JobNotFound { job_id })?;
163 if state != SolverLifecycleState::Paused {
164 return Err(SolverManagerError::InvalidStateTransition {
165 job_id,
166 action: "resume",
167 state,
168 });
169 }
170
171 slot.pause_requested.store(false, Ordering::SeqCst);
172 slot.pause_condvar.notify_one();
173 Ok(())
174 }
175
176 pub fn cancel(&self, job_id: usize) -> Result<(), SolverManagerError> {
177 let slot = self.slot(job_id)?;
178 let state = slot
179 .public_state()
180 .ok_or(SolverManagerError::JobNotFound { job_id })?;
181 if !matches!(
182 state,
183 SolverLifecycleState::Solving
184 | SolverLifecycleState::PauseRequested
185 | SolverLifecycleState::Paused
186 ) {
187 return Err(SolverManagerError::InvalidStateTransition {
188 job_id,
189 action: "cancel",
190 state,
191 });
192 }
193
194 slot.terminate.store(true, Ordering::SeqCst);
195 slot.pause_requested.store(false, Ordering::SeqCst);
196 slot.pause_condvar.notify_one();
197 Ok(())
198 }
199
200 pub fn delete(&self, job_id: usize) -> Result<(), SolverManagerError> {
201 let slot = self.slot(job_id)?;
202 let state = slot
203 .public_state()
204 .ok_or(SolverManagerError::JobNotFound { job_id })?;
205 if !state.is_terminal() {
206 return Err(SolverManagerError::InvalidStateTransition {
207 job_id,
208 action: "delete",
209 state,
210 });
211 }
212
213 slot.mark_deleted();
214 slot.try_reset_deleted();
215 Ok(())
216 }
217
218 pub fn get_snapshot(
219 &self,
220 job_id: usize,
221 snapshot_revision: Option<u64>,
222 ) -> Result<SolverSnapshot<S>, SolverManagerError> {
223 let slot = self.slot(job_id)?;
224 if slot.public_state().is_none() {
225 return Err(SolverManagerError::JobNotFound { job_id });
226 }
227
228 let record = slot.record.lock().unwrap();
229 if record.snapshots.is_empty() {
230 return Err(SolverManagerError::NoSnapshotAvailable { job_id });
231 }
232
233 match snapshot_revision {
234 Some(revision) => record
235 .snapshots
236 .iter()
237 .find(|snapshot| snapshot.snapshot_revision == revision)
238 .cloned()
239 .ok_or(SolverManagerError::SnapshotNotFound {
240 job_id,
241 snapshot_revision: revision,
242 }),
243 None => Ok(record
244 .snapshots
245 .last()
246 .expect("checked non-empty snapshots")
247 .clone()),
248 }
249 }
250
251 pub fn analyze_snapshot(
252 &self,
253 job_id: usize,
254 snapshot_revision: Option<u64>,
255 ) -> Result<SolverSnapshotAnalysis<S::Score>, SolverManagerError>
256 where
257 S: Analyzable,
258 {
259 let snapshot = self.get_snapshot(job_id, snapshot_revision)?;
260 Ok(SolverSnapshotAnalysis {
261 job_id,
262 lifecycle_state: snapshot.lifecycle_state,
263 terminal_reason: snapshot.terminal_reason,
264 snapshot_revision: snapshot.snapshot_revision,
265 analysis: snapshot.solution.analyze(),
266 })
267 }
268
269 pub fn active_job_count(&self) -> usize {
270 self.slots
271 .iter()
272 .filter(|slot| slot.public_state().is_some())
273 .count()
274 }
275
276 #[cfg(test)]
277 pub(crate) fn slot_is_free_for_test(&self, job_id: usize) -> bool {
278 self.slots
279 .get(job_id)
280 .is_some_and(|slot| slot.state.load(Ordering::Acquire) == SLOT_FREE)
281 }
282
283 fn slot(&self, job_id: usize) -> Result<&JobSlot<S>, SolverManagerError> {
284 self.slots
285 .get(job_id)
286 .ok_or(SolverManagerError::JobNotFound { job_id })
287 }
288}