Skip to main content

aingle_cortex/
state.rs

1//! The shared application state for the Córtex API server.
2
3use aingle_graph::GraphDB;
4use aingle_logic::RuleEngine;
5use std::sync::Arc;
6use titans_memory::TitansMemory;
7use tokio::sync::RwLock;
8
9#[cfg(feature = "auth")]
10use crate::auth::UserStore;
11use crate::proofs::ProofStore;
12use crate::rest::audit::AuditLog;
13
14/// The shared state accessible by all API handlers.
15///
16/// This struct uses `Arc` and `RwLock` to provide safe, concurrent access
17/// to the application's core components like the database and logic engine.
18#[derive(Clone)]
19pub struct AppState {
20    /// A thread-safe reference to the graph database.
21    pub graph: Arc<RwLock<GraphDB>>,
22    /// A thread-safe reference to the logic and validation engine.
23    pub logic: Arc<RwLock<RuleEngine>>,
24    /// The Titans dual-memory system (STM + LTM with consolidation).
25    pub memory: Arc<RwLock<TitansMemory>>,
26    /// The event broadcaster for sending real-time updates to WebSocket subscribers.
27    pub broadcaster: Arc<EventBroadcaster>,
28    /// The store for managing and verifying zero-knowledge proofs.
29    pub proof_store: Arc<ProofStore>,
30    /// Manager for temporary sandbox namespaces used by skill verification.
31    pub sandbox_manager: Arc<SandboxManager>,
32    /// Audit log for tracking API actions.
33    pub audit_log: Arc<RwLock<AuditLog>>,
34    /// The user store for authentication and authorization.
35    ///
36    /// This field is only available if the `auth` feature is enabled.
37    #[cfg(feature = "auth")]
38    pub user_store: Arc<UserStore>,
39    /// P2P manager for multi-node triple synchronization.
40    #[cfg(feature = "p2p")]
41    pub p2p: Option<Arc<crate::p2p::manager::P2pManager>>,
42}
43
44impl AppState {
45    /// Creates a new `AppState` with an in-memory graph database.
46    /// This is useful for testing or development environments.
47    pub fn new() -> Self {
48        let graph = GraphDB::memory().expect("Failed to create in-memory graph");
49        let logic = RuleEngine::new();
50        let memory = TitansMemory::agent_mode();
51
52        #[cfg(feature = "auth")]
53        let user_store = {
54            let store = Arc::new(UserStore::new());
55            // Initialize a default admin user for convenience.
56            let _ = store.init_default_admin();
57            store
58        };
59
60        Self {
61            graph: Arc::new(RwLock::new(graph)),
62            logic: Arc::new(RwLock::new(logic)),
63            memory: Arc::new(RwLock::new(memory)),
64            broadcaster: Arc::new(EventBroadcaster::new()),
65            proof_store: Arc::new(ProofStore::new()),
66            sandbox_manager: Arc::new(SandboxManager::new()),
67            audit_log: Arc::new(RwLock::new(AuditLog::default())),
68            #[cfg(feature = "auth")]
69            user_store,
70            #[cfg(feature = "p2p")]
71            p2p: None,
72        }
73    }
74
75    /// Creates a new `AppState` with a pre-configured `GraphDB` instance.
76    pub fn with_graph(graph: GraphDB) -> Self {
77        let logic = RuleEngine::new();
78        let memory = TitansMemory::agent_mode();
79
80        #[cfg(feature = "auth")]
81        let user_store = {
82            let store = Arc::new(UserStore::new());
83            // Initialize a default admin user.
84            let _ = store.init_default_admin();
85            store
86        };
87
88        Self {
89            graph: Arc::new(RwLock::new(graph)),
90            logic: Arc::new(RwLock::new(logic)),
91            memory: Arc::new(RwLock::new(memory)),
92            broadcaster: Arc::new(EventBroadcaster::new()),
93            proof_store: Arc::new(ProofStore::new()),
94            sandbox_manager: Arc::new(SandboxManager::new()),
95            audit_log: Arc::new(RwLock::new(AuditLog::default())),
96            #[cfg(feature = "auth")]
97            user_store,
98            #[cfg(feature = "p2p")]
99            p2p: None,
100        }
101    }
102
103    /// Creates a new `AppState` with a file-backed audit log.
104    pub fn with_audit_path(path: std::path::PathBuf) -> Self {
105        let graph = GraphDB::memory().expect("Failed to create in-memory graph");
106        let logic = RuleEngine::new();
107        let memory = TitansMemory::agent_mode();
108
109        #[cfg(feature = "auth")]
110        let user_store = {
111            let store = Arc::new(UserStore::new());
112            let _ = store.init_default_admin();
113            store
114        };
115
116        Self {
117            graph: Arc::new(RwLock::new(graph)),
118            logic: Arc::new(RwLock::new(logic)),
119            memory: Arc::new(RwLock::new(memory)),
120            broadcaster: Arc::new(EventBroadcaster::new()),
121            proof_store: Arc::new(ProofStore::new()),
122            sandbox_manager: Arc::new(SandboxManager::new()),
123            audit_log: Arc::new(RwLock::new(AuditLog::with_path(10_000, path))),
124            #[cfg(feature = "auth")]
125            user_store,
126            #[cfg(feature = "p2p")]
127            p2p: None,
128        }
129    }
130
131    /// Returns an internal Cortex client configured for same-process access.
132    ///
133    /// This client calls the Cortex REST API and can be used by host functions
134    /// to bridge WASM zome code with the semantic graph.
135    pub fn cortex_client(&self) -> crate::client::CortexInternalClient {
136        crate::client::CortexInternalClient::default_client()
137    }
138
139    /// Gathers and returns statistics about the graph and connected clients.
140    pub async fn stats(&self) -> GraphStats {
141        let graph = self.graph.read().await;
142        let stats = graph.stats();
143        GraphStats {
144            triple_count: stats.triple_count,
145            subject_count: stats.subject_count,
146            predicate_count: stats.predicate_count,
147            object_count: stats.object_count,
148            connected_clients: self.broadcaster.client_count(),
149        }
150    }
151}
152
153impl Default for AppState {
154    fn default() -> Self {
155        Self::new()
156    }
157}
158
159/// A serializable struct containing statistics about the graph database.
160#[derive(Debug, Clone, serde::Serialize)]
161pub struct GraphStats {
162    /// The total number of triples in the graph.
163    pub triple_count: usize,
164    /// The number of unique subjects.
165    pub subject_count: usize,
166    /// The number of unique predicates.
167    pub predicate_count: usize,
168    /// The number of unique objects.
169    pub object_count: usize,
170    /// The number of currently connected WebSocket clients.
171    pub connected_clients: usize,
172}
173
174/// A broadcaster for sending real-time `Event`s to WebSocket subscribers.
175pub struct EventBroadcaster {
176    /// The underlying `tokio::sync::broadcast` sender.
177    sender: tokio::sync::broadcast::Sender<Event>,
178    /// An atomic counter for the number of connected clients.
179    client_count: std::sync::atomic::AtomicUsize,
180}
181
182impl EventBroadcaster {
183    /// Creates a new `EventBroadcaster`.
184    pub fn new() -> Self {
185        let (sender, _) = tokio::sync::broadcast::channel(1024);
186        Self {
187            sender,
188            client_count: std::sync::atomic::AtomicUsize::new(0),
189        }
190    }
191
192    /// Subscribes to the broadcast channel to receive events.
193    /// This also increments the client count.
194    pub fn subscribe(&self) -> tokio::sync::broadcast::Receiver<Event> {
195        self.client_count
196            .fetch_add(1, std::sync::atomic::Ordering::SeqCst);
197        self.sender.subscribe()
198    }
199
200    /// Decrements the client count when a client unsubscribes.
201    pub fn unsubscribe(&self) {
202        // Use fetch_update to prevent underflow wrapping to usize::MAX.
203        let _ = self.client_count.fetch_update(
204            std::sync::atomic::Ordering::SeqCst,
205            std::sync::atomic::Ordering::SeqCst,
206            |current| current.checked_sub(1),
207        );
208    }
209
210    /// Broadcasts an `Event` to all active subscribers.
211    pub fn broadcast(&self, event: Event) {
212        let _ = self.sender.send(event);
213    }
214
215    /// Returns the number of currently connected clients.
216    pub fn client_count(&self) -> usize {
217        self.client_count.load(std::sync::atomic::Ordering::SeqCst)
218    }
219}
220
221impl Default for EventBroadcaster {
222    fn default() -> Self {
223        Self::new()
224    }
225}
226
227/// Defines the types of real-time events sent to WebSocket clients.
228#[derive(Debug, Clone, serde::Serialize)]
229#[serde(tag = "type", content = "data")]
230pub enum Event {
231    /// Sent when a new triple is added to the graph.
232    TripleAdded {
233        hash: String,
234        subject: String,
235        predicate: String,
236        object: serde_json::Value,
237    },
238    /// Sent when a triple is deleted from the graph.
239    TripleDeleted { hash: String },
240    /// Sent after a validation operation is completed.
241    ValidationCompleted {
242        hash: String,
243        valid: bool,
244        proof_hash: Option<String>,
245    },
246    /// Sent to a client immediately after it connects.
247    Connected { client_id: String },
248    /// A heartbeat message to keep the connection alive.
249    Ping,
250}
251
252impl Event {
253    /// Serializes the event to a JSON string.
254    pub fn to_json(&self) -> String {
255        serde_json::to_string(self).unwrap_or_default()
256    }
257}
258
259// ---------------------------------------------------------------------------
260// Sandbox Manager
261// ---------------------------------------------------------------------------
262
263/// Entry for a sandbox namespace with TTL.
264struct SandboxEntry {
265    namespace: String,
266    created_at: std::time::Instant,
267    ttl: std::time::Duration,
268}
269
270/// Manager for temporary sandbox namespaces used by skill verification.
271///
272/// Sandboxes are isolated graph namespaces with a time-to-live (TTL).
273/// After TTL expiration, the sandbox should be cleaned up.
274pub struct SandboxManager {
275    entries: RwLock<std::collections::HashMap<String, SandboxEntry>>,
276}
277
278impl SandboxManager {
279    /// Creates a new, empty `SandboxManager`.
280    pub fn new() -> Self {
281        Self {
282            entries: RwLock::new(std::collections::HashMap::new()),
283        }
284    }
285
286    /// Creates a new sandbox entry with the given ID, namespace, and TTL.
287    pub async fn create(&self, id: String, namespace: String, ttl_seconds: u64) {
288        let entry = SandboxEntry {
289            namespace,
290            created_at: std::time::Instant::now(),
291            ttl: std::time::Duration::from_secs(ttl_seconds),
292        };
293        let mut entries = self.entries.write().await;
294        entries.insert(id, entry);
295    }
296
297    /// Removes a sandbox by ID, returning the namespace if found.
298    pub async fn remove(&self, id: &str) -> Option<String> {
299        let mut entries = self.entries.write().await;
300        entries.remove(id).map(|e| e.namespace)
301    }
302
303    /// Returns the namespace for a sandbox if it exists and hasn't expired.
304    pub async fn get(&self, id: &str) -> Option<String> {
305        let entries = self.entries.read().await;
306        entries.get(id).and_then(|e| {
307            if e.created_at.elapsed() < e.ttl {
308                Some(e.namespace.clone())
309            } else {
310                None
311            }
312        })
313    }
314
315    /// Returns a list of all expired sandbox IDs for cleanup.
316    pub async fn expired(&self) -> Vec<String> {
317        let entries = self.entries.read().await;
318        entries
319            .iter()
320            .filter(|(_, e)| e.created_at.elapsed() >= e.ttl)
321            .map(|(id, _)| id.clone())
322            .collect()
323    }
324}
325
326impl Default for SandboxManager {
327    fn default() -> Self {
328        Self::new()
329    }
330}