Skip to main content

solverforge_solver/manager/
solver_manager.rs

1/* SolverManager for retained async job lifecycle management.
2
3Provides the high-level API for:
4- Starting retained solve jobs that stream lifecycle events
5- Tracking authoritative job lifecycle state
6- Pausing and resuming jobs at exact runtime-safe boundaries
7- Cancelling and deleting retained jobs
8- Retrieving snapshot-bound solutions and score analysis
9*/
10
11use std::error::Error;
12use std::fmt::{self, Debug, Display};
13use std::panic::AssertUnwindSafe;
14use std::sync::atomic::{AtomicBool, AtomicU8, Ordering};
15use std::sync::{Condvar, Mutex};
16
17use solverforge_core::domain::PlanningSolution;
18use solverforge_core::score::Score;
19use tokio::sync::mpsc;
20
21use super::solution_manager::{Analyzable, ScoreAnalysis};
22use crate::scope::{ProgressCallback, SolverScope};
23use crate::stats::SolverTelemetry;
24
25/// Maximum concurrent jobs per SolverManager instance.
26pub const MAX_JOBS: usize = 16;
27
28const SLOT_FREE: u8 = 0;
29const SLOT_SOLVING: u8 = 1;
30const SLOT_PAUSE_REQUESTED: u8 = 2;
31const SLOT_PAUSED: u8 = 3;
32const SLOT_COMPLETED: u8 = 4;
33const SLOT_CANCELLED: u8 = 5;
34const SLOT_FAILED: u8 = 6;
35
36const SLOT_VISIBLE: u8 = 0;
37const SLOT_DELETED: u8 = 1;
38const SLOT_DELETING: u8 = 2;
39
40#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
41#[serde(rename_all = "SCREAMING_SNAKE_CASE")]
42pub enum SolverLifecycleState {
43    Solving,
44    PauseRequested,
45    Paused,
46    Completed,
47    Cancelled,
48    Failed,
49}
50
51impl SolverLifecycleState {
52    pub fn is_terminal(self) -> bool {
53        matches!(self, Self::Completed | Self::Cancelled | Self::Failed)
54    }
55}
56
57#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
58#[serde(rename_all = "snake_case")]
59pub enum SolverTerminalReason {
60    Completed,
61    TerminatedByConfig,
62    Cancelled,
63    Failed,
64}
65
66#[derive(Debug, Clone, PartialEq)]
67pub struct SolverStatus<Sc: Score> {
68    pub job_id: usize,
69    pub lifecycle_state: SolverLifecycleState,
70    pub terminal_reason: Option<SolverTerminalReason>,
71    pub checkpoint_available: bool,
72    pub event_sequence: u64,
73    pub latest_snapshot_revision: Option<u64>,
74    pub current_score: Option<Sc>,
75    pub best_score: Option<Sc>,
76    pub telemetry: SolverTelemetry,
77}
78
79impl<Sc: Score> SolverStatus<Sc> {
80    pub fn is_terminal(&self) -> bool {
81        self.lifecycle_state.is_terminal()
82    }
83}
84
85#[derive(Debug, Clone, PartialEq)]
86pub struct SolverEventMetadata<Sc: Score> {
87    pub job_id: usize,
88    pub event_sequence: u64,
89    pub lifecycle_state: SolverLifecycleState,
90    pub terminal_reason: Option<SolverTerminalReason>,
91    pub telemetry: SolverTelemetry,
92    pub current_score: Option<Sc>,
93    pub best_score: Option<Sc>,
94    pub snapshot_revision: Option<u64>,
95}
96
97#[derive(Debug, Clone)]
98pub struct SolverSnapshot<S: PlanningSolution> {
99    pub job_id: usize,
100    pub snapshot_revision: u64,
101    pub lifecycle_state: SolverLifecycleState,
102    pub terminal_reason: Option<SolverTerminalReason>,
103    pub current_score: Option<S::Score>,
104    pub best_score: Option<S::Score>,
105    pub telemetry: SolverTelemetry,
106    pub solution: S,
107}
108
109#[derive(Debug, Clone)]
110pub struct SolverSnapshotAnalysis<Sc: Score> {
111    pub job_id: usize,
112    pub lifecycle_state: SolverLifecycleState,
113    pub terminal_reason: Option<SolverTerminalReason>,
114    pub snapshot_revision: u64,
115    pub analysis: ScoreAnalysis<Sc>,
116}
117
118#[derive(Debug, Clone)]
119pub enum SolverEvent<S: PlanningSolution> {
120    Progress {
121        metadata: SolverEventMetadata<S::Score>,
122    },
123    BestSolution {
124        metadata: SolverEventMetadata<S::Score>,
125        solution: S,
126    },
127    PauseRequested {
128        metadata: SolverEventMetadata<S::Score>,
129    },
130    Paused {
131        metadata: SolverEventMetadata<S::Score>,
132    },
133    Resumed {
134        metadata: SolverEventMetadata<S::Score>,
135    },
136    Completed {
137        metadata: SolverEventMetadata<S::Score>,
138        solution: S,
139    },
140    Cancelled {
141        metadata: SolverEventMetadata<S::Score>,
142    },
143    Failed {
144        metadata: SolverEventMetadata<S::Score>,
145        error: String,
146    },
147}
148
149impl<S: PlanningSolution> SolverEvent<S> {
150    pub fn metadata(&self) -> &SolverEventMetadata<S::Score> {
151        match self {
152            Self::Progress { metadata }
153            | Self::PauseRequested { metadata }
154            | Self::Paused { metadata }
155            | Self::Resumed { metadata }
156            | Self::Cancelled { metadata } => metadata,
157            Self::BestSolution { metadata, .. }
158            | Self::Completed { metadata, .. }
159            | Self::Failed { metadata, .. } => metadata,
160        }
161    }
162}
163
164#[derive(Debug, Clone, PartialEq, Eq)]
165pub enum SolverManagerError {
166    NoFreeJobSlots,
167    JobNotFound {
168        job_id: usize,
169    },
170    InvalidStateTransition {
171        job_id: usize,
172        action: &'static str,
173        state: SolverLifecycleState,
174    },
175    NoSnapshotAvailable {
176        job_id: usize,
177    },
178    SnapshotNotFound {
179        job_id: usize,
180        snapshot_revision: u64,
181    },
182}
183
184impl Display for SolverManagerError {
185    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
186        match self {
187            Self::NoFreeJobSlots => write!(f, "no free job slots available"),
188            Self::JobNotFound { job_id } => write!(f, "job {job_id} was not found"),
189            Self::InvalidStateTransition {
190                job_id,
191                action,
192                state,
193            } => write!(
194                f,
195                "cannot {action} job {job_id} while it is in state {state:?}"
196            ),
197            Self::NoSnapshotAvailable { job_id } => {
198                write!(f, "job {job_id} has no retained snapshots")
199            }
200            Self::SnapshotNotFound {
201                job_id,
202                snapshot_revision,
203            } => write!(
204                f,
205                "job {job_id} has no retained snapshot revision {snapshot_revision}"
206            ),
207        }
208    }
209}
210
211impl Error for SolverManagerError {}
212
213/// Runtime context for a retained solve job.
214///
215/// This is passed into `Solvable::solve()` so the runtime path can publish
216/// lifecycle events, settle exact pauses, and observe cancellation.
217pub struct SolverRuntime<S: PlanningSolution> {
218    job_id: usize,
219    slot: &'static JobSlot<S>,
220}
221
222impl<S: PlanningSolution> Clone for SolverRuntime<S> {
223    fn clone(&self) -> Self {
224        *self
225    }
226}
227
228impl<S: PlanningSolution> Copy for SolverRuntime<S> {}
229
230impl<S: PlanningSolution> Debug for SolverRuntime<S> {
231    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
232        f.debug_struct("SolverRuntime")
233            .field("job_id", &self.job_id)
234            .finish()
235    }
236}
237
238impl<S: PlanningSolution> SolverRuntime<S> {
239    fn new(job_id: usize, slot: &'static JobSlot<S>) -> Self {
240        Self { job_id, slot }
241    }
242
243    pub fn job_id(&self) -> usize {
244        self.job_id
245    }
246
247    pub fn is_cancel_requested(&self) -> bool {
248        self.slot.terminate.load(Ordering::Acquire)
249    }
250
251    pub(crate) fn cancel_flag(&self) -> &'static AtomicBool {
252        &self.slot.terminate
253    }
254
255    pub fn emit_progress(
256        &self,
257        current_score: Option<S::Score>,
258        best_score: Option<S::Score>,
259        telemetry: SolverTelemetry,
260    ) {
261        let lifecycle_state = self.current_state();
262        self.emit_non_snapshot_event(
263            lifecycle_state,
264            current_score,
265            best_score,
266            telemetry,
267            EventKind::Progress,
268        );
269    }
270
271    pub fn emit_best_solution(
272        &self,
273        solution: S,
274        current_score: Option<S::Score>,
275        best_score: S::Score,
276        telemetry: SolverTelemetry,
277    ) {
278        let state = self.current_state();
279        let (sender, event) = {
280            let mut record = self.slot.record.lock().unwrap();
281            let terminal_reason = record.terminal_reason;
282            record.current_score = current_score;
283            record.best_score = Some(best_score);
284            record.telemetry = telemetry;
285
286            let snapshot_revision = record.push_snapshot(SolverSnapshot {
287                job_id: self.job_id,
288                snapshot_revision: 0,
289                lifecycle_state: state,
290                terminal_reason,
291                current_score,
292                best_score: Some(best_score),
293                telemetry,
294                solution: solution.clone(),
295            });
296
297            let metadata = record.next_metadata(self.job_id, state, Some(snapshot_revision));
298            let sender = self.slot.sender_clone();
299            (sender, SolverEvent::BestSolution { metadata, solution })
300        };
301
302        if let Some(sender) = sender {
303            let _ = sender.send(event);
304        }
305    }
306
307    pub fn emit_completed(
308        &self,
309        solution: S,
310        current_score: Option<S::Score>,
311        best_score: S::Score,
312        telemetry: SolverTelemetry,
313        terminal_reason: SolverTerminalReason,
314    ) {
315        self.slot.state.store(SLOT_COMPLETED, Ordering::SeqCst);
316        let (sender, event) = {
317            let mut record = self.slot.record.lock().unwrap();
318            record.terminal_reason = Some(terminal_reason);
319            record.checkpoint_available = false;
320            record.current_score = current_score;
321            record.best_score = Some(best_score);
322            record.telemetry = telemetry;
323
324            let snapshot_revision = record.push_snapshot(SolverSnapshot {
325                job_id: self.job_id,
326                snapshot_revision: 0,
327                lifecycle_state: SolverLifecycleState::Completed,
328                terminal_reason: Some(terminal_reason),
329                current_score,
330                best_score: Some(best_score),
331                telemetry,
332                solution: solution.clone(),
333            });
334
335            let metadata = record.next_metadata(
336                self.job_id,
337                SolverLifecycleState::Completed,
338                Some(snapshot_revision),
339            );
340            let sender = self.slot.sender_clone();
341            (sender, SolverEvent::Completed { metadata, solution })
342        };
343
344        if let Some(sender) = sender {
345            let _ = sender.send(event);
346        }
347    }
348
349    pub fn emit_cancelled(
350        &self,
351        current_score: Option<S::Score>,
352        best_score: Option<S::Score>,
353        telemetry: SolverTelemetry,
354    ) {
355        self.slot.state.store(SLOT_CANCELLED, Ordering::SeqCst);
356        self.emit_non_snapshot_terminal_event(
357            SolverLifecycleState::Cancelled,
358            SolverTerminalReason::Cancelled,
359            current_score,
360            best_score,
361            telemetry,
362            EventKind::Cancelled,
363        );
364    }
365
366    pub fn emit_failed(&self, error: String) {
367        if matches!(
368            self.current_state(),
369            SolverLifecycleState::Completed
370                | SolverLifecycleState::Cancelled
371                | SolverLifecycleState::Failed
372        ) {
373            return;
374        }
375
376        self.slot.state.store(SLOT_FAILED, Ordering::SeqCst);
377        let (sender, event) = {
378            let mut record = self.slot.record.lock().unwrap();
379            record.terminal_reason = Some(SolverTerminalReason::Failed);
380            record.checkpoint_available = false;
381            record.failure_message = Some(error.clone());
382            let telemetry = record.telemetry;
383            let metadata = record.next_metadata(self.job_id, SolverLifecycleState::Failed, None);
384            let sender = self.slot.sender_clone();
385            (
386                sender,
387                SolverEvent::Failed {
388                    metadata: SolverEventMetadata {
389                        telemetry,
390                        ..metadata
391                    },
392                    error,
393                },
394            )
395        };
396
397        if let Some(sender) = sender {
398            let _ = sender.send(event);
399        }
400    }
401
402    pub(crate) fn pause_if_requested<D, ProgressCb>(
403        &self,
404        solver_scope: &mut SolverScope<'_, S, D, ProgressCb>,
405    ) where
406        D: solverforge_scoring::Director<S>,
407        ProgressCb: ProgressCallback<S>,
408    {
409        if !self.slot.pause_requested.load(Ordering::Acquire) || self.is_cancel_requested() {
410            return;
411        }
412
413        solver_scope.pause_timers();
414
415        let solution = solver_scope.score_director().clone_working_solution();
416        let current_score = solver_scope.current_score().copied();
417        let best_score = solver_scope.best_score().copied();
418        let telemetry = solver_scope.stats().snapshot();
419        let _ = self.pause_with_snapshot(solution, current_score, best_score, telemetry);
420        solver_scope.resume_timers();
421    }
422
423    pub fn pause_with_snapshot(
424        &self,
425        solution: S,
426        current_score: Option<S::Score>,
427        best_score: Option<S::Score>,
428        telemetry: SolverTelemetry,
429    ) -> bool {
430        if !self.slot.pause_requested.load(Ordering::Acquire) || self.is_cancel_requested() {
431            return false;
432        }
433
434        self.slot.state.store(SLOT_PAUSED, Ordering::SeqCst);
435        let (sender, event) = {
436            let mut record = self.slot.record.lock().unwrap();
437            let terminal_reason = record.terminal_reason;
438            record.checkpoint_available = true;
439            record.current_score = current_score;
440            record.best_score = best_score;
441            record.telemetry = telemetry;
442
443            let snapshot_revision = record.push_snapshot(SolverSnapshot {
444                job_id: self.job_id,
445                snapshot_revision: 0,
446                lifecycle_state: SolverLifecycleState::Paused,
447                terminal_reason,
448                current_score,
449                best_score,
450                telemetry,
451                solution,
452            });
453
454            let metadata = record.next_metadata(
455                self.job_id,
456                SolverLifecycleState::Paused,
457                Some(snapshot_revision),
458            );
459            let sender = self.slot.sender_clone();
460            (sender, SolverEvent::Paused { metadata })
461        };
462
463        if let Some(sender) = sender {
464            let _ = sender.send(event);
465        }
466
467        let mut guard = self.slot.pause_gate.lock().unwrap();
468        while self.slot.pause_requested.load(Ordering::Acquire) && !self.is_cancel_requested() {
469            guard = self.slot.pause_condvar.wait(guard).unwrap();
470        }
471        drop(guard);
472
473        if self.is_cancel_requested() {
474            return false;
475        }
476
477        self.slot.state.store(SLOT_SOLVING, Ordering::SeqCst);
478        self.emit_non_snapshot_event(
479            SolverLifecycleState::Solving,
480            current_score,
481            best_score,
482            telemetry,
483            EventKind::Resumed,
484        );
485        true
486    }
487
488    pub(crate) fn is_terminal(&self) -> bool {
489        self.current_state().is_terminal()
490    }
491
492    fn current_state(&self) -> SolverLifecycleState {
493        self.slot
494            .raw_state()
495            .expect("runtime accessed a freed job slot")
496    }
497
498    fn emit_non_snapshot_event(
499        &self,
500        lifecycle_state: SolverLifecycleState,
501        current_score: Option<S::Score>,
502        best_score: Option<S::Score>,
503        telemetry: SolverTelemetry,
504        kind: EventKind,
505    ) {
506        let (sender, event) = {
507            let mut record = self.slot.record.lock().unwrap();
508            record.current_score = current_score;
509            record.best_score = best_score;
510            record.telemetry = telemetry;
511            if lifecycle_state != SolverLifecycleState::Paused {
512                record.checkpoint_available = false;
513            }
514            let metadata = record.next_metadata(self.job_id, lifecycle_state, None);
515            let sender = self.slot.sender_clone();
516            let event = match kind {
517                EventKind::Progress => SolverEvent::Progress { metadata },
518                EventKind::Resumed => SolverEvent::Resumed { metadata },
519                EventKind::Cancelled => unreachable!(),
520            };
521            (sender, event)
522        };
523
524        if let Some(sender) = sender {
525            let _ = sender.send(event);
526        }
527    }
528
529    fn emit_non_snapshot_terminal_event(
530        &self,
531        lifecycle_state: SolverLifecycleState,
532        terminal_reason: SolverTerminalReason,
533        current_score: Option<S::Score>,
534        best_score: Option<S::Score>,
535        telemetry: SolverTelemetry,
536        kind: EventKind,
537    ) {
538        let (sender, event) = {
539            let mut record = self.slot.record.lock().unwrap();
540            record.terminal_reason = Some(terminal_reason);
541            record.checkpoint_available = false;
542            record.current_score = current_score;
543            record.best_score = best_score;
544            record.telemetry = telemetry;
545            let metadata = record.next_metadata(self.job_id, lifecycle_state, None);
546            let sender = self.slot.sender_clone();
547            let event = match kind {
548                EventKind::Cancelled => SolverEvent::Cancelled { metadata },
549                EventKind::Progress | EventKind::Resumed => unreachable!(),
550            };
551            (sender, event)
552        };
553
554        if let Some(sender) = sender {
555            let _ = sender.send(event);
556        }
557    }
558}
559
560/// Trait for solutions that can run inside the retained lifecycle manager.
561pub trait Solvable: PlanningSolution + Send + 'static {
562    fn solve(self, runtime: SolverRuntime<Self>);
563}
564
565#[derive(Debug, Clone, Copy, PartialEq, Eq)]
566enum EventKind {
567    Progress,
568    Resumed,
569    Cancelled,
570}
571
572struct JobRecord<S: PlanningSolution> {
573    terminal_reason: Option<SolverTerminalReason>,
574    event_sequence: u64,
575    latest_snapshot_revision: Option<u64>,
576    current_score: Option<S::Score>,
577    best_score: Option<S::Score>,
578    telemetry: SolverTelemetry,
579    checkpoint_available: bool,
580    snapshots: Vec<SolverSnapshot<S>>,
581    failure_message: Option<String>,
582}
583
584impl<S: PlanningSolution> JobRecord<S> {
585    const fn new() -> Self {
586        Self {
587            terminal_reason: None,
588            event_sequence: 0,
589            latest_snapshot_revision: None,
590            current_score: None,
591            best_score: None,
592            telemetry: SolverTelemetry {
593                elapsed_ms: 0,
594                step_count: 0,
595                moves_evaluated: 0,
596                moves_accepted: 0,
597                score_calculations: 0,
598                moves_per_second: 0,
599                acceptance_rate: 0.0,
600            },
601            checkpoint_available: false,
602            snapshots: Vec::new(),
603            failure_message: None,
604        }
605    }
606
607    fn reset(&mut self) {
608        self.terminal_reason = None;
609        self.event_sequence = 0;
610        self.latest_snapshot_revision = None;
611        self.current_score = None;
612        self.best_score = None;
613        self.telemetry = SolverTelemetry {
614            elapsed_ms: 0,
615            step_count: 0,
616            moves_evaluated: 0,
617            moves_accepted: 0,
618            score_calculations: 0,
619            moves_per_second: 0,
620            acceptance_rate: 0.0,
621        };
622        self.checkpoint_available = false;
623        self.snapshots.clear();
624        self.failure_message = None;
625    }
626
627    fn push_snapshot(&mut self, mut snapshot: SolverSnapshot<S>) -> u64 {
628        let next = self.latest_snapshot_revision.unwrap_or(0) + 1;
629        snapshot.snapshot_revision = next;
630        self.latest_snapshot_revision = Some(next);
631        self.snapshots.push(snapshot);
632        next
633    }
634
635    fn next_metadata(
636        &mut self,
637        job_id: usize,
638        lifecycle_state: SolverLifecycleState,
639        snapshot_revision: Option<u64>,
640    ) -> SolverEventMetadata<S::Score> {
641        self.event_sequence += 1;
642        SolverEventMetadata {
643            job_id,
644            event_sequence: self.event_sequence,
645            lifecycle_state,
646            terminal_reason: self.terminal_reason,
647            telemetry: self.telemetry,
648            current_score: self.current_score,
649            best_score: self.best_score,
650            snapshot_revision: snapshot_revision.or(self.latest_snapshot_revision),
651        }
652    }
653
654    fn status(
655        &self,
656        job_id: usize,
657        lifecycle_state: SolverLifecycleState,
658    ) -> SolverStatus<S::Score> {
659        SolverStatus {
660            job_id,
661            lifecycle_state,
662            terminal_reason: self.terminal_reason,
663            checkpoint_available: self.checkpoint_available,
664            event_sequence: self.event_sequence,
665            latest_snapshot_revision: self.latest_snapshot_revision,
666            current_score: self.current_score,
667            best_score: self.best_score,
668            telemetry: self.telemetry,
669        }
670    }
671}
672
673struct JobSlot<S: PlanningSolution> {
674    state: AtomicU8,
675    visibility: AtomicU8,
676    terminate: AtomicBool,
677    pause_requested: AtomicBool,
678    worker_running: AtomicBool,
679    sender: Mutex<Option<mpsc::UnboundedSender<SolverEvent<S>>>>,
680    record: Mutex<JobRecord<S>>,
681    pause_gate: Mutex<()>,
682    pause_condvar: Condvar,
683}
684
685impl<S: PlanningSolution> JobSlot<S> {
686    const fn new() -> Self {
687        Self {
688            state: AtomicU8::new(SLOT_FREE),
689            visibility: AtomicU8::new(SLOT_VISIBLE),
690            terminate: AtomicBool::new(false),
691            pause_requested: AtomicBool::new(false),
692            worker_running: AtomicBool::new(false),
693            sender: Mutex::new(None),
694            record: Mutex::new(JobRecord::new()),
695            pause_gate: Mutex::new(()),
696            pause_condvar: Condvar::new(),
697        }
698    }
699
700    fn sender_clone(&self) -> Option<mpsc::UnboundedSender<SolverEvent<S>>> {
701        self.sender.lock().unwrap().clone()
702    }
703
704    fn initialize(&self, sender: mpsc::UnboundedSender<SolverEvent<S>>) {
705        self.terminate.store(false, Ordering::Release);
706        self.pause_requested.store(false, Ordering::Release);
707        self.worker_running.store(true, Ordering::Release);
708        self.visibility.store(SLOT_VISIBLE, Ordering::Release);
709        *self.sender.lock().unwrap() = Some(sender);
710        self.record.lock().unwrap().reset();
711    }
712
713    fn reset(&self) {
714        self.terminate.store(false, Ordering::Release);
715        self.pause_requested.store(false, Ordering::Release);
716        self.worker_running.store(false, Ordering::Release);
717        *self.sender.lock().unwrap() = None;
718        self.record.lock().unwrap().reset();
719        self.state.store(SLOT_FREE, Ordering::Release);
720        self.visibility.store(SLOT_VISIBLE, Ordering::Release);
721    }
722
723    fn mark_deleted(&self) {
724        self.visibility.store(SLOT_DELETED, Ordering::Release);
725        *self.sender.lock().unwrap() = None;
726    }
727
728    fn worker_exited(&self) {
729        self.worker_running.store(false, Ordering::Release);
730        self.try_reset_deleted();
731    }
732
733    fn try_reset_deleted(&self) {
734        if self.worker_running.load(Ordering::Acquire) {
735            return;
736        }
737
738        if self
739            .visibility
740            .compare_exchange(
741                SLOT_DELETED,
742                SLOT_DELETING,
743                Ordering::AcqRel,
744                Ordering::Acquire,
745            )
746            .is_ok()
747        {
748            self.reset();
749        }
750    }
751
752    fn raw_state(&self) -> Option<SolverLifecycleState> {
753        match self.state.load(Ordering::Acquire) {
754            SLOT_SOLVING => Some(SolverLifecycleState::Solving),
755            SLOT_PAUSE_REQUESTED => Some(SolverLifecycleState::PauseRequested),
756            SLOT_PAUSED => Some(SolverLifecycleState::Paused),
757            SLOT_COMPLETED => Some(SolverLifecycleState::Completed),
758            SLOT_CANCELLED => Some(SolverLifecycleState::Cancelled),
759            SLOT_FAILED => Some(SolverLifecycleState::Failed),
760            _ => None,
761        }
762    }
763
764    fn public_state(&self) -> Option<SolverLifecycleState> {
765        if self.visibility.load(Ordering::Acquire) != SLOT_VISIBLE {
766            return None;
767        }
768
769        self.raw_state()
770    }
771}
772
773/// Manages retained async solve jobs with lifecycle-complete event streaming.
774pub struct SolverManager<S: Solvable> {
775    slots: [JobSlot<S>; MAX_JOBS],
776    _phantom: std::marker::PhantomData<fn() -> S>,
777}
778
779impl<S: Solvable> Default for SolverManager<S> {
780    fn default() -> Self {
781        Self::new()
782    }
783}
784
785impl<S: Solvable> SolverManager<S>
786where
787    S::Score: Score,
788{
789    pub const fn new() -> Self {
790        Self {
791            slots: [
792                JobSlot::new(),
793                JobSlot::new(),
794                JobSlot::new(),
795                JobSlot::new(),
796                JobSlot::new(),
797                JobSlot::new(),
798                JobSlot::new(),
799                JobSlot::new(),
800                JobSlot::new(),
801                JobSlot::new(),
802                JobSlot::new(),
803                JobSlot::new(),
804                JobSlot::new(),
805                JobSlot::new(),
806                JobSlot::new(),
807                JobSlot::new(),
808            ],
809            _phantom: std::marker::PhantomData,
810        }
811    }
812
813    pub fn solve(
814        &'static self,
815        solution: S,
816    ) -> Result<(usize, mpsc::UnboundedReceiver<SolverEvent<S>>), SolverManagerError> {
817        let (sender, receiver) = mpsc::unbounded_channel();
818
819        let Some(slot_idx) = self.slots.iter().position(|slot| {
820            slot.state
821                .compare_exchange(SLOT_FREE, SLOT_SOLVING, Ordering::SeqCst, Ordering::SeqCst)
822                .is_ok()
823        }) else {
824            return Err(SolverManagerError::NoFreeJobSlots);
825        };
826
827        let slot = &self.slots[slot_idx];
828        slot.initialize(sender);
829        let runtime = SolverRuntime::new(slot_idx, slot);
830
831        rayon::spawn(move || {
832            let result = std::panic::catch_unwind(AssertUnwindSafe(|| solution.solve(runtime)));
833
834            match result {
835                Ok(()) => {
836                    if !runtime.is_terminal() {
837                        if runtime.is_cancel_requested() {
838                            let (current_score, best_score, telemetry) = {
839                                let record = runtime.slot.record.lock().unwrap();
840                                (record.current_score, record.best_score, record.telemetry)
841                            };
842                            runtime.emit_cancelled(current_score, best_score, telemetry);
843                        } else {
844                            runtime.emit_failed(
845                                "solver returned without emitting a terminal lifecycle event"
846                                    .to_string(),
847                            );
848                        }
849                    }
850                }
851                Err(payload) => {
852                    runtime.emit_failed(panic_payload_to_string(payload));
853                }
854            }
855
856            runtime.slot.worker_exited();
857        });
858
859        Ok((slot_idx, receiver))
860    }
861
862    pub fn get_status(&self, job_id: usize) -> Result<SolverStatus<S::Score>, SolverManagerError> {
863        let slot = self.slot(job_id)?;
864        let state = slot
865            .public_state()
866            .ok_or(SolverManagerError::JobNotFound { job_id })?;
867        let record = slot.record.lock().unwrap();
868        Ok(record.status(job_id, state))
869    }
870
871    pub fn pause(&self, job_id: usize) -> Result<(), SolverManagerError> {
872        let slot = self.slot(job_id)?;
873        match slot.state.compare_exchange(
874            SLOT_SOLVING,
875            SLOT_PAUSE_REQUESTED,
876            Ordering::SeqCst,
877            Ordering::SeqCst,
878        ) {
879            Ok(_) => {
880                slot.pause_requested.store(true, Ordering::SeqCst);
881                let (sender, event) = {
882                    let mut record = slot.record.lock().unwrap();
883                    let metadata =
884                        record.next_metadata(job_id, SolverLifecycleState::PauseRequested, None);
885                    let sender = slot.sender_clone();
886                    (sender, SolverEvent::PauseRequested { metadata })
887                };
888                if let Some(sender) = sender {
889                    let _ = sender.send(event);
890                }
891                Ok(())
892            }
893            Err(_) => {
894                let state = slot
895                    .public_state()
896                    .ok_or(SolverManagerError::JobNotFound { job_id })?;
897                Err(SolverManagerError::InvalidStateTransition {
898                    job_id,
899                    action: "pause",
900                    state,
901                })
902            }
903        }
904    }
905
906    pub fn resume(&self, job_id: usize) -> Result<(), SolverManagerError> {
907        let slot = self.slot(job_id)?;
908        let state = slot
909            .public_state()
910            .ok_or(SolverManagerError::JobNotFound { job_id })?;
911        if state != SolverLifecycleState::Paused {
912            return Err(SolverManagerError::InvalidStateTransition {
913                job_id,
914                action: "resume",
915                state,
916            });
917        }
918
919        slot.pause_requested.store(false, Ordering::SeqCst);
920        slot.pause_condvar.notify_one();
921        Ok(())
922    }
923
924    pub fn cancel(&self, job_id: usize) -> Result<(), SolverManagerError> {
925        let slot = self.slot(job_id)?;
926        let state = slot
927            .public_state()
928            .ok_or(SolverManagerError::JobNotFound { job_id })?;
929        if !matches!(
930            state,
931            SolverLifecycleState::Solving
932                | SolverLifecycleState::PauseRequested
933                | SolverLifecycleState::Paused
934        ) {
935            return Err(SolverManagerError::InvalidStateTransition {
936                job_id,
937                action: "cancel",
938                state,
939            });
940        }
941
942        slot.terminate.store(true, Ordering::SeqCst);
943        slot.pause_requested.store(false, Ordering::SeqCst);
944        slot.pause_condvar.notify_one();
945        Ok(())
946    }
947
948    pub fn delete(&self, job_id: usize) -> Result<(), SolverManagerError> {
949        let slot = self.slot(job_id)?;
950        let state = slot
951            .public_state()
952            .ok_or(SolverManagerError::JobNotFound { job_id })?;
953        if !state.is_terminal() {
954            return Err(SolverManagerError::InvalidStateTransition {
955                job_id,
956                action: "delete",
957                state,
958            });
959        }
960
961        slot.mark_deleted();
962        slot.try_reset_deleted();
963        Ok(())
964    }
965
966    pub fn get_snapshot(
967        &self,
968        job_id: usize,
969        snapshot_revision: Option<u64>,
970    ) -> Result<SolverSnapshot<S>, SolverManagerError> {
971        let slot = self.slot(job_id)?;
972        if slot.public_state().is_none() {
973            return Err(SolverManagerError::JobNotFound { job_id });
974        }
975
976        let record = slot.record.lock().unwrap();
977        if record.snapshots.is_empty() {
978            return Err(SolverManagerError::NoSnapshotAvailable { job_id });
979        }
980
981        match snapshot_revision {
982            Some(revision) => record
983                .snapshots
984                .iter()
985                .find(|snapshot| snapshot.snapshot_revision == revision)
986                .cloned()
987                .ok_or(SolverManagerError::SnapshotNotFound {
988                    job_id,
989                    snapshot_revision: revision,
990                }),
991            None => Ok(record
992                .snapshots
993                .last()
994                .expect("checked non-empty snapshots")
995                .clone()),
996        }
997    }
998
999    pub fn analyze_snapshot(
1000        &self,
1001        job_id: usize,
1002        snapshot_revision: Option<u64>,
1003    ) -> Result<SolverSnapshotAnalysis<S::Score>, SolverManagerError>
1004    where
1005        S: Analyzable,
1006    {
1007        let snapshot = self.get_snapshot(job_id, snapshot_revision)?;
1008        Ok(SolverSnapshotAnalysis {
1009            job_id,
1010            lifecycle_state: snapshot.lifecycle_state,
1011            terminal_reason: snapshot.terminal_reason,
1012            snapshot_revision: snapshot.snapshot_revision,
1013            analysis: snapshot.solution.analyze(),
1014        })
1015    }
1016
1017    pub fn active_job_count(&self) -> usize {
1018        self.slots
1019            .iter()
1020            .filter(|slot| slot.public_state().is_some())
1021            .count()
1022    }
1023
1024    #[cfg(test)]
1025    pub(crate) fn slot_is_free_for_test(&self, job_id: usize) -> bool {
1026        self.slots
1027            .get(job_id)
1028            .is_some_and(|slot| slot.state.load(Ordering::Acquire) == SLOT_FREE)
1029    }
1030
1031    fn slot(&self, job_id: usize) -> Result<&JobSlot<S>, SolverManagerError> {
1032        self.slots
1033            .get(job_id)
1034            .ok_or(SolverManagerError::JobNotFound { job_id })
1035    }
1036}
1037
1038fn panic_payload_to_string(payload: Box<dyn std::any::Any + Send>) -> String {
1039    if let Some(message) = payload.downcast_ref::<&'static str>() {
1040        (*message).to_string()
1041    } else if let Some(message) = payload.downcast_ref::<String>() {
1042        message.clone()
1043    } else {
1044        "solver panicked with a non-string payload".to_string()
1045    }
1046}