Skip to main content

solverforge_solver/manager/solver_manager/
manager.rs

1use std::marker::PhantomData;
2use std::panic::AssertUnwindSafe;
3use std::sync::atomic::Ordering;
4
5use solverforge_core::domain::PlanningSolution;
6use solverforge_core::score::Score;
7use tokio::sync::mpsc;
8
9use super::super::solution_manager::Analyzable;
10use super::runtime::{panic_payload_to_string, SolverRuntime};
11use super::slot::{JobSlot, SLOT_FREE, SLOT_SOLVING};
12use super::types::{
13    SolverEvent, SolverLifecycleState, SolverManagerError, SolverSnapshot, SolverSnapshotAnalysis,
14    SolverStatus,
15};
16
17/// Maximum concurrent jobs per SolverManager instance.
18pub const MAX_JOBS: usize = 16;
19
20/// Trait for solutions that can run inside the retained lifecycle manager.
21pub trait Solvable: PlanningSolution + Send + 'static {
22    fn solve(self, runtime: SolverRuntime<Self>);
23}
24
25/// Manages retained async solve jobs with lifecycle-complete event streaming.
26pub struct SolverManager<S: Solvable> {
27    slots: [JobSlot<S>; MAX_JOBS],
28    _phantom: PhantomData<fn() -> S>,
29}
30
31impl<S: Solvable> Default for SolverManager<S> {
32    fn default() -> Self {
33        Self::new()
34    }
35}
36
37impl<S: Solvable> SolverManager<S>
38where
39    S::Score: Score,
40{
41    pub const fn new() -> Self {
42        Self {
43            slots: [
44                JobSlot::new(),
45                JobSlot::new(),
46                JobSlot::new(),
47                JobSlot::new(),
48                JobSlot::new(),
49                JobSlot::new(),
50                JobSlot::new(),
51                JobSlot::new(),
52                JobSlot::new(),
53                JobSlot::new(),
54                JobSlot::new(),
55                JobSlot::new(),
56                JobSlot::new(),
57                JobSlot::new(),
58                JobSlot::new(),
59                JobSlot::new(),
60            ],
61            _phantom: PhantomData,
62        }
63    }
64
65    pub fn solve(
66        &'static self,
67        solution: S,
68    ) -> Result<(usize, mpsc::UnboundedReceiver<SolverEvent<S>>), SolverManagerError> {
69        let (sender, receiver) = mpsc::unbounded_channel();
70
71        let Some(slot_idx) = self.slots.iter().position(|slot| {
72            slot.state
73                .compare_exchange(SLOT_FREE, SLOT_SOLVING, Ordering::SeqCst, Ordering::SeqCst)
74                .is_ok()
75        }) else {
76            return Err(SolverManagerError::NoFreeJobSlots);
77        };
78
79        let slot = &self.slots[slot_idx];
80        slot.initialize(sender);
81        let runtime = SolverRuntime::new(slot_idx, slot);
82
83        rayon::spawn(move || {
84            let result = std::panic::catch_unwind(AssertUnwindSafe(|| solution.solve(runtime)));
85
86            match result {
87                Ok(()) => {
88                    if !runtime.is_terminal() {
89                        if runtime.is_cancel_requested() {
90                            let (current_score, best_score, telemetry) = {
91                                let record = runtime.slot.record.lock().unwrap();
92                                (
93                                    record.current_score,
94                                    record.best_score,
95                                    record.telemetry.clone(),
96                                )
97                            };
98                            runtime.emit_cancelled(current_score, best_score, telemetry);
99                        } else {
100                            runtime.emit_failed(
101                                "solver returned without emitting a terminal lifecycle event"
102                                    .to_string(),
103                            );
104                        }
105                    }
106                }
107                Err(payload) => {
108                    runtime.emit_failed(panic_payload_to_string(payload));
109                }
110            }
111
112            runtime.slot.worker_exited();
113        });
114
115        Ok((slot_idx, receiver))
116    }
117
118    pub fn get_status(&self, job_id: usize) -> Result<SolverStatus<S::Score>, SolverManagerError> {
119        let slot = self.slot(job_id)?;
120        let state = slot
121            .public_state()
122            .ok_or(SolverManagerError::JobNotFound { job_id })?;
123        let record = slot.record.lock().unwrap();
124        Ok(record.status(job_id, state))
125    }
126
127    pub fn pause(&self, job_id: usize) -> Result<(), SolverManagerError> {
128        let slot = self.slot(job_id)?;
129        let paused = slot.with_publication(|sender, record| {
130            match slot.state.compare_exchange(
131                SLOT_SOLVING,
132                super::slot::SLOT_PAUSE_REQUESTED,
133                Ordering::SeqCst,
134                Ordering::SeqCst,
135            ) {
136                Ok(_) => {
137                    slot.pause_requested.store(true, Ordering::SeqCst);
138                    let metadata =
139                        record.next_metadata(job_id, SolverLifecycleState::PauseRequested, None);
140                    if let Some(sender) = sender {
141                        let _ = sender.send(SolverEvent::PauseRequested { metadata });
142                    }
143                    true
144                }
145                Err(_) => false,
146            }
147        });
148        if paused {
149            Ok(())
150        } else {
151            let state = slot
152                .public_state()
153                .ok_or(SolverManagerError::JobNotFound { job_id })?;
154            Err(SolverManagerError::InvalidStateTransition {
155                job_id,
156                action: "pause",
157                state,
158            })
159        }
160    }
161
162    pub fn resume(&self, job_id: usize) -> Result<(), SolverManagerError> {
163        let slot = self.slot(job_id)?;
164        let state = slot
165            .public_state()
166            .ok_or(SolverManagerError::JobNotFound { job_id })?;
167        if state != SolverLifecycleState::Paused {
168            return Err(SolverManagerError::InvalidStateTransition {
169                job_id,
170                action: "resume",
171                state,
172            });
173        }
174
175        slot.pause_requested.store(false, Ordering::SeqCst);
176        slot.pause_condvar.notify_one();
177        Ok(())
178    }
179
180    pub fn cancel(&self, job_id: usize) -> Result<(), SolverManagerError> {
181        let slot = self.slot(job_id)?;
182        let state = slot
183            .public_state()
184            .ok_or(SolverManagerError::JobNotFound { job_id })?;
185        if !matches!(
186            state,
187            SolverLifecycleState::Solving
188                | SolverLifecycleState::PauseRequested
189                | SolverLifecycleState::Paused
190        ) {
191            return Err(SolverManagerError::InvalidStateTransition {
192                job_id,
193                action: "cancel",
194                state,
195            });
196        }
197
198        slot.terminate.store(true, Ordering::SeqCst);
199        slot.pause_requested.store(false, Ordering::SeqCst);
200        slot.pause_condvar.notify_one();
201        Ok(())
202    }
203
204    pub fn delete(&self, job_id: usize) -> Result<(), SolverManagerError> {
205        let slot = self.slot(job_id)?;
206        let state = slot
207            .public_state()
208            .ok_or(SolverManagerError::JobNotFound { job_id })?;
209        if !state.is_terminal() {
210            return Err(SolverManagerError::InvalidStateTransition {
211                job_id,
212                action: "delete",
213                state,
214            });
215        }
216
217        slot.mark_deleted();
218        slot.try_reset_deleted();
219        Ok(())
220    }
221
222    pub fn get_snapshot(
223        &self,
224        job_id: usize,
225        snapshot_revision: Option<u64>,
226    ) -> Result<SolverSnapshot<S>, SolverManagerError> {
227        let slot = self.slot(job_id)?;
228        if slot.public_state().is_none() {
229            return Err(SolverManagerError::JobNotFound { job_id });
230        }
231
232        let record = slot.record.lock().unwrap();
233        if record.snapshots.is_empty() {
234            return Err(SolverManagerError::NoSnapshotAvailable { job_id });
235        }
236
237        match snapshot_revision {
238            Some(revision) => record
239                .snapshots
240                .iter()
241                .find(|snapshot| snapshot.snapshot_revision == revision)
242                .cloned()
243                .ok_or(SolverManagerError::SnapshotNotFound {
244                    job_id,
245                    snapshot_revision: revision,
246                }),
247            None => Ok(record
248                .snapshots
249                .last()
250                .expect("checked non-empty snapshots")
251                .clone()),
252        }
253    }
254
255    pub fn analyze_snapshot(
256        &self,
257        job_id: usize,
258        snapshot_revision: Option<u64>,
259    ) -> Result<SolverSnapshotAnalysis<S::Score>, SolverManagerError>
260    where
261        S: Analyzable,
262    {
263        let snapshot = self.get_snapshot(job_id, snapshot_revision)?;
264        Ok(SolverSnapshotAnalysis {
265            job_id,
266            lifecycle_state: snapshot.lifecycle_state,
267            terminal_reason: snapshot.terminal_reason,
268            snapshot_revision: snapshot.snapshot_revision,
269            analysis: snapshot.solution.analyze(),
270        })
271    }
272
273    pub fn active_job_count(&self) -> usize {
274        self.slots
275            .iter()
276            .filter(|slot| slot.public_state().is_some())
277            .count()
278    }
279
280    #[cfg(test)]
281    pub(crate) fn slot_is_free_for_test(&self, job_id: usize) -> bool {
282        self.slots
283            .get(job_id)
284            .is_some_and(|slot| slot.state.load(Ordering::Acquire) == SLOT_FREE)
285    }
286
287    fn slot(&self, job_id: usize) -> Result<&JobSlot<S>, SolverManagerError> {
288        self.slots
289            .get(job_id)
290            .ok_or(SolverManagerError::JobNotFound { job_id })
291    }
292}