mockforge_core/consistency/
engine.rs1use crate::consistency::adapters::ProtocolAdapter;
7use crate::consistency::types::{EntityState, ProtocolState, StateChangeEvent, UnifiedState};
8use crate::protocol_abstraction::Protocol;
9use crate::reality::RealityLevel;
10use crate::Result;
11type ChaosScenario = serde_json::Value;
13use mockforge_data::PersonaProfile;
14use std::collections::HashMap;
15use std::sync::Arc;
16use tokio::sync::{broadcast, RwLock};
17use tracing::{debug, error, info, warn};
18
19const DEFAULT_BROADCAST_CAPACITY: usize = 1000;
21
22fn get_broadcast_capacity() -> usize {
24 std::env::var("MOCKFORGE_BROADCAST_CAPACITY")
25 .ok()
26 .and_then(|s| s.parse().ok())
27 .unwrap_or(DEFAULT_BROADCAST_CAPACITY)
28}
29
30pub struct ConsistencyEngine {
36 states: Arc<RwLock<HashMap<String, UnifiedState>>>,
38 event_tx: broadcast::Sender<StateChangeEvent>,
40 adapters: Arc<RwLock<Vec<Arc<dyn ProtocolAdapter + Send + Sync>>>>,
42}
43
44impl ConsistencyEngine {
45 pub fn new() -> Self {
50 let capacity = get_broadcast_capacity();
51 let (event_tx, _) = broadcast::channel(capacity);
52 Self {
53 states: Arc::new(RwLock::new(HashMap::new())),
54 event_tx,
55 adapters: Arc::new(RwLock::new(Vec::new())),
56 }
57 }
58
59 pub async fn register_adapter(&self, adapter: Arc<dyn ProtocolAdapter + Send + Sync>) {
63 let mut adapters = self.adapters.write().await;
64 adapters.push(adapter);
65 info!("Registered protocol adapter");
66 }
67
68 pub async fn get_or_create_state(&self, workspace_id: &str) -> UnifiedState {
70 let mut states = self.states.write().await;
71 states
72 .entry(workspace_id.to_string())
73 .or_insert_with(|| UnifiedState::new(workspace_id.to_string()))
74 .clone()
75 }
76
77 pub async fn get_state(&self, workspace_id: &str) -> Option<UnifiedState> {
79 let states = self.states.read().await;
80 states.get(workspace_id).cloned()
81 }
82
83 pub async fn set_active_persona(
88 &self,
89 workspace_id: &str,
90 persona: PersonaProfile,
91 ) -> Result<()> {
92 let mut states = self.states.write().await;
93 let state = states
94 .entry(workspace_id.to_string())
95 .or_insert_with(|| UnifiedState::new(workspace_id.to_string()));
96
97 state.active_persona = Some(persona.clone());
98 state.increment_version();
99
100 let event = StateChangeEvent::PersonaChanged {
101 workspace_id: workspace_id.to_string(),
102 persona,
103 };
104
105 drop(states); self.broadcast_event(&event).await;
108 info!("Set active persona for workspace {}", workspace_id);
109 Ok(())
110 }
111
112 pub async fn set_active_scenario(&self, workspace_id: &str, scenario_id: String) -> Result<()> {
114 let mut states = self.states.write().await;
115 let state = states
116 .entry(workspace_id.to_string())
117 .or_insert_with(|| UnifiedState::new(workspace_id.to_string()));
118
119 state.active_scenario = Some(scenario_id.clone());
120 state.increment_version();
121
122 let event = StateChangeEvent::ScenarioChanged {
123 workspace_id: workspace_id.to_string(),
124 scenario_id,
125 };
126
127 drop(states);
128
129 self.broadcast_event(&event).await;
130 info!("Set active scenario for workspace {}", workspace_id);
131 Ok(())
132 }
133
134 pub async fn set_reality_level(&self, workspace_id: &str, level: RealityLevel) -> Result<()> {
136 let mut states = self.states.write().await;
137 let state = states
138 .entry(workspace_id.to_string())
139 .or_insert_with(|| UnifiedState::new(workspace_id.to_string()));
140
141 state.reality_level = level;
142 state.increment_version();
143
144 let event = StateChangeEvent::RealityLevelChanged {
145 workspace_id: workspace_id.to_string(),
146 level,
147 };
148
149 drop(states);
150
151 self.broadcast_event(&event).await;
152 debug!("Set reality level {:?} for workspace {}", level, workspace_id);
153 Ok(())
154 }
155
156 pub async fn set_reality_ratio(&self, workspace_id: &str, ratio: f64) -> Result<()> {
158 let ratio = ratio.clamp(0.0, 1.0);
159 let mut states = self.states.write().await;
160 let state = states
161 .entry(workspace_id.to_string())
162 .or_insert_with(|| UnifiedState::new(workspace_id.to_string()));
163
164 state.reality_continuum_ratio = ratio;
165 state.increment_version();
166
167 let event = StateChangeEvent::RealityRatioChanged {
168 workspace_id: workspace_id.to_string(),
169 ratio,
170 };
171
172 drop(states);
173
174 self.broadcast_event(&event).await;
175 debug!("Set reality ratio {} for workspace {}", ratio, workspace_id);
176 Ok(())
177 }
178
179 pub async fn register_entity(&self, workspace_id: &str, entity: EntityState) -> Result<()> {
185 let mut states = self.states.write().await;
186 let state = states
187 .entry(workspace_id.to_string())
188 .or_insert_with(|| UnifiedState::new(workspace_id.to_string()));
189
190 let is_new = !state
191 .entity_state
192 .contains_key(&UnifiedState::entity_key(&entity.entity_type, &entity.entity_id));
193
194 #[cfg(feature = "persona-graph")]
196 if let Some(ref persona_id) = entity.persona_id {
197 let graph = state.get_or_create_persona_graph();
198 graph.get_or_create_node_with_links(persona_id, &entity.entity_type, None, None);
199
200 #[cfg(feature = "persona-graph")]
202 if let Some(user_id) = entity.data.get("user_id").or_else(|| entity.data.get("userId"))
203 {
204 if let Some(user_id_str) = user_id.as_str() {
205 let user_persona_id = format!("user:{}", user_id_str);
206 graph.link_entity_types(
207 &user_persona_id,
208 "user",
209 persona_id,
210 &entity.entity_type,
211 );
212 }
213 }
214
215 #[cfg(feature = "persona-graph")]
217 if entity.entity_type == "payment" {
218 if let Some(order_id) =
219 entity.data.get("order_id").or_else(|| entity.data.get("orderId"))
220 {
221 if let Some(order_id_str) = order_id.as_str() {
222 let order_persona_id = format!("order:{}", order_id_str);
223 graph.link_entity_types(&order_persona_id, "order", persona_id, "payment");
224 }
225 }
226 }
227 }
228
229 let entity_clone = entity.clone();
230 state.register_entity(entity_clone.clone());
231
232 let event = if is_new {
233 StateChangeEvent::EntityCreated {
234 workspace_id: workspace_id.to_string(),
235 entity: entity_clone,
236 }
237 } else {
238 StateChangeEvent::EntityUpdated {
239 workspace_id: workspace_id.to_string(),
240 entity: entity_clone,
241 }
242 };
243
244 drop(states);
245
246 self.broadcast_event(&event).await;
247 debug!(
248 "Registered entity {}:{} for workspace {}",
249 entity.entity_type, entity.entity_id, workspace_id
250 );
251 Ok(())
252 }
253
254 pub async fn get_entity(
256 &self,
257 workspace_id: &str,
258 entity_type: &str,
259 entity_id: &str,
260 ) -> Option<EntityState> {
261 let states = self.states.read().await;
262 states.get(workspace_id)?.get_entity(entity_type, entity_id).cloned()
263 }
264
265 pub async fn find_related_entities(
279 &self,
280 workspace_id: &str,
281 persona_id: &str,
282 target_entity_type: &str,
283 relationship_type: Option<&str>,
284 ) -> Vec<EntityState> {
285 let states = self.states.read().await;
286 let state = match states.get(workspace_id) {
287 Some(s) => s,
288 None => return Vec::new(),
289 };
290
291 #[cfg(feature = "persona-graph")]
292 let graph = match state.persona_graph() {
293 Some(g) => g,
294 None => return Vec::new(),
295 };
296
297 #[cfg(feature = "persona-graph")]
299 let related_persona_ids =
300 graph.find_related_by_entity_type(persona_id, target_entity_type, relationship_type);
301
302 #[cfg(not(feature = "persona-graph"))]
303 let related_persona_ids: Vec<String> = Vec::new();
304
305 let mut related_entities = Vec::new();
307 for related_persona_id in related_persona_ids {
308 if let Some((_, entity_id)) = related_persona_id.split_once(':') {
310 if let Some(entity) = state.get_entity(target_entity_type, entity_id) {
311 related_entities.push(entity.clone());
312 }
313 }
314 }
315
316 related_entities
317 }
318
319 pub async fn activate_chaos_rule(&self, workspace_id: &str, rule: ChaosScenario) -> Result<()> {
321 let mut states = self.states.write().await;
322 let state = states
323 .entry(workspace_id.to_string())
324 .or_insert_with(|| UnifiedState::new(workspace_id.to_string()));
325
326 if let Some(rule_name) = rule.get("name").and_then(|v| v.as_str()) {
329 state
330 .active_chaos_rules
331 .retain(|r| r.get("name").and_then(|v| v.as_str()) != Some(rule_name));
332 }
333 state.active_chaos_rules.push(rule.clone());
334 state.increment_version();
335
336 let event = StateChangeEvent::ChaosRuleActivated {
337 workspace_id: workspace_id.to_string(),
338 rule,
339 };
340
341 drop(states);
342
343 self.broadcast_event(&event).await;
344 info!("Activated chaos rule for workspace {}", workspace_id);
345 Ok(())
346 }
347
348 pub async fn deactivate_chaos_rule(&self, workspace_id: &str, rule_name: &str) -> Result<()> {
350 let mut states = self.states.write().await;
351 let state = states
352 .entry(workspace_id.to_string())
353 .or_insert_with(|| UnifiedState::new(workspace_id.to_string()));
354
355 let removed = state
357 .active_chaos_rules
358 .iter()
359 .any(|r| r.get("name").and_then(|v| v.as_str()) == Some(rule_name));
360
361 if removed {
362 state
363 .active_chaos_rules
364 .retain(|r| r.get("name").and_then(|v| v.as_str()) != Some(rule_name));
365 state.increment_version();
366
367 let event = StateChangeEvent::ChaosRuleDeactivated {
368 workspace_id: workspace_id.to_string(),
369 rule_name: rule_name.to_string(),
370 };
371
372 drop(states);
373
374 self.broadcast_event(&event).await;
375 info!("Deactivated chaos rule {} for workspace {}", rule_name, workspace_id);
376 }
377
378 Ok(())
379 }
380
381 pub async fn update_protocol_state(
383 &self,
384 workspace_id: &str,
385 protocol: Protocol,
386 protocol_state: ProtocolState,
387 ) -> Result<()> {
388 let mut states = self.states.write().await;
389 let state = states
390 .entry(workspace_id.to_string())
391 .or_insert_with(|| UnifiedState::new(workspace_id.to_string()));
392
393 state.protocol_states.insert(protocol, protocol_state);
394 state.increment_version();
395
396 Ok(())
397 }
398
399 pub async fn get_protocol_state(
401 &self,
402 workspace_id: &str,
403 protocol: Protocol,
404 ) -> Option<ProtocolState> {
405 let states = self.states.read().await;
406 states.get(workspace_id)?.protocol_states.get(&protocol).cloned()
407 }
408
409 pub fn subscribe_to_events(
414 &self,
415 _workspace_id: Option<&str>,
416 ) -> broadcast::Receiver<StateChangeEvent> {
417 self.event_tx.subscribe()
418 }
419
420 pub async fn restore_state(&self, state: UnifiedState) -> Result<()> {
425 let workspace_id = state.workspace_id.clone();
426 let mut states = self.states.write().await;
427 states.insert(workspace_id.clone(), state.clone());
428 drop(states);
429
430 if let Some(ref persona) = state.active_persona {
432 self.broadcast_event(&StateChangeEvent::PersonaChanged {
433 workspace_id: workspace_id.clone(),
434 persona: persona.clone(),
435 })
436 .await;
437 }
438
439 if let Some(ref scenario_id) = state.active_scenario {
440 self.broadcast_event(&StateChangeEvent::ScenarioChanged {
441 workspace_id: workspace_id.clone(),
442 scenario_id: scenario_id.clone(),
443 })
444 .await;
445 }
446
447 self.broadcast_event(&StateChangeEvent::RealityLevelChanged {
448 workspace_id: workspace_id.clone(),
449 level: state.reality_level,
450 })
451 .await;
452
453 self.broadcast_event(&StateChangeEvent::RealityRatioChanged {
454 workspace_id: workspace_id.clone(),
455 ratio: state.reality_continuum_ratio,
456 })
457 .await;
458
459 for entity in state.entity_state.values() {
461 self.broadcast_event(&StateChangeEvent::EntityCreated {
462 workspace_id: workspace_id.clone(),
463 entity: entity.clone(),
464 })
465 .await;
466 }
467
468 for rule in &state.active_chaos_rules {
470 self.broadcast_event(&StateChangeEvent::ChaosRuleActivated {
471 workspace_id: workspace_id.clone(),
472 rule: rule.clone(),
473 })
474 .await;
475 }
476
477 info!("Restored state for workspace {}", workspace_id);
478 Ok(())
479 }
480
481 async fn broadcast_event(&self, event: &StateChangeEvent) {
483 if let Err(e) = self.event_tx.send(event.clone()) {
485 warn!("Failed to broadcast state change event: {}", e);
486 }
487
488 let adapters = self.adapters.read().await;
490 for adapter in adapters.iter() {
491 if let Err(e) = adapter.on_state_change(event).await {
492 error!(
493 "Protocol adapter {:?} failed to handle state change: {}",
494 adapter.protocol(),
495 e
496 );
497 }
498 }
499 }
500}
501
502impl Default for ConsistencyEngine {
503 fn default() -> Self {
504 Self::new()
505 }
506}