1use std::collections::HashMap;
8use std::sync::Arc;
9use std::time::Instant;
10
11use parking_lot::RwLock;
12use serde::{Deserialize, Serialize};
13
14pub trait Worker: Send + Sync {
25 fn execute(&self) -> Result<String, String>;
27}
28
29#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
35#[serde(rename_all = "snake_case")]
36pub enum WorkerType {
37 Ultralearn,
39 Audit,
41 Optimize,
43 Consolidate,
45 Predict,
47 Map,
49 Deepdive,
51 Document,
53 Refactor,
55 Benchmark,
57 Testgaps,
59 Learning,
61}
62
63impl WorkerType {
64 pub fn all() -> &'static [WorkerType] {
66 &[
67 WorkerType::Ultralearn,
68 WorkerType::Audit,
69 WorkerType::Optimize,
70 WorkerType::Consolidate,
71 WorkerType::Predict,
72 WorkerType::Map,
73 WorkerType::Deepdive,
74 WorkerType::Document,
75 WorkerType::Refactor,
76 WorkerType::Benchmark,
77 WorkerType::Testgaps,
78 WorkerType::Learning,
79 ]
80 }
81
82 pub fn name(&self) -> &'static str {
84 match self {
85 WorkerType::Ultralearn => "ultralearn",
86 WorkerType::Audit => "audit",
87 WorkerType::Optimize => "optimize",
88 WorkerType::Consolidate => "consolidate",
89 WorkerType::Predict => "predict",
90 WorkerType::Map => "map",
91 WorkerType::Deepdive => "deepdive",
92 WorkerType::Document => "document",
93 WorkerType::Refactor => "refactor",
94 WorkerType::Benchmark => "benchmark",
95 WorkerType::Testgaps => "testgaps",
96 WorkerType::Learning => "learning",
97 }
98 }
99
100 pub fn default_interval_ms(&self) -> u64 {
102 match self {
103 WorkerType::Audit => 600_000, WorkerType::Optimize => 300_000, WorkerType::Consolidate => 1_800_000, WorkerType::Ultralearn => 60_000, WorkerType::Predict => 300_000, WorkerType::Map => 600_000, WorkerType::Deepdive => 600_000, WorkerType::Document => 1_800_000, WorkerType::Refactor => 600_000, WorkerType::Benchmark => 600_000, WorkerType::Testgaps => 600_000, WorkerType::Learning => 900_000, }
116 }
117}
118
119#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
125#[serde(rename_all = "snake_case")]
126pub enum WorkerPriority {
127 Critical = 4,
129 High = 3,
131 Normal = 2,
133 Low = 1,
135}
136
137impl WorkerType {
138 pub fn default_priority(&self) -> WorkerPriority {
140 match self {
141 WorkerType::Audit => WorkerPriority::Critical,
142 WorkerType::Optimize => WorkerPriority::High,
143 WorkerType::Ultralearn => WorkerPriority::Normal,
144 WorkerType::Consolidate => WorkerPriority::Low,
145 WorkerType::Predict => WorkerPriority::Normal,
146 WorkerType::Map => WorkerPriority::Normal,
147 WorkerType::Deepdive => WorkerPriority::Normal,
148 WorkerType::Document => WorkerPriority::Normal,
149 WorkerType::Refactor => WorkerPriority::Normal,
150 WorkerType::Benchmark => WorkerPriority::Normal,
151 WorkerType::Testgaps => WorkerPriority::Normal,
152 WorkerType::Learning => WorkerPriority::Normal,
153 }
154 }
155}
156
157#[derive(Debug, Clone, Serialize, Deserialize)]
163pub struct WorkerConfig {
164 pub worker_type: WorkerType,
166 pub priority: WorkerPriority,
168 pub interval_ms: u64,
170 pub enabled: bool,
172}
173
174impl WorkerConfig {
175 pub fn default_for(worker_type: WorkerType) -> Self {
177 Self {
178 worker_type,
179 priority: worker_type.default_priority(),
180 interval_ms: worker_type.default_interval_ms(),
181 enabled: true,
182 }
183 }
184}
185
186#[derive(Debug, Clone, Serialize, Deserialize)]
188pub struct WorkerResult {
189 pub worker: WorkerType,
191 pub success: bool,
193 pub duration_ms: u64,
195 pub output: String,
197}
198
199#[derive(Debug, Clone, Serialize, Deserialize)]
201pub struct WorkerManagerStatus {
202 pub registered: usize,
204 pub enabled: usize,
206 pub running: Vec<String>,
208 pub last_results: HashMap<String, WorkerResult>,
210}
211
212pub struct WorkerManager {
221 configs: RwLock<HashMap<WorkerType, WorkerConfig>>,
223 implementations: RwLock<HashMap<WorkerType, Box<dyn Worker>>>,
225 running: Arc<RwLock<std::collections::HashSet<WorkerType>>>,
227 last_results: RwLock<HashMap<WorkerType, WorkerResult>>,
229}
230
231impl std::fmt::Debug for WorkerManager {
232 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
233 f.debug_struct("WorkerManager")
234 .field("registered", &self.configs.read().len())
235 .field("running", &self.running.read().len())
236 .finish()
237 }
238}
239
240impl WorkerManager {
241 pub fn new() -> Self {
243 Self {
244 configs: RwLock::new(HashMap::new()),
245 implementations: RwLock::new(HashMap::new()),
246 running: Arc::new(RwLock::new(std::collections::HashSet::new())),
247 last_results: RwLock::new(HashMap::new()),
248 }
249 }
250
251 pub fn with_defaults() -> Self {
253 let mgr = Self::new();
254 for wt in WorkerType::all() {
255 mgr.register(*wt, WorkerConfig::default_for(*wt));
256 }
257 mgr
258 }
259
260 pub fn register(&self, worker_type: WorkerType, config: WorkerConfig) {
264 self.configs.write().insert(worker_type, config);
265 tracing::debug!(worker = %worker_type.name(), "Worker registered");
266 }
267
268 pub fn register_implementation(
274 &self,
275 worker_type: WorkerType,
276 implementation: Box<dyn Worker>,
277 ) {
278 self.implementations
279 .write()
280 .insert(worker_type, implementation);
281 tracing::debug!(worker = %worker_type.name(), "Worker implementation registered");
282 }
283
284 pub fn has_implementation(&self, worker_type: WorkerType) -> bool {
286 self.implementations.read().contains_key(&worker_type)
287 }
288
289 pub fn unregister_implementation(&self, worker_type: WorkerType) -> bool {
291 self.implementations.write().remove(&worker_type).is_some()
292 }
293
294 pub fn unregister(&self, worker_type: WorkerType) -> bool {
296 self.configs.write().remove(&worker_type).is_some()
297 }
298
299 pub fn is_registered(&self, worker_type: WorkerType) -> bool {
301 self.configs.read().contains_key(&worker_type)
302 }
303
304 pub fn dispatch(&self, worker_type: WorkerType) -> Result<WorkerResult, String> {
310 {
312 let configs = self.configs.read();
313 let config = configs
314 .get(&worker_type)
315 .ok_or_else(|| format!("Worker '{}' not registered", worker_type.name()))?;
316 if !config.enabled {
317 return Err(format!("Worker '{}' is disabled", worker_type.name()));
318 }
319 }
320
321 {
323 let mut running = self.running.write();
324 if running.contains(&worker_type) {
325 return Err(format!(
326 "Worker '{}' is already running",
327 worker_type.name()
328 ));
329 }
330 running.insert(worker_type);
331 }
332
333 let start = Instant::now();
334 let result = self.execute_worker(worker_type);
335 let duration_ms = start.elapsed().as_millis() as u64;
336
337 let worker_result = WorkerResult {
338 worker: worker_type,
339 success: result.is_ok(),
340 duration_ms,
341 output: result.unwrap_or_else(|e| e),
342 };
343
344 {
346 let mut running = self.running.write();
347 running.remove(&worker_type);
348 }
349 {
350 let mut last = self.last_results.write();
351 last.insert(worker_type, worker_result.clone());
352 }
353
354 tracing::info!(
355 worker = %worker_type.name(),
356 success = worker_result.success,
357 duration_ms,
358 "Worker completed"
359 );
360
361 Ok(worker_result)
362 }
363
364 pub fn dispatch_all(&self) -> Vec<WorkerResult> {
368 let worker_types: Vec<WorkerType> = {
369 let configs = self.configs.read();
370 configs
371 .iter()
372 .filter(|(_, c)| c.enabled)
373 .map(|(wt, _)| *wt)
374 .collect()
375 };
376
377 let mut results = Vec::new();
378 for wt in worker_types {
379 match self.dispatch(wt) {
380 Ok(result) => results.push(result),
381 Err(e) => {
382 tracing::warn!(worker = %wt.name(), error = %e, "Failed to dispatch worker");
383 results.push(WorkerResult {
384 worker: wt,
385 success: false,
386 duration_ms: 0,
387 output: e,
388 });
389 }
390 }
391 }
392 results
393 }
394
395 pub fn status(&self) -> WorkerManagerStatus {
397 let configs = self.configs.read();
398 let running = self.running.read();
399 let last = self.last_results.read();
400
401 let enabled = configs.values().filter(|c| c.enabled).count();
402 let running_names: Vec<String> = running.iter().map(|wt| wt.name().to_string()).collect();
403 let last_results_map: HashMap<String, WorkerResult> = last
404 .iter()
405 .map(|(wt, r)| (wt.name().to_string(), r.clone()))
406 .collect();
407
408 WorkerManagerStatus {
409 registered: configs.len(),
410 enabled,
411 running: running_names,
412 last_results: last_results_map,
413 }
414 }
415
416 pub fn enable(&self, worker_type: WorkerType) -> bool {
418 let mut configs = self.configs.write();
419 if let Some(config) = configs.get_mut(&worker_type) {
420 config.enabled = true;
421 true
422 } else {
423 false
424 }
425 }
426
427 pub fn disable(&self, worker_type: WorkerType) -> bool {
429 let mut configs = self.configs.write();
430 if let Some(config) = configs.get_mut(&worker_type) {
431 config.enabled = false;
432 true
433 } else {
434 false
435 }
436 }
437
438 fn execute_worker(&self, worker_type: WorkerType) -> Result<String, String> {
448 let impls = self.implementations.read();
449 let worker = impls.get(&worker_type).ok_or_else(|| {
450 format!(
451 "No implementation registered for worker '{}'",
452 worker_type.name()
453 )
454 })?;
455 worker.execute()
456 }
457}
458
459impl Default for WorkerManager {
460 fn default() -> Self {
461 Self::new()
462 }
463}
464
465#[cfg(test)]
470mod tests {
471 use super::*;
472
473 struct EchoWorker;
475 impl Worker for EchoWorker {
476 fn execute(&self) -> Result<String, String> {
477 Ok("echo: ok".to_string())
478 }
479 }
480
481 fn manager_with_all_implementations() -> WorkerManager {
483 let mgr = WorkerManager::with_defaults();
484 for wt in WorkerType::all() {
485 mgr.register_implementation(*wt, Box::new(EchoWorker));
486 }
487 mgr
488 }
489
490 #[test]
491 fn test_worker_type_all() {
492 assert_eq!(WorkerType::all().len(), 12);
493 }
494
495 #[test]
496 fn test_worker_type_name() {
497 assert_eq!(WorkerType::Audit.name(), "audit");
498 assert_eq!(WorkerType::Learning.name(), "learning");
499 }
500
501 #[test]
502 fn test_worker_type_default_interval() {
503 assert_eq!(WorkerType::Audit.default_interval_ms(), 600_000);
504 assert_eq!(WorkerType::Ultralearn.default_interval_ms(), 60_000);
505 }
506
507 #[test]
508 fn test_worker_priority_ordering() {
509 assert!(WorkerPriority::Critical > WorkerPriority::High);
510 assert!(WorkerPriority::High > WorkerPriority::Normal);
511 assert!(WorkerPriority::Normal > WorkerPriority::Low);
512 }
513
514 #[test]
515 fn test_worker_config_default() {
516 let config = WorkerConfig::default_for(WorkerType::Audit);
517 assert_eq!(config.worker_type, WorkerType::Audit);
518 assert_eq!(config.priority, WorkerPriority::Critical);
519 assert!(config.enabled);
520 }
521
522 #[test]
523 fn test_new_manager_is_empty() {
524 let mgr = WorkerManager::new();
525 let status = mgr.status();
526 assert_eq!(status.registered, 0);
527 assert_eq!(status.enabled, 0);
528 }
529
530 #[test]
531 fn test_with_defaults() {
532 let mgr = WorkerManager::with_defaults();
533 let status = mgr.status();
534 assert_eq!(status.registered, 12);
535 assert_eq!(status.enabled, 12);
536 }
537
538 #[test]
539 fn test_register_and_dispatch() {
540 let mgr = WorkerManager::new();
541 mgr.register(
542 WorkerType::Audit,
543 WorkerConfig::default_for(WorkerType::Audit),
544 );
545 mgr.register_implementation(WorkerType::Audit, Box::new(EchoWorker));
546
547 let result = mgr.dispatch(WorkerType::Audit).unwrap();
548 assert!(result.success);
549 assert_eq!(result.worker, WorkerType::Audit);
550 }
551
552 #[test]
553 fn test_dispatch_no_implementation() {
554 let mgr = WorkerManager::new();
555 mgr.register(
556 WorkerType::Audit,
557 WorkerConfig::default_for(WorkerType::Audit),
558 );
559
560 let result = mgr.dispatch(WorkerType::Audit).unwrap();
561 assert!(!result.success);
562 assert!(result.output.contains("No implementation registered"));
563 }
564
565 #[test]
566 fn test_dispatch_unregistered() {
567 let mgr = WorkerManager::new();
568 let result = mgr.dispatch(WorkerType::Audit);
569 assert!(result.is_err());
570 assert!(result.unwrap_err().contains("not registered"));
571 }
572
573 #[test]
574 fn test_dispatch_disabled() {
575 let mgr = WorkerManager::new();
576 let mut config = WorkerConfig::default_for(WorkerType::Audit);
577 config.enabled = false;
578 mgr.register(WorkerType::Audit, config);
579
580 let result = mgr.dispatch(WorkerType::Audit);
581 assert!(result.is_err());
582 assert!(result.unwrap_err().contains("disabled"));
583 }
584
585 #[test]
586 fn test_dispatch_all() {
587 let mgr = manager_with_all_implementations();
588 let results = mgr.dispatch_all();
589 assert_eq!(results.len(), 12);
590 assert!(results.iter().all(|r| r.success));
591 }
592
593 #[test]
594 fn test_dispatch_all_missing_impl() {
595 let mgr = WorkerManager::with_defaults();
597 let results = mgr.dispatch_all();
598 assert_eq!(results.len(), 12);
599 assert!(results.iter().all(|r| !r.success));
600 }
601
602 #[test]
603 fn test_enable_disable() {
604 let mgr = WorkerManager::with_defaults();
605
606 mgr.disable(WorkerType::Audit);
607 let status = mgr.status();
608 assert_eq!(status.enabled, 11);
609
610 mgr.enable(WorkerType::Audit);
611 let status = mgr.status();
612 assert_eq!(status.enabled, 12);
613 }
614
615 #[test]
616 fn test_unregister() {
617 let mgr = WorkerManager::with_defaults();
618 assert!(mgr.unregister(WorkerType::Audit));
619 assert_eq!(mgr.status().registered, 11);
620 assert!(!mgr.unregister(WorkerType::Audit)); }
622
623 #[test]
624 fn test_status_last_results() {
625 let mgr = WorkerManager::new();
626 mgr.register(
627 WorkerType::Learning,
628 WorkerConfig::default_for(WorkerType::Learning),
629 );
630 mgr.register_implementation(WorkerType::Learning, Box::new(EchoWorker));
631 mgr.dispatch(WorkerType::Learning).unwrap();
632
633 let status = mgr.status();
634 assert!(status.last_results.contains_key("learning"));
635 }
636
637 #[test]
638 fn test_is_registered() {
639 let mgr = WorkerManager::new();
640 assert!(!mgr.is_registered(WorkerType::Audit));
641 mgr.register(
642 WorkerType::Audit,
643 WorkerConfig::default_for(WorkerType::Audit),
644 );
645 assert!(mgr.is_registered(WorkerType::Audit));
646 }
647
648 #[test]
649 fn test_has_implementation() {
650 let mgr = WorkerManager::new();
651 assert!(!mgr.has_implementation(WorkerType::Audit));
652 mgr.register_implementation(WorkerType::Audit, Box::new(EchoWorker));
653 assert!(mgr.has_implementation(WorkerType::Audit));
654 }
655
656 #[test]
657 fn test_unregister_implementation() {
658 let mgr = WorkerManager::new();
659 mgr.register_implementation(WorkerType::Audit, Box::new(EchoWorker));
660 assert!(mgr.has_implementation(WorkerType::Audit));
661 assert!(mgr.unregister_implementation(WorkerType::Audit));
662 assert!(!mgr.has_implementation(WorkerType::Audit));
663 assert!(!mgr.unregister_implementation(WorkerType::Audit));
664 }
665
666 #[test]
667 fn test_serialization_roundtrip() {
668 let config = WorkerConfig::default_for(WorkerType::Audit);
669 let json = serde_json::to_string(&config).unwrap();
670 let parsed: WorkerConfig = serde_json::from_str(&json).unwrap();
671 assert_eq!(parsed.worker_type, WorkerType::Audit);
672 assert_eq!(parsed.priority, WorkerPriority::Critical);
673 }
674}