mockforge_world_state/
engine.rs1use crate::aggregators::StateAggregator;
7use crate::model::{StateLayer, WorldStateSnapshot};
8use crate::query::WorldStateQuery;
9use anyhow::Result;
10use std::collections::HashMap;
11use std::sync::Arc;
12use tokio::sync::RwLock;
13use tracing::{debug, info, warn};
14
15pub struct WorldStateEngine {
20 aggregators: HashMap<StateLayer, Arc<dyn StateAggregator>>,
22 snapshots: Arc<RwLock<Vec<WorldStateSnapshot>>>,
24 max_snapshots: usize,
26}
27
28impl WorldStateEngine {
29 pub fn new() -> Self {
31 Self {
32 aggregators: HashMap::new(),
33 snapshots: Arc::new(RwLock::new(Vec::new())),
34 max_snapshots: 100,
35 }
36 }
37
38 pub fn register_aggregator(&mut self, aggregator: Arc<dyn StateAggregator>) {
40 let layer = aggregator.layer();
41 self.aggregators.insert(layer, aggregator);
42 info!("Registered aggregator for layer: {:?}", layer);
43 }
44
45 pub async fn create_snapshot(&self) -> Result<WorldStateSnapshot> {
47 debug!("Creating world state snapshot");
48
49 let mut snapshot = WorldStateSnapshot::new();
50 let mut all_nodes = Vec::new();
51 let mut all_edges = Vec::new();
52
53 for (layer, aggregator) in &self.aggregators {
55 match aggregator.aggregate().await {
56 Ok((nodes, edges)) => {
57 debug!(
58 "Aggregated {} nodes and {} edges from layer: {:?}",
59 nodes.len(),
60 edges.len(),
61 layer
62 );
63 all_nodes.extend(nodes);
64 all_edges.extend(edges);
65 snapshot.layers.insert(*layer, true);
66 }
67 Err(e) => {
68 warn!("Failed to aggregate state from layer {:?}: {}", layer, e);
69 snapshot.layers.insert(*layer, false);
70 }
71 }
72 }
73
74 snapshot.nodes = all_nodes;
75 snapshot.edges = all_edges;
76
77 let mut snapshots = self.snapshots.write().await;
79 snapshots.push(snapshot.clone());
80
81 if snapshots.len() > self.max_snapshots {
83 snapshots.remove(0);
84 }
85
86 info!(
87 "Created world state snapshot with {} nodes and {} edges",
88 snapshot.nodes.len(),
89 snapshot.edges.len()
90 );
91
92 Ok(snapshot)
93 }
94
95 pub async fn get_current_snapshot(&self) -> Result<WorldStateSnapshot> {
97 self.create_snapshot().await
98 }
99
100 pub async fn get_snapshot(&self, snapshot_id: &str) -> Option<WorldStateSnapshot> {
102 let snapshots = self.snapshots.read().await;
103 snapshots.iter().find(|s| s.id == snapshot_id).cloned()
104 }
105
106 pub async fn get_all_snapshots(&self) -> Vec<WorldStateSnapshot> {
108 let snapshots = self.snapshots.read().await;
109 snapshots.clone()
110 }
111
112 pub async fn query(&self, query: &WorldStateQuery) -> Result<WorldStateSnapshot> {
114 let snapshot = self.create_snapshot().await?;
115
116 let filtered_nodes: Vec<_> =
118 snapshot.nodes.iter().filter(|node| query.matches_node(node)).cloned().collect();
119
120 let filtered_edges: Vec<_> = if query.include_edges {
122 snapshot.edges.iter().filter(|edge| query.matches_edge(edge)).cloned().collect()
123 } else {
124 Vec::new()
125 };
126
127 let mut filtered_snapshot = snapshot;
129 filtered_snapshot.nodes = filtered_nodes;
130 filtered_snapshot.edges = filtered_edges;
131
132 Ok(filtered_snapshot)
133 }
134
135 pub fn get_layers(&self) -> Vec<StateLayer> {
137 self.aggregators.keys().copied().collect()
138 }
139
140 pub fn set_max_snapshots(&mut self, max: usize) {
142 self.max_snapshots = max;
143 }
144}
145
146impl Default for WorldStateEngine {
147 fn default() -> Self {
148 Self::new()
149 }
150}