solverforge_solver/manager/solver_manager/
runtime.rs1use std::fmt::{self, Debug};
2use std::sync::atomic::{AtomicBool, Ordering};
3
4use solverforge_core::domain::PlanningSolution;
5
6use super::slot::{
7 JobSlot, SLOT_CANCELLED, SLOT_COMPLETED, SLOT_FAILED, SLOT_PAUSED, SLOT_SOLVING,
8};
9use super::types::{SolverEvent, SolverEventMetadata, SolverLifecycleState, SolverTerminalReason};
10use crate::scope::{ProgressCallback, SolverScope};
11use crate::stats::SolverTelemetry;
12
13pub struct SolverRuntime<S: PlanningSolution> {
18 job_id: usize,
19 pub(super) slot: &'static JobSlot<S>,
20}
21
22impl<S: PlanningSolution> Clone for SolverRuntime<S> {
23 fn clone(&self) -> Self {
24 *self
25 }
26}
27
28impl<S: PlanningSolution> Copy for SolverRuntime<S> {}
29
30impl<S: PlanningSolution> Debug for SolverRuntime<S> {
31 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
32 f.debug_struct("SolverRuntime")
33 .field("job_id", &self.job_id)
34 .finish()
35 }
36}
37
38#[derive(Debug, Clone, Copy, PartialEq, Eq)]
39enum EventKind {
40 Progress,
41 Resumed,
42 Cancelled,
43}
44
45impl<S: PlanningSolution> SolverRuntime<S> {
46 pub(super) fn new(job_id: usize, slot: &'static JobSlot<S>) -> Self {
47 Self { job_id, slot }
48 }
49
50 pub fn detached() -> Self {
57 let slot = Box::leak(Box::new(JobSlot::new()));
58 slot.state.store(SLOT_SOLVING, Ordering::Release);
59 slot.worker_running.store(true, Ordering::Release);
60 Self {
61 job_id: usize::MAX,
62 slot,
63 }
64 }
65
66 pub fn job_id(&self) -> usize {
67 self.job_id
68 }
69
70 pub fn is_cancel_requested(&self) -> bool {
71 self.slot.terminate.load(Ordering::Acquire)
72 }
73
74 pub(crate) fn is_pause_requested(&self) -> bool {
75 self.slot.pause_requested.load(Ordering::Acquire)
76 }
77
78 pub(crate) fn cancel_flag(&self) -> &'static AtomicBool {
79 &self.slot.terminate
80 }
81
82 pub fn emit_progress(
83 &self,
84 current_score: Option<S::Score>,
85 best_score: Option<S::Score>,
86 telemetry: SolverTelemetry,
87 ) {
88 let lifecycle_state = self.current_state();
89 self.emit_non_snapshot_event(
90 lifecycle_state,
91 current_score,
92 best_score,
93 telemetry,
94 EventKind::Progress,
95 );
96 }
97
98 pub fn emit_best_solution(
99 &self,
100 solution: S,
101 current_score: Option<S::Score>,
102 best_score: S::Score,
103 telemetry: SolverTelemetry,
104 ) {
105 let state = self.current_state();
106 self.slot.with_publication(|sender, record| {
107 let terminal_reason = record.terminal_reason;
108 record.current_score = current_score;
109 record.best_score = Some(best_score);
110 record.telemetry = telemetry.clone();
111
112 let snapshot_revision = record.push_snapshot(super::types::SolverSnapshot {
113 job_id: self.job_id,
114 snapshot_revision: 0,
115 lifecycle_state: state,
116 terminal_reason,
117 current_score,
118 best_score: Some(best_score),
119 telemetry: telemetry.clone(),
120 solution: solution.clone(),
121 });
122
123 let metadata = record.next_metadata(self.job_id, state, Some(snapshot_revision));
124 if let Some(sender) = sender {
125 let _ = sender.send(SolverEvent::BestSolution { metadata, solution });
126 }
127 });
128 }
129
130 pub fn emit_completed(
131 &self,
132 solution: S,
133 current_score: Option<S::Score>,
134 best_score: S::Score,
135 telemetry: SolverTelemetry,
136 terminal_reason: SolverTerminalReason,
137 ) {
138 self.slot.with_publication(|sender, record| {
139 self.slot.state.store(SLOT_COMPLETED, Ordering::SeqCst);
140 record.terminal_reason = Some(terminal_reason);
141 record.checkpoint_available = false;
142 record.current_score = current_score;
143 record.best_score = Some(best_score);
144 record.telemetry = telemetry.clone();
145
146 let snapshot_revision = record.push_snapshot(super::types::SolverSnapshot {
147 job_id: self.job_id,
148 snapshot_revision: 0,
149 lifecycle_state: SolverLifecycleState::Completed,
150 terminal_reason: Some(terminal_reason),
151 current_score,
152 best_score: Some(best_score),
153 telemetry: telemetry.clone(),
154 solution: solution.clone(),
155 });
156
157 let metadata = record.next_metadata(
158 self.job_id,
159 SolverLifecycleState::Completed,
160 Some(snapshot_revision),
161 );
162 if let Some(sender) = sender {
163 let _ = sender.send(SolverEvent::Completed { metadata, solution });
164 }
165 });
166 }
167
168 pub fn emit_cancelled(
169 &self,
170 current_score: Option<S::Score>,
171 best_score: Option<S::Score>,
172 telemetry: SolverTelemetry,
173 ) {
174 self.emit_non_snapshot_terminal_event(
175 SolverLifecycleState::Cancelled,
176 SolverTerminalReason::Cancelled,
177 current_score,
178 best_score,
179 telemetry,
180 EventKind::Cancelled,
181 );
182 }
183
184 pub fn emit_failed(&self, error: String) {
185 if matches!(
186 self.current_state(),
187 SolverLifecycleState::Completed
188 | SolverLifecycleState::Cancelled
189 | SolverLifecycleState::Failed
190 ) {
191 return;
192 }
193
194 self.slot.with_publication(|sender, record| {
195 self.slot.state.store(SLOT_FAILED, Ordering::SeqCst);
196 record.terminal_reason = Some(SolverTerminalReason::Failed);
197 record.checkpoint_available = false;
198 record.failure_message = Some(error.clone());
199 let telemetry = record.telemetry.clone();
200 let metadata = record.next_metadata(self.job_id, SolverLifecycleState::Failed, None);
201 if let Some(sender) = sender {
202 let _ = sender.send(SolverEvent::Failed {
203 metadata: SolverEventMetadata {
204 telemetry,
205 ..metadata
206 },
207 error,
208 });
209 }
210 });
211 }
212
213 pub(crate) fn pause_if_requested<D, ProgressCb>(
214 &self,
215 solver_scope: &mut SolverScope<'_, S, D, ProgressCb>,
216 ) where
217 D: solverforge_scoring::Director<S>,
218 ProgressCb: ProgressCallback<S>,
219 {
220 if !self.slot.pause_requested.load(Ordering::Acquire) || self.is_cancel_requested() {
221 return;
222 }
223
224 solver_scope.pause_timers();
225
226 let solution = solver_scope.score_director().clone_working_solution();
227 let current_score = solver_scope.current_score().copied();
228 let best_score = solver_scope.best_score().copied();
229 let telemetry = solver_scope.stats().snapshot();
230 let _ = self.pause_with_snapshot(solution, current_score, best_score, telemetry);
231 solver_scope.resume_timers();
232 }
233
234 pub fn pause_with_snapshot(
235 &self,
236 solution: S,
237 current_score: Option<S::Score>,
238 best_score: Option<S::Score>,
239 telemetry: SolverTelemetry,
240 ) -> bool {
241 if !self.slot.pause_requested.load(Ordering::Acquire) || self.is_cancel_requested() {
242 return false;
243 }
244
245 self.slot.with_publication(|sender, record| {
246 self.slot.state.store(SLOT_PAUSED, Ordering::SeqCst);
247 let terminal_reason = record.terminal_reason;
248 record.checkpoint_available = true;
249 record.current_score = current_score;
250 record.best_score = best_score;
251 record.telemetry = telemetry.clone();
252
253 let snapshot_revision = record.push_snapshot(super::types::SolverSnapshot {
254 job_id: self.job_id,
255 snapshot_revision: 0,
256 lifecycle_state: SolverLifecycleState::Paused,
257 terminal_reason,
258 current_score,
259 best_score,
260 telemetry: telemetry.clone(),
261 solution,
262 });
263
264 let metadata = record.next_metadata(
265 self.job_id,
266 SolverLifecycleState::Paused,
267 Some(snapshot_revision),
268 );
269 if let Some(sender) = sender {
270 let _ = sender.send(SolverEvent::Paused { metadata });
271 }
272 });
273
274 let mut guard = self.slot.pause_gate.lock().unwrap();
275 while self.slot.pause_requested.load(Ordering::Acquire) && !self.is_cancel_requested() {
276 guard = self.slot.pause_condvar.wait(guard).unwrap();
277 }
278 drop(guard);
279
280 if self.is_cancel_requested() {
281 return false;
282 }
283
284 self.slot.state.store(SLOT_SOLVING, Ordering::SeqCst);
285 self.emit_non_snapshot_event(
286 SolverLifecycleState::Solving,
287 current_score,
288 best_score,
289 telemetry,
290 EventKind::Resumed,
291 );
292 true
293 }
294
295 pub(crate) fn is_terminal(&self) -> bool {
296 self.current_state().is_terminal()
297 }
298
299 fn current_state(&self) -> SolverLifecycleState {
300 self.slot
301 .raw_state()
302 .expect("runtime accessed a freed job slot")
303 }
304
305 fn emit_non_snapshot_event(
306 &self,
307 lifecycle_state: SolverLifecycleState,
308 current_score: Option<S::Score>,
309 best_score: Option<S::Score>,
310 telemetry: SolverTelemetry,
311 kind: EventKind,
312 ) {
313 self.slot.with_publication(|sender, record| {
314 record.current_score = current_score;
315 record.best_score = best_score;
316 record.telemetry = telemetry.clone();
317 if lifecycle_state != SolverLifecycleState::Paused {
318 record.checkpoint_available = false;
319 }
320 let metadata = record.next_metadata(self.job_id, lifecycle_state, None);
321 let event = match kind {
322 EventKind::Progress => SolverEvent::Progress { metadata },
323 EventKind::Resumed => SolverEvent::Resumed { metadata },
324 EventKind::Cancelled => unreachable!(),
325 };
326 if let Some(sender) = sender {
327 let _ = sender.send(event);
328 }
329 });
330 }
331
332 fn emit_non_snapshot_terminal_event(
333 &self,
334 lifecycle_state: SolverLifecycleState,
335 terminal_reason: SolverTerminalReason,
336 current_score: Option<S::Score>,
337 best_score: Option<S::Score>,
338 telemetry: SolverTelemetry,
339 kind: EventKind,
340 ) {
341 self.slot.with_publication(|sender, record| {
342 match lifecycle_state {
343 SolverLifecycleState::Cancelled => {
344 self.slot.state.store(SLOT_CANCELLED, Ordering::SeqCst);
345 }
346 SolverLifecycleState::Failed => {
347 self.slot.state.store(SLOT_FAILED, Ordering::SeqCst);
348 }
349 _ => {}
350 }
351 record.terminal_reason = Some(terminal_reason);
352 record.checkpoint_available = false;
353 record.current_score = current_score;
354 record.best_score = best_score;
355 record.telemetry = telemetry.clone();
356 let metadata = record.next_metadata(self.job_id, lifecycle_state, None);
357 let event = match kind {
358 EventKind::Cancelled => SolverEvent::Cancelled { metadata },
359 EventKind::Progress | EventKind::Resumed => unreachable!(),
360 };
361 if let Some(sender) = sender {
362 let _ = sender.send(event);
363 }
364 });
365 }
366}
367
368pub(super) fn panic_payload_to_string(payload: Box<dyn std::any::Any + Send>) -> String {
369 if let Some(message) = payload.downcast_ref::<&'static str>() {
370 (*message).to_string()
371 } else if let Some(message) = payload.downcast_ref::<String>() {
372 message.clone()
373 } else {
374 "solver panicked with a non-string payload".to_string()
375 }
376}