Skip to main content

oxios_kernel/
workers.rs

1//! Background worker management for the Learning layer.
2//!
3//! Manages 12 background workers that perform periodic optimization,
4//! analysis, and learning tasks. Each worker has a type, priority,
5//! interval, and a dispatch function.
6
7use std::collections::HashMap;
8use std::sync::Arc;
9use std::time::Instant;
10
11use parking_lot::RwLock;
12use serde::{Deserialize, Serialize};
13
14// ---------------------------------------------------------------------------
15// Worker trait
16// ---------------------------------------------------------------------------
17
18/// Trait for implementing a worker's execution logic.
19///
20/// Each worker type (Audit, Optimize, etc.) should have a corresponding
21/// implementation of this trait. Implementations are registered with
22/// `WorkerManager::register_implementation()` and dispatched via
23/// `WorkerManager::dispatch()`.
24pub trait Worker: Send + Sync {
25    /// Execute the worker's logic and return a summary string.
26    fn execute(&self) -> Result<String, String>;
27}
28
29// ---------------------------------------------------------------------------
30// Worker types
31// ---------------------------------------------------------------------------
32
33/// All available worker types.
34#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
35#[serde(rename_all = "snake_case")]
36pub enum WorkerType {
37    /// Deep knowledge acquisition.
38    Ultralearn,
39    /// Security analysis.
40    Audit,
41    /// Performance optimization.
42    Optimize,
43    /// Memory consolidation.
44    Consolidate,
45    /// Predictive preloading.
46    Predict,
47    /// Codebase mapping.
48    Map,
49    /// Deep code analysis.
50    Deepdive,
51    /// Auto-documentation.
52    Document,
53    /// Refactoring suggestions.
54    Refactor,
55    /// Performance benchmarking.
56    Benchmark,
57    /// Test coverage analysis.
58    Testgaps,
59    /// Neural pattern training.
60    Learning,
61}
62
63impl WorkerType {
64    /// All worker type variants.
65    pub fn all() -> &'static [WorkerType] {
66        &[
67            WorkerType::Ultralearn,
68            WorkerType::Audit,
69            WorkerType::Optimize,
70            WorkerType::Consolidate,
71            WorkerType::Predict,
72            WorkerType::Map,
73            WorkerType::Deepdive,
74            WorkerType::Document,
75            WorkerType::Refactor,
76            WorkerType::Benchmark,
77            WorkerType::Testgaps,
78            WorkerType::Learning,
79        ]
80    }
81
82    /// Human-readable name.
83    pub fn name(&self) -> &'static str {
84        match self {
85            WorkerType::Ultralearn => "ultralearn",
86            WorkerType::Audit => "audit",
87            WorkerType::Optimize => "optimize",
88            WorkerType::Consolidate => "consolidate",
89            WorkerType::Predict => "predict",
90            WorkerType::Map => "map",
91            WorkerType::Deepdive => "deepdive",
92            WorkerType::Document => "document",
93            WorkerType::Refactor => "refactor",
94            WorkerType::Benchmark => "benchmark",
95            WorkerType::Testgaps => "testgaps",
96            WorkerType::Learning => "learning",
97        }
98    }
99
100    /// Default interval in milliseconds.
101    pub fn default_interval_ms(&self) -> u64 {
102        match self {
103            WorkerType::Audit => 600_000,         // 10 min
104            WorkerType::Optimize => 300_000,      // 5 min
105            WorkerType::Consolidate => 1_800_000, // 30 min
106            WorkerType::Ultralearn => 60_000,     // 1 min
107            WorkerType::Predict => 300_000,       // 5 min
108            WorkerType::Map => 600_000,           // 10 min
109            WorkerType::Deepdive => 600_000,      // 10 min
110            WorkerType::Document => 1_800_000,    // 30 min
111            WorkerType::Refactor => 600_000,      // 10 min
112            WorkerType::Benchmark => 600_000,     // 10 min
113            WorkerType::Testgaps => 600_000,      // 10 min
114            WorkerType::Learning => 900_000,      // 15 min
115        }
116    }
117}
118
119// ---------------------------------------------------------------------------
120// Worker priority
121// ---------------------------------------------------------------------------
122
123/// Worker priority level.
124#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
125#[serde(rename_all = "snake_case")]
126pub enum WorkerPriority {
127    /// Critical — must run on time.
128    Critical = 4,
129    /// High priority.
130    High = 3,
131    /// Normal priority.
132    Normal = 2,
133    /// Low priority — can be delayed.
134    Low = 1,
135}
136
137impl WorkerType {
138    /// Default priority for this worker type.
139    pub fn default_priority(&self) -> WorkerPriority {
140        match self {
141            WorkerType::Audit => WorkerPriority::Critical,
142            WorkerType::Optimize => WorkerPriority::High,
143            WorkerType::Ultralearn => WorkerPriority::Normal,
144            WorkerType::Consolidate => WorkerPriority::Low,
145            WorkerType::Predict => WorkerPriority::Normal,
146            WorkerType::Map => WorkerPriority::Normal,
147            WorkerType::Deepdive => WorkerPriority::Normal,
148            WorkerType::Document => WorkerPriority::Normal,
149            WorkerType::Refactor => WorkerPriority::Normal,
150            WorkerType::Benchmark => WorkerPriority::Normal,
151            WorkerType::Testgaps => WorkerPriority::Normal,
152            WorkerType::Learning => WorkerPriority::Normal,
153        }
154    }
155}
156
157// ---------------------------------------------------------------------------
158// Worker config and result
159// ---------------------------------------------------------------------------
160
161/// Configuration for a single worker.
162#[derive(Debug, Clone, Serialize, Deserialize)]
163pub struct WorkerConfig {
164    /// Worker type.
165    pub worker_type: WorkerType,
166    /// Priority level.
167    pub priority: WorkerPriority,
168    /// Interval between runs in milliseconds.
169    pub interval_ms: u64,
170    /// Whether this worker is enabled.
171    pub enabled: bool,
172}
173
174impl WorkerConfig {
175    /// Create a config with default settings for a worker type.
176    pub fn default_for(worker_type: WorkerType) -> Self {
177        Self {
178            worker_type,
179            priority: worker_type.default_priority(),
180            interval_ms: worker_type.default_interval_ms(),
181            enabled: true,
182        }
183    }
184}
185
186/// Result of a worker execution.
187#[derive(Debug, Clone, Serialize, Deserialize)]
188pub struct WorkerResult {
189    /// Worker type that produced this result.
190    pub worker: WorkerType,
191    /// Whether the execution succeeded.
192    pub success: bool,
193    /// Execution duration in milliseconds.
194    pub duration_ms: u64,
195    /// Output message or summary.
196    pub output: String,
197}
198
199/// Status summary of the worker manager.
200#[derive(Debug, Clone, Serialize, Deserialize)]
201pub struct WorkerManagerStatus {
202    /// Number of registered workers.
203    pub registered: usize,
204    /// Number of enabled workers.
205    pub enabled: usize,
206    /// Currently running worker types.
207    pub running: Vec<String>,
208    /// Last results from each worker.
209    pub last_results: HashMap<String, WorkerResult>,
210}
211
212// ---------------------------------------------------------------------------
213// WorkerManager
214// ---------------------------------------------------------------------------
215
216/// Manages background workers for the Learning layer.
217///
218/// Workers are registered with configs and can be dispatched individually
219/// or all at once. The manager tracks running state and results.
220pub struct WorkerManager {
221    /// Registered worker configs.
222    configs: RwLock<HashMap<WorkerType, WorkerConfig>>,
223    /// Registered worker implementations.
224    implementations: RwLock<HashMap<WorkerType, Box<dyn Worker>>>,
225    /// Currently running workers.
226    running: Arc<RwLock<std::collections::HashSet<WorkerType>>>,
227    /// Last execution results.
228    last_results: RwLock<HashMap<WorkerType, WorkerResult>>,
229}
230
231impl std::fmt::Debug for WorkerManager {
232    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
233        f.debug_struct("WorkerManager")
234            .field("registered", &self.configs.read().len())
235            .field("running", &self.running.read().len())
236            .finish()
237    }
238}
239
240impl WorkerManager {
241    /// Create a new empty worker manager.
242    pub fn new() -> Self {
243        Self {
244            configs: RwLock::new(HashMap::new()),
245            implementations: RwLock::new(HashMap::new()),
246            running: Arc::new(RwLock::new(std::collections::HashSet::new())),
247            last_results: RwLock::new(HashMap::new()),
248        }
249    }
250
251    /// Create a worker manager with all 12 default workers registered.
252    pub fn with_defaults() -> Self {
253        let mgr = Self::new();
254        for wt in WorkerType::all() {
255            mgr.register(*wt, WorkerConfig::default_for(*wt));
256        }
257        mgr
258    }
259
260    /// Register a worker with its configuration.
261    ///
262    /// Replaces any existing config for the same worker type.
263    pub fn register(&self, worker_type: WorkerType, config: WorkerConfig) {
264        self.configs.write().insert(worker_type, config);
265        tracing::debug!(worker = %worker_type.name(), "Worker registered");
266    }
267
268    /// Register a concrete implementation for a worker type.
269    ///
270    /// The implementation's `execute()` method will be called when the
271    /// worker is dispatched. If no implementation is registered for a
272    /// worker type, dispatching that worker will return an error.
273    pub fn register_implementation(
274        &self,
275        worker_type: WorkerType,
276        implementation: Box<dyn Worker>,
277    ) {
278        self.implementations
279            .write()
280            .insert(worker_type, implementation);
281        tracing::debug!(worker = %worker_type.name(), "Worker implementation registered");
282    }
283
284    /// Check if a worker has a registered implementation.
285    pub fn has_implementation(&self, worker_type: WorkerType) -> bool {
286        self.implementations.read().contains_key(&worker_type)
287    }
288
289    /// Unregister a worker implementation.
290    pub fn unregister_implementation(&self, worker_type: WorkerType) -> bool {
291        self.implementations.write().remove(&worker_type).is_some()
292    }
293
294    /// Unregister a worker.
295    pub fn unregister(&self, worker_type: WorkerType) -> bool {
296        self.configs.write().remove(&worker_type).is_some()
297    }
298
299    /// Check if a worker is registered.
300    pub fn is_registered(&self, worker_type: WorkerType) -> bool {
301        self.configs.read().contains_key(&worker_type)
302    }
303
304    /// Dispatch a single worker for execution.
305    ///
306    /// Returns the result of the worker's execution.
307    /// If the worker is already running, returns an error.
308    /// If the worker is not registered or disabled, returns an error.
309    pub fn dispatch(&self, worker_type: WorkerType) -> Result<WorkerResult, String> {
310        // Check if registered and enabled
311        {
312            let configs = self.configs.read();
313            let config = configs
314                .get(&worker_type)
315                .ok_or_else(|| format!("Worker '{}' not registered", worker_type.name()))?;
316            if !config.enabled {
317                return Err(format!("Worker '{}' is disabled", worker_type.name()));
318            }
319        }
320
321        // Check if already running
322        {
323            let mut running = self.running.write();
324            if running.contains(&worker_type) {
325                return Err(format!(
326                    "Worker '{}' is already running",
327                    worker_type.name()
328                ));
329            }
330            running.insert(worker_type);
331        }
332
333        let start = Instant::now();
334        let result = self.execute_worker(worker_type);
335        let duration_ms = start.elapsed().as_millis() as u64;
336
337        let worker_result = WorkerResult {
338            worker: worker_type,
339            success: result.is_ok(),
340            duration_ms,
341            output: result.unwrap_or_else(|e| e),
342        };
343
344        // Update state
345        {
346            let mut running = self.running.write();
347            running.remove(&worker_type);
348        }
349        {
350            let mut last = self.last_results.write();
351            last.insert(worker_type, worker_result.clone());
352        }
353
354        tracing::info!(
355            worker = %worker_type.name(),
356            success = worker_result.success,
357            duration_ms,
358            "Worker completed"
359        );
360
361        Ok(worker_result)
362    }
363
364    /// Dispatch all enabled workers.
365    ///
366    /// Returns results for each dispatched worker.
367    pub fn dispatch_all(&self) -> Vec<WorkerResult> {
368        let worker_types: Vec<WorkerType> = {
369            let configs = self.configs.read();
370            configs
371                .iter()
372                .filter(|(_, c)| c.enabled)
373                .map(|(wt, _)| *wt)
374                .collect()
375        };
376
377        let mut results = Vec::new();
378        for wt in worker_types {
379            match self.dispatch(wt) {
380                Ok(result) => results.push(result),
381                Err(e) => {
382                    tracing::warn!(worker = %wt.name(), error = %e, "Failed to dispatch worker");
383                    results.push(WorkerResult {
384                        worker: wt,
385                        success: false,
386                        duration_ms: 0,
387                        output: e,
388                    });
389                }
390            }
391        }
392        results
393    }
394
395    /// Get the current status of the worker manager.
396    pub fn status(&self) -> WorkerManagerStatus {
397        let configs = self.configs.read();
398        let running = self.running.read();
399        let last = self.last_results.read();
400
401        let enabled = configs.values().filter(|c| c.enabled).count();
402        let running_names: Vec<String> = running.iter().map(|wt| wt.name().to_string()).collect();
403        let last_results_map: HashMap<String, WorkerResult> = last
404            .iter()
405            .map(|(wt, r)| (wt.name().to_string(), r.clone()))
406            .collect();
407
408        WorkerManagerStatus {
409            registered: configs.len(),
410            enabled,
411            running: running_names,
412            last_results: last_results_map,
413        }
414    }
415
416    /// Enable a worker.
417    pub fn enable(&self, worker_type: WorkerType) -> bool {
418        let mut configs = self.configs.write();
419        if let Some(config) = configs.get_mut(&worker_type) {
420            config.enabled = true;
421            true
422        } else {
423            false
424        }
425    }
426
427    /// Disable a worker.
428    pub fn disable(&self, worker_type: WorkerType) -> bool {
429        let mut configs = self.configs.write();
430        if let Some(config) = configs.get_mut(&worker_type) {
431            config.enabled = false;
432            true
433        } else {
434            false
435        }
436    }
437
438    // -----------------------------------------------------------------------
439    // Worker execution — delegates to registered trait implementation
440    // -----------------------------------------------------------------------
441
442    /// Execute the actual worker logic by dispatching to the registered
443    /// `Worker` trait implementation.
444    ///
445    /// Returns an error if no implementation has been registered for the
446    /// given worker type.
447    fn execute_worker(&self, worker_type: WorkerType) -> Result<String, String> {
448        let impls = self.implementations.read();
449        let worker = impls.get(&worker_type).ok_or_else(|| {
450            format!(
451                "No implementation registered for worker '{}'",
452                worker_type.name()
453            )
454        })?;
455        worker.execute()
456    }
457}
458
459impl Default for WorkerManager {
460    fn default() -> Self {
461        Self::new()
462    }
463}
464
465// ---------------------------------------------------------------------------
466// Tests
467// ---------------------------------------------------------------------------
468
469#[cfg(test)]
470mod tests {
471    use super::*;
472
473    /// A trivial worker implementation for testing.
474    struct EchoWorker;
475    impl Worker for EchoWorker {
476        fn execute(&self) -> Result<String, String> {
477            Ok("echo: ok".to_string())
478        }
479    }
480
481    /// Helper: create a manager with all 12 workers registered + implemented.
482    fn manager_with_all_implementations() -> WorkerManager {
483        let mgr = WorkerManager::with_defaults();
484        for wt in WorkerType::all() {
485            mgr.register_implementation(*wt, Box::new(EchoWorker));
486        }
487        mgr
488    }
489
490    #[test]
491    fn test_worker_type_all() {
492        assert_eq!(WorkerType::all().len(), 12);
493    }
494
495    #[test]
496    fn test_worker_type_name() {
497        assert_eq!(WorkerType::Audit.name(), "audit");
498        assert_eq!(WorkerType::Learning.name(), "learning");
499    }
500
501    #[test]
502    fn test_worker_type_default_interval() {
503        assert_eq!(WorkerType::Audit.default_interval_ms(), 600_000);
504        assert_eq!(WorkerType::Ultralearn.default_interval_ms(), 60_000);
505    }
506
507    #[test]
508    fn test_worker_priority_ordering() {
509        assert!(WorkerPriority::Critical > WorkerPriority::High);
510        assert!(WorkerPriority::High > WorkerPriority::Normal);
511        assert!(WorkerPriority::Normal > WorkerPriority::Low);
512    }
513
514    #[test]
515    fn test_worker_config_default() {
516        let config = WorkerConfig::default_for(WorkerType::Audit);
517        assert_eq!(config.worker_type, WorkerType::Audit);
518        assert_eq!(config.priority, WorkerPriority::Critical);
519        assert!(config.enabled);
520    }
521
522    #[test]
523    fn test_new_manager_is_empty() {
524        let mgr = WorkerManager::new();
525        let status = mgr.status();
526        assert_eq!(status.registered, 0);
527        assert_eq!(status.enabled, 0);
528    }
529
530    #[test]
531    fn test_with_defaults() {
532        let mgr = WorkerManager::with_defaults();
533        let status = mgr.status();
534        assert_eq!(status.registered, 12);
535        assert_eq!(status.enabled, 12);
536    }
537
538    #[test]
539    fn test_register_and_dispatch() {
540        let mgr = WorkerManager::new();
541        mgr.register(
542            WorkerType::Audit,
543            WorkerConfig::default_for(WorkerType::Audit),
544        );
545        mgr.register_implementation(WorkerType::Audit, Box::new(EchoWorker));
546
547        let result = mgr.dispatch(WorkerType::Audit).unwrap();
548        assert!(result.success);
549        assert_eq!(result.worker, WorkerType::Audit);
550    }
551
552    #[test]
553    fn test_dispatch_no_implementation() {
554        let mgr = WorkerManager::new();
555        mgr.register(
556            WorkerType::Audit,
557            WorkerConfig::default_for(WorkerType::Audit),
558        );
559
560        let result = mgr.dispatch(WorkerType::Audit).unwrap();
561        assert!(!result.success);
562        assert!(result.output.contains("No implementation registered"));
563    }
564
565    #[test]
566    fn test_dispatch_unregistered() {
567        let mgr = WorkerManager::new();
568        let result = mgr.dispatch(WorkerType::Audit);
569        assert!(result.is_err());
570        assert!(result.unwrap_err().contains("not registered"));
571    }
572
573    #[test]
574    fn test_dispatch_disabled() {
575        let mgr = WorkerManager::new();
576        let mut config = WorkerConfig::default_for(WorkerType::Audit);
577        config.enabled = false;
578        mgr.register(WorkerType::Audit, config);
579
580        let result = mgr.dispatch(WorkerType::Audit);
581        assert!(result.is_err());
582        assert!(result.unwrap_err().contains("disabled"));
583    }
584
585    #[test]
586    fn test_dispatch_all() {
587        let mgr = manager_with_all_implementations();
588        let results = mgr.dispatch_all();
589        assert_eq!(results.len(), 12);
590        assert!(results.iter().all(|r| r.success));
591    }
592
593    #[test]
594    fn test_dispatch_all_missing_impl() {
595        // Register configs but no implementations — all should fail.
596        let mgr = WorkerManager::with_defaults();
597        let results = mgr.dispatch_all();
598        assert_eq!(results.len(), 12);
599        assert!(results.iter().all(|r| !r.success));
600    }
601
602    #[test]
603    fn test_enable_disable() {
604        let mgr = WorkerManager::with_defaults();
605
606        mgr.disable(WorkerType::Audit);
607        let status = mgr.status();
608        assert_eq!(status.enabled, 11);
609
610        mgr.enable(WorkerType::Audit);
611        let status = mgr.status();
612        assert_eq!(status.enabled, 12);
613    }
614
615    #[test]
616    fn test_unregister() {
617        let mgr = WorkerManager::with_defaults();
618        assert!(mgr.unregister(WorkerType::Audit));
619        assert_eq!(mgr.status().registered, 11);
620        assert!(!mgr.unregister(WorkerType::Audit)); // already removed
621    }
622
623    #[test]
624    fn test_status_last_results() {
625        let mgr = WorkerManager::new();
626        mgr.register(
627            WorkerType::Learning,
628            WorkerConfig::default_for(WorkerType::Learning),
629        );
630        mgr.register_implementation(WorkerType::Learning, Box::new(EchoWorker));
631        mgr.dispatch(WorkerType::Learning).unwrap();
632
633        let status = mgr.status();
634        assert!(status.last_results.contains_key("learning"));
635    }
636
637    #[test]
638    fn test_is_registered() {
639        let mgr = WorkerManager::new();
640        assert!(!mgr.is_registered(WorkerType::Audit));
641        mgr.register(
642            WorkerType::Audit,
643            WorkerConfig::default_for(WorkerType::Audit),
644        );
645        assert!(mgr.is_registered(WorkerType::Audit));
646    }
647
648    #[test]
649    fn test_has_implementation() {
650        let mgr = WorkerManager::new();
651        assert!(!mgr.has_implementation(WorkerType::Audit));
652        mgr.register_implementation(WorkerType::Audit, Box::new(EchoWorker));
653        assert!(mgr.has_implementation(WorkerType::Audit));
654    }
655
656    #[test]
657    fn test_unregister_implementation() {
658        let mgr = WorkerManager::new();
659        mgr.register_implementation(WorkerType::Audit, Box::new(EchoWorker));
660        assert!(mgr.has_implementation(WorkerType::Audit));
661        assert!(mgr.unregister_implementation(WorkerType::Audit));
662        assert!(!mgr.has_implementation(WorkerType::Audit));
663        assert!(!mgr.unregister_implementation(WorkerType::Audit));
664    }
665
666    #[test]
667    fn test_serialization_roundtrip() {
668        let config = WorkerConfig::default_for(WorkerType::Audit);
669        let json = serde_json::to_string(&config).unwrap();
670        let parsed: WorkerConfig = serde_json::from_str(&json).unwrap();
671        assert_eq!(parsed.worker_type, WorkerType::Audit);
672        assert_eq!(parsed.priority, WorkerPriority::Critical);
673    }
674}