mockforge_core/consistency/
engine.rs1use crate::consistency::adapters::ProtocolAdapter;
7use crate::consistency::types::{EntityState, ProtocolState, StateChangeEvent, UnifiedState};
8use crate::protocol_abstraction::Protocol;
9use crate::reality::RealityLevel;
10use crate::Result;
11type ChaosScenario = serde_json::Value;
13use mockforge_data::PersonaProfile;
14use std::collections::HashMap;
15use std::sync::Arc;
16use tokio::sync::{broadcast, RwLock};
17use tracing::{debug, error, info, warn};
18
19pub struct ConsistencyEngine {
25 states: Arc<RwLock<HashMap<String, UnifiedState>>>,
27 event_tx: broadcast::Sender<StateChangeEvent>,
29 adapters: Arc<RwLock<Vec<Arc<dyn ProtocolAdapter + Send + Sync>>>>,
31}
32
33impl ConsistencyEngine {
34 pub fn new() -> Self {
36 let (event_tx, _) = broadcast::channel(1000);
37 Self {
38 states: Arc::new(RwLock::new(HashMap::new())),
39 event_tx,
40 adapters: Arc::new(RwLock::new(Vec::new())),
41 }
42 }
43
44 pub async fn register_adapter(&self, adapter: Arc<dyn ProtocolAdapter + Send + Sync>) {
48 let mut adapters = self.adapters.write().await;
49 adapters.push(adapter);
50 info!("Registered protocol adapter");
51 }
52
53 pub async fn get_or_create_state(&self, workspace_id: &str) -> UnifiedState {
55 let mut states = self.states.write().await;
56 states
57 .entry(workspace_id.to_string())
58 .or_insert_with(|| UnifiedState::new(workspace_id.to_string()))
59 .clone()
60 }
61
62 pub async fn get_state(&self, workspace_id: &str) -> Option<UnifiedState> {
64 let states = self.states.read().await;
65 states.get(workspace_id).cloned()
66 }
67
68 pub async fn set_active_persona(
73 &self,
74 workspace_id: &str,
75 persona: PersonaProfile,
76 ) -> Result<()> {
77 let mut states = self.states.write().await;
78 let state = states
79 .entry(workspace_id.to_string())
80 .or_insert_with(|| UnifiedState::new(workspace_id.to_string()));
81
82 state.active_persona = Some(persona.clone());
83 state.increment_version();
84
85 let event = StateChangeEvent::PersonaChanged {
86 workspace_id: workspace_id.to_string(),
87 persona,
88 };
89
90 drop(states); self.broadcast_event(&event).await;
93 info!("Set active persona for workspace {}", workspace_id);
94 Ok(())
95 }
96
97 pub async fn set_active_scenario(&self, workspace_id: &str, scenario_id: String) -> Result<()> {
99 let mut states = self.states.write().await;
100 let state = states
101 .entry(workspace_id.to_string())
102 .or_insert_with(|| UnifiedState::new(workspace_id.to_string()));
103
104 state.active_scenario = Some(scenario_id.clone());
105 state.increment_version();
106
107 let event = StateChangeEvent::ScenarioChanged {
108 workspace_id: workspace_id.to_string(),
109 scenario_id,
110 };
111
112 drop(states);
113
114 self.broadcast_event(&event).await;
115 info!("Set active scenario for workspace {}", workspace_id);
116 Ok(())
117 }
118
119 pub async fn set_reality_level(&self, workspace_id: &str, level: RealityLevel) -> Result<()> {
121 let mut states = self.states.write().await;
122 let state = states
123 .entry(workspace_id.to_string())
124 .or_insert_with(|| UnifiedState::new(workspace_id.to_string()));
125
126 state.reality_level = level;
127 state.increment_version();
128
129 let event = StateChangeEvent::RealityLevelChanged {
130 workspace_id: workspace_id.to_string(),
131 level,
132 };
133
134 drop(states);
135
136 self.broadcast_event(&event).await;
137 debug!("Set reality level {:?} for workspace {}", level, workspace_id);
138 Ok(())
139 }
140
141 pub async fn set_reality_ratio(&self, workspace_id: &str, ratio: f64) -> Result<()> {
143 let ratio = ratio.clamp(0.0, 1.0);
144 let mut states = self.states.write().await;
145 let state = states
146 .entry(workspace_id.to_string())
147 .or_insert_with(|| UnifiedState::new(workspace_id.to_string()));
148
149 state.reality_continuum_ratio = ratio;
150 state.increment_version();
151
152 let event = StateChangeEvent::RealityRatioChanged {
153 workspace_id: workspace_id.to_string(),
154 ratio,
155 };
156
157 drop(states);
158
159 self.broadcast_event(&event).await;
160 debug!("Set reality ratio {} for workspace {}", ratio, workspace_id);
161 Ok(())
162 }
163
164 pub async fn register_entity(&self, workspace_id: &str, entity: EntityState) -> Result<()> {
170 let mut states = self.states.write().await;
171 let state = states
172 .entry(workspace_id.to_string())
173 .or_insert_with(|| UnifiedState::new(workspace_id.to_string()));
174
175 let is_new = !state
176 .entity_state
177 .contains_key(&UnifiedState::entity_key(&entity.entity_type, &entity.entity_id));
178
179 #[cfg(feature = "persona-graph")]
181 if let Some(ref persona_id) = entity.persona_id {
182 let graph = state.get_or_create_persona_graph();
183 graph.get_or_create_node_with_links(persona_id, &entity.entity_type, None, None);
184
185 #[cfg(feature = "persona-graph")]
187 if let Some(user_id) = entity.data.get("user_id").or_else(|| entity.data.get("userId"))
188 {
189 if let Some(user_id_str) = user_id.as_str() {
190 let user_persona_id = format!("user:{}", user_id_str);
191 graph.link_entity_types(
192 &user_persona_id,
193 "user",
194 persona_id,
195 &entity.entity_type,
196 );
197 }
198 }
199
200 #[cfg(feature = "persona-graph")]
202 if entity.entity_type == "payment" {
203 if let Some(order_id) =
204 entity.data.get("order_id").or_else(|| entity.data.get("orderId"))
205 {
206 if let Some(order_id_str) = order_id.as_str() {
207 let order_persona_id = format!("order:{}", order_id_str);
208 graph.link_entity_types(&order_persona_id, "order", persona_id, "payment");
209 }
210 }
211 }
212 }
213
214 let entity_clone = entity.clone();
215 state.register_entity(entity_clone.clone());
216
217 let event = if is_new {
218 StateChangeEvent::EntityCreated {
219 workspace_id: workspace_id.to_string(),
220 entity: entity_clone,
221 }
222 } else {
223 StateChangeEvent::EntityUpdated {
224 workspace_id: workspace_id.to_string(),
225 entity: entity_clone,
226 }
227 };
228
229 drop(states);
230
231 self.broadcast_event(&event).await;
232 debug!(
233 "Registered entity {}:{} for workspace {}",
234 entity.entity_type, entity.entity_id, workspace_id
235 );
236 Ok(())
237 }
238
239 pub async fn get_entity(
241 &self,
242 workspace_id: &str,
243 entity_type: &str,
244 entity_id: &str,
245 ) -> Option<EntityState> {
246 let states = self.states.read().await;
247 states.get(workspace_id)?.get_entity(entity_type, entity_id).cloned()
248 }
249
250 pub async fn find_related_entities(
264 &self,
265 workspace_id: &str,
266 persona_id: &str,
267 target_entity_type: &str,
268 relationship_type: Option<&str>,
269 ) -> Vec<EntityState> {
270 let states = self.states.read().await;
271 let state = match states.get(workspace_id) {
272 Some(s) => s,
273 None => return Vec::new(),
274 };
275
276 #[cfg(feature = "persona-graph")]
277 let graph = match state.persona_graph() {
278 Some(g) => g,
279 None => return Vec::new(),
280 };
281
282 #[cfg(feature = "persona-graph")]
284 let related_persona_ids =
285 graph.find_related_by_entity_type(persona_id, target_entity_type, relationship_type);
286
287 #[cfg(not(feature = "persona-graph"))]
288 let related_persona_ids: Vec<String> = Vec::new();
289
290 let mut related_entities = Vec::new();
292 for related_persona_id in related_persona_ids {
293 if let Some((_, entity_id)) = related_persona_id.split_once(':') {
295 if let Some(entity) = state.get_entity(target_entity_type, entity_id) {
296 related_entities.push(entity.clone());
297 }
298 }
299 }
300
301 related_entities
302 }
303
304 pub async fn activate_chaos_rule(&self, workspace_id: &str, rule: ChaosScenario) -> Result<()> {
306 let mut states = self.states.write().await;
307 let state = states
308 .entry(workspace_id.to_string())
309 .or_insert_with(|| UnifiedState::new(workspace_id.to_string()));
310
311 if let Some(rule_name) = rule.get("name").and_then(|v| v.as_str()) {
314 state
315 .active_chaos_rules
316 .retain(|r| r.get("name").and_then(|v| v.as_str()) != Some(rule_name));
317 }
318 state.active_chaos_rules.push(rule.clone());
319 state.increment_version();
320
321 let event = StateChangeEvent::ChaosRuleActivated {
322 workspace_id: workspace_id.to_string(),
323 rule,
324 };
325
326 drop(states);
327
328 self.broadcast_event(&event).await;
329 info!("Activated chaos rule for workspace {}", workspace_id);
330 Ok(())
331 }
332
333 pub async fn deactivate_chaos_rule(&self, workspace_id: &str, rule_name: &str) -> Result<()> {
335 let mut states = self.states.write().await;
336 let state = states
337 .entry(workspace_id.to_string())
338 .or_insert_with(|| UnifiedState::new(workspace_id.to_string()));
339
340 let removed = state
342 .active_chaos_rules
343 .iter()
344 .any(|r| r.get("name").and_then(|v| v.as_str()) == Some(rule_name));
345
346 if removed {
347 state
348 .active_chaos_rules
349 .retain(|r| r.get("name").and_then(|v| v.as_str()) != Some(rule_name));
350 state.increment_version();
351
352 let event = StateChangeEvent::ChaosRuleDeactivated {
353 workspace_id: workspace_id.to_string(),
354 rule_name: rule_name.to_string(),
355 };
356
357 drop(states);
358
359 self.broadcast_event(&event).await;
360 info!("Deactivated chaos rule {} for workspace {}", rule_name, workspace_id);
361 }
362
363 Ok(())
364 }
365
366 pub async fn update_protocol_state(
368 &self,
369 workspace_id: &str,
370 protocol: Protocol,
371 protocol_state: ProtocolState,
372 ) -> Result<()> {
373 let mut states = self.states.write().await;
374 let state = states
375 .entry(workspace_id.to_string())
376 .or_insert_with(|| UnifiedState::new(workspace_id.to_string()));
377
378 state.protocol_states.insert(protocol, protocol_state);
379 state.increment_version();
380
381 Ok(())
382 }
383
384 pub async fn get_protocol_state(
386 &self,
387 workspace_id: &str,
388 protocol: Protocol,
389 ) -> Option<ProtocolState> {
390 let states = self.states.read().await;
391 states.get(workspace_id)?.protocol_states.get(&protocol).cloned()
392 }
393
394 pub fn subscribe_to_events(
399 &self,
400 _workspace_id: Option<&str>,
401 ) -> broadcast::Receiver<StateChangeEvent> {
402 self.event_tx.subscribe()
403 }
404
405 pub async fn restore_state(&self, state: UnifiedState) -> Result<()> {
410 let workspace_id = state.workspace_id.clone();
411 let mut states = self.states.write().await;
412 states.insert(workspace_id.clone(), state.clone());
413 drop(states);
414
415 if let Some(ref persona) = state.active_persona {
417 self.broadcast_event(&StateChangeEvent::PersonaChanged {
418 workspace_id: workspace_id.clone(),
419 persona: persona.clone(),
420 })
421 .await;
422 }
423
424 if let Some(ref scenario_id) = state.active_scenario {
425 self.broadcast_event(&StateChangeEvent::ScenarioChanged {
426 workspace_id: workspace_id.clone(),
427 scenario_id: scenario_id.clone(),
428 })
429 .await;
430 }
431
432 self.broadcast_event(&StateChangeEvent::RealityLevelChanged {
433 workspace_id: workspace_id.clone(),
434 level: state.reality_level,
435 })
436 .await;
437
438 self.broadcast_event(&StateChangeEvent::RealityRatioChanged {
439 workspace_id: workspace_id.clone(),
440 ratio: state.reality_continuum_ratio,
441 })
442 .await;
443
444 for entity in state.entity_state.values() {
446 self.broadcast_event(&StateChangeEvent::EntityCreated {
447 workspace_id: workspace_id.clone(),
448 entity: entity.clone(),
449 })
450 .await;
451 }
452
453 for rule in &state.active_chaos_rules {
455 self.broadcast_event(&StateChangeEvent::ChaosRuleActivated {
456 workspace_id: workspace_id.clone(),
457 rule: rule.clone(),
458 })
459 .await;
460 }
461
462 info!("Restored state for workspace {}", workspace_id);
463 Ok(())
464 }
465
466 async fn broadcast_event(&self, event: &StateChangeEvent) {
468 if let Err(e) = self.event_tx.send(event.clone()) {
470 warn!("Failed to broadcast state change event: {}", e);
471 }
472
473 let adapters = self.adapters.read().await;
475 for adapter in adapters.iter() {
476 if let Err(e) = adapter.on_state_change(event).await {
477 error!(
478 "Protocol adapter {:?} failed to handle state change: {}",
479 adapter.protocol(),
480 e
481 );
482 }
483 }
484 }
485}
486
487impl Default for ConsistencyEngine {
488 fn default() -> Self {
489 Self::new()
490 }
491}