Skip to main content

solverforge_solver/manager/
solver_manager.rs

1/* SolverManager for async job management.
2
3Provides the high-level API for:
4- Starting solve jobs that stream solutions via tokio channels
5- Tracking solver status per job
6- Early termination of solving jobs
7
8# Zero-Erasure Design
9
10This implementation uses tokio channels for ownership transfer.
11The solver sends owned solutions through the channel - no Clone required.
12Fixed-size slot arrays avoid heap indirection.
13*/
14
15use std::sync::atomic::{AtomicBool, AtomicU8, Ordering};
16
17use solverforge_core::score::Score;
18use tokio::sync::mpsc;
19
20/// Maximum concurrent jobs per SolverManager instance.
21pub const MAX_JOBS: usize = 16;
22
23// Slot states for job lifecycle.
24const SLOT_FREE: u8 = 0;
25const SLOT_SOLVING: u8 = 1;
26const SLOT_DONE: u8 = 2;
27
28/// Trait for solutions that can be solved with channel-based solution streaming.
29///
30/// This trait is implemented by the `#[planning_solution]` macro when
31/// `constraints` is specified. The solver sends owned solutions through
32/// the channel - no Clone required.
33///
34/// Solver progress is logged via `tracing` at INFO/DEBUG levels.
35///
36/// # Type Parameters
37///
38/// The solution must be `Send + 'static` to support async job execution.
39/// Note: `Clone` is NOT required - ownership is transferred via channel.
40pub trait Solvable: solverforge_core::domain::PlanningSolution + Send + 'static {
41    /* Solves the solution, sending each new best through the channel.
42
43    The final solution is sent through the channel before this returns.
44    Ownership of solutions transfers through the channel.
45
46    # Arguments
47
48    * `terminate` - Optional flag to request early termination
49    * `sender` - Channel to send each new best solution (ownership transferred)
50    */
51    fn solve(
52        self,
53        terminate: Option<&AtomicBool>,
54        sender: mpsc::UnboundedSender<(Self, Self::Score)>,
55    );
56}
57
58// Status of a solving job.
59#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
60#[serde(rename_all = "SCREAMING_SNAKE_CASE")]
61pub enum SolverStatus {
62    // Not currently solving.
63    NotSolving,
64    // Actively solving.
65    Solving,
66}
67
68impl SolverStatus {
69    pub fn as_str(self) -> &'static str {
70        match self {
71            SolverStatus::NotSolving => "NOT_SOLVING",
72            SolverStatus::Solving => "SOLVING",
73        }
74    }
75}
76
77// A single job slot for tracking solve state.
78struct JobSlot {
79    // Current slot state (FREE, SOLVING, DONE).
80    state: AtomicU8,
81    // Termination flag - solver checks this periodically.
82    terminate: AtomicBool,
83}
84
85impl JobSlot {
86    const fn new() -> Self {
87        Self {
88            state: AtomicU8::new(SLOT_FREE),
89            terminate: AtomicBool::new(false),
90        }
91    }
92
93    // Resets the slot to free state.
94    fn reset(&self) {
95        self.terminate.store(false, Ordering::Release);
96        self.state.store(SLOT_FREE, Ordering::Release);
97    }
98}
99
100/// Manages async solve jobs with channel-based solution streaming.
101///
102/// Uses fixed-size slot array for job tracking. Solutions stream through
103/// tokio channels - the solver sends owned solutions, users receive them
104/// without cloning.
105///
106/// # Type Parameters
107///
108/// * `S` - Solution type that implements `Solvable`
109///
110/// # Thread Safety
111///
112/// `SolverManager` is thread-safe. Jobs can be started, queried, and terminated
113/// from any thread.
114pub struct SolverManager<S: Solvable> {
115    slots: [JobSlot; MAX_JOBS],
116    _phantom: std::marker::PhantomData<fn() -> S>,
117}
118
119impl<S: Solvable> Default for SolverManager<S> {
120    fn default() -> Self {
121        Self::new()
122    }
123}
124
125impl<S: Solvable> SolverManager<S>
126where
127    S::Score: Score,
128{
129    pub const fn new() -> Self {
130        Self {
131            slots: [
132                JobSlot::new(),
133                JobSlot::new(),
134                JobSlot::new(),
135                JobSlot::new(),
136                JobSlot::new(),
137                JobSlot::new(),
138                JobSlot::new(),
139                JobSlot::new(),
140                JobSlot::new(),
141                JobSlot::new(),
142                JobSlot::new(),
143                JobSlot::new(),
144                JobSlot::new(),
145                JobSlot::new(),
146                JobSlot::new(),
147                JobSlot::new(),
148            ],
149            _phantom: std::marker::PhantomData,
150        }
151    }
152
153    /// Starts solving and returns a receiver for streaming solutions.
154    ///
155    /// The solver runs asynchronously via rayon. Solutions stream through
156    /// the returned receiver as they're found. Each solution is owned -
157    /// no cloning occurs.
158    ///
159    /// # Arguments
160    ///
161    /// * `solution` - The starting solution (ownership transferred)
162    ///
163    /// # Returns
164    ///
165    /// A tuple of (job_id, receiver). The receiver yields `(solution, score)`
166    /// pairs as new best solutions are found.
167    ///
168    /// # Panics
169    ///
170    /// Panics if no free slots are available.
171    pub fn solve(&'static self, solution: S) -> (usize, mpsc::UnboundedReceiver<(S, S::Score)>) {
172        let (sender, receiver) = mpsc::unbounded_channel();
173
174        // Find a free slot
175        let slot_idx = self
176            .slots
177            .iter()
178            .position(|s| {
179                s.state
180                    .compare_exchange(SLOT_FREE, SLOT_SOLVING, Ordering::SeqCst, Ordering::SeqCst)
181                    .is_ok()
182            })
183            .expect("No free job slots available");
184
185        let slot = &self.slots[slot_idx];
186        slot.terminate.store(false, Ordering::SeqCst);
187
188        // Spawn the solver via rayon
189        rayon::spawn(move || {
190            let terminate_ref = &slot.terminate;
191
192            // solve sends all solutions (including final) through the channel
193            solution.solve(Some(terminate_ref), sender);
194
195            slot.state.store(SLOT_DONE, Ordering::Release);
196        });
197
198        (slot_idx, receiver)
199    }
200
201    pub fn get_status(&self, job_id: usize) -> SolverStatus {
202        if job_id >= MAX_JOBS {
203            return SolverStatus::NotSolving;
204        }
205        match self.slots[job_id].state.load(Ordering::Acquire) {
206            SLOT_SOLVING => SolverStatus::Solving,
207            _ => SolverStatus::NotSolving,
208        }
209    }
210
211    /// Requests early termination of a job.
212    ///
213    /// Returns `true` if the job was found and is currently solving.
214    pub fn terminate_early(&self, job_id: usize) -> bool {
215        if job_id >= MAX_JOBS {
216            return false;
217        }
218
219        let slot = &self.slots[job_id];
220        if slot.state.load(Ordering::Acquire) == SLOT_SOLVING {
221            slot.terminate.store(true, Ordering::SeqCst);
222            true
223        } else {
224            false
225        }
226    }
227
228    /// Frees a job slot after solving completes.
229    ///
230    /// Call this after the receiver is drained to allow reuse of the slot.
231    pub fn free_slot(&self, job_id: usize) {
232        if job_id < MAX_JOBS {
233            self.slots[job_id].reset();
234        }
235    }
236
237    pub fn active_job_count(&self) -> usize {
238        self.slots
239            .iter()
240            .filter(|s| s.state.load(Ordering::Relaxed) == SLOT_SOLVING)
241            .count()
242    }
243}