ggen_core/ontology/
control_loop.rs

1use serde::{Deserialize, Serialize};
2/// Autonomous Control Loop: Closed-Loop Ontology Evolution
3///
4/// Implements the complete feedback loop:
5/// Observe → Detect → Propose → Validate → Promote → Record → Repeat
6///
7/// Runs autonomously without human intervention in the editing loop.
8use std::sync::Arc;
9use std::time::Duration;
10use tokio::sync::RwLock;
11
12use crate::ontology::delta_proposer::DeltaSigmaProposer;
13#[cfg(test)]
14use crate::ontology::pattern_miner::ObservationSource;
15use crate::ontology::pattern_miner::{MinerConfig, Observation, PatternMiner};
16use crate::ontology::promotion::AtomicSnapshotPromoter;
17use crate::ontology::sigma_runtime::{SigmaReceipt, SigmaRuntime, SigmaSnapshot};
18use crate::ontology::validators::{CompositeValidator, Invariant, ValidationContext};
19
20/// Telemetry for a control loop iteration
21#[derive(Debug, Clone, Serialize, Deserialize)]
22pub struct IterationTelemetry {
23    pub iteration: usize,
24    pub timestamp_ms: u64,
25    pub observation_count: usize,
26    pub patterns_detected: usize,
27    pub proposals_generated: usize,
28    pub proposals_validated: usize,
29    pub proposals_promoted: usize,
30    pub total_duration_ms: u64,
31}
32
33/// Control loop state machine
34#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
35pub enum LoopState {
36    Idle,
37    Observing,
38    Detecting,
39    Proposing,
40    Validating,
41    Promoting,
42    Recording,
43    Error,
44}
45
46/// Autonomous control loop configuration
47#[derive(Debug, Clone)]
48pub struct ControlLoopConfig {
49    /// Interval between iterations (milliseconds)
50    pub iteration_interval_ms: u64,
51
52    /// Max iterations before stopping (None = infinite)
53    pub max_iterations: Option<usize>,
54
55    /// Enable automatic promotion of valid proposals
56    pub auto_promote: bool,
57
58    /// Sector to evolve
59    pub sector: String,
60
61    /// Min confidence to proceed with proposal
62    pub min_proposal_confidence: f64,
63
64    /// Miner configuration
65    pub miner_config: MinerConfig,
66}
67
68impl Default for ControlLoopConfig {
69    fn default() -> Self {
70        Self {
71            iteration_interval_ms: 5000,
72            max_iterations: None,
73            auto_promote: true,
74            sector: "support".to_string(),
75            min_proposal_confidence: 0.75,
76            miner_config: MinerConfig::default(),
77        }
78    }
79}
80
81/// The autonomous control loop
82pub struct AutonomousControlLoop {
83    config: ControlLoopConfig,
84    state: Arc<RwLock<LoopState>>,
85    sigma_runtime: Arc<RwLock<SigmaRuntime>>,
86    promoter: Arc<AtomicSnapshotPromoter>,
87    pattern_miner: Arc<RwLock<PatternMiner>>,
88    proposer: Arc<dyn DeltaSigmaProposer>,
89    validator: Arc<CompositeValidator>,
90    telemetry: Arc<RwLock<Vec<IterationTelemetry>>>,
91}
92
93impl AutonomousControlLoop {
94    pub fn new(
95        config: ControlLoopConfig, initial_snapshot: SigmaSnapshot,
96        proposer: Arc<dyn DeltaSigmaProposer>, validator: Arc<CompositeValidator>,
97    ) -> Self {
98        let sigma_runtime = SigmaRuntime::new(initial_snapshot.clone());
99        let miner_config = config.miner_config.clone();
100
101        Self {
102            config,
103            state: Arc::new(RwLock::new(LoopState::Idle)),
104            sigma_runtime: Arc::new(RwLock::new(sigma_runtime)),
105            promoter: Arc::new(AtomicSnapshotPromoter::new(Arc::new(initial_snapshot))),
106            pattern_miner: Arc::new(RwLock::new(PatternMiner::new(miner_config))),
107            proposer,
108            validator,
109            telemetry: Arc::new(RwLock::new(Vec::new())),
110        }
111    }
112
113    /// Get current loop state
114    pub async fn state(&self) -> LoopState {
115        *self.state.read().await
116    }
117
118    /// Get telemetry
119    pub async fn telemetry(&self) -> Vec<IterationTelemetry> {
120        self.telemetry.read().await.clone()
121    }
122
123    /// Feed observation into the system
124    pub async fn observe(&self, obs: Observation) {
125        let mut miner = self.pattern_miner.write().await;
126        miner.add_observation(obs);
127    }
128
129    /// Run one iteration of the control loop
130    #[allow(clippy::expect_used)]
131    async fn iterate(&self) -> Result<IterationTelemetry, String> {
132        let start_ms = get_time_ms();
133        let mut telemetry = IterationTelemetry {
134            iteration: self.telemetry.read().await.len(),
135            timestamp_ms: start_ms,
136            observation_count: 0,
137            patterns_detected: 0,
138            proposals_generated: 0,
139            proposals_validated: 0,
140            proposals_promoted: 0,
141            total_duration_ms: 0,
142        };
143
144        // 1. OBSERVE: Already done via .observe() calls
145        let miner = self.pattern_miner.read().await;
146        telemetry.observation_count = miner.observation_count();
147        drop(miner);
148
149        // 2. DETECT: Run pattern mining
150        *self.state.write().await = LoopState::Detecting;
151        let mut miner = self.pattern_miner.write().await;
152        let patterns = miner
153            .mine()
154            .map_err(|e| format!("Pattern mining failed: {}", e))?;
155        telemetry.patterns_detected = patterns.len();
156        drop(miner);
157
158        if patterns.is_empty() {
159            *self.state.write().await = LoopState::Recording;
160            return Ok(telemetry);
161        }
162
163        // 3. PROPOSE: Generate ΔΣ² proposals
164        *self.state.write().await = LoopState::Proposing;
165        let current_snapshot = self
166            .promoter
167            .get_current()
168            .map_err(|e| format!("Failed to get current snapshot: {}", e))?;
169        let proposals = self
170            .proposer
171            .propose_deltas(patterns, current_snapshot.snapshot(), &self.config.sector)
172            .await
173            .map_err(|e| format!("Proposal generation failed: {}", e))?;
174
175        telemetry.proposals_generated = proposals.len();
176
177        // Filter by confidence
178        let valid_proposals: Vec<_> = proposals
179            .iter()
180            .filter(|p| p.confidence >= self.config.min_proposal_confidence)
181            .cloned()
182            .collect();
183
184        // 4. VALIDATE: Check invariants (Q)
185        *self.state.write().await = LoopState::Validating;
186        for proposal in &valid_proposals {
187            let current_snap = self
188                .promoter
189                .get_current()
190                .map_err(|e| format!("Failed to get current snapshot: {}", e))?;
191
192            // Apply proposal changes to create new snapshot
193            let mut new_triples = current_snap.snapshot().triples.as_ref().clone();
194
195            // Remove triples (for now, just filter out matching subjects)
196            for triple_pattern in &proposal.triples_to_remove {
197                new_triples.retain(|stmt| !stmt.subject.contains(triple_pattern));
198            }
199
200            // Add new triples
201            for triple_str in &proposal.triples_to_add {
202                // Parse simple triple format: "subject predicate object"
203                let parts: Vec<&str> = triple_str.split_whitespace().collect();
204                if parts.len() >= 3 {
205                    new_triples.push(crate::ontology::sigma_runtime::Statement {
206                        subject: parts[0].to_string(),
207                        predicate: parts[1].to_string(),
208                        object: parts[2].to_string(),
209                        graph: None,
210                    });
211                }
212            }
213
214            let new_snap = SigmaSnapshot::new(
215                Some(current_snap.snapshot().id.clone()),
216                new_triples,
217                format!("{}_updated", current_snap.snapshot().version),
218                "sig_updated".to_string(),
219                current_snap.snapshot().metadata.clone(),
220            );
221
222            let ctx = ValidationContext {
223                proposal: proposal.clone(),
224                current_snapshot: current_snap.snapshot(),
225                expected_new_snapshot: Arc::new(new_snap),
226                sector: self.config.sector.clone(),
227                invariants: vec![
228                    Invariant::NoRetrocausation,
229                    Invariant::TypeSoundness,
230                    Invariant::SLOPreservation,
231                ],
232            };
233
234            let (static_ev, dynamic_ev, perf_ev) = self
235                .validator
236                .validate_all(&ctx)
237                .await
238                .map_err(|e| format!("Validation failed: {}", e))?;
239
240            telemetry.proposals_validated += 1;
241
242            // Check if all validations passed
243            if static_ev.passed && dynamic_ev.passed && perf_ev.passed {
244                // 5. PROMOTE: Move to current
245                *self.state.write().await = LoopState::Promoting;
246
247                if self.config.auto_promote {
248                    // Create the promoted snapshot with applied changes
249                    let current_snap_for_promote = self.promoter.get_current().map_err(|e| {
250                        format!("Failed to get current snapshot for promotion: {}", e)
251                    })?;
252
253                    let mut promoted_triples =
254                        current_snap_for_promote.snapshot().triples.as_ref().clone();
255
256                    // Apply the same changes for promotion
257                    for triple_pattern in &proposal.triples_to_remove {
258                        promoted_triples.retain(|stmt| !stmt.subject.contains(triple_pattern));
259                    }
260
261                    for triple_str in &proposal.triples_to_add {
262                        let parts: Vec<&str> = triple_str.split_whitespace().collect();
263                        if parts.len() >= 3 {
264                            promoted_triples.push(crate::ontology::sigma_runtime::Statement {
265                                subject: parts[0].to_string(),
266                                predicate: parts[1].to_string(),
267                                object: parts[2].to_string(),
268                                graph: None,
269                            });
270                        }
271                    }
272
273                    let promoted_snapshot = SigmaSnapshot::new(
274                        Some(current_snap_for_promote.snapshot().id.clone()),
275                        promoted_triples,
276                        format!(
277                            "{}_v{}",
278                            current_snap_for_promote.snapshot().version,
279                            telemetry.iteration
280                        ),
281                        "promoted_sig".to_string(),
282                        current_snap_for_promote.snapshot().metadata.clone(),
283                    );
284
285                    let _promotion_result = self
286                        .promoter
287                        .promote(Arc::new(promoted_snapshot))
288                        .expect("Failed to promote snapshot");
289
290                    telemetry.proposals_promoted += 1;
291
292                    // 6. RECORD: Store receipt
293                    let receipt = SigmaReceipt::new(
294                        Default::default(),
295                        Some(current_snap.snapshot().id.clone()),
296                        format!("Proposal: {}", proposal.id),
297                    );
298
299                    let mut runtime = self.sigma_runtime.write().await;
300                    runtime.record_receipt(receipt);
301                }
302            }
303        }
304
305        // 7. RECORD: Update telemetry
306        *self.state.write().await = LoopState::Recording;
307        telemetry.total_duration_ms = get_time_ms() - start_ms;
308
309        Ok(telemetry)
310    }
311
312    /// Run the control loop
313    pub async fn run(&self) -> Result<(), String> {
314        *self.state.write().await = LoopState::Idle;
315
316        let mut iteration = 0;
317        loop {
318            // Check max iterations
319            if let Some(max) = self.config.max_iterations {
320                if iteration >= max {
321                    break;
322                }
323            }
324
325            // Run iteration
326            match self.iterate().await {
327                Ok(telemetry) => {
328                    self.telemetry.write().await.push(telemetry);
329                }
330                Err(e) => {
331                    *self.state.write().await = LoopState::Error;
332                    return Err(e);
333                }
334            }
335
336            iteration += 1;
337
338            // Wait before next iteration
339            tokio::time::sleep(Duration::from_millis(self.config.iteration_interval_ms)).await;
340        }
341
342        *self.state.write().await = LoopState::Idle;
343        Ok(())
344    }
345
346    /// Run with bounded iterations
347    pub async fn run_bounded(&self, max_iters: usize) -> Result<(), String> {
348        // Note: we can't modify self.config (it's not mutable), so we'll track manually
349
350        for _ in 0..max_iters {
351            match self.iterate().await {
352                Ok(telemetry) => {
353                    self.telemetry.write().await.push(telemetry);
354                }
355                Err(e) => {
356                    *self.state.write().await = LoopState::Error;
357                    return Err(e);
358                }
359            }
360
361            tokio::time::sleep(Duration::from_millis(self.config.iteration_interval_ms)).await;
362        }
363
364        *self.state.write().await = LoopState::Idle;
365        Ok(())
366    }
367
368    /// Get current snapshot
369    #[allow(clippy::expect_used)]
370    pub fn current_snapshot(&self) -> Arc<SigmaSnapshot> {
371        self.promoter
372            .get_current()
373            .expect("Failed to get current snapshot - lock poisoned")
374            .snapshot()
375    }
376}
377
378/// Get current time in milliseconds
379fn get_time_ms() -> u64 {
380    use std::time::{SystemTime, UNIX_EPOCH};
381
382    SystemTime::now()
383        .duration_since(UNIX_EPOCH)
384        .unwrap_or_default()
385        .as_millis() as u64
386}
387
388#[cfg(test)]
389mod tests {
390    use super::*;
391    use crate::ontology::delta_proposer::MockLLMProposer;
392    use crate::ontology::validators::{
393        MockDynamicValidator, MockPerformanceValidator, MockStaticValidator,
394    };
395
396    fn create_test_loop() -> AutonomousControlLoop {
397        let snapshot = SigmaSnapshot::new(
398            None,
399            vec![],
400            "1.0.0".to_string(),
401            "sig".to_string(),
402            Default::default(),
403        );
404
405        let proposer: Arc<dyn DeltaSigmaProposer> =
406            Arc::new(MockLLMProposer::new(Default::default()));
407
408        let static_v: Arc<dyn crate::ontology::validators::StaticValidator> =
409            Arc::new(MockStaticValidator);
410        let dynamic_v: Arc<dyn crate::ontology::validators::DynamicValidator> =
411            Arc::new(MockDynamicValidator);
412        let perf_v: Arc<dyn crate::ontology::validators::PerformanceValidator> =
413            Arc::new(MockPerformanceValidator::new(1000, 1024 * 100));
414
415        let validator = Arc::new(CompositeValidator::new(static_v, dynamic_v, perf_v));
416
417        let config = ControlLoopConfig {
418            iteration_interval_ms: 100,
419            ..Default::default()
420        };
421
422        AutonomousControlLoop::new(config, snapshot, proposer, validator)
423    }
424
425    #[tokio::test]
426    async fn test_control_loop_creation() {
427        let loop_sys = create_test_loop();
428        assert_eq!(loop_sys.state().await, LoopState::Idle);
429    }
430
431    #[tokio::test]
432    async fn test_control_loop_observation() {
433        let loop_sys = create_test_loop();
434
435        let obs = Observation {
436            entity: "test_entity".to_string(),
437            properties: [("type".to_string(), "test".to_string())]
438                .iter()
439                .cloned()
440                .collect(),
441            timestamp: 1000,
442            source: ObservationSource::Data,
443        };
444
445        loop_sys.observe(obs).await;
446
447        let miner = loop_sys.pattern_miner.read().await;
448        assert_eq!(miner.observations.len(), 1);
449    }
450
451    #[tokio::test]
452    async fn test_control_loop_iteration() {
453        let loop_sys = create_test_loop();
454
455        // Add some observations
456        for i in 0..3 {
457            let obs = Observation {
458                entity: format!("entity_{}", i),
459                properties: [("type".to_string(), "test".to_string())]
460                    .iter()
461                    .cloned()
462                    .collect(),
463                timestamp: 1000 + i as u64,
464                source: ObservationSource::Data,
465            };
466            loop_sys.observe(obs).await;
467        }
468
469        // Run one iteration
470        let telemetry = loop_sys.iterate().await.unwrap();
471        assert_eq!(telemetry.observation_count, 3);
472    }
473
474    #[tokio::test]
475    async fn test_control_loop_bounded_run() {
476        let loop_sys = create_test_loop();
477
478        // Add observations
479        for i in 0..5 {
480            let obs = Observation {
481                entity: format!("entity_{}", i),
482                properties: [("type".to_string(), "test".to_string())]
483                    .iter()
484                    .cloned()
485                    .collect(),
486                timestamp: 1000 + i as u64,
487                source: ObservationSource::Data,
488            };
489            loop_sys.observe(obs).await;
490        }
491
492        // Run 1 iteration
493        let result = loop_sys.run_bounded(1).await;
494        assert!(result.is_ok());
495
496        let telemetry = loop_sys.telemetry().await;
497        assert_eq!(telemetry.len(), 1);
498    }
499
500    #[tokio::test]
501    async fn test_control_loop_state_transitions() {
502        let loop_sys = create_test_loop();
503
504        // Add observations
505        for i in 0..3 {
506            let obs = Observation {
507                entity: format!("entity_{}", i),
508                properties: [("type".to_string(), "test".to_string())]
509                    .iter()
510                    .cloned()
511                    .collect(),
512                timestamp: 1000 + i as u64,
513                source: ObservationSource::Data,
514            };
515            loop_sys.observe(obs).await;
516        }
517
518        loop_sys.iterate().await.unwrap();
519
520        let final_state = loop_sys.state().await;
521        assert_eq!(final_state, LoopState::Recording);
522    }
523}