mockforge_core/consistency/
engine.rs

1//! Consistency engine implementation
2//!
3//! The consistency engine coordinates state across all protocols, ensuring
4//! that persona, scenario, reality level, and entity state are synchronized.
5
6use crate::consistency::adapters::ProtocolAdapter;
7use crate::consistency::types::{EntityState, ProtocolState, StateChangeEvent, UnifiedState};
8use crate::protocol_abstraction::Protocol;
9use crate::reality::RealityLevel;
10use crate::Result;
11// ChaosScenario is defined in mockforge-chaos, but we use serde_json::Value to avoid circular dependency
12type ChaosScenario = serde_json::Value;
13use mockforge_data::PersonaProfile;
14use std::collections::HashMap;
15use std::sync::Arc;
16use tokio::sync::{broadcast, RwLock};
17use tracing::{debug, error, info, warn};
18
19/// Default broadcast channel capacity for state change events
20const DEFAULT_BROADCAST_CAPACITY: usize = 1000;
21
22/// Get the broadcast channel capacity from environment or use default
23fn get_broadcast_capacity() -> usize {
24    std::env::var("MOCKFORGE_BROADCAST_CAPACITY")
25        .ok()
26        .and_then(|s| s.parse().ok())
27        .unwrap_or(DEFAULT_BROADCAST_CAPACITY)
28}
29
30/// Consistency engine for coordinating state across all protocols
31///
32/// The engine maintains unified state per workspace and broadcasts state
33/// changes to all registered protocol adapters. This ensures that all
34/// protocols reflect the same persona, scenario, reality level, and entity state.
35pub struct ConsistencyEngine {
36    /// Workspace state storage (workspace_id -> UnifiedState)
37    states: Arc<RwLock<HashMap<String, UnifiedState>>>,
38    /// Event broadcaster for state change notifications
39    event_tx: broadcast::Sender<StateChangeEvent>,
40    /// Registered protocol adapters
41    adapters: Arc<RwLock<Vec<Arc<dyn ProtocolAdapter + Send + Sync>>>>,
42}
43
44impl ConsistencyEngine {
45    /// Create a new consistency engine
46    ///
47    /// The broadcast channel capacity can be configured via the
48    /// `MOCKFORGE_BROADCAST_CAPACITY` environment variable.
49    pub fn new() -> Self {
50        let capacity = get_broadcast_capacity();
51        let (event_tx, _) = broadcast::channel(capacity);
52        Self {
53            states: Arc::new(RwLock::new(HashMap::new())),
54            event_tx,
55            adapters: Arc::new(RwLock::new(Vec::new())),
56        }
57    }
58
59    /// Register a protocol adapter
60    ///
61    /// Adapters are notified of all state changes for their protocol.
62    pub async fn register_adapter(&self, adapter: Arc<dyn ProtocolAdapter + Send + Sync>) {
63        let mut adapters = self.adapters.write().await;
64        adapters.push(adapter);
65        info!("Registered protocol adapter");
66    }
67
68    /// Get or create unified state for a workspace
69    pub async fn get_or_create_state(&self, workspace_id: &str) -> UnifiedState {
70        let mut states = self.states.write().await;
71        states
72            .entry(workspace_id.to_string())
73            .or_insert_with(|| UnifiedState::new(workspace_id.to_string()))
74            .clone()
75    }
76
77    /// Get unified state for a workspace (returns None if not found)
78    pub async fn get_state(&self, workspace_id: &str) -> Option<UnifiedState> {
79        let states = self.states.read().await;
80        states.get(workspace_id).cloned()
81    }
82
83    /// Set active persona for a workspace
84    ///
85    /// This updates the unified state and broadcasts the change to all
86    /// protocol adapters, ensuring all protocols use the new persona.
87    pub async fn set_active_persona(
88        &self,
89        workspace_id: &str,
90        persona: PersonaProfile,
91    ) -> Result<()> {
92        let mut states = self.states.write().await;
93        let state = states
94            .entry(workspace_id.to_string())
95            .or_insert_with(|| UnifiedState::new(workspace_id.to_string()));
96
97        state.active_persona = Some(persona.clone());
98        state.increment_version();
99
100        let event = StateChangeEvent::PersonaChanged {
101            workspace_id: workspace_id.to_string(),
102            persona,
103        };
104
105        drop(states); // Release lock before broadcasting
106
107        self.broadcast_event(&event).await;
108        info!("Set active persona for workspace {}", workspace_id);
109        Ok(())
110    }
111
112    /// Set active scenario for a workspace
113    pub async fn set_active_scenario(&self, workspace_id: &str, scenario_id: String) -> Result<()> {
114        let mut states = self.states.write().await;
115        let state = states
116            .entry(workspace_id.to_string())
117            .or_insert_with(|| UnifiedState::new(workspace_id.to_string()));
118
119        state.active_scenario = Some(scenario_id.clone());
120        state.increment_version();
121
122        let event = StateChangeEvent::ScenarioChanged {
123            workspace_id: workspace_id.to_string(),
124            scenario_id,
125        };
126
127        drop(states);
128
129        self.broadcast_event(&event).await;
130        info!("Set active scenario for workspace {}", workspace_id);
131        Ok(())
132    }
133
134    /// Set reality level for a workspace
135    pub async fn set_reality_level(&self, workspace_id: &str, level: RealityLevel) -> Result<()> {
136        let mut states = self.states.write().await;
137        let state = states
138            .entry(workspace_id.to_string())
139            .or_insert_with(|| UnifiedState::new(workspace_id.to_string()));
140
141        state.reality_level = level;
142        state.increment_version();
143
144        let event = StateChangeEvent::RealityLevelChanged {
145            workspace_id: workspace_id.to_string(),
146            level,
147        };
148
149        drop(states);
150
151        self.broadcast_event(&event).await;
152        debug!("Set reality level {:?} for workspace {}", level, workspace_id);
153        Ok(())
154    }
155
156    /// Set reality continuum ratio for a workspace
157    pub async fn set_reality_ratio(&self, workspace_id: &str, ratio: f64) -> Result<()> {
158        let ratio = ratio.clamp(0.0, 1.0);
159        let mut states = self.states.write().await;
160        let state = states
161            .entry(workspace_id.to_string())
162            .or_insert_with(|| UnifiedState::new(workspace_id.to_string()));
163
164        state.reality_continuum_ratio = ratio;
165        state.increment_version();
166
167        let event = StateChangeEvent::RealityRatioChanged {
168            workspace_id: workspace_id.to_string(),
169            ratio,
170        };
171
172        drop(states);
173
174        self.broadcast_event(&event).await;
175        debug!("Set reality ratio {} for workspace {}", ratio, workspace_id);
176        Ok(())
177    }
178
179    /// Register or update an entity
180    ///
181    /// Entities are tracked across all protocols. When an entity is created
182    /// via HTTP, it becomes immediately available in GraphQL, gRPC, etc.
183    /// Also automatically adds the entity to the persona graph if a persona_id is present.
184    pub async fn register_entity(&self, workspace_id: &str, entity: EntityState) -> Result<()> {
185        let mut states = self.states.write().await;
186        let state = states
187            .entry(workspace_id.to_string())
188            .or_insert_with(|| UnifiedState::new(workspace_id.to_string()));
189
190        let is_new = !state
191            .entity_state
192            .contains_key(&UnifiedState::entity_key(&entity.entity_type, &entity.entity_id));
193
194        // Add entity to persona graph if persona_id is present
195        #[cfg(feature = "persona-graph")]
196        if let Some(ref persona_id) = entity.persona_id {
197            let graph = state.get_or_create_persona_graph();
198            graph.get_or_create_node_with_links(persona_id, &entity.entity_type, None, None);
199
200            // If entity data contains related entity IDs, link them in the graph
201            #[cfg(feature = "persona-graph")]
202            if let Some(user_id) = entity.data.get("user_id").or_else(|| entity.data.get("userId"))
203            {
204                if let Some(user_id_str) = user_id.as_str() {
205                    let user_persona_id = format!("user:{}", user_id_str);
206                    graph.link_entity_types(
207                        &user_persona_id,
208                        "user",
209                        persona_id,
210                        &entity.entity_type,
211                    );
212                }
213            }
214
215            // Link orders to payments
216            #[cfg(feature = "persona-graph")]
217            if entity.entity_type == "payment" {
218                if let Some(order_id) =
219                    entity.data.get("order_id").or_else(|| entity.data.get("orderId"))
220                {
221                    if let Some(order_id_str) = order_id.as_str() {
222                        let order_persona_id = format!("order:{}", order_id_str);
223                        graph.link_entity_types(&order_persona_id, "order", persona_id, "payment");
224                    }
225                }
226            }
227        }
228
229        let entity_clone = entity.clone();
230        state.register_entity(entity_clone.clone());
231
232        let event = if is_new {
233            StateChangeEvent::EntityCreated {
234                workspace_id: workspace_id.to_string(),
235                entity: entity_clone,
236            }
237        } else {
238            StateChangeEvent::EntityUpdated {
239                workspace_id: workspace_id.to_string(),
240                entity: entity_clone,
241            }
242        };
243
244        drop(states);
245
246        self.broadcast_event(&event).await;
247        debug!(
248            "Registered entity {}:{} for workspace {}",
249            entity.entity_type, entity.entity_id, workspace_id
250        );
251        Ok(())
252    }
253
254    /// Get entity by type and ID
255    pub async fn get_entity(
256        &self,
257        workspace_id: &str,
258        entity_type: &str,
259        entity_id: &str,
260    ) -> Option<EntityState> {
261        let states = self.states.read().await;
262        states.get(workspace_id)?.get_entity(entity_type, entity_id).cloned()
263    }
264
265    /// Find related entities using the persona graph
266    ///
267    /// Given a persona ID and entity type, finds all related entities of the target type
268    /// by traversing the persona graph.
269    ///
270    /// # Arguments
271    /// * `workspace_id` - Workspace identifier
272    /// * `persona_id` - Starting persona ID
273    /// * `target_entity_type` - Entity type to find (e.g., "order", "payment")
274    /// * `relationship_type` - Optional relationship type filter (e.g., "has_orders")
275    ///
276    /// # Returns
277    /// Vector of entity states matching the criteria
278    pub async fn find_related_entities(
279        &self,
280        workspace_id: &str,
281        persona_id: &str,
282        target_entity_type: &str,
283        relationship_type: Option<&str>,
284    ) -> Vec<EntityState> {
285        let states = self.states.read().await;
286        let state = match states.get(workspace_id) {
287            Some(s) => s,
288            None => return Vec::new(),
289        };
290
291        #[cfg(feature = "persona-graph")]
292        let graph = match state.persona_graph() {
293            Some(g) => g,
294            None => return Vec::new(),
295        };
296
297        // Find related persona IDs in the graph
298        #[cfg(feature = "persona-graph")]
299        let related_persona_ids =
300            graph.find_related_by_entity_type(persona_id, target_entity_type, relationship_type);
301
302        #[cfg(not(feature = "persona-graph"))]
303        let related_persona_ids: Vec<String> = Vec::new();
304
305        // Convert persona IDs to entity states
306        let mut related_entities = Vec::new();
307        for related_persona_id in related_persona_ids {
308            // Extract entity ID from persona ID (format: "entity_type:entity_id")
309            if let Some((_, entity_id)) = related_persona_id.split_once(':') {
310                if let Some(entity) = state.get_entity(target_entity_type, entity_id) {
311                    related_entities.push(entity.clone());
312                }
313            }
314        }
315
316        related_entities
317    }
318
319    /// Activate a chaos rule
320    pub async fn activate_chaos_rule(&self, workspace_id: &str, rule: ChaosScenario) -> Result<()> {
321        let mut states = self.states.write().await;
322        let state = states
323            .entry(workspace_id.to_string())
324            .or_insert_with(|| UnifiedState::new(workspace_id.to_string()));
325
326        // Remove existing rule with same name if present
327        // Note: ChaosScenario is serde_json::Value, so we compare by serializing
328        if let Some(rule_name) = rule.get("name").and_then(|v| v.as_str()) {
329            state
330                .active_chaos_rules
331                .retain(|r| r.get("name").and_then(|v| v.as_str()) != Some(rule_name));
332        }
333        state.active_chaos_rules.push(rule.clone());
334        state.increment_version();
335
336        let event = StateChangeEvent::ChaosRuleActivated {
337            workspace_id: workspace_id.to_string(),
338            rule,
339        };
340
341        drop(states);
342
343        self.broadcast_event(&event).await;
344        info!("Activated chaos rule for workspace {}", workspace_id);
345        Ok(())
346    }
347
348    /// Deactivate a chaos rule
349    pub async fn deactivate_chaos_rule(&self, workspace_id: &str, rule_name: &str) -> Result<()> {
350        let mut states = self.states.write().await;
351        let state = states
352            .entry(workspace_id.to_string())
353            .or_insert_with(|| UnifiedState::new(workspace_id.to_string()));
354
355        // Note: ChaosScenario is serde_json::Value, so we compare by serializing
356        let removed = state
357            .active_chaos_rules
358            .iter()
359            .any(|r| r.get("name").and_then(|v| v.as_str()) == Some(rule_name));
360
361        if removed {
362            state
363                .active_chaos_rules
364                .retain(|r| r.get("name").and_then(|v| v.as_str()) != Some(rule_name));
365            state.increment_version();
366
367            let event = StateChangeEvent::ChaosRuleDeactivated {
368                workspace_id: workspace_id.to_string(),
369                rule_name: rule_name.to_string(),
370            };
371
372            drop(states);
373
374            self.broadcast_event(&event).await;
375            info!("Deactivated chaos rule {} for workspace {}", rule_name, workspace_id);
376        }
377
378        Ok(())
379    }
380
381    /// Update protocol state
382    pub async fn update_protocol_state(
383        &self,
384        workspace_id: &str,
385        protocol: Protocol,
386        protocol_state: ProtocolState,
387    ) -> Result<()> {
388        let mut states = self.states.write().await;
389        let state = states
390            .entry(workspace_id.to_string())
391            .or_insert_with(|| UnifiedState::new(workspace_id.to_string()));
392
393        state.protocol_states.insert(protocol, protocol_state);
394        state.increment_version();
395
396        Ok(())
397    }
398
399    /// Get protocol state
400    pub async fn get_protocol_state(
401        &self,
402        workspace_id: &str,
403        protocol: Protocol,
404    ) -> Option<ProtocolState> {
405        let states = self.states.read().await;
406        states.get(workspace_id)?.protocol_states.get(&protocol).cloned()
407    }
408
409    /// Subscribe to state change events
410    ///
411    /// Returns a receiver that will receive all state change events for
412    /// the specified workspace (or all workspaces if None).
413    pub fn subscribe_to_events(
414        &self,
415        _workspace_id: Option<&str>,
416    ) -> broadcast::Receiver<StateChangeEvent> {
417        self.event_tx.subscribe()
418    }
419
420    /// Restore unified state from a snapshot
421    ///
422    /// This replaces the current state for a workspace with the provided state.
423    /// All protocol adapters will be notified of the state changes.
424    pub async fn restore_state(&self, state: UnifiedState) -> Result<()> {
425        let workspace_id = state.workspace_id.clone();
426        let mut states = self.states.write().await;
427        states.insert(workspace_id.clone(), state.clone());
428        drop(states);
429
430        // Broadcast state change events for all components
431        if let Some(ref persona) = state.active_persona {
432            self.broadcast_event(&StateChangeEvent::PersonaChanged {
433                workspace_id: workspace_id.clone(),
434                persona: persona.clone(),
435            })
436            .await;
437        }
438
439        if let Some(ref scenario_id) = state.active_scenario {
440            self.broadcast_event(&StateChangeEvent::ScenarioChanged {
441                workspace_id: workspace_id.clone(),
442                scenario_id: scenario_id.clone(),
443            })
444            .await;
445        }
446
447        self.broadcast_event(&StateChangeEvent::RealityLevelChanged {
448            workspace_id: workspace_id.clone(),
449            level: state.reality_level,
450        })
451        .await;
452
453        self.broadcast_event(&StateChangeEvent::RealityRatioChanged {
454            workspace_id: workspace_id.clone(),
455            ratio: state.reality_continuum_ratio,
456        })
457        .await;
458
459        // Broadcast entity events
460        for entity in state.entity_state.values() {
461            self.broadcast_event(&StateChangeEvent::EntityCreated {
462                workspace_id: workspace_id.clone(),
463                entity: entity.clone(),
464            })
465            .await;
466        }
467
468        // Broadcast chaos rule events
469        for rule in &state.active_chaos_rules {
470            self.broadcast_event(&StateChangeEvent::ChaosRuleActivated {
471                workspace_id: workspace_id.clone(),
472                rule: rule.clone(),
473            })
474            .await;
475        }
476
477        info!("Restored state for workspace {}", workspace_id);
478        Ok(())
479    }
480
481    /// Broadcast a state change event to all adapters
482    async fn broadcast_event(&self, event: &StateChangeEvent) {
483        // Send to event subscribers
484        if let Err(e) = self.event_tx.send(event.clone()) {
485            warn!("Failed to broadcast state change event: {}", e);
486        }
487
488        // Notify all protocol adapters
489        let adapters = self.adapters.read().await;
490        for adapter in adapters.iter() {
491            if let Err(e) = adapter.on_state_change(event).await {
492                error!(
493                    "Protocol adapter {:?} failed to handle state change: {}",
494                    adapter.protocol(),
495                    e
496                );
497            }
498        }
499    }
500}
501
502impl Default for ConsistencyEngine {
503    fn default() -> Self {
504        Self::new()
505    }
506}