mockforge_core/consistency/
engine.rs1use crate::consistency::adapters::ProtocolAdapter;
7use crate::consistency::types::{EntityState, ProtocolState, StateChangeEvent, UnifiedState};
8use crate::protocol_abstraction::Protocol;
9use crate::reality::RealityLevel;
10use crate::Result;
11type ChaosScenario = serde_json::Value;
13use mockforge_data::PersonaProfile;
14use std::collections::HashMap;
15use std::sync::Arc;
16use tokio::sync::{broadcast, RwLock};
17use tracing::{debug, error, info, warn};
18
19const DEFAULT_BROADCAST_CAPACITY: usize = 1000;
21
22fn get_broadcast_capacity() -> usize {
24 std::env::var("MOCKFORGE_BROADCAST_CAPACITY")
25 .ok()
26 .and_then(|s| s.parse().ok())
27 .unwrap_or(DEFAULT_BROADCAST_CAPACITY)
28}
29
30pub struct ConsistencyEngine {
36 states: Arc<RwLock<HashMap<String, UnifiedState>>>,
38 event_tx: broadcast::Sender<StateChangeEvent>,
40 adapters: Arc<RwLock<Vec<Arc<dyn ProtocolAdapter + Send + Sync>>>>,
42}
43
44impl ConsistencyEngine {
45 pub fn new() -> Self {
50 let capacity = get_broadcast_capacity();
51 let (event_tx, _) = broadcast::channel(capacity);
52 Self {
53 states: Arc::new(RwLock::new(HashMap::new())),
54 event_tx,
55 adapters: Arc::new(RwLock::new(Vec::new())),
56 }
57 }
58
59 pub async fn register_adapter(&self, adapter: Arc<dyn ProtocolAdapter + Send + Sync>) {
63 let mut adapters = self.adapters.write().await;
64 adapters.push(adapter);
65 info!("Registered protocol adapter");
66 }
67
68 pub async fn get_or_create_state(&self, workspace_id: &str) -> UnifiedState {
70 let mut states = self.states.write().await;
71 states
72 .entry(workspace_id.to_string())
73 .or_insert_with(|| UnifiedState::new(workspace_id.to_string()))
74 .clone()
75 }
76
77 pub async fn get_state(&self, workspace_id: &str) -> Option<UnifiedState> {
79 let states = self.states.read().await;
80 states.get(workspace_id).cloned()
81 }
82
83 pub async fn set_active_persona(
88 &self,
89 workspace_id: &str,
90 persona: PersonaProfile,
91 ) -> Result<()> {
92 let mut states = self.states.write().await;
93 let state = states
94 .entry(workspace_id.to_string())
95 .or_insert_with(|| UnifiedState::new(workspace_id.to_string()));
96
97 state.active_persona = Some(persona.clone());
98 state.increment_version();
99
100 let event = StateChangeEvent::PersonaChanged {
101 workspace_id: workspace_id.to_string(),
102 persona,
103 };
104
105 drop(states); self.broadcast_event(&event).await;
108 info!("Set active persona for workspace {}", workspace_id);
109 Ok(())
110 }
111
112 pub async fn set_active_scenario(&self, workspace_id: &str, scenario_id: String) -> Result<()> {
114 let mut states = self.states.write().await;
115 let state = states
116 .entry(workspace_id.to_string())
117 .or_insert_with(|| UnifiedState::new(workspace_id.to_string()));
118
119 state.active_scenario = Some(scenario_id.clone());
120 state.increment_version();
121
122 let event = StateChangeEvent::ScenarioChanged {
123 workspace_id: workspace_id.to_string(),
124 scenario_id,
125 };
126
127 drop(states);
128
129 self.broadcast_event(&event).await;
130 info!("Set active scenario for workspace {}", workspace_id);
131 Ok(())
132 }
133
134 pub async fn set_reality_level(&self, workspace_id: &str, level: RealityLevel) -> Result<()> {
136 let mut states = self.states.write().await;
137 let state = states
138 .entry(workspace_id.to_string())
139 .or_insert_with(|| UnifiedState::new(workspace_id.to_string()));
140
141 state.reality_level = level;
142 state.increment_version();
143
144 let event = StateChangeEvent::RealityLevelChanged {
145 workspace_id: workspace_id.to_string(),
146 level,
147 };
148
149 drop(states);
150
151 self.broadcast_event(&event).await;
152 debug!("Set reality level {:?} for workspace {}", level, workspace_id);
153 Ok(())
154 }
155
156 pub async fn set_reality_ratio(&self, workspace_id: &str, ratio: f64) -> Result<()> {
158 let ratio = ratio.clamp(0.0, 1.0);
159 let mut states = self.states.write().await;
160 let state = states
161 .entry(workspace_id.to_string())
162 .or_insert_with(|| UnifiedState::new(workspace_id.to_string()));
163
164 state.reality_continuum_ratio = ratio;
165 state.increment_version();
166
167 let event = StateChangeEvent::RealityRatioChanged {
168 workspace_id: workspace_id.to_string(),
169 ratio,
170 };
171
172 drop(states);
173
174 self.broadcast_event(&event).await;
175 debug!("Set reality ratio {} for workspace {}", ratio, workspace_id);
176 Ok(())
177 }
178
179 pub async fn register_entity(&self, workspace_id: &str, entity: EntityState) -> Result<()> {
185 let mut states = self.states.write().await;
186 let state = states
187 .entry(workspace_id.to_string())
188 .or_insert_with(|| UnifiedState::new(workspace_id.to_string()));
189
190 let is_new = !state
191 .entity_state
192 .contains_key(&UnifiedState::entity_key(&entity.entity_type, &entity.entity_id));
193
194 #[cfg(feature = "persona-graph")]
196 if let Some(ref persona_id) = entity.persona_id {
197 let graph = state.get_or_create_persona_graph();
198 graph.get_or_create_node_with_links(persona_id, &entity.entity_type, None, None);
199
200 #[cfg(feature = "persona-graph")]
202 if let Some(user_id) = entity.data.get("user_id").or_else(|| entity.data.get("userId"))
203 {
204 if let Some(user_id_str) = user_id.as_str() {
205 let user_persona_id = format!("user:{}", user_id_str);
206 graph.link_entity_types(
207 &user_persona_id,
208 "user",
209 persona_id,
210 &entity.entity_type,
211 );
212 }
213 }
214
215 #[cfg(feature = "persona-graph")]
217 if entity.entity_type == "payment" {
218 if let Some(order_id) =
219 entity.data.get("order_id").or_else(|| entity.data.get("orderId"))
220 {
221 if let Some(order_id_str) = order_id.as_str() {
222 let order_persona_id = format!("order:{}", order_id_str);
223 graph.link_entity_types(&order_persona_id, "order", persona_id, "payment");
224 }
225 }
226 }
227 }
228
229 let entity_clone = entity.clone();
230 state.register_entity(entity_clone.clone());
231
232 let event = if is_new {
233 StateChangeEvent::EntityCreated {
234 workspace_id: workspace_id.to_string(),
235 entity: entity_clone,
236 }
237 } else {
238 StateChangeEvent::EntityUpdated {
239 workspace_id: workspace_id.to_string(),
240 entity: entity_clone,
241 }
242 };
243
244 drop(states);
245
246 self.broadcast_event(&event).await;
247 debug!(
248 "Registered entity {}:{} for workspace {}",
249 entity.entity_type, entity.entity_id, workspace_id
250 );
251 Ok(())
252 }
253
254 pub async fn get_entity(
256 &self,
257 workspace_id: &str,
258 entity_type: &str,
259 entity_id: &str,
260 ) -> Option<EntityState> {
261 let states = self.states.read().await;
262 states.get(workspace_id)?.get_entity(entity_type, entity_id).cloned()
263 }
264
265 pub async fn find_related_entities(
279 &self,
280 workspace_id: &str,
281 persona_id: &str,
282 target_entity_type: &str,
283 relationship_type: Option<&str>,
284 ) -> Vec<EntityState> {
285 #[cfg(not(feature = "persona-graph"))]
286 let _ = (persona_id, relationship_type);
287
288 let states = self.states.read().await;
289 let state = match states.get(workspace_id) {
290 Some(s) => s,
291 None => return Vec::new(),
292 };
293
294 #[cfg(feature = "persona-graph")]
295 let graph = match state.persona_graph() {
296 Some(g) => g,
297 None => return Vec::new(),
298 };
299
300 #[cfg(feature = "persona-graph")]
302 let related_persona_ids =
303 graph.find_related_by_entity_type(persona_id, target_entity_type, relationship_type);
304
305 #[cfg(not(feature = "persona-graph"))]
306 let related_persona_ids: Vec<String> = Vec::new();
307
308 let mut related_entities = Vec::new();
310 for related_persona_id in related_persona_ids {
311 if let Some((_, entity_id)) = related_persona_id.split_once(':') {
313 if let Some(entity) = state.get_entity(target_entity_type, entity_id) {
314 related_entities.push(entity.clone());
315 }
316 }
317 }
318
319 related_entities
320 }
321
322 pub async fn activate_chaos_rule(&self, workspace_id: &str, rule: ChaosScenario) -> Result<()> {
324 let mut states = self.states.write().await;
325 let state = states
326 .entry(workspace_id.to_string())
327 .or_insert_with(|| UnifiedState::new(workspace_id.to_string()));
328
329 if let Some(rule_name) = rule.get("name").and_then(|v| v.as_str()) {
332 state
333 .active_chaos_rules
334 .retain(|r| r.get("name").and_then(|v| v.as_str()) != Some(rule_name));
335 }
336 state.active_chaos_rules.push(rule.clone());
337 state.increment_version();
338
339 let event = StateChangeEvent::ChaosRuleActivated {
340 workspace_id: workspace_id.to_string(),
341 rule,
342 };
343
344 drop(states);
345
346 self.broadcast_event(&event).await;
347 info!("Activated chaos rule for workspace {}", workspace_id);
348 Ok(())
349 }
350
351 pub async fn deactivate_chaos_rule(&self, workspace_id: &str, rule_name: &str) -> Result<()> {
353 let mut states = self.states.write().await;
354 let state = states
355 .entry(workspace_id.to_string())
356 .or_insert_with(|| UnifiedState::new(workspace_id.to_string()));
357
358 let removed = state
360 .active_chaos_rules
361 .iter()
362 .any(|r| r.get("name").and_then(|v| v.as_str()) == Some(rule_name));
363
364 if removed {
365 state
366 .active_chaos_rules
367 .retain(|r| r.get("name").and_then(|v| v.as_str()) != Some(rule_name));
368 state.increment_version();
369
370 let event = StateChangeEvent::ChaosRuleDeactivated {
371 workspace_id: workspace_id.to_string(),
372 rule_name: rule_name.to_string(),
373 };
374
375 drop(states);
376
377 self.broadcast_event(&event).await;
378 info!("Deactivated chaos rule {} for workspace {}", rule_name, workspace_id);
379 }
380
381 Ok(())
382 }
383
384 pub async fn update_protocol_state(
386 &self,
387 workspace_id: &str,
388 protocol: Protocol,
389 protocol_state: ProtocolState,
390 ) -> Result<()> {
391 let mut states = self.states.write().await;
392 let state = states
393 .entry(workspace_id.to_string())
394 .or_insert_with(|| UnifiedState::new(workspace_id.to_string()));
395
396 state.protocol_states.insert(protocol, protocol_state);
397 state.increment_version();
398
399 Ok(())
400 }
401
402 pub async fn get_protocol_state(
404 &self,
405 workspace_id: &str,
406 protocol: Protocol,
407 ) -> Option<ProtocolState> {
408 let states = self.states.read().await;
409 states.get(workspace_id)?.protocol_states.get(&protocol).cloned()
410 }
411
412 pub fn subscribe_to_events(
417 &self,
418 _workspace_id: Option<&str>,
419 ) -> broadcast::Receiver<StateChangeEvent> {
420 self.event_tx.subscribe()
421 }
422
423 pub async fn restore_state(&self, state: UnifiedState) -> Result<()> {
428 let workspace_id = state.workspace_id.clone();
429 let mut states = self.states.write().await;
430 states.insert(workspace_id.clone(), state.clone());
431 drop(states);
432
433 if let Some(ref persona) = state.active_persona {
435 self.broadcast_event(&StateChangeEvent::PersonaChanged {
436 workspace_id: workspace_id.clone(),
437 persona: persona.clone(),
438 })
439 .await;
440 }
441
442 if let Some(ref scenario_id) = state.active_scenario {
443 self.broadcast_event(&StateChangeEvent::ScenarioChanged {
444 workspace_id: workspace_id.clone(),
445 scenario_id: scenario_id.clone(),
446 })
447 .await;
448 }
449
450 self.broadcast_event(&StateChangeEvent::RealityLevelChanged {
451 workspace_id: workspace_id.clone(),
452 level: state.reality_level,
453 })
454 .await;
455
456 self.broadcast_event(&StateChangeEvent::RealityRatioChanged {
457 workspace_id: workspace_id.clone(),
458 ratio: state.reality_continuum_ratio,
459 })
460 .await;
461
462 for entity in state.entity_state.values() {
464 self.broadcast_event(&StateChangeEvent::EntityCreated {
465 workspace_id: workspace_id.clone(),
466 entity: entity.clone(),
467 })
468 .await;
469 }
470
471 for rule in &state.active_chaos_rules {
473 self.broadcast_event(&StateChangeEvent::ChaosRuleActivated {
474 workspace_id: workspace_id.clone(),
475 rule: rule.clone(),
476 })
477 .await;
478 }
479
480 info!("Restored state for workspace {}", workspace_id);
481 Ok(())
482 }
483
484 async fn broadcast_event(&self, event: &StateChangeEvent) {
486 if let Err(e) = self.event_tx.send(event.clone()) {
488 warn!("Failed to broadcast state change event: {}", e);
489 }
490
491 let adapters = self.adapters.read().await;
493 for adapter in adapters.iter() {
494 if let Err(e) = adapter.on_state_change(event).await {
495 error!(
496 "Protocol adapter {:?} failed to handle state change: {}",
497 adapter.protocol(),
498 e
499 );
500 }
501 }
502 }
503}
504
505impl Default for ConsistencyEngine {
506 fn default() -> Self {
507 Self::new()
508 }
509}