aimdb_core/graph.rs
1//! Dependency graph for AimDB record topology
2//!
3//! This module provides the `DependencyGraph` and related types that represent
4//! the entire database topology — sources, links, transforms, taps, and their
5//! relationships.
6//!
7//! The graph is constructed once during `build()` and is immutable thereafter.
8//! It enables:
9//! - Build-time validation (cycle detection, missing inputs)
10//! - Spawn ordering (topological sort)
11//! - Runtime introspection (AimX protocol, MCP tools, CLI)
12
13extern crate alloc;
14use alloc::{
15 string::{String, ToString},
16 vec::Vec,
17};
18
19// ============================================================================
20// Record Origin
21// ============================================================================
22
23/// How a record gets its values — part of the dependency graph.
24#[derive(Clone, Debug)]
25#[cfg_attr(feature = "std", derive(serde::Serialize, serde::Deserialize))]
26#[cfg_attr(feature = "std", serde(rename_all = "snake_case"))]
27pub enum RecordOrigin {
28 /// Autonomous producer via `.source()`
29 Source,
30 /// Inbound connector via `.link_from()`
31 Link { protocol: String, address: String },
32 /// Single-input reactive derivation via `.transform()`
33 Transform { input: String },
34 /// Multi-input reactive join via `.transform_join()`
35 TransformJoin { inputs: Vec<String> },
36 /// No registered producer (writable via `record.set` / `db.produce()`)
37 Passive,
38}
39
40// ============================================================================
41// Graph Node
42// ============================================================================
43
44/// Metadata for one node in the dependency graph.
45#[derive(Clone, Debug)]
46#[cfg_attr(feature = "std", derive(serde::Serialize))]
47pub struct GraphNode {
48 /// Record key (e.g. "temp.vienna")
49 pub key: String,
50 /// How this record gets its values
51 pub origin: RecordOrigin,
52 /// Buffer type ("spmc_ring", "single_latest", "mailbox", "none")
53 pub buffer_type: String,
54 /// Buffer capacity (None for unbounded or non-ring buffers)
55 pub buffer_capacity: Option<usize>,
56 /// Number of taps attached
57 pub tap_count: usize,
58 /// Whether an outbound link is configured
59 pub has_outbound_link: bool,
60}
61
62// ============================================================================
63// Graph Edge
64// ============================================================================
65
66/// One directed edge in the dependency graph.
67#[derive(Clone, Debug)]
68#[cfg_attr(feature = "std", derive(serde::Serialize))]
69pub struct GraphEdge {
70 /// Source record key (None for external origins like source/link)
71 pub from: Option<String>,
72 /// Target record key (None for side-effects like taps/link_out)
73 pub to: Option<String>,
74 /// Classification of this edge
75 pub edge_type: EdgeType,
76}
77
78/// Classification of a dependency graph edge.
79#[derive(Clone, Debug)]
80#[cfg_attr(feature = "std", derive(serde::Serialize))]
81#[cfg_attr(feature = "std", serde(rename_all = "snake_case"))]
82pub enum EdgeType {
83 Source,
84 Link { protocol: String },
85 Transform,
86 TransformJoin,
87 Tap { index: usize },
88 LinkOut { protocol: String },
89}
90
91// ============================================================================
92// Record Graph Info (builder input)
93// ============================================================================
94
95/// Information needed to build a GraphNode for one record.
96///
97/// Collected from each record during graph construction.
98#[derive(Clone, Debug)]
99pub struct RecordGraphInfo {
100 /// Record key
101 pub key: String,
102 /// How this record gets its values
103 pub origin: RecordOrigin,
104 /// Buffer type name
105 pub buffer_type: String,
106 /// Buffer capacity (if applicable)
107 pub buffer_capacity: Option<usize>,
108 /// Number of taps attached
109 pub tap_count: usize,
110 /// Whether an outbound link is configured
111 pub has_outbound_link: bool,
112}
113
114// ============================================================================
115// Dependency Graph
116// ============================================================================
117
118/// The dependency graph, constructed once during `build()` and immutable thereafter.
119#[derive(Clone, Debug)]
120pub struct DependencyGraph {
121 /// All nodes indexed by record key.
122 pub nodes: Vec<GraphNode>,
123 /// All edges (both internal and external).
124 pub edges: Vec<GraphEdge>,
125 /// Topological order of record keys (transforms come after their inputs).
126 pub topo_order: Vec<String>,
127}
128
129impl DependencyGraph {
130 /// Construct and validate the dependency graph from registered records.
131 ///
132 /// This builds a complete `DependencyGraph` with nodes, edges, and topological order.
133 /// The graph is immutable after construction and can be used for introspection.
134 ///
135 /// # Arguments
136 /// * `record_infos` - Information about each registered record (origin, buffer, etc.)
137 ///
138 /// # Returns
139 /// * `Ok(DependencyGraph)` - The complete, validated graph
140 /// * `Err(DbError::TransformInputNotFound)` - If a transform references a missing record
141 /// * `Err(DbError::CyclicDependency)` - If the transform edges form a cycle
142 pub fn build_and_validate(record_infos: &[RecordGraphInfo]) -> crate::DbResult<Self> {
143 use alloc::collections::VecDeque;
144
145 // Build set of all keys for validation
146 let all_keys: hashbrown::HashSet<&str> =
147 record_infos.iter().map(|info| info.key.as_str()).collect();
148
149 // Extract transform inputs for validation and edge construction
150 let mut transform_inputs: Vec<(&str, Vec<&str>)> = Vec::new();
151 for info in record_infos {
152 match &info.origin {
153 RecordOrigin::Transform { input } => {
154 transform_inputs.push((info.key.as_str(), alloc::vec![input.as_str()]));
155 }
156 RecordOrigin::TransformJoin { inputs } => {
157 let input_refs: Vec<&str> = inputs.iter().map(|s| s.as_str()).collect();
158 transform_inputs.push((info.key.as_str(), input_refs));
159 }
160 _ => {}
161 }
162 }
163
164 // Validate: check all transform input keys exist
165 #[allow(unused_variables)] // output_key, input_key used only in std error messages
166 for (output_key, input_keys) in &transform_inputs {
167 for input_key in input_keys {
168 if !all_keys.contains(input_key) {
169 #[cfg(feature = "std")]
170 return Err(crate::DbError::TransformInputNotFound {
171 output_key: output_key.to_string(),
172 input_key: input_key.to_string(),
173 });
174 #[cfg(not(feature = "std"))]
175 return Err(crate::DbError::TransformInputNotFound {
176 _output_key: (),
177 _input_key: (),
178 });
179 }
180 }
181 }
182
183 // Build adjacency list and in-degree map for Kahn's algorithm
184 let mut in_degree: hashbrown::HashMap<&str, usize> = hashbrown::HashMap::new();
185 let mut adjacency: hashbrown::HashMap<&str, Vec<&str>> = hashbrown::HashMap::new();
186
187 // Initialize all keys with in-degree 0
188 for key in &all_keys {
189 in_degree.entry(*key).or_insert(0);
190 adjacency.entry(*key).or_default();
191 }
192
193 // Add transform edges: input → output
194 for (output_key, input_keys) in &transform_inputs {
195 for input_key in input_keys {
196 adjacency.entry(*input_key).or_default().push(*output_key);
197 *in_degree.entry(*output_key).or_insert(0) += 1;
198 }
199 }
200
201 // Kahn's algorithm — topological sort
202 let mut queue: VecDeque<&str> = in_degree
203 .iter()
204 .filter(|(_, °)| deg == 0)
205 .map(|(&node, _)| node)
206 .collect();
207 let mut topo_order = Vec::new();
208
209 while let Some(node) = queue.pop_front() {
210 topo_order.push(node.to_string());
211 if let Some(neighbors) = adjacency.get(node) {
212 for &neighbor in neighbors {
213 if let Some(deg) = in_degree.get_mut(neighbor) {
214 *deg -= 1;
215 if *deg == 0 {
216 queue.push_back(neighbor);
217 }
218 }
219 }
220 }
221 }
222
223 if topo_order.len() != all_keys.len() {
224 #[cfg(feature = "std")]
225 {
226 // Find the cycle participants for a helpful error message
227 let cycle_records: Vec<String> = in_degree
228 .iter()
229 .filter(|(_, °)| deg > 0)
230 .map(|(&k, _)| k.to_string())
231 .collect();
232
233 return Err(crate::DbError::CyclicDependency {
234 records: cycle_records,
235 });
236 }
237 #[cfg(not(feature = "std"))]
238 return Err(crate::DbError::CyclicDependency { _records: () });
239 }
240
241 // Build nodes from record infos
242 let nodes: Vec<GraphNode> = record_infos
243 .iter()
244 .map(|info| GraphNode {
245 key: info.key.clone(),
246 origin: info.origin.clone(),
247 buffer_type: info.buffer_type.clone(),
248 buffer_capacity: info.buffer_capacity,
249 tap_count: info.tap_count,
250 has_outbound_link: info.has_outbound_link,
251 })
252 .collect();
253
254 // Build edges from origins
255 let mut edges: Vec<GraphEdge> = Vec::new();
256 for info in record_infos {
257 match &info.origin {
258 RecordOrigin::Source => {
259 edges.push(GraphEdge {
260 from: None,
261 to: Some(info.key.clone()),
262 edge_type: EdgeType::Source,
263 });
264 }
265 RecordOrigin::Link { protocol, .. } => {
266 edges.push(GraphEdge {
267 from: None,
268 to: Some(info.key.clone()),
269 edge_type: EdgeType::Link {
270 protocol: protocol.clone(),
271 },
272 });
273 }
274 RecordOrigin::Transform { input } => {
275 edges.push(GraphEdge {
276 from: Some(input.clone()),
277 to: Some(info.key.clone()),
278 edge_type: EdgeType::Transform,
279 });
280 }
281 RecordOrigin::TransformJoin { inputs } => {
282 for input in inputs {
283 edges.push(GraphEdge {
284 from: Some(input.clone()),
285 to: Some(info.key.clone()),
286 edge_type: EdgeType::TransformJoin,
287 });
288 }
289 }
290 RecordOrigin::Passive => {
291 // No inbound edge for passive records
292 }
293 }
294
295 // Add outbound link edges if present
296 if info.has_outbound_link {
297 edges.push(GraphEdge {
298 from: Some(info.key.clone()),
299 to: None,
300 edge_type: EdgeType::LinkOut {
301 protocol: "unknown".to_string(), // Protocol info not available here
302 },
303 });
304 }
305
306 // Add tap edges
307 for tap_idx in 0..info.tap_count {
308 edges.push(GraphEdge {
309 from: Some(info.key.clone()),
310 to: None,
311 edge_type: EdgeType::Tap { index: tap_idx },
312 });
313 }
314 }
315
316 Ok(DependencyGraph {
317 nodes,
318 edges,
319 topo_order,
320 })
321 }
322
323 /// Get a node by key
324 pub fn node(&self, key: &str) -> Option<&GraphNode> {
325 self.nodes.iter().find(|n| n.key == key)
326 }
327
328 /// Get all nodes
329 pub fn nodes(&self) -> &[GraphNode] {
330 &self.nodes
331 }
332
333 /// Get all edges
334 pub fn edges(&self) -> &[GraphEdge] {
335 &self.edges
336 }
337
338 /// Get the topological order
339 pub fn topo_order(&self) -> &[String] {
340 &self.topo_order
341 }
342}