solverforge_solver/manager/solver_manager/
manager.rs1use std::marker::PhantomData;
2use std::panic::AssertUnwindSafe;
3use std::sync::atomic::Ordering;
4
5use solverforge_core::domain::PlanningSolution;
6use solverforge_core::score::Score;
7use tokio::sync::mpsc;
8
9use super::super::solution_manager::Analyzable;
10use super::runtime::{panic_payload_to_string, SolverRuntime};
11use super::slot::{JobSlot, SLOT_FREE, SLOT_SOLVING};
12use super::types::{
13 SolverEvent, SolverLifecycleState, SolverManagerError, SolverSnapshot, SolverSnapshotAnalysis,
14 SolverStatus,
15};
16
17pub const MAX_JOBS: usize = 16;
19
20pub trait Solvable: PlanningSolution + Send + 'static {
22 fn solve(self, runtime: SolverRuntime<Self>);
23}
24
25pub struct SolverManager<S: Solvable> {
27 slots: [JobSlot<S>; MAX_JOBS],
28 _phantom: PhantomData<fn() -> S>,
29}
30
31impl<S: Solvable> Default for SolverManager<S> {
32 fn default() -> Self {
33 Self::new()
34 }
35}
36
37impl<S: Solvable> SolverManager<S>
38where
39 S::Score: Score,
40{
41 pub const fn new() -> Self {
42 Self {
43 slots: [
44 JobSlot::new(),
45 JobSlot::new(),
46 JobSlot::new(),
47 JobSlot::new(),
48 JobSlot::new(),
49 JobSlot::new(),
50 JobSlot::new(),
51 JobSlot::new(),
52 JobSlot::new(),
53 JobSlot::new(),
54 JobSlot::new(),
55 JobSlot::new(),
56 JobSlot::new(),
57 JobSlot::new(),
58 JobSlot::new(),
59 JobSlot::new(),
60 ],
61 _phantom: PhantomData,
62 }
63 }
64
65 pub fn solve(
66 &'static self,
67 solution: S,
68 ) -> Result<(usize, mpsc::UnboundedReceiver<SolverEvent<S>>), SolverManagerError> {
69 let (sender, receiver) = mpsc::unbounded_channel();
70
71 let Some(slot_idx) = self.slots.iter().position(|slot| {
72 slot.state
73 .compare_exchange(SLOT_FREE, SLOT_SOLVING, Ordering::SeqCst, Ordering::SeqCst)
74 .is_ok()
75 }) else {
76 return Err(SolverManagerError::NoFreeJobSlots);
77 };
78
79 let slot = &self.slots[slot_idx];
80 slot.initialize(sender);
81 let runtime = SolverRuntime::new(slot_idx, slot);
82
83 rayon::spawn(move || {
84 let result = std::panic::catch_unwind(AssertUnwindSafe(|| solution.solve(runtime)));
85
86 match result {
87 Ok(()) => {
88 if !runtime.is_terminal() {
89 if runtime.is_cancel_requested() {
90 let (current_score, best_score, telemetry) = {
91 let record = runtime.slot.record.lock().unwrap();
92 (
93 record.current_score,
94 record.best_score,
95 record.telemetry.clone(),
96 )
97 };
98 runtime.emit_cancelled(current_score, best_score, telemetry);
99 } else {
100 runtime.emit_failed(
101 "solver returned without emitting a terminal lifecycle event"
102 .to_string(),
103 );
104 }
105 }
106 }
107 Err(payload) => {
108 runtime.emit_failed(panic_payload_to_string(payload));
109 }
110 }
111
112 runtime.slot.worker_exited();
113 });
114
115 Ok((slot_idx, receiver))
116 }
117
118 pub fn get_status(&self, job_id: usize) -> Result<SolverStatus<S::Score>, SolverManagerError> {
119 let slot = self.slot(job_id)?;
120 let state = slot
121 .public_state()
122 .ok_or(SolverManagerError::JobNotFound { job_id })?;
123 let record = slot.record.lock().unwrap();
124 Ok(record.status(job_id, state))
125 }
126
127 pub fn pause(&self, job_id: usize) -> Result<(), SolverManagerError> {
128 let slot = self.slot(job_id)?;
129 let paused = slot.with_publication(|sender, record| {
130 match slot.state.compare_exchange(
131 SLOT_SOLVING,
132 super::slot::SLOT_PAUSE_REQUESTED,
133 Ordering::SeqCst,
134 Ordering::SeqCst,
135 ) {
136 Ok(_) => {
137 slot.pause_requested.store(true, Ordering::SeqCst);
138 let metadata =
139 record.next_metadata(job_id, SolverLifecycleState::PauseRequested, None);
140 if let Some(sender) = sender {
141 let _ = sender.send(SolverEvent::PauseRequested { metadata });
142 }
143 true
144 }
145 Err(_) => false,
146 }
147 });
148 if paused {
149 Ok(())
150 } else {
151 let state = slot
152 .public_state()
153 .ok_or(SolverManagerError::JobNotFound { job_id })?;
154 Err(SolverManagerError::InvalidStateTransition {
155 job_id,
156 action: "pause",
157 state,
158 })
159 }
160 }
161
162 pub fn resume(&self, job_id: usize) -> Result<(), SolverManagerError> {
163 let slot = self.slot(job_id)?;
164 let state = slot
165 .public_state()
166 .ok_or(SolverManagerError::JobNotFound { job_id })?;
167 if state != SolverLifecycleState::Paused {
168 return Err(SolverManagerError::InvalidStateTransition {
169 job_id,
170 action: "resume",
171 state,
172 });
173 }
174
175 slot.pause_requested.store(false, Ordering::SeqCst);
176 slot.pause_condvar.notify_one();
177 Ok(())
178 }
179
180 pub fn cancel(&self, job_id: usize) -> Result<(), SolverManagerError> {
181 let slot = self.slot(job_id)?;
182 let state = slot
183 .public_state()
184 .ok_or(SolverManagerError::JobNotFound { job_id })?;
185 if !matches!(
186 state,
187 SolverLifecycleState::Solving
188 | SolverLifecycleState::PauseRequested
189 | SolverLifecycleState::Paused
190 ) {
191 return Err(SolverManagerError::InvalidStateTransition {
192 job_id,
193 action: "cancel",
194 state,
195 });
196 }
197
198 slot.terminate.store(true, Ordering::SeqCst);
199 slot.pause_requested.store(false, Ordering::SeqCst);
200 slot.pause_condvar.notify_one();
201 Ok(())
202 }
203
204 pub fn delete(&self, job_id: usize) -> Result<(), SolverManagerError> {
205 let slot = self.slot(job_id)?;
206 let state = slot
207 .public_state()
208 .ok_or(SolverManagerError::JobNotFound { job_id })?;
209 if !state.is_terminal() {
210 return Err(SolverManagerError::InvalidStateTransition {
211 job_id,
212 action: "delete",
213 state,
214 });
215 }
216
217 slot.mark_deleted();
218 slot.try_reset_deleted();
219 Ok(())
220 }
221
222 pub fn get_snapshot(
223 &self,
224 job_id: usize,
225 snapshot_revision: Option<u64>,
226 ) -> Result<SolverSnapshot<S>, SolverManagerError> {
227 let slot = self.slot(job_id)?;
228 if slot.public_state().is_none() {
229 return Err(SolverManagerError::JobNotFound { job_id });
230 }
231
232 let record = slot.record.lock().unwrap();
233 if record.snapshots.is_empty() {
234 return Err(SolverManagerError::NoSnapshotAvailable { job_id });
235 }
236
237 match snapshot_revision {
238 Some(revision) => record
239 .snapshots
240 .iter()
241 .find(|snapshot| snapshot.snapshot_revision == revision)
242 .cloned()
243 .ok_or(SolverManagerError::SnapshotNotFound {
244 job_id,
245 snapshot_revision: revision,
246 }),
247 None => Ok(record
248 .snapshots
249 .last()
250 .expect("checked non-empty snapshots")
251 .clone()),
252 }
253 }
254
255 pub fn analyze_snapshot(
256 &self,
257 job_id: usize,
258 snapshot_revision: Option<u64>,
259 ) -> Result<SolverSnapshotAnalysis<S::Score>, SolverManagerError>
260 where
261 S: Analyzable,
262 {
263 let snapshot = self.get_snapshot(job_id, snapshot_revision)?;
264 Ok(SolverSnapshotAnalysis {
265 job_id,
266 lifecycle_state: snapshot.lifecycle_state,
267 terminal_reason: snapshot.terminal_reason,
268 snapshot_revision: snapshot.snapshot_revision,
269 analysis: snapshot.solution.analyze(),
270 })
271 }
272
273 pub fn active_job_count(&self) -> usize {
274 self.slots
275 .iter()
276 .filter(|slot| slot.public_state().is_some())
277 .count()
278 }
279
280 #[cfg(test)]
281 pub(crate) fn slot_is_free_for_test(&self, job_id: usize) -> bool {
282 self.slots
283 .get(job_id)
284 .is_some_and(|slot| slot.state.load(Ordering::Acquire) == SLOT_FREE)
285 }
286
287 fn slot(&self, job_id: usize) -> Result<&JobSlot<S>, SolverManagerError> {
288 self.slots
289 .get(job_id)
290 .ok_or(SolverManagerError::JobNotFound { job_id })
291 }
292}