1use serde::{Deserialize, Serialize};
2use std::sync::Arc;
9use std::time::Duration;
10use tokio::sync::RwLock;
11
12use crate::ontology::delta_proposer::DeltaSigmaProposer;
13#[cfg(test)]
14use crate::ontology::pattern_miner::ObservationSource;
15use crate::ontology::pattern_miner::{MinerConfig, Observation, PatternMiner};
16use crate::ontology::promotion::AtomicSnapshotPromoter;
17use crate::ontology::sigma_runtime::{SigmaReceipt, SigmaRuntime, SigmaSnapshot};
18use crate::ontology::validators::{CompositeValidator, Invariant, ValidationContext};
19
20#[derive(Debug, Clone, Serialize, Deserialize)]
22pub struct IterationTelemetry {
23 pub iteration: usize,
24 pub timestamp_ms: u64,
25 pub observation_count: usize,
26 pub patterns_detected: usize,
27 pub proposals_generated: usize,
28 pub proposals_validated: usize,
29 pub proposals_promoted: usize,
30 pub total_duration_ms: u64,
31}
32
33#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
35pub enum LoopState {
36 Idle,
37 Observing,
38 Detecting,
39 Proposing,
40 Validating,
41 Promoting,
42 Recording,
43 Error,
44}
45
46#[derive(Debug, Clone)]
48pub struct ControlLoopConfig {
49 pub iteration_interval_ms: u64,
51
52 pub max_iterations: Option<usize>,
54
55 pub auto_promote: bool,
57
58 pub sector: String,
60
61 pub min_proposal_confidence: f64,
63
64 pub miner_config: MinerConfig,
66}
67
68impl Default for ControlLoopConfig {
69 fn default() -> Self {
70 Self {
71 iteration_interval_ms: 5000,
72 max_iterations: None,
73 auto_promote: true,
74 sector: "support".to_string(),
75 min_proposal_confidence: 0.75,
76 miner_config: MinerConfig::default(),
77 }
78 }
79}
80
81pub struct AutonomousControlLoop {
83 config: ControlLoopConfig,
84 state: Arc<RwLock<LoopState>>,
85 sigma_runtime: Arc<RwLock<SigmaRuntime>>,
86 promoter: Arc<AtomicSnapshotPromoter>,
87 pattern_miner: Arc<RwLock<PatternMiner>>,
88 proposer: Arc<dyn DeltaSigmaProposer>,
89 validator: Arc<CompositeValidator>,
90 telemetry: Arc<RwLock<Vec<IterationTelemetry>>>,
91}
92
93impl AutonomousControlLoop {
94 pub fn new(
95 config: ControlLoopConfig, initial_snapshot: SigmaSnapshot,
96 proposer: Arc<dyn DeltaSigmaProposer>, validator: Arc<CompositeValidator>,
97 ) -> Self {
98 let sigma_runtime = SigmaRuntime::new(initial_snapshot.clone());
99 let miner_config = config.miner_config.clone();
100
101 Self {
102 config,
103 state: Arc::new(RwLock::new(LoopState::Idle)),
104 sigma_runtime: Arc::new(RwLock::new(sigma_runtime)),
105 promoter: Arc::new(AtomicSnapshotPromoter::new(Arc::new(initial_snapshot))),
106 pattern_miner: Arc::new(RwLock::new(PatternMiner::new(miner_config))),
107 proposer,
108 validator,
109 telemetry: Arc::new(RwLock::new(Vec::new())),
110 }
111 }
112
113 pub async fn state(&self) -> LoopState {
115 *self.state.read().await
116 }
117
118 pub async fn telemetry(&self) -> Vec<IterationTelemetry> {
120 self.telemetry.read().await.clone()
121 }
122
123 pub async fn observe(&self, obs: Observation) {
125 let mut miner = self.pattern_miner.write().await;
126 miner.add_observation(obs);
127 }
128
129 #[allow(clippy::expect_used)]
131 async fn iterate(&self) -> Result<IterationTelemetry, String> {
132 let start_ms = get_time_ms();
133 let mut telemetry = IterationTelemetry {
134 iteration: self.telemetry.read().await.len(),
135 timestamp_ms: start_ms,
136 observation_count: 0,
137 patterns_detected: 0,
138 proposals_generated: 0,
139 proposals_validated: 0,
140 proposals_promoted: 0,
141 total_duration_ms: 0,
142 };
143
144 let miner = self.pattern_miner.read().await;
146 telemetry.observation_count = miner.observation_count();
147 drop(miner);
148
149 *self.state.write().await = LoopState::Detecting;
151 let mut miner = self.pattern_miner.write().await;
152 let patterns = miner
153 .mine()
154 .map_err(|e| format!("Pattern mining failed: {}", e))?;
155 telemetry.patterns_detected = patterns.len();
156 drop(miner);
157
158 if patterns.is_empty() {
159 *self.state.write().await = LoopState::Recording;
160 return Ok(telemetry);
161 }
162
163 *self.state.write().await = LoopState::Proposing;
165 let current_snapshot = self
166 .promoter
167 .get_current()
168 .map_err(|e| format!("Failed to get current snapshot: {}", e))?;
169 let proposals = self
170 .proposer
171 .propose_deltas(patterns, current_snapshot.snapshot(), &self.config.sector)
172 .await
173 .map_err(|e| format!("Proposal generation failed: {}", e))?;
174
175 telemetry.proposals_generated = proposals.len();
176
177 let valid_proposals: Vec<_> = proposals
179 .iter()
180 .filter(|p| p.confidence >= self.config.min_proposal_confidence)
181 .cloned()
182 .collect();
183
184 *self.state.write().await = LoopState::Validating;
186 for proposal in &valid_proposals {
187 let current_snap = self
188 .promoter
189 .get_current()
190 .map_err(|e| format!("Failed to get current snapshot: {}", e))?;
191
192 let mut new_triples = current_snap.snapshot().triples.as_ref().clone();
194
195 for triple_pattern in &proposal.triples_to_remove {
197 new_triples.retain(|stmt| !stmt.subject.contains(triple_pattern));
198 }
199
200 for triple_str in &proposal.triples_to_add {
202 let parts: Vec<&str> = triple_str.split_whitespace().collect();
204 if parts.len() >= 3 {
205 new_triples.push(crate::ontology::sigma_runtime::Statement {
206 subject: parts[0].to_string(),
207 predicate: parts[1].to_string(),
208 object: parts[2].to_string(),
209 graph: None,
210 });
211 }
212 }
213
214 let new_snap = SigmaSnapshot::new(
215 Some(current_snap.snapshot().id.clone()),
216 new_triples,
217 format!("{}_updated", current_snap.snapshot().version),
218 "sig_updated".to_string(),
219 current_snap.snapshot().metadata.clone(),
220 );
221
222 let ctx = ValidationContext {
223 proposal: proposal.clone(),
224 current_snapshot: current_snap.snapshot(),
225 expected_new_snapshot: Arc::new(new_snap),
226 sector: self.config.sector.clone(),
227 invariants: vec![
228 Invariant::NoRetrocausation,
229 Invariant::TypeSoundness,
230 Invariant::SLOPreservation,
231 ],
232 };
233
234 let (static_ev, dynamic_ev, perf_ev) = self
235 .validator
236 .validate_all(&ctx)
237 .await
238 .map_err(|e| format!("Validation failed: {}", e))?;
239
240 telemetry.proposals_validated += 1;
241
242 if static_ev.passed && dynamic_ev.passed && perf_ev.passed {
244 *self.state.write().await = LoopState::Promoting;
246
247 if self.config.auto_promote {
248 let current_snap_for_promote = self.promoter.get_current().map_err(|e| {
250 format!("Failed to get current snapshot for promotion: {}", e)
251 })?;
252
253 let mut promoted_triples =
254 current_snap_for_promote.snapshot().triples.as_ref().clone();
255
256 for triple_pattern in &proposal.triples_to_remove {
258 promoted_triples.retain(|stmt| !stmt.subject.contains(triple_pattern));
259 }
260
261 for triple_str in &proposal.triples_to_add {
262 let parts: Vec<&str> = triple_str.split_whitespace().collect();
263 if parts.len() >= 3 {
264 promoted_triples.push(crate::ontology::sigma_runtime::Statement {
265 subject: parts[0].to_string(),
266 predicate: parts[1].to_string(),
267 object: parts[2].to_string(),
268 graph: None,
269 });
270 }
271 }
272
273 let promoted_snapshot = SigmaSnapshot::new(
274 Some(current_snap_for_promote.snapshot().id.clone()),
275 promoted_triples,
276 format!(
277 "{}_v{}",
278 current_snap_for_promote.snapshot().version,
279 telemetry.iteration
280 ),
281 "promoted_sig".to_string(),
282 current_snap_for_promote.snapshot().metadata.clone(),
283 );
284
285 let _promotion_result = self
286 .promoter
287 .promote(Arc::new(promoted_snapshot))
288 .expect("Failed to promote snapshot");
289
290 telemetry.proposals_promoted += 1;
291
292 let receipt = SigmaReceipt::new(
294 Default::default(),
295 Some(current_snap.snapshot().id.clone()),
296 format!("Proposal: {}", proposal.id),
297 );
298
299 let mut runtime = self.sigma_runtime.write().await;
300 runtime.record_receipt(receipt);
301 }
302 }
303 }
304
305 *self.state.write().await = LoopState::Recording;
307 telemetry.total_duration_ms = get_time_ms() - start_ms;
308
309 Ok(telemetry)
310 }
311
312 pub async fn run(&self) -> Result<(), String> {
314 *self.state.write().await = LoopState::Idle;
315
316 let mut iteration = 0;
317 loop {
318 if let Some(max) = self.config.max_iterations {
320 if iteration >= max {
321 break;
322 }
323 }
324
325 match self.iterate().await {
327 Ok(telemetry) => {
328 self.telemetry.write().await.push(telemetry);
329 }
330 Err(e) => {
331 *self.state.write().await = LoopState::Error;
332 return Err(e);
333 }
334 }
335
336 iteration += 1;
337
338 tokio::time::sleep(Duration::from_millis(self.config.iteration_interval_ms)).await;
340 }
341
342 *self.state.write().await = LoopState::Idle;
343 Ok(())
344 }
345
346 pub async fn run_bounded(&self, max_iters: usize) -> Result<(), String> {
348 for _ in 0..max_iters {
351 match self.iterate().await {
352 Ok(telemetry) => {
353 self.telemetry.write().await.push(telemetry);
354 }
355 Err(e) => {
356 *self.state.write().await = LoopState::Error;
357 return Err(e);
358 }
359 }
360
361 tokio::time::sleep(Duration::from_millis(self.config.iteration_interval_ms)).await;
362 }
363
364 *self.state.write().await = LoopState::Idle;
365 Ok(())
366 }
367
368 #[allow(clippy::expect_used)]
370 pub fn current_snapshot(&self) -> Arc<SigmaSnapshot> {
371 self.promoter
372 .get_current()
373 .expect("Failed to get current snapshot - lock poisoned")
374 .snapshot()
375 }
376}
377
378fn get_time_ms() -> u64 {
380 use std::time::{SystemTime, UNIX_EPOCH};
381
382 SystemTime::now()
383 .duration_since(UNIX_EPOCH)
384 .unwrap_or_default()
385 .as_millis() as u64
386}
387
388#[cfg(test)]
389mod tests {
390 use super::*;
391 use crate::ontology::delta_proposer::MockLLMProposer;
392 use crate::ontology::validators::{
393 MockDynamicValidator, MockPerformanceValidator, MockStaticValidator,
394 };
395
396 fn create_test_loop() -> AutonomousControlLoop {
397 let snapshot = SigmaSnapshot::new(
398 None,
399 vec![],
400 "1.0.0".to_string(),
401 "sig".to_string(),
402 Default::default(),
403 );
404
405 let proposer: Arc<dyn DeltaSigmaProposer> =
406 Arc::new(MockLLMProposer::new(Default::default()));
407
408 let static_v: Arc<dyn crate::ontology::validators::StaticValidator> =
409 Arc::new(MockStaticValidator);
410 let dynamic_v: Arc<dyn crate::ontology::validators::DynamicValidator> =
411 Arc::new(MockDynamicValidator);
412 let perf_v: Arc<dyn crate::ontology::validators::PerformanceValidator> =
413 Arc::new(MockPerformanceValidator::new(1000, 1024 * 100));
414
415 let validator = Arc::new(CompositeValidator::new(static_v, dynamic_v, perf_v));
416
417 let config = ControlLoopConfig {
418 iteration_interval_ms: 100,
419 ..Default::default()
420 };
421
422 AutonomousControlLoop::new(config, snapshot, proposer, validator)
423 }
424
425 #[tokio::test]
426 async fn test_control_loop_creation() {
427 let loop_sys = create_test_loop();
428 assert_eq!(loop_sys.state().await, LoopState::Idle);
429 }
430
431 #[tokio::test]
432 async fn test_control_loop_observation() {
433 let loop_sys = create_test_loop();
434
435 let obs = Observation {
436 entity: "test_entity".to_string(),
437 properties: [("type".to_string(), "test".to_string())]
438 .iter()
439 .cloned()
440 .collect(),
441 timestamp: 1000,
442 source: ObservationSource::Data,
443 };
444
445 loop_sys.observe(obs).await;
446
447 let miner = loop_sys.pattern_miner.read().await;
448 assert_eq!(miner.observations.len(), 1);
449 }
450
451 #[tokio::test]
452 async fn test_control_loop_iteration() {
453 let loop_sys = create_test_loop();
454
455 for i in 0..3 {
457 let obs = Observation {
458 entity: format!("entity_{}", i),
459 properties: [("type".to_string(), "test".to_string())]
460 .iter()
461 .cloned()
462 .collect(),
463 timestamp: 1000 + i as u64,
464 source: ObservationSource::Data,
465 };
466 loop_sys.observe(obs).await;
467 }
468
469 let telemetry = loop_sys.iterate().await.unwrap();
471 assert_eq!(telemetry.observation_count, 3);
472 }
473
474 #[tokio::test]
475 async fn test_control_loop_bounded_run() {
476 let loop_sys = create_test_loop();
477
478 for i in 0..5 {
480 let obs = Observation {
481 entity: format!("entity_{}", i),
482 properties: [("type".to_string(), "test".to_string())]
483 .iter()
484 .cloned()
485 .collect(),
486 timestamp: 1000 + i as u64,
487 source: ObservationSource::Data,
488 };
489 loop_sys.observe(obs).await;
490 }
491
492 let result = loop_sys.run_bounded(1).await;
494 assert!(result.is_ok());
495
496 let telemetry = loop_sys.telemetry().await;
497 assert_eq!(telemetry.len(), 1);
498 }
499
500 #[tokio::test]
501 async fn test_control_loop_state_transitions() {
502 let loop_sys = create_test_loop();
503
504 for i in 0..3 {
506 let obs = Observation {
507 entity: format!("entity_{}", i),
508 properties: [("type".to_string(), "test".to_string())]
509 .iter()
510 .cloned()
511 .collect(),
512 timestamp: 1000 + i as u64,
513 source: ObservationSource::Data,
514 };
515 loop_sys.observe(obs).await;
516 }
517
518 loop_sys.iterate().await.unwrap();
519
520 let final_state = loop_sys.state().await;
521 assert_eq!(final_state, LoopState::Recording);
522 }
523}