Skip to main content

solverforge_solver/manager/
solver_manager.rs

1/* SolverManager for retained async job lifecycle management.
2
3Provides the high-level API for:
4- Starting retained solve jobs that stream lifecycle events
5- Tracking authoritative job lifecycle state
6- Pausing and resuming jobs at exact runtime-safe boundaries
7- Cancelling and deleting retained jobs
8- Retrieving snapshot-bound solutions and score analysis
9*/
10
11use std::error::Error;
12use std::fmt::{self, Debug, Display};
13use std::panic::AssertUnwindSafe;
14use std::sync::atomic::{AtomicBool, AtomicU8, Ordering};
15use std::sync::{Condvar, Mutex};
16
17use solverforge_core::domain::PlanningSolution;
18use solverforge_core::score::Score;
19use tokio::sync::mpsc;
20
21use super::solution_manager::{Analyzable, ScoreAnalysis};
22use crate::scope::{ProgressCallback, SolverScope};
23use crate::stats::SolverTelemetry;
24
25/// Maximum concurrent jobs per SolverManager instance.
26pub const MAX_JOBS: usize = 16;
27
28const SLOT_FREE: u8 = 0;
29const SLOT_SOLVING: u8 = 1;
30const SLOT_PAUSE_REQUESTED: u8 = 2;
31const SLOT_PAUSED: u8 = 3;
32const SLOT_COMPLETED: u8 = 4;
33const SLOT_CANCELLED: u8 = 5;
34const SLOT_FAILED: u8 = 6;
35
36const SLOT_VISIBLE: u8 = 0;
37const SLOT_DELETED: u8 = 1;
38const SLOT_DELETING: u8 = 2;
39
40#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
41#[serde(rename_all = "SCREAMING_SNAKE_CASE")]
42pub enum SolverLifecycleState {
43    Solving,
44    PauseRequested,
45    Paused,
46    Completed,
47    Cancelled,
48    Failed,
49}
50
51impl SolverLifecycleState {
52    pub fn is_terminal(self) -> bool {
53        matches!(self, Self::Completed | Self::Cancelled | Self::Failed)
54    }
55}
56
57#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
58#[serde(rename_all = "snake_case")]
59pub enum SolverTerminalReason {
60    Completed,
61    TerminatedByConfig,
62    Cancelled,
63    Failed,
64}
65
66#[derive(Debug, Clone, PartialEq)]
67pub struct SolverStatus<Sc: Score> {
68    pub job_id: usize,
69    pub lifecycle_state: SolverLifecycleState,
70    pub terminal_reason: Option<SolverTerminalReason>,
71    pub checkpoint_available: bool,
72    pub event_sequence: u64,
73    pub latest_snapshot_revision: Option<u64>,
74    pub current_score: Option<Sc>,
75    pub best_score: Option<Sc>,
76    pub telemetry: SolverTelemetry,
77}
78
79impl<Sc: Score> SolverStatus<Sc> {
80    pub fn is_terminal(&self) -> bool {
81        self.lifecycle_state.is_terminal()
82    }
83}
84
85#[derive(Debug, Clone, PartialEq)]
86pub struct SolverEventMetadata<Sc: Score> {
87    pub job_id: usize,
88    pub event_sequence: u64,
89    pub lifecycle_state: SolverLifecycleState,
90    pub terminal_reason: Option<SolverTerminalReason>,
91    pub telemetry: SolverTelemetry,
92    pub current_score: Option<Sc>,
93    pub best_score: Option<Sc>,
94    pub snapshot_revision: Option<u64>,
95}
96
97#[derive(Debug, Clone)]
98pub struct SolverSnapshot<S: PlanningSolution> {
99    pub job_id: usize,
100    pub snapshot_revision: u64,
101    pub lifecycle_state: SolverLifecycleState,
102    pub terminal_reason: Option<SolverTerminalReason>,
103    pub current_score: Option<S::Score>,
104    pub best_score: Option<S::Score>,
105    pub telemetry: SolverTelemetry,
106    pub solution: S,
107}
108
109#[derive(Debug, Clone)]
110pub struct SolverSnapshotAnalysis<Sc: Score> {
111    pub job_id: usize,
112    pub lifecycle_state: SolverLifecycleState,
113    pub terminal_reason: Option<SolverTerminalReason>,
114    pub snapshot_revision: u64,
115    pub analysis: ScoreAnalysis<Sc>,
116}
117
118#[derive(Debug, Clone)]
119pub enum SolverEvent<S: PlanningSolution> {
120    Progress {
121        metadata: SolverEventMetadata<S::Score>,
122    },
123    BestSolution {
124        metadata: SolverEventMetadata<S::Score>,
125        solution: S,
126    },
127    PauseRequested {
128        metadata: SolverEventMetadata<S::Score>,
129    },
130    Paused {
131        metadata: SolverEventMetadata<S::Score>,
132    },
133    Resumed {
134        metadata: SolverEventMetadata<S::Score>,
135    },
136    Completed {
137        metadata: SolverEventMetadata<S::Score>,
138        solution: S,
139    },
140    Cancelled {
141        metadata: SolverEventMetadata<S::Score>,
142    },
143    Failed {
144        metadata: SolverEventMetadata<S::Score>,
145        error: String,
146    },
147}
148
149impl<S: PlanningSolution> SolverEvent<S> {
150    pub fn metadata(&self) -> &SolverEventMetadata<S::Score> {
151        match self {
152            Self::Progress { metadata }
153            | Self::PauseRequested { metadata }
154            | Self::Paused { metadata }
155            | Self::Resumed { metadata }
156            | Self::Cancelled { metadata } => metadata,
157            Self::BestSolution { metadata, .. }
158            | Self::Completed { metadata, .. }
159            | Self::Failed { metadata, .. } => metadata,
160        }
161    }
162}
163
164#[derive(Debug, Clone, PartialEq, Eq)]
165pub enum SolverManagerError {
166    NoFreeJobSlots,
167    JobNotFound {
168        job_id: usize,
169    },
170    InvalidStateTransition {
171        job_id: usize,
172        action: &'static str,
173        state: SolverLifecycleState,
174    },
175    NoSnapshotAvailable {
176        job_id: usize,
177    },
178    SnapshotNotFound {
179        job_id: usize,
180        snapshot_revision: u64,
181    },
182}
183
184impl Display for SolverManagerError {
185    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
186        match self {
187            Self::NoFreeJobSlots => write!(f, "no free job slots available"),
188            Self::JobNotFound { job_id } => write!(f, "job {job_id} was not found"),
189            Self::InvalidStateTransition {
190                job_id,
191                action,
192                state,
193            } => write!(
194                f,
195                "cannot {action} job {job_id} while it is in state {state:?}"
196            ),
197            Self::NoSnapshotAvailable { job_id } => {
198                write!(f, "job {job_id} has no retained snapshots")
199            }
200            Self::SnapshotNotFound {
201                job_id,
202                snapshot_revision,
203            } => write!(
204                f,
205                "job {job_id} has no retained snapshot revision {snapshot_revision}"
206            ),
207        }
208    }
209}
210
211impl Error for SolverManagerError {}
212
213/// Runtime context for a retained solve job.
214///
215/// This is passed into `Solvable::solve()` so the runtime path can publish
216/// lifecycle events, settle exact pauses, and observe cancellation.
217pub struct SolverRuntime<S: PlanningSolution> {
218    job_id: usize,
219    slot: &'static JobSlot<S>,
220}
221
222impl<S: PlanningSolution> Clone for SolverRuntime<S> {
223    fn clone(&self) -> Self {
224        *self
225    }
226}
227
228impl<S: PlanningSolution> Copy for SolverRuntime<S> {}
229
230impl<S: PlanningSolution> Debug for SolverRuntime<S> {
231    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
232        f.debug_struct("SolverRuntime")
233            .field("job_id", &self.job_id)
234            .finish()
235    }
236}
237
238impl<S: PlanningSolution> SolverRuntime<S> {
239    fn new(job_id: usize, slot: &'static JobSlot<S>) -> Self {
240        Self { job_id, slot }
241    }
242
243    pub fn job_id(&self) -> usize {
244        self.job_id
245    }
246
247    pub fn is_cancel_requested(&self) -> bool {
248        self.slot.terminate.load(Ordering::Acquire)
249    }
250
251    pub(crate) fn is_pause_requested(&self) -> bool {
252        self.slot.pause_requested.load(Ordering::Acquire)
253    }
254
255    pub(crate) fn cancel_flag(&self) -> &'static AtomicBool {
256        &self.slot.terminate
257    }
258
259    pub fn emit_progress(
260        &self,
261        current_score: Option<S::Score>,
262        best_score: Option<S::Score>,
263        telemetry: SolverTelemetry,
264    ) {
265        let lifecycle_state = self.current_state();
266        self.emit_non_snapshot_event(
267            lifecycle_state,
268            current_score,
269            best_score,
270            telemetry,
271            EventKind::Progress,
272        );
273    }
274
275    pub fn emit_best_solution(
276        &self,
277        solution: S,
278        current_score: Option<S::Score>,
279        best_score: S::Score,
280        telemetry: SolverTelemetry,
281    ) {
282        let state = self.current_state();
283        self.slot.with_publication(|sender, record| {
284            let terminal_reason = record.terminal_reason;
285            record.current_score = current_score;
286            record.best_score = Some(best_score);
287            record.telemetry = telemetry;
288
289            let snapshot_revision = record.push_snapshot(SolverSnapshot {
290                job_id: self.job_id,
291                snapshot_revision: 0,
292                lifecycle_state: state,
293                terminal_reason,
294                current_score,
295                best_score: Some(best_score),
296                telemetry,
297                solution: solution.clone(),
298            });
299
300            let metadata = record.next_metadata(self.job_id, state, Some(snapshot_revision));
301            if let Some(sender) = sender {
302                let _ = sender.send(SolverEvent::BestSolution { metadata, solution });
303            }
304        });
305    }
306
307    pub fn emit_completed(
308        &self,
309        solution: S,
310        current_score: Option<S::Score>,
311        best_score: S::Score,
312        telemetry: SolverTelemetry,
313        terminal_reason: SolverTerminalReason,
314    ) {
315        self.slot.with_publication(|sender, record| {
316            self.slot.state.store(SLOT_COMPLETED, Ordering::SeqCst);
317            record.terminal_reason = Some(terminal_reason);
318            record.checkpoint_available = false;
319            record.current_score = current_score;
320            record.best_score = Some(best_score);
321            record.telemetry = telemetry;
322
323            let snapshot_revision = record.push_snapshot(SolverSnapshot {
324                job_id: self.job_id,
325                snapshot_revision: 0,
326                lifecycle_state: SolverLifecycleState::Completed,
327                terminal_reason: Some(terminal_reason),
328                current_score,
329                best_score: Some(best_score),
330                telemetry,
331                solution: solution.clone(),
332            });
333
334            let metadata = record.next_metadata(
335                self.job_id,
336                SolverLifecycleState::Completed,
337                Some(snapshot_revision),
338            );
339            if let Some(sender) = sender {
340                let _ = sender.send(SolverEvent::Completed { metadata, solution });
341            }
342        });
343    }
344
345    pub fn emit_cancelled(
346        &self,
347        current_score: Option<S::Score>,
348        best_score: Option<S::Score>,
349        telemetry: SolverTelemetry,
350    ) {
351        self.emit_non_snapshot_terminal_event(
352            SolverLifecycleState::Cancelled,
353            SolverTerminalReason::Cancelled,
354            current_score,
355            best_score,
356            telemetry,
357            EventKind::Cancelled,
358        );
359    }
360
361    pub fn emit_failed(&self, error: String) {
362        if matches!(
363            self.current_state(),
364            SolverLifecycleState::Completed
365                | SolverLifecycleState::Cancelled
366                | SolverLifecycleState::Failed
367        ) {
368            return;
369        }
370
371        self.slot.with_publication(|sender, record| {
372            self.slot.state.store(SLOT_FAILED, Ordering::SeqCst);
373            record.terminal_reason = Some(SolverTerminalReason::Failed);
374            record.checkpoint_available = false;
375            record.failure_message = Some(error.clone());
376            let telemetry = record.telemetry;
377            let metadata = record.next_metadata(self.job_id, SolverLifecycleState::Failed, None);
378            if let Some(sender) = sender {
379                let _ = sender.send(SolverEvent::Failed {
380                    metadata: SolverEventMetadata {
381                        telemetry,
382                        ..metadata
383                    },
384                    error,
385                });
386            }
387        });
388    }
389
390    pub(crate) fn pause_if_requested<D, ProgressCb>(
391        &self,
392        solver_scope: &mut SolverScope<'_, S, D, ProgressCb>,
393    ) where
394        D: solverforge_scoring::Director<S>,
395        ProgressCb: ProgressCallback<S>,
396    {
397        if !self.slot.pause_requested.load(Ordering::Acquire) || self.is_cancel_requested() {
398            return;
399        }
400
401        solver_scope.pause_timers();
402
403        let solution = solver_scope.score_director().clone_working_solution();
404        let current_score = solver_scope.current_score().copied();
405        let best_score = solver_scope.best_score().copied();
406        let telemetry = solver_scope.stats().snapshot();
407        let _ = self.pause_with_snapshot(solution, current_score, best_score, telemetry);
408        solver_scope.resume_timers();
409    }
410
411    pub fn pause_with_snapshot(
412        &self,
413        solution: S,
414        current_score: Option<S::Score>,
415        best_score: Option<S::Score>,
416        telemetry: SolverTelemetry,
417    ) -> bool {
418        if !self.slot.pause_requested.load(Ordering::Acquire) || self.is_cancel_requested() {
419            return false;
420        }
421
422        self.slot.with_publication(|sender, record| {
423            self.slot.state.store(SLOT_PAUSED, Ordering::SeqCst);
424            let terminal_reason = record.terminal_reason;
425            record.checkpoint_available = true;
426            record.current_score = current_score;
427            record.best_score = best_score;
428            record.telemetry = telemetry;
429
430            let snapshot_revision = record.push_snapshot(SolverSnapshot {
431                job_id: self.job_id,
432                snapshot_revision: 0,
433                lifecycle_state: SolverLifecycleState::Paused,
434                terminal_reason,
435                current_score,
436                best_score,
437                telemetry,
438                solution,
439            });
440
441            let metadata = record.next_metadata(
442                self.job_id,
443                SolverLifecycleState::Paused,
444                Some(snapshot_revision),
445            );
446            if let Some(sender) = sender {
447                let _ = sender.send(SolverEvent::Paused { metadata });
448            }
449        });
450
451        let mut guard = self.slot.pause_gate.lock().unwrap();
452        while self.slot.pause_requested.load(Ordering::Acquire) && !self.is_cancel_requested() {
453            guard = self.slot.pause_condvar.wait(guard).unwrap();
454        }
455        drop(guard);
456
457        if self.is_cancel_requested() {
458            return false;
459        }
460
461        self.slot.state.store(SLOT_SOLVING, Ordering::SeqCst);
462        self.emit_non_snapshot_event(
463            SolverLifecycleState::Solving,
464            current_score,
465            best_score,
466            telemetry,
467            EventKind::Resumed,
468        );
469        true
470    }
471
472    pub(crate) fn is_terminal(&self) -> bool {
473        self.current_state().is_terminal()
474    }
475
476    fn current_state(&self) -> SolverLifecycleState {
477        self.slot
478            .raw_state()
479            .expect("runtime accessed a freed job slot")
480    }
481
482    fn emit_non_snapshot_event(
483        &self,
484        lifecycle_state: SolverLifecycleState,
485        current_score: Option<S::Score>,
486        best_score: Option<S::Score>,
487        telemetry: SolverTelemetry,
488        kind: EventKind,
489    ) {
490        self.slot.with_publication(|sender, record| {
491            record.current_score = current_score;
492            record.best_score = best_score;
493            record.telemetry = telemetry;
494            if lifecycle_state != SolverLifecycleState::Paused {
495                record.checkpoint_available = false;
496            }
497            let metadata = record.next_metadata(self.job_id, lifecycle_state, None);
498            let event = match kind {
499                EventKind::Progress => SolverEvent::Progress { metadata },
500                EventKind::Resumed => SolverEvent::Resumed { metadata },
501                EventKind::Cancelled => unreachable!(),
502            };
503            if let Some(sender) = sender {
504                let _ = sender.send(event);
505            }
506        });
507    }
508
509    fn emit_non_snapshot_terminal_event(
510        &self,
511        lifecycle_state: SolverLifecycleState,
512        terminal_reason: SolverTerminalReason,
513        current_score: Option<S::Score>,
514        best_score: Option<S::Score>,
515        telemetry: SolverTelemetry,
516        kind: EventKind,
517    ) {
518        self.slot.with_publication(|sender, record| {
519            match lifecycle_state {
520                SolverLifecycleState::Cancelled => {
521                    self.slot.state.store(SLOT_CANCELLED, Ordering::SeqCst);
522                }
523                SolverLifecycleState::Failed => {
524                    self.slot.state.store(SLOT_FAILED, Ordering::SeqCst);
525                }
526                _ => {}
527            }
528            record.terminal_reason = Some(terminal_reason);
529            record.checkpoint_available = false;
530            record.current_score = current_score;
531            record.best_score = best_score;
532            record.telemetry = telemetry;
533            let metadata = record.next_metadata(self.job_id, lifecycle_state, None);
534            let event = match kind {
535                EventKind::Cancelled => SolverEvent::Cancelled { metadata },
536                EventKind::Progress | EventKind::Resumed => unreachable!(),
537            };
538            if let Some(sender) = sender {
539                let _ = sender.send(event);
540            }
541        });
542    }
543}
544
545/// Trait for solutions that can run inside the retained lifecycle manager.
546pub trait Solvable: PlanningSolution + Send + 'static {
547    fn solve(self, runtime: SolverRuntime<Self>);
548}
549
550#[derive(Debug, Clone, Copy, PartialEq, Eq)]
551enum EventKind {
552    Progress,
553    Resumed,
554    Cancelled,
555}
556
557struct JobRecord<S: PlanningSolution> {
558    terminal_reason: Option<SolverTerminalReason>,
559    event_sequence: u64,
560    latest_snapshot_revision: Option<u64>,
561    current_score: Option<S::Score>,
562    best_score: Option<S::Score>,
563    telemetry: SolverTelemetry,
564    checkpoint_available: bool,
565    snapshots: Vec<SolverSnapshot<S>>,
566    failure_message: Option<String>,
567}
568
569impl<S: PlanningSolution> JobRecord<S> {
570    const fn new() -> Self {
571        Self {
572            terminal_reason: None,
573            event_sequence: 0,
574            latest_snapshot_revision: None,
575            current_score: None,
576            best_score: None,
577            telemetry: SolverTelemetry {
578                elapsed_ms: 0,
579                step_count: 0,
580                moves_evaluated: 0,
581                moves_accepted: 0,
582                score_calculations: 0,
583                moves_per_second: 0,
584                acceptance_rate: 0.0,
585            },
586            checkpoint_available: false,
587            snapshots: Vec::new(),
588            failure_message: None,
589        }
590    }
591
592    fn reset(&mut self) {
593        self.terminal_reason = None;
594        self.event_sequence = 0;
595        self.latest_snapshot_revision = None;
596        self.current_score = None;
597        self.best_score = None;
598        self.telemetry = SolverTelemetry {
599            elapsed_ms: 0,
600            step_count: 0,
601            moves_evaluated: 0,
602            moves_accepted: 0,
603            score_calculations: 0,
604            moves_per_second: 0,
605            acceptance_rate: 0.0,
606        };
607        self.checkpoint_available = false;
608        self.snapshots.clear();
609        self.failure_message = None;
610    }
611
612    fn push_snapshot(&mut self, mut snapshot: SolverSnapshot<S>) -> u64 {
613        let next = self.latest_snapshot_revision.unwrap_or(0) + 1;
614        snapshot.snapshot_revision = next;
615        self.latest_snapshot_revision = Some(next);
616        self.snapshots.push(snapshot);
617        next
618    }
619
620    fn next_metadata(
621        &mut self,
622        job_id: usize,
623        lifecycle_state: SolverLifecycleState,
624        snapshot_revision: Option<u64>,
625    ) -> SolverEventMetadata<S::Score> {
626        self.event_sequence += 1;
627        SolverEventMetadata {
628            job_id,
629            event_sequence: self.event_sequence,
630            lifecycle_state,
631            terminal_reason: self.terminal_reason,
632            telemetry: self.telemetry,
633            current_score: self.current_score,
634            best_score: self.best_score,
635            snapshot_revision: snapshot_revision.or(self.latest_snapshot_revision),
636        }
637    }
638
639    fn status(
640        &self,
641        job_id: usize,
642        lifecycle_state: SolverLifecycleState,
643    ) -> SolverStatus<S::Score> {
644        SolverStatus {
645            job_id,
646            lifecycle_state,
647            terminal_reason: self.terminal_reason,
648            checkpoint_available: self.checkpoint_available,
649            event_sequence: self.event_sequence,
650            latest_snapshot_revision: self.latest_snapshot_revision,
651            current_score: self.current_score,
652            best_score: self.best_score,
653            telemetry: self.telemetry,
654        }
655    }
656}
657
658struct JobSlot<S: PlanningSolution> {
659    state: AtomicU8,
660    visibility: AtomicU8,
661    terminate: AtomicBool,
662    pause_requested: AtomicBool,
663    worker_running: AtomicBool,
664    publication: Mutex<()>,
665    sender: Mutex<Option<mpsc::UnboundedSender<SolverEvent<S>>>>,
666    record: Mutex<JobRecord<S>>,
667    pause_gate: Mutex<()>,
668    pause_condvar: Condvar,
669}
670
671impl<S: PlanningSolution> JobSlot<S> {
672    const fn new() -> Self {
673        Self {
674            state: AtomicU8::new(SLOT_FREE),
675            visibility: AtomicU8::new(SLOT_VISIBLE),
676            terminate: AtomicBool::new(false),
677            pause_requested: AtomicBool::new(false),
678            worker_running: AtomicBool::new(false),
679            publication: Mutex::new(()),
680            sender: Mutex::new(None),
681            record: Mutex::new(JobRecord::new()),
682            pause_gate: Mutex::new(()),
683            pause_condvar: Condvar::new(),
684        }
685    }
686
687    fn sender_clone(&self) -> Option<mpsc::UnboundedSender<SolverEvent<S>>> {
688        self.sender.lock().unwrap().clone()
689    }
690
691    fn with_publication<R>(
692        &self,
693        f: impl FnOnce(Option<mpsc::UnboundedSender<SolverEvent<S>>>, &mut JobRecord<S>) -> R,
694    ) -> R {
695        let _publication = self.publication.lock().unwrap();
696        let sender = self.sender_clone();
697        let mut record = self.record.lock().unwrap();
698        f(sender, &mut record)
699    }
700
701    fn initialize(&self, sender: mpsc::UnboundedSender<SolverEvent<S>>) {
702        self.terminate.store(false, Ordering::Release);
703        self.pause_requested.store(false, Ordering::Release);
704        self.worker_running.store(true, Ordering::Release);
705        self.visibility.store(SLOT_VISIBLE, Ordering::Release);
706        *self.sender.lock().unwrap() = Some(sender);
707        self.record.lock().unwrap().reset();
708    }
709
710    fn reset(&self) {
711        self.terminate.store(false, Ordering::Release);
712        self.pause_requested.store(false, Ordering::Release);
713        self.worker_running.store(false, Ordering::Release);
714        *self.sender.lock().unwrap() = None;
715        self.record.lock().unwrap().reset();
716        self.state.store(SLOT_FREE, Ordering::Release);
717        self.visibility.store(SLOT_VISIBLE, Ordering::Release);
718    }
719
720    fn mark_deleted(&self) {
721        self.visibility.store(SLOT_DELETED, Ordering::Release);
722        *self.sender.lock().unwrap() = None;
723    }
724
725    fn worker_exited(&self) {
726        self.worker_running.store(false, Ordering::Release);
727        self.try_reset_deleted();
728    }
729
730    fn try_reset_deleted(&self) {
731        if self.worker_running.load(Ordering::Acquire) {
732            return;
733        }
734
735        if self
736            .visibility
737            .compare_exchange(
738                SLOT_DELETED,
739                SLOT_DELETING,
740                Ordering::AcqRel,
741                Ordering::Acquire,
742            )
743            .is_ok()
744        {
745            self.reset();
746        }
747    }
748
749    fn raw_state(&self) -> Option<SolverLifecycleState> {
750        match self.state.load(Ordering::Acquire) {
751            SLOT_SOLVING => Some(SolverLifecycleState::Solving),
752            SLOT_PAUSE_REQUESTED => Some(SolverLifecycleState::PauseRequested),
753            SLOT_PAUSED => Some(SolverLifecycleState::Paused),
754            SLOT_COMPLETED => Some(SolverLifecycleState::Completed),
755            SLOT_CANCELLED => Some(SolverLifecycleState::Cancelled),
756            SLOT_FAILED => Some(SolverLifecycleState::Failed),
757            _ => None,
758        }
759    }
760
761    fn public_state(&self) -> Option<SolverLifecycleState> {
762        if self.visibility.load(Ordering::Acquire) != SLOT_VISIBLE {
763            return None;
764        }
765
766        self.raw_state()
767    }
768}
769
770/// Manages retained async solve jobs with lifecycle-complete event streaming.
771pub struct SolverManager<S: Solvable> {
772    slots: [JobSlot<S>; MAX_JOBS],
773    _phantom: std::marker::PhantomData<fn() -> S>,
774}
775
776impl<S: Solvable> Default for SolverManager<S> {
777    fn default() -> Self {
778        Self::new()
779    }
780}
781
782impl<S: Solvable> SolverManager<S>
783where
784    S::Score: Score,
785{
786    pub const fn new() -> Self {
787        Self {
788            slots: [
789                JobSlot::new(),
790                JobSlot::new(),
791                JobSlot::new(),
792                JobSlot::new(),
793                JobSlot::new(),
794                JobSlot::new(),
795                JobSlot::new(),
796                JobSlot::new(),
797                JobSlot::new(),
798                JobSlot::new(),
799                JobSlot::new(),
800                JobSlot::new(),
801                JobSlot::new(),
802                JobSlot::new(),
803                JobSlot::new(),
804                JobSlot::new(),
805            ],
806            _phantom: std::marker::PhantomData,
807        }
808    }
809
810    pub fn solve(
811        &'static self,
812        solution: S,
813    ) -> Result<(usize, mpsc::UnboundedReceiver<SolverEvent<S>>), SolverManagerError> {
814        let (sender, receiver) = mpsc::unbounded_channel();
815
816        let Some(slot_idx) = self.slots.iter().position(|slot| {
817            slot.state
818                .compare_exchange(SLOT_FREE, SLOT_SOLVING, Ordering::SeqCst, Ordering::SeqCst)
819                .is_ok()
820        }) else {
821            return Err(SolverManagerError::NoFreeJobSlots);
822        };
823
824        let slot = &self.slots[slot_idx];
825        slot.initialize(sender);
826        let runtime = SolverRuntime::new(slot_idx, slot);
827
828        rayon::spawn(move || {
829            let result = std::panic::catch_unwind(AssertUnwindSafe(|| solution.solve(runtime)));
830
831            match result {
832                Ok(()) => {
833                    if !runtime.is_terminal() {
834                        if runtime.is_cancel_requested() {
835                            let (current_score, best_score, telemetry) = {
836                                let record = runtime.slot.record.lock().unwrap();
837                                (record.current_score, record.best_score, record.telemetry)
838                            };
839                            runtime.emit_cancelled(current_score, best_score, telemetry);
840                        } else {
841                            runtime.emit_failed(
842                                "solver returned without emitting a terminal lifecycle event"
843                                    .to_string(),
844                            );
845                        }
846                    }
847                }
848                Err(payload) => {
849                    runtime.emit_failed(panic_payload_to_string(payload));
850                }
851            }
852
853            runtime.slot.worker_exited();
854        });
855
856        Ok((slot_idx, receiver))
857    }
858
859    pub fn get_status(&self, job_id: usize) -> Result<SolverStatus<S::Score>, SolverManagerError> {
860        let slot = self.slot(job_id)?;
861        let state = slot
862            .public_state()
863            .ok_or(SolverManagerError::JobNotFound { job_id })?;
864        let record = slot.record.lock().unwrap();
865        Ok(record.status(job_id, state))
866    }
867
868    pub fn pause(&self, job_id: usize) -> Result<(), SolverManagerError> {
869        let slot = self.slot(job_id)?;
870        let paused = slot.with_publication(|sender, record| {
871            match slot.state.compare_exchange(
872                SLOT_SOLVING,
873                SLOT_PAUSE_REQUESTED,
874                Ordering::SeqCst,
875                Ordering::SeqCst,
876            ) {
877                Ok(_) => {
878                    slot.pause_requested.store(true, Ordering::SeqCst);
879                    let metadata =
880                        record.next_metadata(job_id, SolverLifecycleState::PauseRequested, None);
881                    if let Some(sender) = sender {
882                        let _ = sender.send(SolverEvent::PauseRequested { metadata });
883                    }
884                    true
885                }
886                Err(_) => false,
887            }
888        });
889        if paused {
890            Ok(())
891        } else {
892            let state = slot
893                .public_state()
894                .ok_or(SolverManagerError::JobNotFound { job_id })?;
895            Err(SolverManagerError::InvalidStateTransition {
896                job_id,
897                action: "pause",
898                state,
899            })
900        }
901    }
902
903    pub fn resume(&self, job_id: usize) -> Result<(), SolverManagerError> {
904        let slot = self.slot(job_id)?;
905        let state = slot
906            .public_state()
907            .ok_or(SolverManagerError::JobNotFound { job_id })?;
908        if state != SolverLifecycleState::Paused {
909            return Err(SolverManagerError::InvalidStateTransition {
910                job_id,
911                action: "resume",
912                state,
913            });
914        }
915
916        slot.pause_requested.store(false, Ordering::SeqCst);
917        slot.pause_condvar.notify_one();
918        Ok(())
919    }
920
921    pub fn cancel(&self, job_id: usize) -> Result<(), SolverManagerError> {
922        let slot = self.slot(job_id)?;
923        let state = slot
924            .public_state()
925            .ok_or(SolverManagerError::JobNotFound { job_id })?;
926        if !matches!(
927            state,
928            SolverLifecycleState::Solving
929                | SolverLifecycleState::PauseRequested
930                | SolverLifecycleState::Paused
931        ) {
932            return Err(SolverManagerError::InvalidStateTransition {
933                job_id,
934                action: "cancel",
935                state,
936            });
937        }
938
939        slot.terminate.store(true, Ordering::SeqCst);
940        slot.pause_requested.store(false, Ordering::SeqCst);
941        slot.pause_condvar.notify_one();
942        Ok(())
943    }
944
945    pub fn delete(&self, job_id: usize) -> Result<(), SolverManagerError> {
946        let slot = self.slot(job_id)?;
947        let state = slot
948            .public_state()
949            .ok_or(SolverManagerError::JobNotFound { job_id })?;
950        if !state.is_terminal() {
951            return Err(SolverManagerError::InvalidStateTransition {
952                job_id,
953                action: "delete",
954                state,
955            });
956        }
957
958        slot.mark_deleted();
959        slot.try_reset_deleted();
960        Ok(())
961    }
962
963    pub fn get_snapshot(
964        &self,
965        job_id: usize,
966        snapshot_revision: Option<u64>,
967    ) -> Result<SolverSnapshot<S>, SolverManagerError> {
968        let slot = self.slot(job_id)?;
969        if slot.public_state().is_none() {
970            return Err(SolverManagerError::JobNotFound { job_id });
971        }
972
973        let record = slot.record.lock().unwrap();
974        if record.snapshots.is_empty() {
975            return Err(SolverManagerError::NoSnapshotAvailable { job_id });
976        }
977
978        match snapshot_revision {
979            Some(revision) => record
980                .snapshots
981                .iter()
982                .find(|snapshot| snapshot.snapshot_revision == revision)
983                .cloned()
984                .ok_or(SolverManagerError::SnapshotNotFound {
985                    job_id,
986                    snapshot_revision: revision,
987                }),
988            None => Ok(record
989                .snapshots
990                .last()
991                .expect("checked non-empty snapshots")
992                .clone()),
993        }
994    }
995
996    pub fn analyze_snapshot(
997        &self,
998        job_id: usize,
999        snapshot_revision: Option<u64>,
1000    ) -> Result<SolverSnapshotAnalysis<S::Score>, SolverManagerError>
1001    where
1002        S: Analyzable,
1003    {
1004        let snapshot = self.get_snapshot(job_id, snapshot_revision)?;
1005        Ok(SolverSnapshotAnalysis {
1006            job_id,
1007            lifecycle_state: snapshot.lifecycle_state,
1008            terminal_reason: snapshot.terminal_reason,
1009            snapshot_revision: snapshot.snapshot_revision,
1010            analysis: snapshot.solution.analyze(),
1011        })
1012    }
1013
1014    pub fn active_job_count(&self) -> usize {
1015        self.slots
1016            .iter()
1017            .filter(|slot| slot.public_state().is_some())
1018            .count()
1019    }
1020
1021    #[cfg(test)]
1022    pub(crate) fn slot_is_free_for_test(&self, job_id: usize) -> bool {
1023        self.slots
1024            .get(job_id)
1025            .is_some_and(|slot| slot.state.load(Ordering::Acquire) == SLOT_FREE)
1026    }
1027
1028    fn slot(&self, job_id: usize) -> Result<&JobSlot<S>, SolverManagerError> {
1029        self.slots
1030            .get(job_id)
1031            .ok_or(SolverManagerError::JobNotFound { job_id })
1032    }
1033}
1034
1035fn panic_payload_to_string(payload: Box<dyn std::any::Any + Send>) -> String {
1036    if let Some(message) = payload.downcast_ref::<&'static str>() {
1037        (*message).to_string()
1038    } else if let Some(message) = payload.downcast_ref::<String>() {
1039        message.clone()
1040    } else {
1041        "solver panicked with a non-string payload".to_string()
1042    }
1043}