solverforge_solver/manager/solver_manager.rs
1//! SolverManager for async job management.
2//!
3//! Provides the high-level API for:
4//! - Starting solve jobs that stream solutions via tokio channels
5//! - Tracking solver status per job
6//! - Early termination of solving jobs
7//!
8//! # Zero-Erasure Design
9//!
10//! This implementation uses tokio channels for ownership transfer.
11//! The solver sends owned solutions through the channel - no Clone required.
12//! Fixed-size slot arrays avoid heap indirection.
13
14use std::sync::atomic::{AtomicBool, AtomicU8, Ordering};
15
16use solverforge_core::score::Score;
17use tokio::sync::mpsc;
18
19/// Maximum concurrent jobs per SolverManager instance.
20pub const MAX_JOBS: usize = 16;
21
22/// Slot states for job lifecycle.
23const SLOT_FREE: u8 = 0;
24const SLOT_SOLVING: u8 = 1;
25const SLOT_DONE: u8 = 2;
26
27/// Trait for solutions that can be solved with channel-based solution streaming.
28///
29/// This trait is implemented by the `#[planning_solution]` macro when
30/// `constraints` is specified. The solver sends owned solutions through
31/// the channel - no Clone required.
32///
33/// Solver progress is logged via `tracing` at INFO/DEBUG levels.
34///
35/// # Type Parameters
36///
37/// The solution must be `Send + 'static` to support async job execution.
38/// Note: `Clone` is NOT required - ownership is transferred via channel.
39pub trait Solvable: solverforge_core::domain::PlanningSolution + Send + 'static {
40 /// Solves the solution, sending each new best through the channel.
41 ///
42 /// The final solution is sent through the channel before this returns.
43 /// Ownership of solutions transfers through the channel.
44 ///
45 /// # Arguments
46 ///
47 /// * `terminate` - Optional flag to request early termination
48 /// * `sender` - Channel to send each new best solution (ownership transferred)
49 fn solve(
50 self,
51 terminate: Option<&AtomicBool>,
52 sender: mpsc::UnboundedSender<(Self, Self::Score)>,
53 );
54}
55
56/// Status of a solving job.
57#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
58#[serde(rename_all = "SCREAMING_SNAKE_CASE")]
59pub enum SolverStatus {
60 /// Not currently solving.
61 NotSolving,
62 /// Actively solving.
63 Solving,
64}
65
66impl SolverStatus {
67 /// Returns the status as a string.
68 pub fn as_str(self) -> &'static str {
69 match self {
70 SolverStatus::NotSolving => "NOT_SOLVING",
71 SolverStatus::Solving => "SOLVING",
72 }
73 }
74}
75
76/// A single job slot for tracking solve state.
77struct JobSlot {
78 /// Current slot state (FREE, SOLVING, DONE).
79 state: AtomicU8,
80 /// Termination flag - solver checks this periodically.
81 terminate: AtomicBool,
82}
83
84impl JobSlot {
85 /// Creates an empty job slot.
86 const fn new() -> Self {
87 Self {
88 state: AtomicU8::new(SLOT_FREE),
89 terminate: AtomicBool::new(false),
90 }
91 }
92
93 /// Resets the slot to free state.
94 fn reset(&self) {
95 self.terminate.store(false, Ordering::Release);
96 self.state.store(SLOT_FREE, Ordering::Release);
97 }
98}
99
100/// Manages async solve jobs with channel-based solution streaming.
101///
102/// Uses fixed-size slot array for job tracking. Solutions stream through
103/// tokio channels - the solver sends owned solutions, users receive them
104/// without cloning.
105///
106/// # Type Parameters
107///
108/// * `S` - Solution type that implements `Solvable`
109///
110/// # Thread Safety
111///
112/// `SolverManager` is thread-safe. Jobs can be started, queried, and terminated
113/// from any thread.
114pub struct SolverManager<S: Solvable> {
115 slots: [JobSlot; MAX_JOBS],
116 _phantom: std::marker::PhantomData<fn() -> S>,
117}
118
119impl<S: Solvable> Default for SolverManager<S> {
120 fn default() -> Self {
121 Self::new()
122 }
123}
124
125impl<S: Solvable> SolverManager<S>
126where
127 S::Score: Score,
128{
129 /// Creates a new SolverManager with empty slots.
130 pub const fn new() -> Self {
131 Self {
132 slots: [
133 JobSlot::new(),
134 JobSlot::new(),
135 JobSlot::new(),
136 JobSlot::new(),
137 JobSlot::new(),
138 JobSlot::new(),
139 JobSlot::new(),
140 JobSlot::new(),
141 JobSlot::new(),
142 JobSlot::new(),
143 JobSlot::new(),
144 JobSlot::new(),
145 JobSlot::new(),
146 JobSlot::new(),
147 JobSlot::new(),
148 JobSlot::new(),
149 ],
150 _phantom: std::marker::PhantomData,
151 }
152 }
153
154 /// Starts solving and returns a receiver for streaming solutions.
155 ///
156 /// The solver runs asynchronously via rayon. Solutions stream through
157 /// the returned receiver as they're found. Each solution is owned -
158 /// no cloning occurs.
159 ///
160 /// # Arguments
161 ///
162 /// * `solution` - The starting solution (ownership transferred)
163 ///
164 /// # Returns
165 ///
166 /// A tuple of (job_id, receiver). The receiver yields `(solution, score)`
167 /// pairs as new best solutions are found.
168 ///
169 /// # Panics
170 ///
171 /// Panics if no free slots are available.
172 pub fn solve(&'static self, solution: S) -> (usize, mpsc::UnboundedReceiver<(S, S::Score)>) {
173 let (sender, receiver) = mpsc::unbounded_channel();
174
175 // Find a free slot
176 let slot_idx = self
177 .slots
178 .iter()
179 .position(|s| {
180 s.state
181 .compare_exchange(SLOT_FREE, SLOT_SOLVING, Ordering::SeqCst, Ordering::SeqCst)
182 .is_ok()
183 })
184 .expect("No free job slots available");
185
186 let slot = &self.slots[slot_idx];
187 slot.terminate.store(false, Ordering::SeqCst);
188
189 // Spawn the solver via rayon
190 rayon::spawn(move || {
191 let terminate_ref = &slot.terminate;
192
193 // solve sends all solutions (including final) through the channel
194 solution.solve(Some(terminate_ref), sender);
195
196 slot.state.store(SLOT_DONE, Ordering::Release);
197 });
198
199 (slot_idx, receiver)
200 }
201
202 /// Gets the solver status for a job.
203 pub fn get_status(&self, job_id: usize) -> SolverStatus {
204 if job_id >= MAX_JOBS {
205 return SolverStatus::NotSolving;
206 }
207 match self.slots[job_id].state.load(Ordering::Acquire) {
208 SLOT_SOLVING => SolverStatus::Solving,
209 _ => SolverStatus::NotSolving,
210 }
211 }
212
213 /// Requests early termination of a job.
214 ///
215 /// Returns `true` if the job was found and is currently solving.
216 pub fn terminate_early(&self, job_id: usize) -> bool {
217 if job_id >= MAX_JOBS {
218 return false;
219 }
220
221 let slot = &self.slots[job_id];
222 if slot.state.load(Ordering::Acquire) == SLOT_SOLVING {
223 slot.terminate.store(true, Ordering::SeqCst);
224 true
225 } else {
226 false
227 }
228 }
229
230 /// Frees a job slot after solving completes.
231 ///
232 /// Call this after the receiver is drained to allow reuse of the slot.
233 pub fn free_slot(&self, job_id: usize) {
234 if job_id < MAX_JOBS {
235 self.slots[job_id].reset();
236 }
237 }
238
239 /// Returns the number of active (solving) jobs.
240 pub fn active_job_count(&self) -> usize {
241 self.slots
242 .iter()
243 .filter(|s| s.state.load(Ordering::Relaxed) == SLOT_SOLVING)
244 .count()
245 }
246}