Skip to main content

lean_rs_worker/
planning.rs

1//! Import-set planning for worker-pool execution.
2//!
3//! The planner groups module work into stable worker session batches. It does
4//! not choose workers, run commands, or define downstream cache semantics.
5
6use std::collections::BTreeMap;
7use std::fmt;
8use std::path::PathBuf;
9
10use lean_toolchain::{
11    LeanLakeProjectModules, LeanModuleDescriptor, LeanModuleDiscoveryDiagnostic, LeanModuleDiscoveryOptions,
12    LeanModuleSetFingerprint, ToolchainFingerprint, discover_lake_modules, lake_target_declared,
13};
14use serde_json::Value;
15
16use crate::capability::LeanWorkerCapabilityBuilder;
17use crate::pool::{LeanWorkerRestartPolicyClass, LeanWorkerSessionKey};
18use crate::session::LeanWorkerCapabilityMetadata;
19use crate::supervisor::LeanWorkerRestartPolicy;
20
21/// Capability and session requirements used to plan worker batches.
22#[derive(Clone, Debug, Eq, PartialEq)]
23pub struct LeanWorkerImportPlanConfig {
24    project_root: PathBuf,
25    package: String,
26    lib_name: String,
27    source_roots: Option<Vec<String>>,
28    base_imports: Vec<String>,
29    metadata_expectation: Option<LeanWorkerPlanMetadataExpectation>,
30    restart_policy: Option<LeanWorkerRestartPolicy>,
31}
32
33impl LeanWorkerImportPlanConfig {
34    /// Create planner configuration for a Lake capability target.
35    #[must_use]
36    pub fn new(project_root: impl Into<PathBuf>, package: impl Into<String>, lib_name: impl Into<String>) -> Self {
37        Self {
38            project_root: project_root.into(),
39            package: package.into(),
40            lib_name: lib_name.into(),
41            source_roots: None,
42            base_imports: Vec::new(),
43            metadata_expectation: None,
44            restart_policy: None,
45        }
46    }
47
48    /// Restrict discovery to these source roots.
49    #[must_use]
50    pub fn source_roots(mut self, roots: impl IntoIterator<Item = impl Into<String>>) -> Self {
51        self.source_roots = Some(roots.into_iter().map(Into::into).collect());
52        self
53    }
54
55    /// Add imports required by every planned session batch.
56    #[must_use]
57    pub fn base_imports(mut self, imports: impl IntoIterator<Item = impl Into<String>>) -> Self {
58        self.base_imports = imports.into_iter().map(Into::into).collect();
59        self
60    }
61
62    /// Validate metadata before a worker batch is used.
63    #[must_use]
64    pub fn validate_metadata(mut self, export: impl Into<String>, request: Value) -> Self {
65        self.metadata_expectation = Some(LeanWorkerPlanMetadataExpectation {
66            export: export.into(),
67            request,
68            expected: None,
69        });
70        self
71    }
72
73    /// Require exact generic capability metadata before a worker batch is used.
74    #[must_use]
75    pub fn expect_metadata(
76        mut self,
77        export: impl Into<String>,
78        request: Value,
79        expected: LeanWorkerCapabilityMetadata,
80    ) -> Self {
81        self.metadata_expectation = Some(LeanWorkerPlanMetadataExpectation {
82            export: export.into(),
83            request,
84            expected: Some(expected),
85        });
86        self
87    }
88
89    /// Use a worker restart policy for builders derived from planned batches.
90    #[must_use]
91    pub fn restart_policy(mut self, policy: LeanWorkerRestartPolicy) -> Self {
92        self.restart_policy = Some(policy);
93        self
94    }
95}
96
97/// Metadata expectation carried into planned session keys.
98#[derive(Clone, Debug, Eq, PartialEq)]
99pub struct LeanWorkerPlanMetadataExpectation {
100    pub export: String,
101    pub request: Value,
102    pub expected: Option<LeanWorkerCapabilityMetadata>,
103}
104
105/// One module-sized unit of planned worker work.
106#[derive(Clone, Debug, Eq, PartialEq)]
107pub struct LeanWorkerModuleWork {
108    pub module: String,
109    pub path: PathBuf,
110    pub source_root: String,
111    pub imports: Vec<String>,
112}
113
114impl LeanWorkerModuleWork {
115    /// Create one module work item with explicit imports.
116    #[must_use]
117    pub fn new(
118        module: impl Into<String>,
119        path: impl Into<PathBuf>,
120        source_root: impl Into<String>,
121        imports: impl IntoIterator<Item = impl Into<String>>,
122    ) -> Self {
123        Self {
124            module: module.into(),
125            path: path.into(),
126            source_root: source_root.into(),
127            imports: imports.into_iter().map(Into::into).collect(),
128        }
129    }
130
131    fn from_descriptor(descriptor: &LeanModuleDescriptor, imports: &[String]) -> Self {
132        Self {
133            module: descriptor.module.clone(),
134            path: descriptor.path.clone(),
135            source_root: descriptor.source_root.clone(),
136            imports: imports.to_vec(),
137        }
138    }
139}
140
141/// One stable pool-execution batch.
142#[derive(Clone, Debug, Eq, PartialEq)]
143pub struct LeanWorkerPlannedBatch {
144    pub session_key: LeanWorkerSessionKey,
145    pub project_root: PathBuf,
146    pub package: String,
147    pub lib_name: String,
148    pub source_root: String,
149    pub imports: Vec<String>,
150    pub modules: Vec<LeanWorkerModuleWork>,
151    pub fingerprint: LeanWorkerBatchFingerprint,
152    metadata_expectation: Option<LeanWorkerPlanMetadataExpectation>,
153    restart_policy: Option<LeanWorkerRestartPolicy>,
154}
155
156impl LeanWorkerPlannedBatch {
157    /// Create a capability builder for this batch.
158    ///
159    /// The caller may still add packaging-specific details such as an explicit
160    /// worker executable. The batch supplies the stable session material.
161    #[must_use]
162    pub fn capability_builder(&self) -> LeanWorkerCapabilityBuilder {
163        let mut builder = LeanWorkerCapabilityBuilder::new(
164            self.project_root.clone(),
165            self.package.clone(),
166            self.lib_name.clone(),
167            self.imports.clone(),
168        );
169        if let Some(policy) = &self.restart_policy {
170            builder = builder.restart_policy(policy.clone());
171        }
172        if let Some(expectation) = &self.metadata_expectation {
173            builder = match &expectation.expected {
174                Some(expected) => builder.expect_metadata(
175                    expectation.export.clone(),
176                    expectation.request.clone(),
177                    expected.clone(),
178                ),
179                None => builder.validate_metadata(expectation.export.clone(), expectation.request.clone()),
180            };
181        }
182        builder
183    }
184}
185
186/// Stable cache-key-relevant facts for a planned batch.
187#[derive(Clone, Debug, Eq, PartialEq)]
188pub struct LeanWorkerBatchFingerprint {
189    pub toolchain: ToolchainFingerprint,
190    pub source_set: LeanModuleSetFingerprint,
191    pub batch_key: String,
192}
193
194/// Import planning diagnostics.
195#[non_exhaustive]
196#[derive(Debug)]
197pub enum LeanWorkerImportPlanError {
198    ModuleDiscovery { diagnostic: LeanModuleDiscoveryDiagnostic },
199    InvalidModuleName { module: String, reason: String },
200    UnresolvedCapabilityTarget { project_root: PathBuf, target_name: String },
201}
202
203impl fmt::Display for LeanWorkerImportPlanError {
204    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
205        match self {
206            Self::ModuleDiscovery { diagnostic } => write!(f, "{diagnostic}"),
207            Self::InvalidModuleName { module, reason } => {
208                write!(f, "lean-rs-worker: invalid module `{module}` in import plan: {reason}")
209            }
210            Self::UnresolvedCapabilityTarget {
211                project_root,
212                target_name,
213            } => {
214                write!(
215                    f,
216                    "lean-rs-worker: capability target `{target_name}` is not declared in {}",
217                    project_root.display()
218                )
219            }
220        }
221    }
222}
223
224impl std::error::Error for LeanWorkerImportPlanError {
225    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
226        match self {
227            Self::ModuleDiscovery { diagnostic } => Some(diagnostic),
228            Self::InvalidModuleName { .. } | Self::UnresolvedCapabilityTarget { .. } => None,
229        }
230    }
231}
232
233/// Planner for worker-pool import/session batches.
234#[derive(Clone, Debug, Eq, PartialEq)]
235pub struct LeanWorkerImportPlanner {
236    config: LeanWorkerImportPlanConfig,
237}
238
239impl LeanWorkerImportPlanner {
240    /// Create a planner from capability/session requirements.
241    #[must_use]
242    pub fn new(config: LeanWorkerImportPlanConfig) -> Self {
243        Self { config }
244    }
245
246    /// Discover a Lake project and return stable worker batches.
247    ///
248    /// # Errors
249    ///
250    /// Returns typed planning diagnostics for missing Lake roots, selected
251    /// module roots, invalid module names, unsupported toolchains, or an
252    /// unresolved capability target.
253    pub fn plan_lake_project(&self) -> Result<Vec<LeanWorkerPlannedBatch>, LeanWorkerImportPlanError> {
254        let mut options = LeanModuleDiscoveryOptions::new(&self.config.project_root);
255        if let Some(roots) = &self.config.source_roots {
256            options = options.selected_roots(roots.clone());
257        }
258        let discovered = discover_lake_modules(options)
259            .map_err(|diagnostic| LeanWorkerImportPlanError::ModuleDiscovery { diagnostic })?;
260        let target_declared = lake_target_declared(&discovered.project_root, &self.config.lib_name)
261            .map_err(|diagnostic| LeanWorkerImportPlanError::ModuleDiscovery { diagnostic })?;
262        if !target_declared {
263            return Err(LeanWorkerImportPlanError::UnresolvedCapabilityTarget {
264                project_root: discovered.project_root,
265                target_name: self.config.lib_name.clone(),
266            });
267        }
268        self.plan_discovered(&discovered)
269    }
270
271    /// Plan batches from already discovered module descriptors.
272    ///
273    /// # Errors
274    ///
275    /// Returns a typed error if a supplied module descriptor has an invalid
276    /// module name.
277    pub fn plan_discovered(
278        &self,
279        discovered: &LeanLakeProjectModules,
280    ) -> Result<Vec<LeanWorkerPlannedBatch>, LeanWorkerImportPlanError> {
281        let work = discovered
282            .modules
283            .iter()
284            .map(|module| LeanWorkerModuleWork::from_descriptor(module, &self.config.base_imports));
285        self.plan_work_items(work, &discovered.fingerprint)
286    }
287
288    /// Plan batches from caller-provided module work items.
289    ///
290    /// # Errors
291    ///
292    /// Returns a typed error if a supplied work item has an invalid module
293    /// name.
294    pub fn plan_work_items(
295        &self,
296        modules: impl IntoIterator<Item = LeanWorkerModuleWork>,
297        source_set: &LeanModuleSetFingerprint,
298    ) -> Result<Vec<LeanWorkerPlannedBatch>, LeanWorkerImportPlanError> {
299        let mut groups = BTreeMap::<BatchGroupKey, Vec<LeanWorkerModuleWork>>::new();
300        for module in modules {
301            validate_module_name(&module.module)?;
302            validate_module_name(&module.source_root)?;
303            let key = BatchGroupKey {
304                project_root: self.config.project_root.clone(),
305                package: self.config.package.clone(),
306                lib_name: self.config.lib_name.clone(),
307                source_root: module.source_root.clone(),
308                imports: module.imports.clone(),
309                restart_policy_class: restart_policy_class(self.config.restart_policy.as_ref()),
310            };
311            groups.entry(key).or_default().push(module);
312        }
313
314        let mut batches = Vec::new();
315        for (key, mut modules) in groups {
316            modules.sort_by(|left, right| left.module.cmp(&right.module));
317            let mut session_key = LeanWorkerSessionKey::new(
318                key.project_root.clone(),
319                key.package.clone(),
320                key.lib_name.clone(),
321                key.imports.clone(),
322            )
323            .restart_policy_class(key.restart_policy_class);
324            if let Some(expectation) = &self.config.metadata_expectation {
325                session_key = session_key.metadata_expectation(
326                    expectation.export.clone(),
327                    expectation.request.clone(),
328                    expectation.expected.clone(),
329                );
330            }
331            let batch_key = batch_key_string(&key, &modules);
332            batches.push(LeanWorkerPlannedBatch {
333                session_key,
334                project_root: key.project_root,
335                package: key.package,
336                lib_name: key.lib_name,
337                source_root: key.source_root,
338                imports: key.imports,
339                modules,
340                fingerprint: LeanWorkerBatchFingerprint {
341                    toolchain: source_set.toolchain.clone(),
342                    source_set: source_set.clone(),
343                    batch_key,
344                },
345                metadata_expectation: self.config.metadata_expectation.clone(),
346                restart_policy: self.config.restart_policy.clone(),
347            });
348        }
349        Ok(batches)
350    }
351}
352
353#[derive(Clone, Debug, Eq, Ord, PartialEq, PartialOrd)]
354struct BatchGroupKey {
355    project_root: PathBuf,
356    package: String,
357    lib_name: String,
358    source_root: String,
359    imports: Vec<String>,
360    restart_policy_class: LeanWorkerRestartPolicyClass,
361}
362
363fn restart_policy_class(policy: Option<&LeanWorkerRestartPolicy>) -> LeanWorkerRestartPolicyClass {
364    match policy {
365        Some(policy) if policy == &LeanWorkerRestartPolicy::default() => LeanWorkerRestartPolicyClass::Default,
366        Some(_policy) => LeanWorkerRestartPolicyClass::Custom,
367        None => LeanWorkerRestartPolicyClass::Default,
368    }
369}
370
371fn batch_key_string(key: &BatchGroupKey, modules: &[LeanWorkerModuleWork]) -> String {
372    let module_list = modules
373        .iter()
374        .map(|module| module.module.as_str())
375        .collect::<Vec<_>>()
376        .join(",");
377    format!(
378        "project={};package={};lib={};source_root={};imports={};policy={:?};modules={module_list}",
379        key.project_root.display(),
380        key.package,
381        key.lib_name,
382        key.source_root,
383        key.imports.join(","),
384        key.restart_policy_class,
385    )
386}
387
388fn validate_module_name(module: &str) -> Result<(), LeanWorkerImportPlanError> {
389    if module.is_empty() {
390        return Err(LeanWorkerImportPlanError::InvalidModuleName {
391            module: module.to_owned(),
392            reason: "module name is empty".to_owned(),
393        });
394    }
395    for component in module.split('.') {
396        if component.is_empty() {
397            return Err(LeanWorkerImportPlanError::InvalidModuleName {
398                module: module.to_owned(),
399                reason: "module name contains an empty component".to_owned(),
400            });
401        }
402        let mut chars = component.chars();
403        let Some(first) = chars.next() else {
404            return Err(LeanWorkerImportPlanError::InvalidModuleName {
405                module: module.to_owned(),
406                reason: "module name contains an empty component".to_owned(),
407            });
408        };
409        if !(first == '_' || first.is_alphabetic()) {
410            return Err(LeanWorkerImportPlanError::InvalidModuleName {
411                module: module.to_owned(),
412                reason: "module components must begin with a letter or underscore".to_owned(),
413            });
414        }
415        if chars.any(|ch| !(ch == '_' || ch == '\'' || ch.is_alphanumeric())) {
416            return Err(LeanWorkerImportPlanError::InvalidModuleName {
417                module: module.to_owned(),
418                reason: "module components may contain only letters, digits, underscores, or apostrophes".to_owned(),
419            });
420        }
421    }
422    Ok(())
423}