1use std::sync::Arc;
32
33use chrono::{DateTime, Timelike, Utc};
34use serde::{Deserialize, Serialize};
35use thiserror::Error;
36use tokio::sync::RwLock;
37use tracing::{debug, info, warn};
38
39use super::{
40 ActionAllowlist, AnalysisBlocked, Analyzer, CircuitBreaker, CircuitState, ExecutionBlocked,
41 Executor, HealthReport, Learner, LearningBlocked, Monitor, SelfDiagnosis,
42 SelfImprovementConfig, SelfImprovementPipes,
43};
44use crate::langbase::LangbaseClient;
45use crate::storage::SqliteStorage;
46
47#[derive(Debug, Error)]
53pub enum SelfImprovementError {
54 #[error("Self-improvement system is disabled")]
56 Disabled,
57
58 #[error("Circuit breaker is open: {consecutive_failures} consecutive failures")]
60 CircuitBreakerOpen {
61 consecutive_failures: u32,
63 },
64
65 #[error("System in cooldown until {until}")]
67 InCooldown {
68 until: DateTime<Utc>,
70 },
71
72 #[error("Rate limit exceeded: {count}/{max} actions this hour")]
74 RateLimitExceeded {
75 count: u32,
77 max: u32,
79 },
80
81 #[error("Monitor phase failed: {message}")]
83 MonitorFailed {
84 message: String,
86 },
87
88 #[error("Analyzer phase failed: {message}")]
90 AnalyzerFailed {
91 message: String,
93 },
94
95 #[error("Executor phase failed: {message}")]
97 ExecutorFailed {
98 message: String,
100 },
101
102 #[error("Learner phase failed: {message}")]
104 LearnerFailed {
105 message: String,
107 },
108
109 #[error("Storage error: {message}")]
111 StorageError {
112 message: String,
114 },
115
116 #[error("Internal error: {message}")]
118 Internal {
119 message: String,
121 },
122}
123
124#[derive(Debug, Clone, Serialize, Deserialize)]
132pub struct InvocationEvent {
133 pub tool_name: String,
135 pub latency_ms: i64,
137 pub success: bool,
139 pub quality_score: Option<f64>,
141 pub timestamp: DateTime<Utc>,
143}
144
145#[derive(Debug, Clone, Serialize, Deserialize)]
151pub struct CycleResult {
152 pub success: bool,
154 pub action_taken: bool,
156 pub diagnosis: Option<SelfDiagnosis>,
158 pub reward: Option<f64>,
160 pub lessons: Option<String>,
162 pub error: Option<String>,
164 pub duration_ms: u64,
166}
167
168#[derive(Debug, Clone, Serialize, Deserialize)]
174pub struct SystemStatus {
175 pub enabled: bool,
177 pub circuit_state: CircuitState,
179 pub consecutive_failures: u32,
181 pub in_cooldown: bool,
183 pub cooldown_ends_at: Option<DateTime<Utc>>,
185 pub actions_this_hour: u32,
187 pub max_actions_per_hour: u32,
189 pub total_cycles: u64,
191 pub total_successes: u64,
193 pub total_rollbacks: u64,
195 pub last_cycle_at: Option<DateTime<Utc>>,
197}
198
199struct SystemState {
205 cooldown_until: Option<DateTime<Utc>>,
207 actions_this_hour: u32,
209 rate_limit_hour: DateTime<Utc>,
211 total_cycles: u64,
213 total_successes: u64,
215 total_rollbacks: u64,
217 last_cycle_at: Option<DateTime<Utc>>,
219}
220
221impl Default for SystemState {
222 fn default() -> Self {
223 Self {
224 cooldown_until: None,
225 actions_this_hour: 0,
226 rate_limit_hour: Utc::now(),
227 total_cycles: 0,
228 total_successes: 0,
229 total_rollbacks: 0,
230 last_cycle_at: None,
231 }
232 }
233}
234
235pub struct SelfImprovementSystem {
256 config: SelfImprovementConfig,
258 monitor: Monitor,
260 analyzer: Analyzer,
262 executor: Executor,
264 learner: Learner,
266 circuit_breaker: Arc<RwLock<CircuitBreaker>>,
268 allowlist: ActionAllowlist,
270 state: Arc<RwLock<SystemState>>,
272}
273
274impl SelfImprovementSystem {
275 pub fn new(
287 config: SelfImprovementConfig,
288 _storage: SqliteStorage,
289 langbase: LangbaseClient,
290 ) -> Self {
291 info!(
292 enabled = config.enabled,
293 max_actions_per_hour = config.executor.max_actions_per_hour,
294 "Initializing SelfImprovementSystem"
295 );
296
297 let circuit_breaker = Arc::new(RwLock::new(CircuitBreaker::new(
299 config.circuit_breaker.clone(),
300 )));
301
302 let langbase_arc = Arc::new(langbase);
304 let pipes = Arc::new(SelfImprovementPipes::new(
305 langbase_arc,
306 config.pipes.clone(),
307 ));
308
309 let allowlist = ActionAllowlist::default_allowlist();
311
312 let monitor = Monitor::new(config.clone());
314 let analyzer = Analyzer::new(config.clone(), pipes.clone(), circuit_breaker.clone());
315 let executor = Executor::new(config.clone(), allowlist.clone(), circuit_breaker.clone());
316 let learner = Learner::new(config.clone(), pipes, circuit_breaker.clone());
317
318 Self {
319 config,
320 monitor,
321 analyzer,
322 executor,
323 learner,
324 circuit_breaker,
325 allowlist,
326 state: Arc::new(RwLock::new(SystemState::default())),
327 }
328 }
329
330 pub fn is_enabled(&self) -> bool {
332 self.config.enabled
333 }
334
335 pub async fn on_invocation(&self, event: InvocationEvent) {
339 if !self.config.enabled {
340 return;
341 }
342
343 debug!(
344 tool = %event.tool_name,
345 latency_ms = event.latency_ms,
346 success = event.success,
347 "Recording invocation"
348 );
349
350 self.monitor
352 .record_invocation(
353 !event.success,
354 event.latency_ms,
355 event.quality_score.unwrap_or(0.8),
356 false, )
358 .await;
359 }
360
361 pub async fn check_health(&self) -> Option<HealthReport> {
366 self.monitor.check_health().await
367 }
368
369 pub async fn force_health_check(&self) -> Option<HealthReport> {
371 self.monitor.force_check().await
372 }
373
374 pub async fn status(&self) -> SystemStatus {
376 let state = self.state.read().await;
377 let cb = self.circuit_breaker.read().await;
378 let cb_summary = cb.summary();
379
380 let in_cooldown = state
381 .cooldown_until
382 .map(|until| Utc::now() < until)
383 .unwrap_or(false);
384
385 SystemStatus {
386 enabled: self.config.enabled,
387 circuit_state: cb_summary.state,
388 consecutive_failures: cb_summary.consecutive_failures,
389 in_cooldown,
390 cooldown_ends_at: if in_cooldown {
391 state.cooldown_until
392 } else {
393 None
394 },
395 actions_this_hour: state.actions_this_hour,
396 max_actions_per_hour: self.config.executor.max_actions_per_hour,
397 total_cycles: state.total_cycles,
398 total_successes: state.total_successes,
399 total_rollbacks: state.total_rollbacks,
400 last_cycle_at: state.last_cycle_at,
401 }
402 }
403
404 pub async fn run_cycle(&self) -> Result<CycleResult, SelfImprovementError> {
411 let start = std::time::Instant::now();
412
413 if !self.config.enabled {
415 return Err(SelfImprovementError::Disabled);
416 }
417
418 {
420 let mut cb = self.circuit_breaker.write().await;
421 if !cb.can_execute() {
422 let summary = cb.summary();
423 return Err(SelfImprovementError::CircuitBreakerOpen {
424 consecutive_failures: summary.consecutive_failures,
425 });
426 }
427 }
428
429 {
431 let state = self.state.read().await;
432 if let Some(until) = state.cooldown_until {
433 if Utc::now() < until {
434 return Err(SelfImprovementError::InCooldown { until });
435 }
436 }
437 }
438
439 self.check_and_update_rate_limit().await?;
441
442 {
444 let mut state = self.state.write().await;
445 state.total_cycles += 1;
446 state.last_cycle_at = Some(Utc::now());
447 }
448
449 info!("Starting self-improvement cycle");
450
451 let health = match self.monitor.force_check().await {
453 Some(report) => report,
454 None => {
455 info!("Not enough samples for health check");
456 return Ok(CycleResult {
457 success: true,
458 action_taken: false,
459 diagnosis: None,
460 reward: None,
461 lessons: None,
462 error: Some("Insufficient samples for health check".to_string()),
463 duration_ms: start.elapsed().as_millis() as u64,
464 });
465 }
466 };
467 debug!(?health, "Health report");
468
469 if !health.needs_action() {
470 info!("No action needed - system healthy");
471 return Ok(CycleResult {
472 success: true,
473 action_taken: false,
474 diagnosis: None,
475 reward: None,
476 lessons: None,
477 error: None,
478 duration_ms: start.elapsed().as_millis() as u64,
479 });
480 }
481
482 let analysis_result = match self.analyzer.analyze(&health).await {
484 Ok(result) => result,
485 Err(blocked) => {
486 let msg = match &blocked {
487 AnalysisBlocked::CircuitOpen { remaining_secs } => {
488 format!("Circuit open, {} seconds until recovery", remaining_secs)
489 }
490 AnalysisBlocked::NoTriggers => "No triggers to analyze".to_string(),
491 AnalysisBlocked::PipeUnavailable { pipe, error } => {
492 format!("Pipe '{}' unavailable: {}", pipe, error)
493 }
494 AnalysisBlocked::MaxPendingReached { count } => {
495 format!("Max pending diagnoses reached: {}", count)
496 }
497 AnalysisBlocked::SeverityTooLow { severity, minimum } => {
498 format!("Severity {:?} below minimum {:?}", severity, minimum)
499 }
500 };
501 warn!(?blocked, "Analysis blocked");
502 return Ok(CycleResult {
503 success: true,
504 action_taken: false,
505 diagnosis: None,
506 reward: None,
507 lessons: None,
508 error: Some(format!("Analysis blocked: {}", msg)),
509 duration_ms: start.elapsed().as_millis() as u64,
510 });
511 }
512 };
513
514 let diagnosis = analysis_result.diagnosis.clone();
515
516 info!(
517 diagnosis_id = %diagnosis.id,
518 severity = ?diagnosis.severity,
519 action = ?diagnosis.suggested_action.action_type(),
520 "Diagnosis generated"
521 );
522
523 if let Err(e) = self.allowlist.validate(&diagnosis.suggested_action) {
525 warn!(error = %e, "Action not allowed");
526 return Ok(CycleResult {
527 success: true,
528 action_taken: false,
529 diagnosis: Some(diagnosis),
530 reward: None,
531 lessons: None,
532 error: Some(format!("Action not allowed: {}", e)),
533 duration_ms: start.elapsed().as_millis() as u64,
534 });
535 }
536
537 let current_metrics = self.monitor.get_current_metrics().await;
539 let execution_result = match self.executor.execute(&diagnosis, ¤t_metrics).await {
540 Ok(result) => result,
541 Err(blocked) => {
542 let msg = match &blocked {
543 ExecutionBlocked::CircuitOpen { remaining_secs } => {
544 format!("Circuit open, {} seconds until recovery", remaining_secs)
545 }
546 ExecutionBlocked::CooldownActive { remaining_secs } => {
547 format!("Cooldown active, {} seconds remaining", remaining_secs)
548 }
549 ExecutionBlocked::RateLimitExceeded { count, max } => {
550 format!("Rate limit exceeded: {}/{}", count, max)
551 }
552 ExecutionBlocked::NotAllowed { reason } => {
553 format!("Action not allowed: {}", reason)
554 }
555 ExecutionBlocked::NoOpAction { reason } => {
556 format!("NoOp action: {}", reason)
557 }
558 ExecutionBlocked::AwaitingApproval { diagnosis_id } => {
559 format!("Awaiting approval for diagnosis: {}", diagnosis_id)
560 }
561 };
562 warn!(?blocked, "Execution blocked");
563 return Ok(CycleResult {
564 success: true,
565 action_taken: false,
566 diagnosis: Some(diagnosis),
567 reward: None,
568 lessons: None,
569 error: Some(format!("Execution blocked: {}", msg)),
570 duration_ms: start.elapsed().as_millis() as u64,
571 });
572 }
573 };
574
575 info!(
576 action_id = %execution_result.action_id,
577 outcome = ?execution_result.outcome,
578 "Action executed"
579 );
580
581 let baselines = self.monitor.get_baselines().await;
583 let post_metrics = self.monitor.get_current_metrics().await;
584
585 let learning_result = match self
587 .learner
588 .learn(&execution_result, &diagnosis, &post_metrics, &baselines)
589 .await
590 {
591 Ok(outcome) => Some(outcome),
592 Err(blocked) => {
593 let msg = match &blocked {
594 LearningBlocked::ExecutionNotCompleted { status } => {
595 format!("Execution not completed: {:?}", status)
596 }
597 LearningBlocked::InsufficientSamples { required, actual } => {
598 format!("Insufficient samples: {} < {}", actual, required)
599 }
600 LearningBlocked::PipeUnavailable { message } => {
601 format!("Pipe unavailable: {}", message)
602 }
603 };
604 warn!(?blocked, "Learning blocked: {}", msg);
605 None
606 }
607 };
608
609 let (reward, lessons) = if let Some(outcome) = learning_result {
610 let lesson_text = outcome
611 .learning_synthesis
612 .map(|ls| ls.lessons.join("; "));
613 (Some(outcome.reward.value), lesson_text)
614 } else {
615 (None, None)
616 };
617
618 if execution_result.outcome == super::types::ActionOutcome::Success {
620 self.record_success().await;
621 let mut state = self.state.write().await;
622 state.total_successes += 1;
623 } else if execution_result.outcome == super::types::ActionOutcome::RolledBack {
624 self.record_failure().await;
625 let mut state = self.state.write().await;
626 state.total_rollbacks += 1;
627 }
628
629 self.set_cooldown().await;
631
632 info!(
633 reward = ?reward,
634 "Improvement cycle completed"
635 );
636
637 Ok(CycleResult {
638 success: true,
639 action_taken: true,
640 diagnosis: Some(diagnosis),
641 reward,
642 lessons,
643 error: None,
644 duration_ms: start.elapsed().as_millis() as u64,
645 })
646 }
647
648 pub async fn force_cycle(&self) -> Result<CycleResult, SelfImprovementError> {
652 info!("Force-running improvement cycle");
653 self.run_cycle().await
654 }
655
656 pub async fn rollback(&self, action_id: &str) -> Result<(), SelfImprovementError> {
658 info!(action_id = action_id, "Manual rollback requested");
659
660 self.executor.rollback_by_id(action_id).await.map_err(|e| {
661 SelfImprovementError::ExecutorFailed {
662 message: format!("Rollback failed: {}", e),
663 }
664 })?;
665
666 Ok(())
667 }
668
669 pub async fn pause(&self, duration: std::time::Duration) {
671 let until = Utc::now() + chrono::Duration::from_std(duration).unwrap_or_default();
672 let mut state = self.state.write().await;
673 state.cooldown_until = Some(until);
674 info!(until = %until, "System paused");
675 }
676
677 pub async fn resume(&self) {
679 let mut state = self.state.write().await;
680 state.cooldown_until = None;
681 info!("System resumed");
682 }
683
684 async fn check_and_update_rate_limit(&self) -> Result<(), SelfImprovementError> {
689 let mut state = self.state.write().await;
690 let now = Utc::now();
691
692 let current_hour = now.date_naive().and_hms_opt(now.time().hour(), 0, 0);
694 let limit_hour = state
695 .rate_limit_hour
696 .date_naive()
697 .and_hms_opt(state.rate_limit_hour.time().hour(), 0, 0);
698
699 if current_hour != limit_hour {
700 state.actions_this_hour = 0;
701 state.rate_limit_hour = now;
702 }
703
704 if state.actions_this_hour >= self.config.executor.max_actions_per_hour {
706 return Err(SelfImprovementError::RateLimitExceeded {
707 count: state.actions_this_hour,
708 max: self.config.executor.max_actions_per_hour,
709 });
710 }
711
712 state.actions_this_hour += 1;
714
715 Ok(())
716 }
717
718 async fn record_success(&self) {
719 let mut cb = self.circuit_breaker.write().await;
720 cb.record_success();
721 }
722
723 async fn record_failure(&self) {
724 let mut cb = self.circuit_breaker.write().await;
725 cb.record_failure();
726 }
727
728 async fn set_cooldown(&self) {
729 let cooldown = self.config.executor.cooldown_duration();
730 let until = Utc::now() + chrono::Duration::from_std(cooldown).unwrap_or_default();
731 let mut state = self.state.write().await;
732 state.cooldown_until = Some(until);
733 debug!(until = %until, "Cooldown set");
734 }
735}
736
737#[cfg(test)]
742mod tests {
743 use super::*;
744
745 #[test]
746 fn test_invocation_event_creation() {
747 let event = InvocationEvent {
748 tool_name: "reasoning_linear".to_string(),
749 latency_ms: 150,
750 success: true,
751 quality_score: Some(0.85),
752 timestamp: Utc::now(),
753 };
754
755 assert_eq!(event.tool_name, "reasoning_linear");
756 assert_eq!(event.latency_ms, 150);
757 assert!(event.success);
758 }
759
760 #[test]
761 fn test_cycle_result_no_action() {
762 let result = CycleResult {
763 success: true,
764 action_taken: false,
765 diagnosis: None,
766 reward: None,
767 lessons: None,
768 error: None,
769 duration_ms: 50,
770 };
771
772 assert!(result.success);
773 assert!(!result.action_taken);
774 }
775
776 #[test]
777 fn test_system_status_default() {
778 let status = SystemStatus {
779 enabled: true,
780 circuit_state: CircuitState::Closed,
781 consecutive_failures: 0,
782 in_cooldown: false,
783 cooldown_ends_at: None,
784 actions_this_hour: 0,
785 max_actions_per_hour: 3,
786 total_cycles: 0,
787 total_successes: 0,
788 total_rollbacks: 0,
789 last_cycle_at: None,
790 };
791
792 assert!(status.enabled);
793 assert_eq!(status.circuit_state, CircuitState::Closed);
794 }
795
796 #[test]
797 fn test_error_display() {
798 let err = SelfImprovementError::Disabled;
799 assert_eq!(err.to_string(), "Self-improvement system is disabled");
800
801 let err = SelfImprovementError::CircuitBreakerOpen {
802 consecutive_failures: 3,
803 };
804 assert!(err.to_string().contains("3 consecutive failures"));
805 }
806}