Skip to main content

aimdb_core/
graph.rs

1//! Dependency graph for AimDB record topology
2//!
3//! This module provides the `DependencyGraph` and related types that represent
4//! the entire database topology — sources, links, transforms, taps, and their
5//! relationships.
6//!
7//! The graph is constructed once during `build()` and is immutable thereafter.
8//! It enables:
9//! - Build-time validation (cycle detection, missing inputs)
10//! - Spawn ordering (topological sort)
11//! - Runtime introspection (AimX protocol, MCP tools, CLI)
12
13extern crate alloc;
14use alloc::{
15    string::{String, ToString},
16    vec::Vec,
17};
18
19// ============================================================================
20// Record Origin
21// ============================================================================
22
23/// How a record gets its values — part of the dependency graph.
24#[derive(Clone, Debug)]
25#[cfg_attr(feature = "std", derive(serde::Serialize, serde::Deserialize))]
26#[cfg_attr(feature = "std", serde(rename_all = "snake_case"))]
27pub enum RecordOrigin {
28    /// Autonomous producer via `.source()`
29    Source,
30    /// Inbound connector via `.link_from()`
31    Link { protocol: String, address: String },
32    /// Single-input reactive derivation via `.transform()`
33    Transform { input: String },
34    /// Multi-input reactive join via `.transform_join()`
35    TransformJoin { inputs: Vec<String> },
36    /// No registered producer (writable via `record.set` / `db.produce()`)
37    Passive,
38}
39
40// ============================================================================
41// Graph Node
42// ============================================================================
43
44/// Metadata for one node in the dependency graph.
45#[derive(Clone, Debug)]
46#[cfg_attr(feature = "std", derive(serde::Serialize))]
47pub struct GraphNode {
48    /// Record key (e.g. "temp.vienna")
49    pub key: String,
50    /// How this record gets its values
51    pub origin: RecordOrigin,
52    /// Buffer type ("spmc_ring", "single_latest", "mailbox", "none")
53    pub buffer_type: String,
54    /// Buffer capacity (None for unbounded or non-ring buffers)
55    pub buffer_capacity: Option<usize>,
56    /// Number of taps attached
57    pub tap_count: usize,
58    /// Whether an outbound link is configured
59    pub has_outbound_link: bool,
60}
61
62// ============================================================================
63// Graph Edge
64// ============================================================================
65
66/// One directed edge in the dependency graph.
67#[derive(Clone, Debug)]
68#[cfg_attr(feature = "std", derive(serde::Serialize))]
69pub struct GraphEdge {
70    /// Source record key (None for external origins like source/link)
71    pub from: Option<String>,
72    /// Target record key (None for side-effects like taps/link_out)
73    pub to: Option<String>,
74    /// Classification of this edge
75    pub edge_type: EdgeType,
76}
77
78/// Classification of a dependency graph edge.
79#[derive(Clone, Debug)]
80#[cfg_attr(feature = "std", derive(serde::Serialize))]
81#[cfg_attr(feature = "std", serde(rename_all = "snake_case"))]
82pub enum EdgeType {
83    Source,
84    Link { protocol: String },
85    Transform,
86    TransformJoin,
87    Tap { index: usize },
88    LinkOut { protocol: String },
89}
90
91// ============================================================================
92// Record Graph Info (builder input)
93// ============================================================================
94
95/// Information needed to build a GraphNode for one record.
96///
97/// Collected from each record during graph construction.
98#[derive(Clone, Debug)]
99pub struct RecordGraphInfo {
100    /// Record key
101    pub key: String,
102    /// How this record gets its values
103    pub origin: RecordOrigin,
104    /// Buffer type name
105    pub buffer_type: String,
106    /// Buffer capacity (if applicable)
107    pub buffer_capacity: Option<usize>,
108    /// Number of taps attached
109    pub tap_count: usize,
110    /// Whether an outbound link is configured
111    pub has_outbound_link: bool,
112}
113
114// ============================================================================
115// Dependency Graph
116// ============================================================================
117
118/// The dependency graph, constructed once during `build()` and immutable thereafter.
119#[derive(Clone, Debug)]
120pub struct DependencyGraph {
121    /// All nodes indexed by record key.
122    pub nodes: Vec<GraphNode>,
123    /// All edges (both internal and external).
124    pub edges: Vec<GraphEdge>,
125    /// Topological order of record keys (transforms come after their inputs).
126    pub topo_order: Vec<String>,
127}
128
129impl DependencyGraph {
130    /// Construct and validate the dependency graph from registered records.
131    ///
132    /// This builds a complete `DependencyGraph` with nodes, edges, and topological order.
133    /// The graph is immutable after construction and can be used for introspection.
134    ///
135    /// # Arguments
136    /// * `record_infos` - Information about each registered record (origin, buffer, etc.)
137    ///
138    /// # Returns
139    /// * `Ok(DependencyGraph)` - The complete, validated graph
140    /// * `Err(DbError::TransformInputNotFound)` - If a transform references a missing record
141    /// * `Err(DbError::CyclicDependency)` - If the transform edges form a cycle
142    pub fn build_and_validate(record_infos: &[RecordGraphInfo]) -> crate::DbResult<Self> {
143        use alloc::collections::VecDeque;
144
145        // Build set of all keys for validation
146        let all_keys: hashbrown::HashSet<&str> =
147            record_infos.iter().map(|info| info.key.as_str()).collect();
148
149        // Extract transform inputs for validation and edge construction
150        let mut transform_inputs: Vec<(&str, Vec<&str>)> = Vec::new();
151        for info in record_infos {
152            match &info.origin {
153                RecordOrigin::Transform { input } => {
154                    transform_inputs.push((info.key.as_str(), alloc::vec![input.as_str()]));
155                }
156                RecordOrigin::TransformJoin { inputs } => {
157                    let input_refs: Vec<&str> = inputs.iter().map(|s| s.as_str()).collect();
158                    transform_inputs.push((info.key.as_str(), input_refs));
159                }
160                _ => {}
161            }
162        }
163
164        // Validate: check all transform input keys exist
165        #[allow(unused_variables)] // output_key, input_key used only in std error messages
166        for (output_key, input_keys) in &transform_inputs {
167            for input_key in input_keys {
168                if !all_keys.contains(input_key) {
169                    #[cfg(feature = "std")]
170                    return Err(crate::DbError::TransformInputNotFound {
171                        output_key: output_key.to_string(),
172                        input_key: input_key.to_string(),
173                    });
174                    #[cfg(not(feature = "std"))]
175                    return Err(crate::DbError::TransformInputNotFound {
176                        _output_key: (),
177                        _input_key: (),
178                    });
179                }
180            }
181        }
182
183        // Build adjacency list and in-degree map for Kahn's algorithm
184        let mut in_degree: hashbrown::HashMap<&str, usize> = hashbrown::HashMap::new();
185        let mut adjacency: hashbrown::HashMap<&str, Vec<&str>> = hashbrown::HashMap::new();
186
187        // Initialize all keys with in-degree 0
188        for key in &all_keys {
189            in_degree.entry(*key).or_insert(0);
190            adjacency.entry(*key).or_default();
191        }
192
193        // Add transform edges: input → output
194        for (output_key, input_keys) in &transform_inputs {
195            for input_key in input_keys {
196                adjacency.entry(*input_key).or_default().push(*output_key);
197                *in_degree.entry(*output_key).or_insert(0) += 1;
198            }
199        }
200
201        // Kahn's algorithm — topological sort
202        let mut queue: VecDeque<&str> = in_degree
203            .iter()
204            .filter(|(_, &deg)| deg == 0)
205            .map(|(&node, _)| node)
206            .collect();
207        let mut topo_order = Vec::new();
208
209        while let Some(node) = queue.pop_front() {
210            topo_order.push(node.to_string());
211            if let Some(neighbors) = adjacency.get(node) {
212                for &neighbor in neighbors {
213                    if let Some(deg) = in_degree.get_mut(neighbor) {
214                        *deg -= 1;
215                        if *deg == 0 {
216                            queue.push_back(neighbor);
217                        }
218                    }
219                }
220            }
221        }
222
223        if topo_order.len() != all_keys.len() {
224            #[cfg(feature = "std")]
225            {
226                // Find the cycle participants for a helpful error message
227                let cycle_records: Vec<String> = in_degree
228                    .iter()
229                    .filter(|(_, &deg)| deg > 0)
230                    .map(|(&k, _)| k.to_string())
231                    .collect();
232
233                return Err(crate::DbError::CyclicDependency {
234                    records: cycle_records,
235                });
236            }
237            #[cfg(not(feature = "std"))]
238            return Err(crate::DbError::CyclicDependency { _records: () });
239        }
240
241        // Build nodes from record infos
242        let nodes: Vec<GraphNode> = record_infos
243            .iter()
244            .map(|info| GraphNode {
245                key: info.key.clone(),
246                origin: info.origin.clone(),
247                buffer_type: info.buffer_type.clone(),
248                buffer_capacity: info.buffer_capacity,
249                tap_count: info.tap_count,
250                has_outbound_link: info.has_outbound_link,
251            })
252            .collect();
253
254        // Build edges from origins
255        let mut edges: Vec<GraphEdge> = Vec::new();
256        for info in record_infos {
257            match &info.origin {
258                RecordOrigin::Source => {
259                    edges.push(GraphEdge {
260                        from: None,
261                        to: Some(info.key.clone()),
262                        edge_type: EdgeType::Source,
263                    });
264                }
265                RecordOrigin::Link { protocol, .. } => {
266                    edges.push(GraphEdge {
267                        from: None,
268                        to: Some(info.key.clone()),
269                        edge_type: EdgeType::Link {
270                            protocol: protocol.clone(),
271                        },
272                    });
273                }
274                RecordOrigin::Transform { input } => {
275                    edges.push(GraphEdge {
276                        from: Some(input.clone()),
277                        to: Some(info.key.clone()),
278                        edge_type: EdgeType::Transform,
279                    });
280                }
281                RecordOrigin::TransformJoin { inputs } => {
282                    for input in inputs {
283                        edges.push(GraphEdge {
284                            from: Some(input.clone()),
285                            to: Some(info.key.clone()),
286                            edge_type: EdgeType::TransformJoin,
287                        });
288                    }
289                }
290                RecordOrigin::Passive => {
291                    // No inbound edge for passive records
292                }
293            }
294
295            // Add outbound link edges if present
296            if info.has_outbound_link {
297                edges.push(GraphEdge {
298                    from: Some(info.key.clone()),
299                    to: None,
300                    edge_type: EdgeType::LinkOut {
301                        protocol: "unknown".to_string(), // Protocol info not available here
302                    },
303                });
304            }
305
306            // Add tap edges
307            for tap_idx in 0..info.tap_count {
308                edges.push(GraphEdge {
309                    from: Some(info.key.clone()),
310                    to: None,
311                    edge_type: EdgeType::Tap { index: tap_idx },
312                });
313            }
314        }
315
316        Ok(DependencyGraph {
317            nodes,
318            edges,
319            topo_order,
320        })
321    }
322
323    /// Get a node by key
324    pub fn node(&self, key: &str) -> Option<&GraphNode> {
325        self.nodes.iter().find(|n| n.key == key)
326    }
327
328    /// Get all nodes
329    pub fn nodes(&self) -> &[GraphNode] {
330        &self.nodes
331    }
332
333    /// Get all edges
334    pub fn edges(&self) -> &[GraphEdge] {
335        &self.edges
336    }
337
338    /// Get the topological order
339    pub fn topo_order(&self) -> &[String] {
340        &self.topo_order
341    }
342}