solverforge_solver/manager/solver_manager.rs
1/* SolverManager for async job management.
2
3Provides the high-level API for:
4- Starting solve jobs that stream solutions via tokio channels
5- Tracking solver status per job
6- Early termination of solving jobs
7
8# Zero-Erasure Design
9
10This implementation uses tokio channels for ownership transfer.
11The solver sends owned solutions through the channel - no Clone required.
12Fixed-size slot arrays avoid heap indirection.
13*/
14
15use std::sync::atomic::{AtomicBool, AtomicU8, Ordering};
16
17use solverforge_core::domain::PlanningSolution;
18use solverforge_core::score::Score;
19use tokio::sync::mpsc;
20
21use crate::stats::SolverTelemetry;
22
23/// Maximum concurrent jobs per SolverManager instance.
24pub const MAX_JOBS: usize = 16;
25
26// Slot states for job lifecycle.
27const SLOT_FREE: u8 = 0;
28const SLOT_SOLVING: u8 = 1;
29const SLOT_DONE: u8 = 2;
30
31#[derive(Debug)]
32pub enum SolverEvent<S: PlanningSolution> {
33 Progress {
34 current_score: Option<S::Score>,
35 best_score: Option<S::Score>,
36 telemetry: SolverTelemetry,
37 },
38 BestSolution {
39 solution: S,
40 score: S::Score,
41 telemetry: SolverTelemetry,
42 },
43 Finished {
44 solution: S,
45 score: S::Score,
46 telemetry: SolverTelemetry,
47 },
48}
49
50/// Trait for solutions that can be solved with channel-based solution streaming.
51///
52/// This trait is implemented by the `#[planning_solution]` macro when
53/// `constraints` is specified. The solver sends owned solutions through
54/// the channel - no Clone required.
55///
56/// Solver progress is logged via `tracing` at INFO/DEBUG levels.
57///
58/// # Type Parameters
59///
60/// The solution must be `Send + 'static` to support async job execution.
61/// Note: `Clone` is NOT required - ownership is transferred via channel.
62pub trait Solvable: solverforge_core::domain::PlanningSolution + Send + 'static {
63 /* Solves the solution, sending progress and owned solutions through the channel.
64
65 The final solution is sent through the channel before this returns.
66 Ownership of solutions transfers through the channel.
67
68 # Arguments
69
70 * `terminate` - Optional flag to request early termination
71 * `sender` - Channel to send each new best solution (ownership transferred)
72 */
73 fn solve(
74 self,
75 terminate: Option<&AtomicBool>,
76 sender: mpsc::UnboundedSender<SolverEvent<Self>>,
77 );
78}
79
80// Status of a solving job.
81#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
82#[serde(rename_all = "SCREAMING_SNAKE_CASE")]
83pub enum SolverStatus {
84 // Not currently solving.
85 NotSolving,
86 // Actively solving.
87 Solving,
88}
89
90impl SolverStatus {
91 pub fn as_str(self) -> &'static str {
92 match self {
93 SolverStatus::NotSolving => "NOT_SOLVING",
94 SolverStatus::Solving => "SOLVING",
95 }
96 }
97}
98
99// A single job slot for tracking solve state.
100struct JobSlot {
101 // Current slot state (FREE, SOLVING, DONE).
102 state: AtomicU8,
103 // Termination flag - solver checks this periodically.
104 terminate: AtomicBool,
105}
106
107impl JobSlot {
108 const fn new() -> Self {
109 Self {
110 state: AtomicU8::new(SLOT_FREE),
111 terminate: AtomicBool::new(false),
112 }
113 }
114
115 // Resets the slot to free state.
116 fn reset(&self) {
117 self.terminate.store(false, Ordering::Release);
118 self.state.store(SLOT_FREE, Ordering::Release);
119 }
120}
121
122/// Manages async solve jobs with channel-based solution streaming.
123///
124/// Uses fixed-size slot array for job tracking. Solutions stream through
125/// tokio channels - the solver sends owned solutions, users receive them
126/// without cloning.
127///
128/// # Type Parameters
129///
130/// * `S` - Solution type that implements `Solvable`
131///
132/// # Thread Safety
133///
134/// `SolverManager` is thread-safe. Jobs can be started, queried, and terminated
135/// from any thread.
136pub struct SolverManager<S: Solvable> {
137 slots: [JobSlot; MAX_JOBS],
138 _phantom: std::marker::PhantomData<fn() -> S>,
139}
140
141impl<S: Solvable> Default for SolverManager<S> {
142 fn default() -> Self {
143 Self::new()
144 }
145}
146
147impl<S: Solvable> SolverManager<S>
148where
149 S::Score: Score,
150{
151 pub const fn new() -> Self {
152 Self {
153 slots: [
154 JobSlot::new(),
155 JobSlot::new(),
156 JobSlot::new(),
157 JobSlot::new(),
158 JobSlot::new(),
159 JobSlot::new(),
160 JobSlot::new(),
161 JobSlot::new(),
162 JobSlot::new(),
163 JobSlot::new(),
164 JobSlot::new(),
165 JobSlot::new(),
166 JobSlot::new(),
167 JobSlot::new(),
168 JobSlot::new(),
169 JobSlot::new(),
170 ],
171 _phantom: std::marker::PhantomData,
172 }
173 }
174
175 /// Starts solving and returns a receiver for streaming solutions.
176 ///
177 /// The solver runs asynchronously via rayon. Solutions stream through
178 /// the returned receiver as they're found. Each solution is owned -
179 /// no cloning occurs.
180 ///
181 /// # Arguments
182 ///
183 /// * `solution` - The starting solution (ownership transferred)
184 ///
185 /// # Returns
186 ///
187 /// A tuple of (job_id, receiver). The receiver yields `(solution, score)`
188 /// pairs as new best solutions are found.
189 ///
190 /// # Panics
191 ///
192 /// Panics if no free slots are available.
193 pub fn solve(&'static self, solution: S) -> (usize, mpsc::UnboundedReceiver<SolverEvent<S>>) {
194 let (sender, receiver) = mpsc::unbounded_channel();
195
196 // Find a free slot
197 let slot_idx = self
198 .slots
199 .iter()
200 .position(|s| {
201 s.state
202 .compare_exchange(SLOT_FREE, SLOT_SOLVING, Ordering::SeqCst, Ordering::SeqCst)
203 .is_ok()
204 })
205 .expect("No free job slots available");
206
207 let slot = &self.slots[slot_idx];
208 slot.terminate.store(false, Ordering::SeqCst);
209
210 // Spawn the solver via rayon
211 rayon::spawn(move || {
212 let terminate_ref = &slot.terminate;
213
214 // solve sends all solutions (including final) through the channel
215 solution.solve(Some(terminate_ref), sender);
216
217 slot.state.store(SLOT_DONE, Ordering::Release);
218 });
219
220 (slot_idx, receiver)
221 }
222
223 pub fn get_status(&self, job_id: usize) -> SolverStatus {
224 if job_id >= MAX_JOBS {
225 return SolverStatus::NotSolving;
226 }
227 match self.slots[job_id].state.load(Ordering::Acquire) {
228 SLOT_SOLVING => SolverStatus::Solving,
229 _ => SolverStatus::NotSolving,
230 }
231 }
232
233 /// Requests early termination of a job.
234 ///
235 /// Returns `true` if the job was found and is currently solving.
236 pub fn terminate_early(&self, job_id: usize) -> bool {
237 if job_id >= MAX_JOBS {
238 return false;
239 }
240
241 let slot = &self.slots[job_id];
242 if slot.state.load(Ordering::Acquire) == SLOT_SOLVING {
243 slot.terminate.store(true, Ordering::SeqCst);
244 true
245 } else {
246 false
247 }
248 }
249
250 /// Frees a job slot after solving completes.
251 ///
252 /// Call this after the receiver is drained to allow reuse of the slot.
253 pub fn free_slot(&self, job_id: usize) {
254 if job_id < MAX_JOBS {
255 self.slots[job_id].reset();
256 }
257 }
258
259 pub fn active_job_count(&self) -> usize {
260 self.slots
261 .iter()
262 .filter(|s| s.state.load(Ordering::Relaxed) == SLOT_SOLVING)
263 .count()
264 }
265}