solverforge_solver/manager/solver_manager.rs
1/* SolverManager for async job management.
2
3Provides the high-level API for:
4- Starting solve jobs that stream solutions via tokio channels
5- Tracking solver status per job
6- Early termination of solving jobs
7
8# Zero-Erasure Design
9
10This implementation uses tokio channels for ownership transfer.
11The solver sends owned solutions through the channel - no Clone required.
12Fixed-size slot arrays avoid heap indirection.
13*/
14
15use std::sync::atomic::{AtomicBool, AtomicU8, Ordering};
16
17use solverforge_core::score::Score;
18use tokio::sync::mpsc;
19
20/// Maximum concurrent jobs per SolverManager instance.
21pub const MAX_JOBS: usize = 16;
22
23// Slot states for job lifecycle.
24const SLOT_FREE: u8 = 0;
25const SLOT_SOLVING: u8 = 1;
26const SLOT_DONE: u8 = 2;
27
28/// Trait for solutions that can be solved with channel-based solution streaming.
29///
30/// This trait is implemented by the `#[planning_solution]` macro when
31/// `constraints` is specified. The solver sends owned solutions through
32/// the channel - no Clone required.
33///
34/// Solver progress is logged via `tracing` at INFO/DEBUG levels.
35///
36/// # Type Parameters
37///
38/// The solution must be `Send + 'static` to support async job execution.
39/// Note: `Clone` is NOT required - ownership is transferred via channel.
40pub trait Solvable: solverforge_core::domain::PlanningSolution + Send + 'static {
41 /* Solves the solution, sending each new best through the channel.
42
43 The final solution is sent through the channel before this returns.
44 Ownership of solutions transfers through the channel.
45
46 # Arguments
47
48 * `terminate` - Optional flag to request early termination
49 * `sender` - Channel to send each new best solution (ownership transferred)
50 */
51 fn solve(
52 self,
53 terminate: Option<&AtomicBool>,
54 sender: mpsc::UnboundedSender<(Self, Self::Score)>,
55 );
56}
57
58// Status of a solving job.
59#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
60#[serde(rename_all = "SCREAMING_SNAKE_CASE")]
61pub enum SolverStatus {
62 // Not currently solving.
63 NotSolving,
64 // Actively solving.
65 Solving,
66}
67
68impl SolverStatus {
69 pub fn as_str(self) -> &'static str {
70 match self {
71 SolverStatus::NotSolving => "NOT_SOLVING",
72 SolverStatus::Solving => "SOLVING",
73 }
74 }
75}
76
77// A single job slot for tracking solve state.
78struct JobSlot {
79 // Current slot state (FREE, SOLVING, DONE).
80 state: AtomicU8,
81 // Termination flag - solver checks this periodically.
82 terminate: AtomicBool,
83}
84
85impl JobSlot {
86 const fn new() -> Self {
87 Self {
88 state: AtomicU8::new(SLOT_FREE),
89 terminate: AtomicBool::new(false),
90 }
91 }
92
93 // Resets the slot to free state.
94 fn reset(&self) {
95 self.terminate.store(false, Ordering::Release);
96 self.state.store(SLOT_FREE, Ordering::Release);
97 }
98}
99
100/// Manages async solve jobs with channel-based solution streaming.
101///
102/// Uses fixed-size slot array for job tracking. Solutions stream through
103/// tokio channels - the solver sends owned solutions, users receive them
104/// without cloning.
105///
106/// # Type Parameters
107///
108/// * `S` - Solution type that implements `Solvable`
109///
110/// # Thread Safety
111///
112/// `SolverManager` is thread-safe. Jobs can be started, queried, and terminated
113/// from any thread.
114pub struct SolverManager<S: Solvable> {
115 slots: [JobSlot; MAX_JOBS],
116 _phantom: std::marker::PhantomData<fn() -> S>,
117}
118
119impl<S: Solvable> Default for SolverManager<S> {
120 fn default() -> Self {
121 Self::new()
122 }
123}
124
125impl<S: Solvable> SolverManager<S>
126where
127 S::Score: Score,
128{
129 pub const fn new() -> Self {
130 Self {
131 slots: [
132 JobSlot::new(),
133 JobSlot::new(),
134 JobSlot::new(),
135 JobSlot::new(),
136 JobSlot::new(),
137 JobSlot::new(),
138 JobSlot::new(),
139 JobSlot::new(),
140 JobSlot::new(),
141 JobSlot::new(),
142 JobSlot::new(),
143 JobSlot::new(),
144 JobSlot::new(),
145 JobSlot::new(),
146 JobSlot::new(),
147 JobSlot::new(),
148 ],
149 _phantom: std::marker::PhantomData,
150 }
151 }
152
153 /// Starts solving and returns a receiver for streaming solutions.
154 ///
155 /// The solver runs asynchronously via rayon. Solutions stream through
156 /// the returned receiver as they're found. Each solution is owned -
157 /// no cloning occurs.
158 ///
159 /// # Arguments
160 ///
161 /// * `solution` - The starting solution (ownership transferred)
162 ///
163 /// # Returns
164 ///
165 /// A tuple of (job_id, receiver). The receiver yields `(solution, score)`
166 /// pairs as new best solutions are found.
167 ///
168 /// # Panics
169 ///
170 /// Panics if no free slots are available.
171 pub fn solve(&'static self, solution: S) -> (usize, mpsc::UnboundedReceiver<(S, S::Score)>) {
172 let (sender, receiver) = mpsc::unbounded_channel();
173
174 // Find a free slot
175 let slot_idx = self
176 .slots
177 .iter()
178 .position(|s| {
179 s.state
180 .compare_exchange(SLOT_FREE, SLOT_SOLVING, Ordering::SeqCst, Ordering::SeqCst)
181 .is_ok()
182 })
183 .expect("No free job slots available");
184
185 let slot = &self.slots[slot_idx];
186 slot.terminate.store(false, Ordering::SeqCst);
187
188 // Spawn the solver via rayon
189 rayon::spawn(move || {
190 let terminate_ref = &slot.terminate;
191
192 // solve sends all solutions (including final) through the channel
193 solution.solve(Some(terminate_ref), sender);
194
195 slot.state.store(SLOT_DONE, Ordering::Release);
196 });
197
198 (slot_idx, receiver)
199 }
200
201 pub fn get_status(&self, job_id: usize) -> SolverStatus {
202 if job_id >= MAX_JOBS {
203 return SolverStatus::NotSolving;
204 }
205 match self.slots[job_id].state.load(Ordering::Acquire) {
206 SLOT_SOLVING => SolverStatus::Solving,
207 _ => SolverStatus::NotSolving,
208 }
209 }
210
211 /// Requests early termination of a job.
212 ///
213 /// Returns `true` if the job was found and is currently solving.
214 pub fn terminate_early(&self, job_id: usize) -> bool {
215 if job_id >= MAX_JOBS {
216 return false;
217 }
218
219 let slot = &self.slots[job_id];
220 if slot.state.load(Ordering::Acquire) == SLOT_SOLVING {
221 slot.terminate.store(true, Ordering::SeqCst);
222 true
223 } else {
224 false
225 }
226 }
227
228 /// Frees a job slot after solving completes.
229 ///
230 /// Call this after the receiver is drained to allow reuse of the slot.
231 pub fn free_slot(&self, job_id: usize) {
232 if job_id < MAX_JOBS {
233 self.slots[job_id].reset();
234 }
235 }
236
237 pub fn active_job_count(&self) -> usize {
238 self.slots
239 .iter()
240 .filter(|s| s.state.load(Ordering::Relaxed) == SLOT_SOLVING)
241 .count()
242 }
243}