1use std::error::Error;
12use std::fmt::{self, Debug, Display};
13use std::panic::AssertUnwindSafe;
14use std::sync::atomic::{AtomicBool, AtomicU8, Ordering};
15use std::sync::{Condvar, Mutex};
16
17use solverforge_core::domain::PlanningSolution;
18use solverforge_core::score::Score;
19use tokio::sync::mpsc;
20
21use super::solution_manager::{Analyzable, ScoreAnalysis};
22use crate::scope::{ProgressCallback, SolverScope};
23use crate::stats::SolverTelemetry;
24
25pub const MAX_JOBS: usize = 16;
27
28const SLOT_FREE: u8 = 0;
29const SLOT_SOLVING: u8 = 1;
30const SLOT_PAUSE_REQUESTED: u8 = 2;
31const SLOT_PAUSED: u8 = 3;
32const SLOT_COMPLETED: u8 = 4;
33const SLOT_CANCELLED: u8 = 5;
34const SLOT_FAILED: u8 = 6;
35
36const SLOT_VISIBLE: u8 = 0;
37const SLOT_DELETED: u8 = 1;
38const SLOT_DELETING: u8 = 2;
39
40#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
41#[serde(rename_all = "SCREAMING_SNAKE_CASE")]
42pub enum SolverLifecycleState {
43 Solving,
44 PauseRequested,
45 Paused,
46 Completed,
47 Cancelled,
48 Failed,
49}
50
51impl SolverLifecycleState {
52 pub fn is_terminal(self) -> bool {
53 matches!(self, Self::Completed | Self::Cancelled | Self::Failed)
54 }
55}
56
57#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
58#[serde(rename_all = "snake_case")]
59pub enum SolverTerminalReason {
60 Completed,
61 TerminatedByConfig,
62 Cancelled,
63 Failed,
64}
65
66#[derive(Debug, Clone, PartialEq)]
67pub struct SolverStatus<Sc: Score> {
68 pub job_id: usize,
69 pub lifecycle_state: SolverLifecycleState,
70 pub terminal_reason: Option<SolverTerminalReason>,
71 pub checkpoint_available: bool,
72 pub event_sequence: u64,
73 pub latest_snapshot_revision: Option<u64>,
74 pub current_score: Option<Sc>,
75 pub best_score: Option<Sc>,
76 pub telemetry: SolverTelemetry,
77}
78
79impl<Sc: Score> SolverStatus<Sc> {
80 pub fn is_terminal(&self) -> bool {
81 self.lifecycle_state.is_terminal()
82 }
83}
84
85#[derive(Debug, Clone, PartialEq)]
86pub struct SolverEventMetadata<Sc: Score> {
87 pub job_id: usize,
88 pub event_sequence: u64,
89 pub lifecycle_state: SolverLifecycleState,
90 pub terminal_reason: Option<SolverTerminalReason>,
91 pub telemetry: SolverTelemetry,
92 pub current_score: Option<Sc>,
93 pub best_score: Option<Sc>,
94 pub snapshot_revision: Option<u64>,
95}
96
97#[derive(Debug, Clone)]
98pub struct SolverSnapshot<S: PlanningSolution> {
99 pub job_id: usize,
100 pub snapshot_revision: u64,
101 pub lifecycle_state: SolverLifecycleState,
102 pub terminal_reason: Option<SolverTerminalReason>,
103 pub current_score: Option<S::Score>,
104 pub best_score: Option<S::Score>,
105 pub telemetry: SolverTelemetry,
106 pub solution: S,
107}
108
109#[derive(Debug, Clone)]
110pub struct SolverSnapshotAnalysis<Sc: Score> {
111 pub job_id: usize,
112 pub lifecycle_state: SolverLifecycleState,
113 pub terminal_reason: Option<SolverTerminalReason>,
114 pub snapshot_revision: u64,
115 pub analysis: ScoreAnalysis<Sc>,
116}
117
118#[derive(Debug, Clone)]
119pub enum SolverEvent<S: PlanningSolution> {
120 Progress {
121 metadata: SolverEventMetadata<S::Score>,
122 },
123 BestSolution {
124 metadata: SolverEventMetadata<S::Score>,
125 solution: S,
126 },
127 PauseRequested {
128 metadata: SolverEventMetadata<S::Score>,
129 },
130 Paused {
131 metadata: SolverEventMetadata<S::Score>,
132 },
133 Resumed {
134 metadata: SolverEventMetadata<S::Score>,
135 },
136 Completed {
137 metadata: SolverEventMetadata<S::Score>,
138 solution: S,
139 },
140 Cancelled {
141 metadata: SolverEventMetadata<S::Score>,
142 },
143 Failed {
144 metadata: SolverEventMetadata<S::Score>,
145 error: String,
146 },
147}
148
149impl<S: PlanningSolution> SolverEvent<S> {
150 pub fn metadata(&self) -> &SolverEventMetadata<S::Score> {
151 match self {
152 Self::Progress { metadata }
153 | Self::PauseRequested { metadata }
154 | Self::Paused { metadata }
155 | Self::Resumed { metadata }
156 | Self::Cancelled { metadata } => metadata,
157 Self::BestSolution { metadata, .. }
158 | Self::Completed { metadata, .. }
159 | Self::Failed { metadata, .. } => metadata,
160 }
161 }
162}
163
164#[derive(Debug, Clone, PartialEq, Eq)]
165pub enum SolverManagerError {
166 NoFreeJobSlots,
167 JobNotFound {
168 job_id: usize,
169 },
170 InvalidStateTransition {
171 job_id: usize,
172 action: &'static str,
173 state: SolverLifecycleState,
174 },
175 NoSnapshotAvailable {
176 job_id: usize,
177 },
178 SnapshotNotFound {
179 job_id: usize,
180 snapshot_revision: u64,
181 },
182}
183
184impl Display for SolverManagerError {
185 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
186 match self {
187 Self::NoFreeJobSlots => write!(f, "no free job slots available"),
188 Self::JobNotFound { job_id } => write!(f, "job {job_id} was not found"),
189 Self::InvalidStateTransition {
190 job_id,
191 action,
192 state,
193 } => write!(
194 f,
195 "cannot {action} job {job_id} while it is in state {state:?}"
196 ),
197 Self::NoSnapshotAvailable { job_id } => {
198 write!(f, "job {job_id} has no retained snapshots")
199 }
200 Self::SnapshotNotFound {
201 job_id,
202 snapshot_revision,
203 } => write!(
204 f,
205 "job {job_id} has no retained snapshot revision {snapshot_revision}"
206 ),
207 }
208 }
209}
210
211impl Error for SolverManagerError {}
212
213pub struct SolverRuntime<S: PlanningSolution> {
218 job_id: usize,
219 slot: &'static JobSlot<S>,
220}
221
222impl<S: PlanningSolution> Clone for SolverRuntime<S> {
223 fn clone(&self) -> Self {
224 *self
225 }
226}
227
228impl<S: PlanningSolution> Copy for SolverRuntime<S> {}
229
230impl<S: PlanningSolution> Debug for SolverRuntime<S> {
231 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
232 f.debug_struct("SolverRuntime")
233 .field("job_id", &self.job_id)
234 .finish()
235 }
236}
237
238impl<S: PlanningSolution> SolverRuntime<S> {
239 fn new(job_id: usize, slot: &'static JobSlot<S>) -> Self {
240 Self { job_id, slot }
241 }
242
243 pub fn job_id(&self) -> usize {
244 self.job_id
245 }
246
247 pub fn is_cancel_requested(&self) -> bool {
248 self.slot.terminate.load(Ordering::Acquire)
249 }
250
251 pub(crate) fn cancel_flag(&self) -> &'static AtomicBool {
252 &self.slot.terminate
253 }
254
255 pub fn emit_progress(
256 &self,
257 current_score: Option<S::Score>,
258 best_score: Option<S::Score>,
259 telemetry: SolverTelemetry,
260 ) {
261 let lifecycle_state = self.current_state();
262 self.emit_non_snapshot_event(
263 lifecycle_state,
264 current_score,
265 best_score,
266 telemetry,
267 EventKind::Progress,
268 );
269 }
270
271 pub fn emit_best_solution(
272 &self,
273 solution: S,
274 current_score: Option<S::Score>,
275 best_score: S::Score,
276 telemetry: SolverTelemetry,
277 ) {
278 let state = self.current_state();
279 let (sender, event) = {
280 let mut record = self.slot.record.lock().unwrap();
281 let terminal_reason = record.terminal_reason;
282 record.current_score = current_score;
283 record.best_score = Some(best_score);
284 record.telemetry = telemetry;
285
286 let snapshot_revision = record.push_snapshot(SolverSnapshot {
287 job_id: self.job_id,
288 snapshot_revision: 0,
289 lifecycle_state: state,
290 terminal_reason,
291 current_score,
292 best_score: Some(best_score),
293 telemetry,
294 solution: solution.clone(),
295 });
296
297 let metadata = record.next_metadata(self.job_id, state, Some(snapshot_revision));
298 let sender = self.slot.sender_clone();
299 (sender, SolverEvent::BestSolution { metadata, solution })
300 };
301
302 if let Some(sender) = sender {
303 let _ = sender.send(event);
304 }
305 }
306
307 pub fn emit_completed(
308 &self,
309 solution: S,
310 current_score: Option<S::Score>,
311 best_score: S::Score,
312 telemetry: SolverTelemetry,
313 terminal_reason: SolverTerminalReason,
314 ) {
315 self.slot.state.store(SLOT_COMPLETED, Ordering::SeqCst);
316 let (sender, event) = {
317 let mut record = self.slot.record.lock().unwrap();
318 record.terminal_reason = Some(terminal_reason);
319 record.checkpoint_available = false;
320 record.current_score = current_score;
321 record.best_score = Some(best_score);
322 record.telemetry = telemetry;
323
324 let snapshot_revision = record.push_snapshot(SolverSnapshot {
325 job_id: self.job_id,
326 snapshot_revision: 0,
327 lifecycle_state: SolverLifecycleState::Completed,
328 terminal_reason: Some(terminal_reason),
329 current_score,
330 best_score: Some(best_score),
331 telemetry,
332 solution: solution.clone(),
333 });
334
335 let metadata = record.next_metadata(
336 self.job_id,
337 SolverLifecycleState::Completed,
338 Some(snapshot_revision),
339 );
340 let sender = self.slot.sender_clone();
341 (sender, SolverEvent::Completed { metadata, solution })
342 };
343
344 if let Some(sender) = sender {
345 let _ = sender.send(event);
346 }
347 }
348
349 pub fn emit_cancelled(
350 &self,
351 current_score: Option<S::Score>,
352 best_score: Option<S::Score>,
353 telemetry: SolverTelemetry,
354 ) {
355 self.slot.state.store(SLOT_CANCELLED, Ordering::SeqCst);
356 self.emit_non_snapshot_terminal_event(
357 SolverLifecycleState::Cancelled,
358 SolverTerminalReason::Cancelled,
359 current_score,
360 best_score,
361 telemetry,
362 EventKind::Cancelled,
363 );
364 }
365
366 pub fn emit_failed(&self, error: String) {
367 if matches!(
368 self.current_state(),
369 SolverLifecycleState::Completed
370 | SolverLifecycleState::Cancelled
371 | SolverLifecycleState::Failed
372 ) {
373 return;
374 }
375
376 self.slot.state.store(SLOT_FAILED, Ordering::SeqCst);
377 let (sender, event) = {
378 let mut record = self.slot.record.lock().unwrap();
379 record.terminal_reason = Some(SolverTerminalReason::Failed);
380 record.checkpoint_available = false;
381 record.failure_message = Some(error.clone());
382 let telemetry = record.telemetry;
383 let metadata = record.next_metadata(self.job_id, SolverLifecycleState::Failed, None);
384 let sender = self.slot.sender_clone();
385 (
386 sender,
387 SolverEvent::Failed {
388 metadata: SolverEventMetadata {
389 telemetry,
390 ..metadata
391 },
392 error,
393 },
394 )
395 };
396
397 if let Some(sender) = sender {
398 let _ = sender.send(event);
399 }
400 }
401
402 pub(crate) fn pause_if_requested<D, ProgressCb>(
403 &self,
404 solver_scope: &mut SolverScope<'_, S, D, ProgressCb>,
405 ) where
406 D: solverforge_scoring::Director<S>,
407 ProgressCb: ProgressCallback<S>,
408 {
409 if !self.slot.pause_requested.load(Ordering::Acquire) || self.is_cancel_requested() {
410 return;
411 }
412
413 solver_scope.pause_timers();
414
415 let solution = solver_scope.score_director().clone_working_solution();
416 let current_score = solver_scope.current_score().copied();
417 let best_score = solver_scope.best_score().copied();
418 let telemetry = solver_scope.stats().snapshot();
419 let _ = self.pause_with_snapshot(solution, current_score, best_score, telemetry);
420 solver_scope.resume_timers();
421 }
422
423 pub fn pause_with_snapshot(
424 &self,
425 solution: S,
426 current_score: Option<S::Score>,
427 best_score: Option<S::Score>,
428 telemetry: SolverTelemetry,
429 ) -> bool {
430 if !self.slot.pause_requested.load(Ordering::Acquire) || self.is_cancel_requested() {
431 return false;
432 }
433
434 self.slot.state.store(SLOT_PAUSED, Ordering::SeqCst);
435 let (sender, event) = {
436 let mut record = self.slot.record.lock().unwrap();
437 let terminal_reason = record.terminal_reason;
438 record.checkpoint_available = true;
439 record.current_score = current_score;
440 record.best_score = best_score;
441 record.telemetry = telemetry;
442
443 let snapshot_revision = record.push_snapshot(SolverSnapshot {
444 job_id: self.job_id,
445 snapshot_revision: 0,
446 lifecycle_state: SolverLifecycleState::Paused,
447 terminal_reason,
448 current_score,
449 best_score,
450 telemetry,
451 solution,
452 });
453
454 let metadata = record.next_metadata(
455 self.job_id,
456 SolverLifecycleState::Paused,
457 Some(snapshot_revision),
458 );
459 let sender = self.slot.sender_clone();
460 (sender, SolverEvent::Paused { metadata })
461 };
462
463 if let Some(sender) = sender {
464 let _ = sender.send(event);
465 }
466
467 let mut guard = self.slot.pause_gate.lock().unwrap();
468 while self.slot.pause_requested.load(Ordering::Acquire) && !self.is_cancel_requested() {
469 guard = self.slot.pause_condvar.wait(guard).unwrap();
470 }
471 drop(guard);
472
473 if self.is_cancel_requested() {
474 return false;
475 }
476
477 self.slot.state.store(SLOT_SOLVING, Ordering::SeqCst);
478 self.emit_non_snapshot_event(
479 SolverLifecycleState::Solving,
480 current_score,
481 best_score,
482 telemetry,
483 EventKind::Resumed,
484 );
485 true
486 }
487
488 pub(crate) fn is_terminal(&self) -> bool {
489 self.current_state().is_terminal()
490 }
491
492 fn current_state(&self) -> SolverLifecycleState {
493 self.slot
494 .raw_state()
495 .expect("runtime accessed a freed job slot")
496 }
497
498 fn emit_non_snapshot_event(
499 &self,
500 lifecycle_state: SolverLifecycleState,
501 current_score: Option<S::Score>,
502 best_score: Option<S::Score>,
503 telemetry: SolverTelemetry,
504 kind: EventKind,
505 ) {
506 let (sender, event) = {
507 let mut record = self.slot.record.lock().unwrap();
508 record.current_score = current_score;
509 record.best_score = best_score;
510 record.telemetry = telemetry;
511 if lifecycle_state != SolverLifecycleState::Paused {
512 record.checkpoint_available = false;
513 }
514 let metadata = record.next_metadata(self.job_id, lifecycle_state, None);
515 let sender = self.slot.sender_clone();
516 let event = match kind {
517 EventKind::Progress => SolverEvent::Progress { metadata },
518 EventKind::Resumed => SolverEvent::Resumed { metadata },
519 EventKind::Cancelled => unreachable!(),
520 };
521 (sender, event)
522 };
523
524 if let Some(sender) = sender {
525 let _ = sender.send(event);
526 }
527 }
528
529 fn emit_non_snapshot_terminal_event(
530 &self,
531 lifecycle_state: SolverLifecycleState,
532 terminal_reason: SolverTerminalReason,
533 current_score: Option<S::Score>,
534 best_score: Option<S::Score>,
535 telemetry: SolverTelemetry,
536 kind: EventKind,
537 ) {
538 let (sender, event) = {
539 let mut record = self.slot.record.lock().unwrap();
540 record.terminal_reason = Some(terminal_reason);
541 record.checkpoint_available = false;
542 record.current_score = current_score;
543 record.best_score = best_score;
544 record.telemetry = telemetry;
545 let metadata = record.next_metadata(self.job_id, lifecycle_state, None);
546 let sender = self.slot.sender_clone();
547 let event = match kind {
548 EventKind::Cancelled => SolverEvent::Cancelled { metadata },
549 EventKind::Progress | EventKind::Resumed => unreachable!(),
550 };
551 (sender, event)
552 };
553
554 if let Some(sender) = sender {
555 let _ = sender.send(event);
556 }
557 }
558}
559
560pub trait Solvable: PlanningSolution + Send + 'static {
562 fn solve(self, runtime: SolverRuntime<Self>);
563}
564
565#[derive(Debug, Clone, Copy, PartialEq, Eq)]
566enum EventKind {
567 Progress,
568 Resumed,
569 Cancelled,
570}
571
572struct JobRecord<S: PlanningSolution> {
573 terminal_reason: Option<SolverTerminalReason>,
574 event_sequence: u64,
575 latest_snapshot_revision: Option<u64>,
576 current_score: Option<S::Score>,
577 best_score: Option<S::Score>,
578 telemetry: SolverTelemetry,
579 checkpoint_available: bool,
580 snapshots: Vec<SolverSnapshot<S>>,
581 failure_message: Option<String>,
582}
583
584impl<S: PlanningSolution> JobRecord<S> {
585 const fn new() -> Self {
586 Self {
587 terminal_reason: None,
588 event_sequence: 0,
589 latest_snapshot_revision: None,
590 current_score: None,
591 best_score: None,
592 telemetry: SolverTelemetry {
593 elapsed_ms: 0,
594 step_count: 0,
595 moves_evaluated: 0,
596 moves_accepted: 0,
597 score_calculations: 0,
598 moves_per_second: 0,
599 acceptance_rate: 0.0,
600 },
601 checkpoint_available: false,
602 snapshots: Vec::new(),
603 failure_message: None,
604 }
605 }
606
607 fn reset(&mut self) {
608 self.terminal_reason = None;
609 self.event_sequence = 0;
610 self.latest_snapshot_revision = None;
611 self.current_score = None;
612 self.best_score = None;
613 self.telemetry = SolverTelemetry {
614 elapsed_ms: 0,
615 step_count: 0,
616 moves_evaluated: 0,
617 moves_accepted: 0,
618 score_calculations: 0,
619 moves_per_second: 0,
620 acceptance_rate: 0.0,
621 };
622 self.checkpoint_available = false;
623 self.snapshots.clear();
624 self.failure_message = None;
625 }
626
627 fn push_snapshot(&mut self, mut snapshot: SolverSnapshot<S>) -> u64 {
628 let next = self.latest_snapshot_revision.unwrap_or(0) + 1;
629 snapshot.snapshot_revision = next;
630 self.latest_snapshot_revision = Some(next);
631 self.snapshots.push(snapshot);
632 next
633 }
634
635 fn next_metadata(
636 &mut self,
637 job_id: usize,
638 lifecycle_state: SolverLifecycleState,
639 snapshot_revision: Option<u64>,
640 ) -> SolverEventMetadata<S::Score> {
641 self.event_sequence += 1;
642 SolverEventMetadata {
643 job_id,
644 event_sequence: self.event_sequence,
645 lifecycle_state,
646 terminal_reason: self.terminal_reason,
647 telemetry: self.telemetry,
648 current_score: self.current_score,
649 best_score: self.best_score,
650 snapshot_revision: snapshot_revision.or(self.latest_snapshot_revision),
651 }
652 }
653
654 fn status(
655 &self,
656 job_id: usize,
657 lifecycle_state: SolverLifecycleState,
658 ) -> SolverStatus<S::Score> {
659 SolverStatus {
660 job_id,
661 lifecycle_state,
662 terminal_reason: self.terminal_reason,
663 checkpoint_available: self.checkpoint_available,
664 event_sequence: self.event_sequence,
665 latest_snapshot_revision: self.latest_snapshot_revision,
666 current_score: self.current_score,
667 best_score: self.best_score,
668 telemetry: self.telemetry,
669 }
670 }
671}
672
673struct JobSlot<S: PlanningSolution> {
674 state: AtomicU8,
675 visibility: AtomicU8,
676 terminate: AtomicBool,
677 pause_requested: AtomicBool,
678 worker_running: AtomicBool,
679 sender: Mutex<Option<mpsc::UnboundedSender<SolverEvent<S>>>>,
680 record: Mutex<JobRecord<S>>,
681 pause_gate: Mutex<()>,
682 pause_condvar: Condvar,
683}
684
685impl<S: PlanningSolution> JobSlot<S> {
686 const fn new() -> Self {
687 Self {
688 state: AtomicU8::new(SLOT_FREE),
689 visibility: AtomicU8::new(SLOT_VISIBLE),
690 terminate: AtomicBool::new(false),
691 pause_requested: AtomicBool::new(false),
692 worker_running: AtomicBool::new(false),
693 sender: Mutex::new(None),
694 record: Mutex::new(JobRecord::new()),
695 pause_gate: Mutex::new(()),
696 pause_condvar: Condvar::new(),
697 }
698 }
699
700 fn sender_clone(&self) -> Option<mpsc::UnboundedSender<SolverEvent<S>>> {
701 self.sender.lock().unwrap().clone()
702 }
703
704 fn initialize(&self, sender: mpsc::UnboundedSender<SolverEvent<S>>) {
705 self.terminate.store(false, Ordering::Release);
706 self.pause_requested.store(false, Ordering::Release);
707 self.worker_running.store(true, Ordering::Release);
708 self.visibility.store(SLOT_VISIBLE, Ordering::Release);
709 *self.sender.lock().unwrap() = Some(sender);
710 self.record.lock().unwrap().reset();
711 }
712
713 fn reset(&self) {
714 self.terminate.store(false, Ordering::Release);
715 self.pause_requested.store(false, Ordering::Release);
716 self.worker_running.store(false, Ordering::Release);
717 *self.sender.lock().unwrap() = None;
718 self.record.lock().unwrap().reset();
719 self.state.store(SLOT_FREE, Ordering::Release);
720 self.visibility.store(SLOT_VISIBLE, Ordering::Release);
721 }
722
723 fn mark_deleted(&self) {
724 self.visibility.store(SLOT_DELETED, Ordering::Release);
725 *self.sender.lock().unwrap() = None;
726 }
727
728 fn worker_exited(&self) {
729 self.worker_running.store(false, Ordering::Release);
730 self.try_reset_deleted();
731 }
732
733 fn try_reset_deleted(&self) {
734 if self.worker_running.load(Ordering::Acquire) {
735 return;
736 }
737
738 if self
739 .visibility
740 .compare_exchange(
741 SLOT_DELETED,
742 SLOT_DELETING,
743 Ordering::AcqRel,
744 Ordering::Acquire,
745 )
746 .is_ok()
747 {
748 self.reset();
749 }
750 }
751
752 fn raw_state(&self) -> Option<SolverLifecycleState> {
753 match self.state.load(Ordering::Acquire) {
754 SLOT_SOLVING => Some(SolverLifecycleState::Solving),
755 SLOT_PAUSE_REQUESTED => Some(SolverLifecycleState::PauseRequested),
756 SLOT_PAUSED => Some(SolverLifecycleState::Paused),
757 SLOT_COMPLETED => Some(SolverLifecycleState::Completed),
758 SLOT_CANCELLED => Some(SolverLifecycleState::Cancelled),
759 SLOT_FAILED => Some(SolverLifecycleState::Failed),
760 _ => None,
761 }
762 }
763
764 fn public_state(&self) -> Option<SolverLifecycleState> {
765 if self.visibility.load(Ordering::Acquire) != SLOT_VISIBLE {
766 return None;
767 }
768
769 self.raw_state()
770 }
771}
772
773pub struct SolverManager<S: Solvable> {
775 slots: [JobSlot<S>; MAX_JOBS],
776 _phantom: std::marker::PhantomData<fn() -> S>,
777}
778
779impl<S: Solvable> Default for SolverManager<S> {
780 fn default() -> Self {
781 Self::new()
782 }
783}
784
785impl<S: Solvable> SolverManager<S>
786where
787 S::Score: Score,
788{
789 pub const fn new() -> Self {
790 Self {
791 slots: [
792 JobSlot::new(),
793 JobSlot::new(),
794 JobSlot::new(),
795 JobSlot::new(),
796 JobSlot::new(),
797 JobSlot::new(),
798 JobSlot::new(),
799 JobSlot::new(),
800 JobSlot::new(),
801 JobSlot::new(),
802 JobSlot::new(),
803 JobSlot::new(),
804 JobSlot::new(),
805 JobSlot::new(),
806 JobSlot::new(),
807 JobSlot::new(),
808 ],
809 _phantom: std::marker::PhantomData,
810 }
811 }
812
813 pub fn solve(
814 &'static self,
815 solution: S,
816 ) -> Result<(usize, mpsc::UnboundedReceiver<SolverEvent<S>>), SolverManagerError> {
817 let (sender, receiver) = mpsc::unbounded_channel();
818
819 let Some(slot_idx) = self.slots.iter().position(|slot| {
820 slot.state
821 .compare_exchange(SLOT_FREE, SLOT_SOLVING, Ordering::SeqCst, Ordering::SeqCst)
822 .is_ok()
823 }) else {
824 return Err(SolverManagerError::NoFreeJobSlots);
825 };
826
827 let slot = &self.slots[slot_idx];
828 slot.initialize(sender);
829 let runtime = SolverRuntime::new(slot_idx, slot);
830
831 rayon::spawn(move || {
832 let result = std::panic::catch_unwind(AssertUnwindSafe(|| solution.solve(runtime)));
833
834 match result {
835 Ok(()) => {
836 if !runtime.is_terminal() {
837 if runtime.is_cancel_requested() {
838 let (current_score, best_score, telemetry) = {
839 let record = runtime.slot.record.lock().unwrap();
840 (record.current_score, record.best_score, record.telemetry)
841 };
842 runtime.emit_cancelled(current_score, best_score, telemetry);
843 } else {
844 runtime.emit_failed(
845 "solver returned without emitting a terminal lifecycle event"
846 .to_string(),
847 );
848 }
849 }
850 }
851 Err(payload) => {
852 runtime.emit_failed(panic_payload_to_string(payload));
853 }
854 }
855
856 runtime.slot.worker_exited();
857 });
858
859 Ok((slot_idx, receiver))
860 }
861
862 pub fn get_status(&self, job_id: usize) -> Result<SolverStatus<S::Score>, SolverManagerError> {
863 let slot = self.slot(job_id)?;
864 let state = slot
865 .public_state()
866 .ok_or(SolverManagerError::JobNotFound { job_id })?;
867 let record = slot.record.lock().unwrap();
868 Ok(record.status(job_id, state))
869 }
870
871 pub fn pause(&self, job_id: usize) -> Result<(), SolverManagerError> {
872 let slot = self.slot(job_id)?;
873 match slot.state.compare_exchange(
874 SLOT_SOLVING,
875 SLOT_PAUSE_REQUESTED,
876 Ordering::SeqCst,
877 Ordering::SeqCst,
878 ) {
879 Ok(_) => {
880 slot.pause_requested.store(true, Ordering::SeqCst);
881 let (sender, event) = {
882 let mut record = slot.record.lock().unwrap();
883 let metadata =
884 record.next_metadata(job_id, SolverLifecycleState::PauseRequested, None);
885 let sender = slot.sender_clone();
886 (sender, SolverEvent::PauseRequested { metadata })
887 };
888 if let Some(sender) = sender {
889 let _ = sender.send(event);
890 }
891 Ok(())
892 }
893 Err(_) => {
894 let state = slot
895 .public_state()
896 .ok_or(SolverManagerError::JobNotFound { job_id })?;
897 Err(SolverManagerError::InvalidStateTransition {
898 job_id,
899 action: "pause",
900 state,
901 })
902 }
903 }
904 }
905
906 pub fn resume(&self, job_id: usize) -> Result<(), SolverManagerError> {
907 let slot = self.slot(job_id)?;
908 let state = slot
909 .public_state()
910 .ok_or(SolverManagerError::JobNotFound { job_id })?;
911 if state != SolverLifecycleState::Paused {
912 return Err(SolverManagerError::InvalidStateTransition {
913 job_id,
914 action: "resume",
915 state,
916 });
917 }
918
919 slot.pause_requested.store(false, Ordering::SeqCst);
920 slot.pause_condvar.notify_one();
921 Ok(())
922 }
923
924 pub fn cancel(&self, job_id: usize) -> Result<(), SolverManagerError> {
925 let slot = self.slot(job_id)?;
926 let state = slot
927 .public_state()
928 .ok_or(SolverManagerError::JobNotFound { job_id })?;
929 if !matches!(
930 state,
931 SolverLifecycleState::Solving
932 | SolverLifecycleState::PauseRequested
933 | SolverLifecycleState::Paused
934 ) {
935 return Err(SolverManagerError::InvalidStateTransition {
936 job_id,
937 action: "cancel",
938 state,
939 });
940 }
941
942 slot.terminate.store(true, Ordering::SeqCst);
943 slot.pause_requested.store(false, Ordering::SeqCst);
944 slot.pause_condvar.notify_one();
945 Ok(())
946 }
947
948 pub fn delete(&self, job_id: usize) -> Result<(), SolverManagerError> {
949 let slot = self.slot(job_id)?;
950 let state = slot
951 .public_state()
952 .ok_or(SolverManagerError::JobNotFound { job_id })?;
953 if !state.is_terminal() {
954 return Err(SolverManagerError::InvalidStateTransition {
955 job_id,
956 action: "delete",
957 state,
958 });
959 }
960
961 slot.mark_deleted();
962 slot.try_reset_deleted();
963 Ok(())
964 }
965
966 pub fn get_snapshot(
967 &self,
968 job_id: usize,
969 snapshot_revision: Option<u64>,
970 ) -> Result<SolverSnapshot<S>, SolverManagerError> {
971 let slot = self.slot(job_id)?;
972 if slot.public_state().is_none() {
973 return Err(SolverManagerError::JobNotFound { job_id });
974 }
975
976 let record = slot.record.lock().unwrap();
977 if record.snapshots.is_empty() {
978 return Err(SolverManagerError::NoSnapshotAvailable { job_id });
979 }
980
981 match snapshot_revision {
982 Some(revision) => record
983 .snapshots
984 .iter()
985 .find(|snapshot| snapshot.snapshot_revision == revision)
986 .cloned()
987 .ok_or(SolverManagerError::SnapshotNotFound {
988 job_id,
989 snapshot_revision: revision,
990 }),
991 None => Ok(record
992 .snapshots
993 .last()
994 .expect("checked non-empty snapshots")
995 .clone()),
996 }
997 }
998
999 pub fn analyze_snapshot(
1000 &self,
1001 job_id: usize,
1002 snapshot_revision: Option<u64>,
1003 ) -> Result<SolverSnapshotAnalysis<S::Score>, SolverManagerError>
1004 where
1005 S: Analyzable,
1006 {
1007 let snapshot = self.get_snapshot(job_id, snapshot_revision)?;
1008 Ok(SolverSnapshotAnalysis {
1009 job_id,
1010 lifecycle_state: snapshot.lifecycle_state,
1011 terminal_reason: snapshot.terminal_reason,
1012 snapshot_revision: snapshot.snapshot_revision,
1013 analysis: snapshot.solution.analyze(),
1014 })
1015 }
1016
1017 pub fn active_job_count(&self) -> usize {
1018 self.slots
1019 .iter()
1020 .filter(|slot| slot.public_state().is_some())
1021 .count()
1022 }
1023
1024 #[cfg(test)]
1025 pub(crate) fn slot_is_free_for_test(&self, job_id: usize) -> bool {
1026 self.slots
1027 .get(job_id)
1028 .is_some_and(|slot| slot.state.load(Ordering::Acquire) == SLOT_FREE)
1029 }
1030
1031 fn slot(&self, job_id: usize) -> Result<&JobSlot<S>, SolverManagerError> {
1032 self.slots
1033 .get(job_id)
1034 .ok_or(SolverManagerError::JobNotFound { job_id })
1035 }
1036}
1037
1038fn panic_payload_to_string(payload: Box<dyn std::any::Any + Send>) -> String {
1039 if let Some(message) = payload.downcast_ref::<&'static str>() {
1040 (*message).to_string()
1041 } else if let Some(message) = payload.downcast_ref::<String>() {
1042 message.clone()
1043 } else {
1044 "solver panicked with a non-string payload".to_string()
1045 }
1046}