Skip to main content

lean_rs_worker/
planning.rs

1//! Import-set planning for worker-pool execution.
2//!
3//! The planner groups module work into stable worker session batches. It does
4//! not choose workers, run commands, or define downstream cache semantics.
5
6use std::collections::BTreeMap;
7use std::fmt;
8use std::path::PathBuf;
9
10use lean_toolchain::{
11    LeanLakeProjectModules, LeanModuleDescriptor, LeanModuleDiscoveryDiagnostic, LeanModuleDiscoveryOptions,
12    LeanModuleSetFingerprint, ToolchainFingerprint, discover_lake_modules, lake_target_declared,
13};
14use serde_json::Value;
15
16use crate::capability::LeanWorkerCapabilityBuilder;
17use crate::pool::{LeanWorkerRestartPolicyClass, LeanWorkerSessionKey};
18use crate::supervisor::LeanWorkerRestartPolicy;
19use crate::types::LeanWorkerCapabilityMetadata;
20
21/// Capability and session requirements used to plan worker batches.
22#[derive(Clone, Debug, Eq, PartialEq)]
23pub struct LeanWorkerImportPlanConfig {
24    project_root: PathBuf,
25    package: String,
26    lib_name: String,
27    source_roots: Option<Vec<String>>,
28    base_imports: Vec<String>,
29    metadata_expectation: Option<LeanWorkerPlanMetadataExpectation>,
30    restart_policy: Option<LeanWorkerRestartPolicy>,
31}
32
33impl LeanWorkerImportPlanConfig {
34    /// Create planner configuration for a Lake capability target.
35    #[must_use]
36    pub fn new(project_root: impl Into<PathBuf>, package: impl Into<String>, lib_name: impl Into<String>) -> Self {
37        Self {
38            project_root: project_root.into(),
39            package: package.into(),
40            lib_name: lib_name.into(),
41            source_roots: None,
42            base_imports: Vec::new(),
43            metadata_expectation: None,
44            restart_policy: None,
45        }
46    }
47
48    /// Restrict discovery to these source roots.
49    #[must_use]
50    pub fn source_roots(mut self, roots: impl IntoIterator<Item = impl Into<String>>) -> Self {
51        self.source_roots = Some(roots.into_iter().map(Into::into).collect());
52        self
53    }
54
55    /// Add imports required by every planned session batch.
56    #[must_use]
57    pub fn base_imports(mut self, imports: impl IntoIterator<Item = impl Into<String>>) -> Self {
58        self.base_imports = imports.into_iter().map(Into::into).collect();
59        self
60    }
61
62    /// Validate metadata before a worker batch is used.
63    #[must_use]
64    pub fn validate_metadata(mut self, export: impl Into<String>, request: Value) -> Self {
65        self.metadata_expectation = Some(LeanWorkerPlanMetadataExpectation {
66            export: export.into(),
67            request,
68            expected: None,
69        });
70        self
71    }
72
73    /// Require exact generic capability metadata before a worker batch is used.
74    #[must_use]
75    pub fn expect_metadata(
76        mut self,
77        export: impl Into<String>,
78        request: Value,
79        expected: LeanWorkerCapabilityMetadata,
80    ) -> Self {
81        self.metadata_expectation = Some(LeanWorkerPlanMetadataExpectation {
82            export: export.into(),
83            request,
84            expected: Some(expected),
85        });
86        self
87    }
88
89    /// Use a worker restart policy for builders derived from planned batches.
90    #[must_use]
91    pub fn restart_policy(mut self, policy: LeanWorkerRestartPolicy) -> Self {
92        self.restart_policy = Some(policy);
93        self
94    }
95}
96
97/// Metadata expectation carried into planned session keys.
98#[derive(Clone, Debug, Eq, PartialEq)]
99pub struct LeanWorkerPlanMetadataExpectation {
100    pub export: String,
101    pub request: Value,
102    pub expected: Option<LeanWorkerCapabilityMetadata>,
103}
104
105/// One module-sized unit of planned worker work.
106#[derive(Clone, Debug, Eq, PartialEq)]
107pub struct LeanWorkerModuleWork {
108    pub module: String,
109    pub path: PathBuf,
110    pub source_root: String,
111    pub imports: Vec<String>,
112}
113
114impl LeanWorkerModuleWork {
115    /// Create one module work item with explicit imports.
116    #[must_use]
117    pub fn new(
118        module: impl Into<String>,
119        path: impl Into<PathBuf>,
120        source_root: impl Into<String>,
121        imports: impl IntoIterator<Item = impl Into<String>>,
122    ) -> Self {
123        Self {
124            module: module.into(),
125            path: path.into(),
126            source_root: source_root.into(),
127            imports: imports.into_iter().map(Into::into).collect(),
128        }
129    }
130
131    fn from_descriptor(descriptor: &LeanModuleDescriptor, imports: &[String]) -> Self {
132        Self {
133            module: descriptor.module.clone(),
134            path: descriptor.path.clone(),
135            source_root: descriptor.source_root.clone(),
136            imports: imports.to_vec(),
137        }
138    }
139}
140
141/// One stable pool-execution batch.
142#[derive(Clone, Debug, Eq, PartialEq)]
143pub struct LeanWorkerPlannedBatch {
144    pub session_key: LeanWorkerSessionKey,
145    pub project_root: PathBuf,
146    pub package: String,
147    pub lib_name: String,
148    pub source_root: String,
149    pub imports: Vec<String>,
150    pub modules: Vec<LeanWorkerModuleWork>,
151    pub fingerprint: LeanWorkerBatchFingerprint,
152    metadata_expectation: Option<LeanWorkerPlanMetadataExpectation>,
153    restart_policy: Option<LeanWorkerRestartPolicy>,
154}
155
156impl LeanWorkerPlannedBatch {
157    /// Create a capability builder for this batch.
158    ///
159    /// The caller may still add packaging-specific details such as an explicit
160    /// worker executable. The batch supplies the stable session material.
161    #[must_use]
162    pub fn capability_builder(&self) -> LeanWorkerCapabilityBuilder {
163        let mut builder = LeanWorkerCapabilityBuilder::new(
164            self.project_root.clone(),
165            self.package.clone(),
166            self.lib_name.clone(),
167            self.imports.clone(),
168        );
169        if let Some(policy) = &self.restart_policy {
170            builder = builder.restart_policy(policy.clone());
171        }
172        if let Some(expectation) = &self.metadata_expectation {
173            builder = match &expectation.expected {
174                Some(expected) => builder.expect_metadata(
175                    expectation.export.clone(),
176                    expectation.request.clone(),
177                    expected.clone(),
178                ),
179                None => builder.validate_metadata(expectation.export.clone(), expectation.request.clone()),
180            };
181        }
182        builder
183    }
184}
185
186/// Stable cache-key-relevant facts for a planned batch.
187#[derive(Clone, Debug, Eq, PartialEq)]
188pub struct LeanWorkerBatchFingerprint {
189    pub toolchain: ToolchainFingerprint,
190    pub source_set: LeanModuleSetFingerprint,
191    pub batch_key: String,
192}
193
194/// Import planning diagnostics.
195#[derive(Debug)]
196pub enum LeanWorkerImportPlanError {
197    ModuleDiscovery { diagnostic: LeanModuleDiscoveryDiagnostic },
198    InvalidModuleName { module: String, reason: String },
199    UnresolvedCapabilityTarget { project_root: PathBuf, target_name: String },
200}
201
202impl fmt::Display for LeanWorkerImportPlanError {
203    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
204        match self {
205            Self::ModuleDiscovery { diagnostic } => write!(f, "{diagnostic}"),
206            Self::InvalidModuleName { module, reason } => {
207                write!(f, "lean-rs-worker: invalid module `{module}` in import plan: {reason}")
208            }
209            Self::UnresolvedCapabilityTarget {
210                project_root,
211                target_name,
212            } => {
213                write!(
214                    f,
215                    "lean-rs-worker: capability target `{target_name}` is not declared in {}",
216                    project_root.display()
217                )
218            }
219        }
220    }
221}
222
223impl std::error::Error for LeanWorkerImportPlanError {
224    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
225        match self {
226            Self::ModuleDiscovery { diagnostic } => Some(diagnostic),
227            Self::InvalidModuleName { .. } | Self::UnresolvedCapabilityTarget { .. } => None,
228        }
229    }
230}
231
232/// Planner for worker-pool import/session batches.
233#[derive(Clone, Debug, Eq, PartialEq)]
234pub struct LeanWorkerImportPlanner {
235    config: LeanWorkerImportPlanConfig,
236}
237
238impl LeanWorkerImportPlanner {
239    /// Create a planner from capability/session requirements.
240    #[must_use]
241    pub fn new(config: LeanWorkerImportPlanConfig) -> Self {
242        Self { config }
243    }
244
245    /// Discover a Lake project and return stable worker batches.
246    ///
247    /// # Errors
248    ///
249    /// Returns typed planning diagnostics for missing Lake roots, selected
250    /// module roots, invalid module names, unsupported toolchains, or an
251    /// unresolved capability target.
252    pub fn plan_lake_project(&self) -> Result<Vec<LeanWorkerPlannedBatch>, LeanWorkerImportPlanError> {
253        let mut options = LeanModuleDiscoveryOptions::new(&self.config.project_root);
254        if let Some(roots) = &self.config.source_roots {
255            options = options.selected_roots(roots.clone());
256        }
257        let discovered = discover_lake_modules(options)
258            .map_err(|diagnostic| LeanWorkerImportPlanError::ModuleDiscovery { diagnostic })?;
259        let target_declared = lake_target_declared(&discovered.project_root, &self.config.lib_name)
260            .map_err(|diagnostic| LeanWorkerImportPlanError::ModuleDiscovery { diagnostic })?;
261        if !target_declared {
262            return Err(LeanWorkerImportPlanError::UnresolvedCapabilityTarget {
263                project_root: discovered.project_root,
264                target_name: self.config.lib_name.clone(),
265            });
266        }
267        self.plan_discovered(&discovered)
268    }
269
270    /// Plan batches from already discovered module descriptors.
271    ///
272    /// # Errors
273    ///
274    /// Returns a typed error if a supplied module descriptor has an invalid
275    /// module name.
276    pub fn plan_discovered(
277        &self,
278        discovered: &LeanLakeProjectModules,
279    ) -> Result<Vec<LeanWorkerPlannedBatch>, LeanWorkerImportPlanError> {
280        let work = discovered
281            .modules
282            .iter()
283            .map(|module| LeanWorkerModuleWork::from_descriptor(module, &self.config.base_imports));
284        self.plan_work_items(work, &discovered.fingerprint)
285    }
286
287    /// Plan batches from caller-provided module work items.
288    ///
289    /// # Errors
290    ///
291    /// Returns a typed error if a supplied work item has an invalid module
292    /// name.
293    pub fn plan_work_items(
294        &self,
295        modules: impl IntoIterator<Item = LeanWorkerModuleWork>,
296        source_set: &LeanModuleSetFingerprint,
297    ) -> Result<Vec<LeanWorkerPlannedBatch>, LeanWorkerImportPlanError> {
298        let mut groups = BTreeMap::<BatchGroupKey, Vec<LeanWorkerModuleWork>>::new();
299        for module in modules {
300            validate_module_name(&module.module)?;
301            validate_module_name(&module.source_root)?;
302            let key = BatchGroupKey {
303                project_root: self.config.project_root.clone(),
304                package: self.config.package.clone(),
305                lib_name: self.config.lib_name.clone(),
306                source_root: module.source_root.clone(),
307                imports: module.imports.clone(),
308                restart_policy_class: restart_policy_class(self.config.restart_policy.as_ref()),
309            };
310            groups.entry(key).or_default().push(module);
311        }
312
313        let mut batches = Vec::new();
314        for (key, mut modules) in groups {
315            modules.sort_by(|left, right| left.module.cmp(&right.module));
316            let mut session_key = LeanWorkerSessionKey::new(
317                key.project_root.clone(),
318                key.package.clone(),
319                key.lib_name.clone(),
320                key.imports.clone(),
321            )
322            .restart_policy_class(key.restart_policy_class);
323            if let Some(expectation) = &self.config.metadata_expectation {
324                session_key = session_key.metadata_expectation(
325                    expectation.export.clone(),
326                    expectation.request.clone(),
327                    expectation.expected.clone(),
328                );
329            }
330            let batch_key = batch_key_string(&key, &modules);
331            batches.push(LeanWorkerPlannedBatch {
332                session_key,
333                project_root: key.project_root,
334                package: key.package,
335                lib_name: key.lib_name,
336                source_root: key.source_root,
337                imports: key.imports,
338                modules,
339                fingerprint: LeanWorkerBatchFingerprint {
340                    toolchain: source_set.toolchain.clone(),
341                    source_set: source_set.clone(),
342                    batch_key,
343                },
344                metadata_expectation: self.config.metadata_expectation.clone(),
345                restart_policy: self.config.restart_policy.clone(),
346            });
347        }
348        Ok(batches)
349    }
350}
351
352#[derive(Clone, Debug, Eq, Ord, PartialEq, PartialOrd)]
353struct BatchGroupKey {
354    project_root: PathBuf,
355    package: String,
356    lib_name: String,
357    source_root: String,
358    imports: Vec<String>,
359    restart_policy_class: LeanWorkerRestartPolicyClass,
360}
361
362fn restart_policy_class(policy: Option<&LeanWorkerRestartPolicy>) -> LeanWorkerRestartPolicyClass {
363    match policy {
364        Some(policy) if policy == &LeanWorkerRestartPolicy::default() => LeanWorkerRestartPolicyClass::Default,
365        Some(_policy) => LeanWorkerRestartPolicyClass::Custom,
366        None => LeanWorkerRestartPolicyClass::Default,
367    }
368}
369
370fn batch_key_string(key: &BatchGroupKey, modules: &[LeanWorkerModuleWork]) -> String {
371    let module_list = modules
372        .iter()
373        .map(|module| module.module.as_str())
374        .collect::<Vec<_>>()
375        .join(",");
376    format!(
377        "project={};package={};lib={};source_root={};imports={};policy={:?};modules={module_list}",
378        key.project_root.display(),
379        key.package,
380        key.lib_name,
381        key.source_root,
382        key.imports.join(","),
383        key.restart_policy_class,
384    )
385}
386
387fn validate_module_name(module: &str) -> Result<(), LeanWorkerImportPlanError> {
388    if module.is_empty() {
389        return Err(LeanWorkerImportPlanError::InvalidModuleName {
390            module: module.to_owned(),
391            reason: "module name is empty".to_owned(),
392        });
393    }
394    for component in module.split('.') {
395        if component.is_empty() {
396            return Err(LeanWorkerImportPlanError::InvalidModuleName {
397                module: module.to_owned(),
398                reason: "module name contains an empty component".to_owned(),
399            });
400        }
401        let mut chars = component.chars();
402        let Some(first) = chars.next() else {
403            return Err(LeanWorkerImportPlanError::InvalidModuleName {
404                module: module.to_owned(),
405                reason: "module name contains an empty component".to_owned(),
406            });
407        };
408        if !(first == '_' || first.is_alphabetic()) {
409            return Err(LeanWorkerImportPlanError::InvalidModuleName {
410                module: module.to_owned(),
411                reason: "module components must begin with a letter or underscore".to_owned(),
412            });
413        }
414        if chars.any(|ch| !(ch == '_' || ch == '\'' || ch.is_alphanumeric())) {
415            return Err(LeanWorkerImportPlanError::InvalidModuleName {
416                module: module.to_owned(),
417                reason: "module components may contain only letters, digits, underscores, or apostrophes".to_owned(),
418            });
419        }
420    }
421    Ok(())
422}