mockforge_core/consistency/
engine.rs

1//! Consistency engine implementation
2//!
3//! The consistency engine coordinates state across all protocols, ensuring
4//! that persona, scenario, reality level, and entity state are synchronized.
5
6use crate::consistency::adapters::ProtocolAdapter;
7use crate::consistency::types::{EntityState, ProtocolState, StateChangeEvent, UnifiedState};
8use crate::protocol_abstraction::Protocol;
9use crate::reality::RealityLevel;
10use crate::Result;
11// ChaosScenario is defined in mockforge-chaos, but we use serde_json::Value to avoid circular dependency
12type ChaosScenario = serde_json::Value;
13use mockforge_data::PersonaProfile;
14use std::collections::HashMap;
15use std::sync::Arc;
16use tokio::sync::{broadcast, RwLock};
17use tracing::{debug, error, info, warn};
18
19/// Consistency engine for coordinating state across all protocols
20///
21/// The engine maintains unified state per workspace and broadcasts state
22/// changes to all registered protocol adapters. This ensures that all
23/// protocols reflect the same persona, scenario, reality level, and entity state.
24pub struct ConsistencyEngine {
25    /// Workspace state storage (workspace_id -> UnifiedState)
26    states: Arc<RwLock<HashMap<String, UnifiedState>>>,
27    /// Event broadcaster for state change notifications
28    event_tx: broadcast::Sender<StateChangeEvent>,
29    /// Registered protocol adapters
30    adapters: Arc<RwLock<Vec<Arc<dyn ProtocolAdapter + Send + Sync>>>>,
31}
32
33impl ConsistencyEngine {
34    /// Create a new consistency engine
35    pub fn new() -> Self {
36        let (event_tx, _) = broadcast::channel(1000);
37        Self {
38            states: Arc::new(RwLock::new(HashMap::new())),
39            event_tx,
40            adapters: Arc::new(RwLock::new(Vec::new())),
41        }
42    }
43
44    /// Register a protocol adapter
45    ///
46    /// Adapters are notified of all state changes for their protocol.
47    pub async fn register_adapter(&self, adapter: Arc<dyn ProtocolAdapter + Send + Sync>) {
48        let mut adapters = self.adapters.write().await;
49        adapters.push(adapter);
50        info!("Registered protocol adapter");
51    }
52
53    /// Get or create unified state for a workspace
54    pub async fn get_or_create_state(&self, workspace_id: &str) -> UnifiedState {
55        let mut states = self.states.write().await;
56        states
57            .entry(workspace_id.to_string())
58            .or_insert_with(|| UnifiedState::new(workspace_id.to_string()))
59            .clone()
60    }
61
62    /// Get unified state for a workspace (returns None if not found)
63    pub async fn get_state(&self, workspace_id: &str) -> Option<UnifiedState> {
64        let states = self.states.read().await;
65        states.get(workspace_id).cloned()
66    }
67
68    /// Set active persona for a workspace
69    ///
70    /// This updates the unified state and broadcasts the change to all
71    /// protocol adapters, ensuring all protocols use the new persona.
72    pub async fn set_active_persona(
73        &self,
74        workspace_id: &str,
75        persona: PersonaProfile,
76    ) -> Result<()> {
77        let mut states = self.states.write().await;
78        let state = states
79            .entry(workspace_id.to_string())
80            .or_insert_with(|| UnifiedState::new(workspace_id.to_string()));
81
82        state.active_persona = Some(persona.clone());
83        state.increment_version();
84
85        let event = StateChangeEvent::PersonaChanged {
86            workspace_id: workspace_id.to_string(),
87            persona,
88        };
89
90        drop(states); // Release lock before broadcasting
91
92        self.broadcast_event(&event).await;
93        info!("Set active persona for workspace {}", workspace_id);
94        Ok(())
95    }
96
97    /// Set active scenario for a workspace
98    pub async fn set_active_scenario(&self, workspace_id: &str, scenario_id: String) -> Result<()> {
99        let mut states = self.states.write().await;
100        let state = states
101            .entry(workspace_id.to_string())
102            .or_insert_with(|| UnifiedState::new(workspace_id.to_string()));
103
104        state.active_scenario = Some(scenario_id.clone());
105        state.increment_version();
106
107        let event = StateChangeEvent::ScenarioChanged {
108            workspace_id: workspace_id.to_string(),
109            scenario_id,
110        };
111
112        drop(states);
113
114        self.broadcast_event(&event).await;
115        info!("Set active scenario for workspace {}", workspace_id);
116        Ok(())
117    }
118
119    /// Set reality level for a workspace
120    pub async fn set_reality_level(&self, workspace_id: &str, level: RealityLevel) -> Result<()> {
121        let mut states = self.states.write().await;
122        let state = states
123            .entry(workspace_id.to_string())
124            .or_insert_with(|| UnifiedState::new(workspace_id.to_string()));
125
126        state.reality_level = level;
127        state.increment_version();
128
129        let event = StateChangeEvent::RealityLevelChanged {
130            workspace_id: workspace_id.to_string(),
131            level,
132        };
133
134        drop(states);
135
136        self.broadcast_event(&event).await;
137        debug!("Set reality level {:?} for workspace {}", level, workspace_id);
138        Ok(())
139    }
140
141    /// Set reality continuum ratio for a workspace
142    pub async fn set_reality_ratio(&self, workspace_id: &str, ratio: f64) -> Result<()> {
143        let ratio = ratio.clamp(0.0, 1.0);
144        let mut states = self.states.write().await;
145        let state = states
146            .entry(workspace_id.to_string())
147            .or_insert_with(|| UnifiedState::new(workspace_id.to_string()));
148
149        state.reality_continuum_ratio = ratio;
150        state.increment_version();
151
152        let event = StateChangeEvent::RealityRatioChanged {
153            workspace_id: workspace_id.to_string(),
154            ratio,
155        };
156
157        drop(states);
158
159        self.broadcast_event(&event).await;
160        debug!("Set reality ratio {} for workspace {}", ratio, workspace_id);
161        Ok(())
162    }
163
164    /// Register or update an entity
165    ///
166    /// Entities are tracked across all protocols. When an entity is created
167    /// via HTTP, it becomes immediately available in GraphQL, gRPC, etc.
168    /// Also automatically adds the entity to the persona graph if a persona_id is present.
169    pub async fn register_entity(&self, workspace_id: &str, entity: EntityState) -> Result<()> {
170        let mut states = self.states.write().await;
171        let state = states
172            .entry(workspace_id.to_string())
173            .or_insert_with(|| UnifiedState::new(workspace_id.to_string()));
174
175        let is_new = !state
176            .entity_state
177            .contains_key(&UnifiedState::entity_key(&entity.entity_type, &entity.entity_id));
178
179        // Add entity to persona graph if persona_id is present
180        #[cfg(feature = "persona-graph")]
181        if let Some(ref persona_id) = entity.persona_id {
182            let graph = state.get_or_create_persona_graph();
183            graph.get_or_create_node_with_links(persona_id, &entity.entity_type, None, None);
184
185            // If entity data contains related entity IDs, link them in the graph
186            #[cfg(feature = "persona-graph")]
187            if let Some(user_id) = entity.data.get("user_id").or_else(|| entity.data.get("userId"))
188            {
189                if let Some(user_id_str) = user_id.as_str() {
190                    let user_persona_id = format!("user:{}", user_id_str);
191                    graph.link_entity_types(
192                        &user_persona_id,
193                        "user",
194                        persona_id,
195                        &entity.entity_type,
196                    );
197                }
198            }
199
200            // Link orders to payments
201            #[cfg(feature = "persona-graph")]
202            if entity.entity_type == "payment" {
203                if let Some(order_id) =
204                    entity.data.get("order_id").or_else(|| entity.data.get("orderId"))
205                {
206                    if let Some(order_id_str) = order_id.as_str() {
207                        let order_persona_id = format!("order:{}", order_id_str);
208                        graph.link_entity_types(&order_persona_id, "order", persona_id, "payment");
209                    }
210                }
211            }
212        }
213
214        let entity_clone = entity.clone();
215        state.register_entity(entity_clone.clone());
216
217        let event = if is_new {
218            StateChangeEvent::EntityCreated {
219                workspace_id: workspace_id.to_string(),
220                entity: entity_clone,
221            }
222        } else {
223            StateChangeEvent::EntityUpdated {
224                workspace_id: workspace_id.to_string(),
225                entity: entity_clone,
226            }
227        };
228
229        drop(states);
230
231        self.broadcast_event(&event).await;
232        debug!(
233            "Registered entity {}:{} for workspace {}",
234            entity.entity_type, entity.entity_id, workspace_id
235        );
236        Ok(())
237    }
238
239    /// Get entity by type and ID
240    pub async fn get_entity(
241        &self,
242        workspace_id: &str,
243        entity_type: &str,
244        entity_id: &str,
245    ) -> Option<EntityState> {
246        let states = self.states.read().await;
247        states.get(workspace_id)?.get_entity(entity_type, entity_id).cloned()
248    }
249
250    /// Find related entities using the persona graph
251    ///
252    /// Given a persona ID and entity type, finds all related entities of the target type
253    /// by traversing the persona graph.
254    ///
255    /// # Arguments
256    /// * `workspace_id` - Workspace identifier
257    /// * `persona_id` - Starting persona ID
258    /// * `target_entity_type` - Entity type to find (e.g., "order", "payment")
259    /// * `relationship_type` - Optional relationship type filter (e.g., "has_orders")
260    ///
261    /// # Returns
262    /// Vector of entity states matching the criteria
263    pub async fn find_related_entities(
264        &self,
265        workspace_id: &str,
266        persona_id: &str,
267        target_entity_type: &str,
268        relationship_type: Option<&str>,
269    ) -> Vec<EntityState> {
270        let states = self.states.read().await;
271        let state = match states.get(workspace_id) {
272            Some(s) => s,
273            None => return Vec::new(),
274        };
275
276        #[cfg(feature = "persona-graph")]
277        let graph = match state.persona_graph() {
278            Some(g) => g,
279            None => return Vec::new(),
280        };
281
282        // Find related persona IDs in the graph
283        #[cfg(feature = "persona-graph")]
284        let related_persona_ids =
285            graph.find_related_by_entity_type(persona_id, target_entity_type, relationship_type);
286
287        #[cfg(not(feature = "persona-graph"))]
288        let related_persona_ids: Vec<String> = Vec::new();
289
290        // Convert persona IDs to entity states
291        let mut related_entities = Vec::new();
292        for related_persona_id in related_persona_ids {
293            // Extract entity ID from persona ID (format: "entity_type:entity_id")
294            if let Some((_, entity_id)) = related_persona_id.split_once(':') {
295                if let Some(entity) = state.get_entity(target_entity_type, entity_id) {
296                    related_entities.push(entity.clone());
297                }
298            }
299        }
300
301        related_entities
302    }
303
304    /// Activate a chaos rule
305    pub async fn activate_chaos_rule(&self, workspace_id: &str, rule: ChaosScenario) -> Result<()> {
306        let mut states = self.states.write().await;
307        let state = states
308            .entry(workspace_id.to_string())
309            .or_insert_with(|| UnifiedState::new(workspace_id.to_string()));
310
311        // Remove existing rule with same name if present
312        // Note: ChaosScenario is serde_json::Value, so we compare by serializing
313        if let Some(rule_name) = rule.get("name").and_then(|v| v.as_str()) {
314            state
315                .active_chaos_rules
316                .retain(|r| r.get("name").and_then(|v| v.as_str()) != Some(rule_name));
317        }
318        state.active_chaos_rules.push(rule.clone());
319        state.increment_version();
320
321        let event = StateChangeEvent::ChaosRuleActivated {
322            workspace_id: workspace_id.to_string(),
323            rule,
324        };
325
326        drop(states);
327
328        self.broadcast_event(&event).await;
329        info!("Activated chaos rule for workspace {}", workspace_id);
330        Ok(())
331    }
332
333    /// Deactivate a chaos rule
334    pub async fn deactivate_chaos_rule(&self, workspace_id: &str, rule_name: &str) -> Result<()> {
335        let mut states = self.states.write().await;
336        let state = states
337            .entry(workspace_id.to_string())
338            .or_insert_with(|| UnifiedState::new(workspace_id.to_string()));
339
340        // Note: ChaosScenario is serde_json::Value, so we compare by serializing
341        let removed = state
342            .active_chaos_rules
343            .iter()
344            .any(|r| r.get("name").and_then(|v| v.as_str()) == Some(rule_name));
345
346        if removed {
347            state
348                .active_chaos_rules
349                .retain(|r| r.get("name").and_then(|v| v.as_str()) != Some(rule_name));
350            state.increment_version();
351
352            let event = StateChangeEvent::ChaosRuleDeactivated {
353                workspace_id: workspace_id.to_string(),
354                rule_name: rule_name.to_string(),
355            };
356
357            drop(states);
358
359            self.broadcast_event(&event).await;
360            info!("Deactivated chaos rule {} for workspace {}", rule_name, workspace_id);
361        }
362
363        Ok(())
364    }
365
366    /// Update protocol state
367    pub async fn update_protocol_state(
368        &self,
369        workspace_id: &str,
370        protocol: Protocol,
371        protocol_state: ProtocolState,
372    ) -> Result<()> {
373        let mut states = self.states.write().await;
374        let state = states
375            .entry(workspace_id.to_string())
376            .or_insert_with(|| UnifiedState::new(workspace_id.to_string()));
377
378        state.protocol_states.insert(protocol, protocol_state);
379        state.increment_version();
380
381        Ok(())
382    }
383
384    /// Get protocol state
385    pub async fn get_protocol_state(
386        &self,
387        workspace_id: &str,
388        protocol: Protocol,
389    ) -> Option<ProtocolState> {
390        let states = self.states.read().await;
391        states.get(workspace_id)?.protocol_states.get(&protocol).cloned()
392    }
393
394    /// Subscribe to state change events
395    ///
396    /// Returns a receiver that will receive all state change events for
397    /// the specified workspace (or all workspaces if None).
398    pub fn subscribe_to_events(
399        &self,
400        _workspace_id: Option<&str>,
401    ) -> broadcast::Receiver<StateChangeEvent> {
402        self.event_tx.subscribe()
403    }
404
405    /// Restore unified state from a snapshot
406    ///
407    /// This replaces the current state for a workspace with the provided state.
408    /// All protocol adapters will be notified of the state changes.
409    pub async fn restore_state(&self, state: UnifiedState) -> Result<()> {
410        let workspace_id = state.workspace_id.clone();
411        let mut states = self.states.write().await;
412        states.insert(workspace_id.clone(), state.clone());
413        drop(states);
414
415        // Broadcast state change events for all components
416        if let Some(ref persona) = state.active_persona {
417            self.broadcast_event(&StateChangeEvent::PersonaChanged {
418                workspace_id: workspace_id.clone(),
419                persona: persona.clone(),
420            })
421            .await;
422        }
423
424        if let Some(ref scenario_id) = state.active_scenario {
425            self.broadcast_event(&StateChangeEvent::ScenarioChanged {
426                workspace_id: workspace_id.clone(),
427                scenario_id: scenario_id.clone(),
428            })
429            .await;
430        }
431
432        self.broadcast_event(&StateChangeEvent::RealityLevelChanged {
433            workspace_id: workspace_id.clone(),
434            level: state.reality_level,
435        })
436        .await;
437
438        self.broadcast_event(&StateChangeEvent::RealityRatioChanged {
439            workspace_id: workspace_id.clone(),
440            ratio: state.reality_continuum_ratio,
441        })
442        .await;
443
444        // Broadcast entity events
445        for entity in state.entity_state.values() {
446            self.broadcast_event(&StateChangeEvent::EntityCreated {
447                workspace_id: workspace_id.clone(),
448                entity: entity.clone(),
449            })
450            .await;
451        }
452
453        // Broadcast chaos rule events
454        for rule in &state.active_chaos_rules {
455            self.broadcast_event(&StateChangeEvent::ChaosRuleActivated {
456                workspace_id: workspace_id.clone(),
457                rule: rule.clone(),
458            })
459            .await;
460        }
461
462        info!("Restored state for workspace {}", workspace_id);
463        Ok(())
464    }
465
466    /// Broadcast a state change event to all adapters
467    async fn broadcast_event(&self, event: &StateChangeEvent) {
468        // Send to event subscribers
469        if let Err(e) = self.event_tx.send(event.clone()) {
470            warn!("Failed to broadcast state change event: {}", e);
471        }
472
473        // Notify all protocol adapters
474        let adapters = self.adapters.read().await;
475        for adapter in adapters.iter() {
476            if let Err(e) = adapter.on_state_change(event).await {
477                error!(
478                    "Protocol adapter {:?} failed to handle state change: {}",
479                    adapter.protocol(),
480                    e
481                );
482            }
483        }
484    }
485}
486
487impl Default for ConsistencyEngine {
488    fn default() -> Self {
489        Self::new()
490    }
491}