Skip to main content

rustant_core/nodes/
manager.rs

1//! Node manager — registers, finds capable nodes, and executes tasks.
2
3use super::{
4    Node,
5    consent::ConsentStore,
6    types::{Capability, NodeCapability, NodeHealth, NodeId, NodeMessage, NodeResult, NodeTask},
7};
8use crate::error::{NodeError, RustantError};
9use std::collections::HashMap;
10
11/// Manages registered nodes and dispatches tasks.
12pub struct NodeManager {
13    nodes: HashMap<NodeId, Box<dyn Node>>,
14    consent: ConsentStore,
15}
16
17impl NodeManager {
18    pub fn new() -> Self {
19        Self {
20            nodes: HashMap::new(),
21            consent: ConsentStore::new(),
22        }
23    }
24
25    /// Register a node.
26    pub fn register_node(&mut self, node: Box<dyn Node>) {
27        let id = node.node_id().clone();
28        self.nodes.insert(id, node);
29    }
30
31    /// Remove a node by ID.
32    pub fn remove_node(&mut self, node_id: &NodeId) -> bool {
33        self.consent.revoke_all(node_id);
34        self.nodes.remove(node_id).is_some()
35    }
36
37    /// Number of registered nodes.
38    pub fn node_count(&self) -> usize {
39        self.nodes.len()
40    }
41
42    /// Get a reference to the consent store.
43    pub fn consent(&self) -> &ConsentStore {
44        &self.consent
45    }
46
47    /// Get a mutable reference to the consent store.
48    pub fn consent_mut(&mut self) -> &mut ConsentStore {
49        &mut self.consent
50    }
51
52    /// Find all nodes that have a given capability and are healthy.
53    pub fn find_capable(&self, capability: &Capability) -> Vec<&NodeId> {
54        self.nodes
55            .iter()
56            .filter(|(_, node)| {
57                node.capabilities().contains(capability) && node.health() == NodeHealth::Healthy
58            })
59            .map(|(id, _)| id)
60            .collect()
61    }
62
63    /// Execute a task on the best available node (healthy + consented).
64    pub async fn execute_on_best(&self, task: NodeTask) -> Result<NodeResult, RustantError> {
65        let capable = self.find_capable(&task.capability);
66        if capable.is_empty() {
67            return Err(RustantError::Node(NodeError::NoCapableNode {
68                capability: task.capability.to_string(),
69            }));
70        }
71
72        // Find first consented healthy node
73        for node_id in &capable {
74            if self.consent.is_granted(node_id, &task.capability)
75                && let Some(node) = self.nodes.get(node_id)
76            {
77                return node.execute(task).await;
78            }
79        }
80
81        Err(RustantError::Node(NodeError::ConsentDenied {
82            capability: task.capability.to_string(),
83        }))
84    }
85
86    /// Get node IDs.
87    pub fn node_ids(&self) -> Vec<&NodeId> {
88        self.nodes.keys().collect()
89    }
90
91    /// Find all healthy nodes with a given capability that also have consent granted.
92    pub fn find_capable_with_consent(
93        &self,
94        capability: &Capability,
95        consent_store: &ConsentStore,
96    ) -> Vec<&NodeId> {
97        self.nodes
98            .iter()
99            .filter(|(id, node)| {
100                node.capabilities().contains(capability)
101                    && node.health() == NodeHealth::Healthy
102                    && consent_store.is_granted(id, capability)
103            })
104            .map(|(id, _)| id)
105            .collect()
106    }
107
108    /// Broadcast a message to all registered nodes. Returns responses from each node.
109    pub async fn broadcast_message(&self, msg: &NodeMessage) -> Vec<(NodeId, Option<NodeMessage>)> {
110        let mut results = Vec::new();
111        for (id, node) in &self.nodes {
112            let response = node.handle_message(msg.clone()).await.unwrap_or(None);
113            results.push((id.clone(), response));
114        }
115        results
116    }
117
118    /// Aggregate rich capabilities from all registered nodes.
119    pub fn node_capabilities_map(&self) -> HashMap<NodeId, Vec<NodeCapability>> {
120        self.nodes
121            .iter()
122            .map(|(id, node)| (id.clone(), node.rich_capabilities()))
123            .collect()
124    }
125}
126
127impl Default for NodeManager {
128    fn default() -> Self {
129        Self::new()
130    }
131}
132
133#[cfg(test)]
134mod tests {
135    use super::super::types::{NodeInfo, Platform};
136    use super::*;
137    use async_trait::async_trait;
138    use chrono::Utc;
139    struct MockNode {
140        id: NodeId,
141        info: NodeInfo,
142        capabilities: Vec<Capability>,
143        health: NodeHealth,
144    }
145
146    impl MockNode {
147        fn new(name: &str, capabilities: Vec<Capability>) -> Self {
148            let id = NodeId::new(name);
149            Self {
150                id: id.clone(),
151                info: NodeInfo {
152                    node_id: id,
153                    name: name.to_string(),
154                    platform: Platform::MacOS,
155                    hostname: "test".into(),
156                    registered_at: Utc::now(),
157                    os_version: None,
158                    agent_version: "0.1.0".into(),
159                    uptime_secs: 0,
160                },
161                capabilities,
162                health: NodeHealth::Healthy,
163            }
164        }
165    }
166
167    #[async_trait]
168    impl Node for MockNode {
169        fn node_id(&self) -> &NodeId {
170            &self.id
171        }
172        fn info(&self) -> &NodeInfo {
173            &self.info
174        }
175        fn capabilities(&self) -> &[Capability] {
176            &self.capabilities
177        }
178        async fn execute(&self, task: NodeTask) -> Result<NodeResult, RustantError> {
179            Ok(NodeResult {
180                task_id: task.task_id,
181                success: true,
182                output: "mock output".into(),
183                exit_code: Some(0),
184                duration_ms: 1,
185            })
186        }
187        fn health(&self) -> NodeHealth {
188            self.health
189        }
190        async fn heartbeat(&self) -> Result<NodeHealth, RustantError> {
191            Ok(self.health)
192        }
193    }
194
195    #[test]
196    fn test_manager_new() {
197        let mgr = NodeManager::new();
198        assert_eq!(mgr.node_count(), 0);
199    }
200
201    #[test]
202    fn test_manager_register() {
203        let mut mgr = NodeManager::new();
204        mgr.register_node(Box::new(MockNode::new("n1", vec![Capability::Shell])));
205        assert_eq!(mgr.node_count(), 1);
206    }
207
208    #[test]
209    fn test_manager_remove() {
210        let mut mgr = NodeManager::new();
211        let id = NodeId::new("n1");
212        mgr.register_node(Box::new(MockNode::new("n1", vec![Capability::Shell])));
213        assert!(mgr.remove_node(&id));
214        assert_eq!(mgr.node_count(), 0);
215    }
216
217    #[test]
218    fn test_manager_find_capable() {
219        let mut mgr = NodeManager::new();
220        mgr.register_node(Box::new(MockNode::new(
221            "n1",
222            vec![Capability::Shell, Capability::FileSystem],
223        )));
224        mgr.register_node(Box::new(MockNode::new("n2", vec![Capability::Shell])));
225        mgr.register_node(Box::new(MockNode::new("n3", vec![Capability::Screenshot])));
226
227        let shell_nodes = mgr.find_capable(&Capability::Shell);
228        assert_eq!(shell_nodes.len(), 2);
229
230        let screenshot_nodes = mgr.find_capable(&Capability::Screenshot);
231        assert_eq!(screenshot_nodes.len(), 1);
232
233        let apple_nodes = mgr.find_capable(&Capability::AppleScript);
234        assert!(apple_nodes.is_empty());
235    }
236
237    #[tokio::test]
238    async fn test_manager_execute_on_best() {
239        let mut mgr = NodeManager::new();
240        let node_id = NodeId::new("n1");
241        mgr.register_node(Box::new(MockNode::new("n1", vec![Capability::Shell])));
242        mgr.consent_mut().grant(&node_id, Capability::Shell);
243
244        let task = NodeTask::new(Capability::Shell, "echo hello");
245        let result = mgr.execute_on_best(task).await.unwrap();
246        assert!(result.success);
247    }
248
249    #[tokio::test]
250    async fn test_manager_execute_no_capable() {
251        let mgr = NodeManager::new();
252        let task = NodeTask::new(Capability::Shell, "echo hello");
253        let result = mgr.execute_on_best(task).await;
254        assert!(result.is_err());
255    }
256
257    #[tokio::test]
258    async fn test_manager_execute_no_consent() {
259        let mut mgr = NodeManager::new();
260        mgr.register_node(Box::new(MockNode::new("n1", vec![Capability::Shell])));
261        // No consent granted
262
263        let task = NodeTask::new(Capability::Shell, "echo hello");
264        let result = mgr.execute_on_best(task).await;
265        assert!(result.is_err());
266    }
267
268    #[test]
269    fn test_manager_find_with_consent() {
270        let mut mgr = NodeManager::new();
271        let n1 = NodeId::new("n1");
272        let _n2 = NodeId::new("n2");
273        mgr.register_node(Box::new(MockNode::new("n1", vec![Capability::Shell])));
274        mgr.register_node(Box::new(MockNode::new("n2", vec![Capability::Shell])));
275
276        let mut consent = ConsentStore::new();
277        consent.grant(&n1, Capability::Shell);
278        // n2 not consented
279
280        let found = mgr.find_capable_with_consent(&Capability::Shell, &consent);
281        assert_eq!(found.len(), 1);
282        assert_eq!(*found[0], n1);
283    }
284
285    #[test]
286    fn test_manager_find_without_consent_excluded() {
287        let mut mgr = NodeManager::new();
288        mgr.register_node(Box::new(MockNode::new("n1", vec![Capability::Shell])));
289
290        let consent = ConsentStore::new(); // No grants at all
291        let found = mgr.find_capable_with_consent(&Capability::Shell, &consent);
292        assert!(found.is_empty());
293    }
294
295    #[tokio::test]
296    async fn test_manager_broadcast_message() {
297        let mut mgr = NodeManager::new();
298        mgr.register_node(Box::new(MockNode::new("n1", vec![Capability::Shell])));
299        mgr.register_node(Box::new(MockNode::new("n2", vec![Capability::FileSystem])));
300
301        let results = mgr.broadcast_message(&NodeMessage::Ping).await;
302        assert_eq!(results.len(), 2);
303        for (_, response) in &results {
304            match response {
305                Some(NodeMessage::Pong { uptime_secs }) => assert_eq!(*uptime_secs, 0),
306                _ => panic!("Expected Pong"),
307            }
308        }
309    }
310
311    #[test]
312    fn test_manager_capabilities_map() {
313        let mut mgr = NodeManager::new();
314        mgr.register_node(Box::new(MockNode::new(
315            "n1",
316            vec![Capability::Shell, Capability::FileSystem],
317        )));
318        mgr.register_node(Box::new(MockNode::new("n2", vec![Capability::Screenshot])));
319
320        let map = mgr.node_capabilities_map();
321        assert_eq!(map.len(), 2);
322        assert_eq!(map[&NodeId::new("n1")].len(), 2);
323        assert_eq!(map[&NodeId::new("n2")].len(), 1);
324    }
325}