mecha10_core/
auth.rs

1//! Authorization and Access Control
2//!
3//! Provides role-based access control (RBAC) for pub/sub topics.
4//! Enables controlling which nodes can publish/subscribe to specific topics.
5//!
6//! # Architecture
7//!
8//! - Simple role-based permissions
9//! - Wildcard pattern matching for topics
10//! - Policies stored in Redis
11//! - Optional enforcement (can be disabled for development)
12//!
13//! # Example
14//!
15//! ```rust
16//! use mecha10::prelude::*;
17//! use mecha10::auth::{AuthExt, Policy, Permission};
18//!
19//! # async fn example(ctx: &Context) -> Result<()> {
20//! // Define a policy
21//! let policy = Policy::new("camera_driver")
22//!     .allow(Permission::Publish, "/sensors/camera/*")
23//!     .allow(Permission::Subscribe, "/control/camera/*");
24//!
25//! ctx.register_policy(&policy).await?;
26//!
27//! // Assign role to node
28//! ctx.assign_role("camera-1", "camera_driver").await?;
29//!
30//! // Check permission (automatically done in pub/sub)
31//! let allowed = ctx.check_permission(
32//!     "camera-1",
33//!     Permission::Publish,
34//!     "/sensors/camera/rgb"
35//! ).await?;
36//!
37//! # Ok(())
38//! # }
39//! ```
40
41use crate::context::Context;
42use crate::error::{Mecha10Error, Result};
43use serde::{Deserialize, Serialize};
44
45/// Permission type
46#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
47#[serde(rename_all = "lowercase")]
48pub enum Permission {
49    /// Permission to publish to a topic
50    Publish,
51    /// Permission to subscribe to a topic
52    Subscribe,
53}
54
55impl Permission {
56    #[allow(dead_code)]
57    fn as_str(&self) -> &str {
58        match self {
59            Permission::Publish => "publish",
60            Permission::Subscribe => "subscribe",
61        }
62    }
63}
64
65/// Access control rule
66#[derive(Debug, Clone, Serialize, Deserialize)]
67pub struct Rule {
68    /// Permission type
69    pub permission: Permission,
70
71    /// Topic pattern (supports wildcards: /sensors/*)
72    pub topic_pattern: String,
73}
74
75impl Rule {
76    /// Create a new rule
77    pub fn new(permission: Permission, topic_pattern: impl Into<String>) -> Self {
78        Self {
79            permission,
80            topic_pattern: topic_pattern.into(),
81        }
82    }
83
84    /// Check if a topic matches this rule's pattern
85    pub fn matches(&self, topic: &str) -> bool {
86        matches_pattern(&self.topic_pattern, topic)
87    }
88}
89
90/// Policy defining permissions for a role
91#[derive(Debug, Clone, Serialize, Deserialize)]
92pub struct Policy {
93    /// Role name
94    pub role: String,
95
96    /// Access control rules
97    pub rules: Vec<Rule>,
98
99    /// Policy description
100    #[serde(default)]
101    pub description: String,
102}
103
104impl Policy {
105    /// Create a new policy for a role
106    pub fn new(role: impl Into<String>) -> Self {
107        Self {
108            role: role.into(),
109            rules: Vec::new(),
110            description: String::new(),
111        }
112    }
113
114    /// Set description
115    pub fn with_description(mut self, description: impl Into<String>) -> Self {
116        self.description = description.into();
117        self
118    }
119
120    /// Add a permission rule
121    pub fn allow(mut self, permission: Permission, topic_pattern: impl Into<String>) -> Self {
122        self.rules.push(Rule::new(permission, topic_pattern));
123        self
124    }
125
126    /// Add publish permission
127    pub fn allow_publish(self, topic_pattern: impl Into<String>) -> Self {
128        self.allow(Permission::Publish, topic_pattern)
129    }
130
131    /// Add subscribe permission
132    pub fn allow_subscribe(self, topic_pattern: impl Into<String>) -> Self {
133        self.allow(Permission::Subscribe, topic_pattern)
134    }
135
136    /// Check if this policy allows a specific action
137    pub fn allows(&self, permission: Permission, topic: &str) -> bool {
138        self.rules
139            .iter()
140            .any(|rule| rule.permission == permission && rule.matches(topic))
141    }
142
143    /// Get Redis key for this policy
144    fn redis_key(&self) -> String {
145        format!("mecha10:auth:policies:{}", self.role)
146    }
147
148    /// Get Redis key for policies list
149    fn redis_list_key() -> String {
150        "mecha10:auth:policies:list".to_string()
151    }
152
153    /// Get Redis key for node role assignment
154    fn redis_role_key(node_id: &str) -> String {
155        format!("mecha10:auth:roles:{}", node_id)
156    }
157}
158
159/// Authorization extension trait for Context
160pub trait AuthExt {
161    /// Register a policy
162    ///
163    /// # Arguments
164    ///
165    /// * `policy` - Policy to register
166    ///
167    /// # Example
168    ///
169    /// ```rust
170    /// use mecha10::prelude::*;
171    /// use mecha10::auth::{AuthExt, Policy, Permission};
172    ///
173    /// # async fn example(ctx: &Context) -> Result<()> {
174    /// let policy = Policy::new("camera_driver")
175    ///     .allow_publish("/sensors/camera/*")
176    ///     .allow_subscribe("/control/camera/*");
177    ///
178    /// ctx.register_policy(&policy).await?;
179    /// # Ok(())
180    /// # }
181    /// ```
182    fn register_policy(&self, policy: &Policy) -> impl std::future::Future<Output = Result<()>> + Send;
183
184    /// Get a policy by role name
185    ///
186    /// # Arguments
187    ///
188    /// * `role` - Role name
189    fn get_policy(&self, role: &str) -> impl std::future::Future<Output = Result<Option<Policy>>> + Send;
190
191    /// List all registered policies
192    fn list_policies(&self) -> impl std::future::Future<Output = Result<Vec<Policy>>> + Send;
193
194    /// Assign a role to a node
195    ///
196    /// # Arguments
197    ///
198    /// * `node_id` - Node identifier
199    /// * `role` - Role to assign
200    fn assign_role(&self, node_id: &str, role: &str) -> impl std::future::Future<Output = Result<()>> + Send;
201
202    /// Get the role assigned to a node
203    ///
204    /// # Arguments
205    ///
206    /// * `node_id` - Node identifier
207    fn get_role(&self, node_id: &str) -> impl std::future::Future<Output = Result<Option<String>>> + Send;
208
209    /// Check if a node has permission to perform an action on a topic
210    ///
211    /// # Arguments
212    ///
213    /// * `node_id` - Node identifier
214    /// * `permission` - Permission type (Publish/Subscribe)
215    /// * `topic` - Topic path
216    ///
217    /// # Returns
218    ///
219    /// `true` if permission is granted, `false` otherwise
220    fn check_permission(
221        &self,
222        node_id: &str,
223        permission: Permission,
224        topic: &str,
225    ) -> impl std::future::Future<Output = Result<bool>> + Send;
226
227    /// Delete a policy
228    ///
229    /// # Arguments
230    ///
231    /// * `role` - Role name
232    fn delete_policy(&self, role: &str) -> impl std::future::Future<Output = Result<()>> + Send;
233
234    /// Remove role assignment from a node
235    ///
236    /// # Arguments
237    ///
238    /// * `node_id` - Node identifier
239    fn unassign_role(&self, node_id: &str) -> impl std::future::Future<Output = Result<()>> + Send;
240}
241
242impl AuthExt for Context {
243    async fn register_policy(&self, policy: &Policy) -> Result<()> {
244        #[cfg(feature = "messaging")]
245        {
246            use redis::AsyncCommands;
247
248            let redis_url = Context::get_redis_url()?;
249            let client = redis::Client::open(redis_url.as_str()).map_err(|e| Mecha10Error::MessagingError {
250                message: format!("Failed to connect to Redis: {}", e),
251                suggestion: "Ensure Redis is running".to_string(),
252            })?;
253
254            let mut conn =
255                client
256                    .get_multiplexed_async_connection()
257                    .await
258                    .map_err(|e| Mecha10Error::MessagingError {
259                        message: format!("Failed to get Redis connection: {}", e),
260                        suggestion: "Ensure Redis is running".to_string(),
261                    })?;
262
263            // Serialize policy
264            let policy_json = serde_json::to_string(&policy)
265                .map_err(|e| Mecha10Error::Other(format!("Failed to serialize policy: {}", e)))?;
266
267            // Store policy
268            let key = policy.redis_key();
269            conn.set::<_, _, ()>(&key, &policy_json)
270                .await
271                .map_err(|e| Mecha10Error::MessagingError {
272                    message: format!("Failed to register policy: {}", e),
273                    suggestion: "Check Redis connection".to_string(),
274                })?;
275
276            // Add to policies list
277            let list_key = Policy::redis_list_key();
278            conn.sadd::<_, _, ()>(&list_key, &policy.role)
279                .await
280                .map_err(|e| Mecha10Error::MessagingError {
281                    message: format!("Failed to update policy list: {}", e),
282                    suggestion: "Check Redis connection".to_string(),
283                })?;
284
285            Ok(())
286        }
287
288        #[cfg(not(feature = "messaging"))]
289        {
290            Err(Mecha10Error::Other("Messaging feature not enabled".to_string()))
291        }
292    }
293
294    async fn get_policy(&self, role: &str) -> Result<Option<Policy>> {
295        #[cfg(feature = "messaging")]
296        {
297            use redis::AsyncCommands;
298
299            let redis_url = Context::get_redis_url()?;
300            let client = redis::Client::open(redis_url.as_str()).map_err(|e| Mecha10Error::MessagingError {
301                message: format!("Failed to connect to Redis: {}", e),
302                suggestion: "Ensure Redis is running".to_string(),
303            })?;
304
305            let mut conn =
306                client
307                    .get_multiplexed_async_connection()
308                    .await
309                    .map_err(|e| Mecha10Error::MessagingError {
310                        message: format!("Failed to get Redis connection: {}", e),
311                        suggestion: "Ensure Redis is running".to_string(),
312                    })?;
313
314            let key = format!("mecha10:auth:policies:{}", role);
315            let json: Option<String> = conn.get(&key).await.map_err(|e| Mecha10Error::MessagingError {
316                message: format!("Failed to get policy: {}", e),
317                suggestion: "Check Redis connection".to_string(),
318            })?;
319
320            if let Some(json) = json {
321                let policy = serde_json::from_str::<Policy>(&json)
322                    .map_err(|e| Mecha10Error::Other(format!("Failed to parse policy: {}", e)))?;
323                Ok(Some(policy))
324            } else {
325                Ok(None)
326            }
327        }
328
329        #[cfg(not(feature = "messaging"))]
330        {
331            Err(Mecha10Error::Other("Messaging feature not enabled".to_string()))
332        }
333    }
334
335    async fn list_policies(&self) -> Result<Vec<Policy>> {
336        #[cfg(feature = "messaging")]
337        {
338            use redis::AsyncCommands;
339
340            let redis_url = Context::get_redis_url()?;
341            let client = redis::Client::open(redis_url.as_str()).map_err(|e| Mecha10Error::MessagingError {
342                message: format!("Failed to connect to Redis: {}", e),
343                suggestion: "Ensure Redis is running".to_string(),
344            })?;
345
346            let mut conn =
347                client
348                    .get_multiplexed_async_connection()
349                    .await
350                    .map_err(|e| Mecha10Error::MessagingError {
351                        message: format!("Failed to get Redis connection: {}", e),
352                        suggestion: "Ensure Redis is running".to_string(),
353                    })?;
354
355            // Get all role names
356            let list_key = Policy::redis_list_key();
357            let roles: Vec<String> = conn
358                .smembers(&list_key)
359                .await
360                .map_err(|e| Mecha10Error::MessagingError {
361                    message: format!("Failed to get policy list: {}", e),
362                    suggestion: "Check Redis connection".to_string(),
363                })?;
364
365            // Get each policy
366            let mut policies = Vec::new();
367            for role in roles {
368                if let Some(policy) = self.get_policy(&role).await? {
369                    policies.push(policy);
370                }
371            }
372
373            Ok(policies)
374        }
375
376        #[cfg(not(feature = "messaging"))]
377        {
378            Err(Mecha10Error::Other("Messaging feature not enabled".to_string()))
379        }
380    }
381
382    async fn assign_role(&self, node_id: &str, role: &str) -> Result<()> {
383        #[cfg(feature = "messaging")]
384        {
385            use redis::AsyncCommands;
386
387            let redis_url = Context::get_redis_url()?;
388            let client = redis::Client::open(redis_url.as_str()).map_err(|e| Mecha10Error::MessagingError {
389                message: format!("Failed to connect to Redis: {}", e),
390                suggestion: "Ensure Redis is running".to_string(),
391            })?;
392
393            let mut conn =
394                client
395                    .get_multiplexed_async_connection()
396                    .await
397                    .map_err(|e| Mecha10Error::MessagingError {
398                        message: format!("Failed to get Redis connection: {}", e),
399                        suggestion: "Ensure Redis is running".to_string(),
400                    })?;
401
402            let key = Policy::redis_role_key(node_id);
403            conn.set::<_, _, ()>(&key, role)
404                .await
405                .map_err(|e| Mecha10Error::MessagingError {
406                    message: format!("Failed to assign role: {}", e),
407                    suggestion: "Check Redis connection".to_string(),
408                })?;
409
410            Ok(())
411        }
412
413        #[cfg(not(feature = "messaging"))]
414        {
415            Err(Mecha10Error::Other("Messaging feature not enabled".to_string()))
416        }
417    }
418
419    async fn get_role(&self, node_id: &str) -> Result<Option<String>> {
420        #[cfg(feature = "messaging")]
421        {
422            use redis::AsyncCommands;
423
424            let redis_url = Context::get_redis_url()?;
425            let client = redis::Client::open(redis_url.as_str()).map_err(|e| Mecha10Error::MessagingError {
426                message: format!("Failed to connect to Redis: {}", e),
427                suggestion: "Ensure Redis is running".to_string(),
428            })?;
429
430            let mut conn =
431                client
432                    .get_multiplexed_async_connection()
433                    .await
434                    .map_err(|e| Mecha10Error::MessagingError {
435                        message: format!("Failed to get Redis connection: {}", e),
436                        suggestion: "Ensure Redis is running".to_string(),
437                    })?;
438
439            let key = Policy::redis_role_key(node_id);
440            let role: Option<String> = conn.get(&key).await.map_err(|e| Mecha10Error::MessagingError {
441                message: format!("Failed to get role: {}", e),
442                suggestion: "Check Redis connection".to_string(),
443            })?;
444
445            Ok(role)
446        }
447
448        #[cfg(not(feature = "messaging"))]
449        {
450            Err(Mecha10Error::Other("Messaging feature not enabled".to_string()))
451        }
452    }
453
454    async fn check_permission(&self, node_id: &str, permission: Permission, topic: &str) -> Result<bool> {
455        // Get node's role
456        let role = match self.get_role(node_id).await? {
457            Some(role) => role,
458            None => return Ok(false), // No role = no permissions
459        };
460
461        // Get policy for role
462        let policy = match self.get_policy(&role).await? {
463            Some(policy) => policy,
464            None => return Ok(false), // No policy = no permissions
465        };
466
467        // Check if policy allows this action
468        Ok(policy.allows(permission, topic))
469    }
470
471    async fn delete_policy(&self, role: &str) -> Result<()> {
472        #[cfg(feature = "messaging")]
473        {
474            use redis::AsyncCommands;
475
476            let redis_url = Context::get_redis_url()?;
477            let client = redis::Client::open(redis_url.as_str()).map_err(|e| Mecha10Error::MessagingError {
478                message: format!("Failed to connect to Redis: {}", e),
479                suggestion: "Ensure Redis is running".to_string(),
480            })?;
481
482            let mut conn =
483                client
484                    .get_multiplexed_async_connection()
485                    .await
486                    .map_err(|e| Mecha10Error::MessagingError {
487                        message: format!("Failed to get Redis connection: {}", e),
488                        suggestion: "Ensure Redis is running".to_string(),
489                    })?;
490
491            // Delete policy
492            let key = format!("mecha10:auth:policies:{}", role);
493            conn.del::<_, ()>(&key)
494                .await
495                .map_err(|e| Mecha10Error::MessagingError {
496                    message: format!("Failed to delete policy: {}", e),
497                    suggestion: "Check Redis connection".to_string(),
498                })?;
499
500            // Remove from list
501            let list_key = Policy::redis_list_key();
502            conn.srem::<_, _, ()>(&list_key, role)
503                .await
504                .map_err(|e| Mecha10Error::MessagingError {
505                    message: format!("Failed to update policy list: {}", e),
506                    suggestion: "Check Redis connection".to_string(),
507                })?;
508
509            Ok(())
510        }
511
512        #[cfg(not(feature = "messaging"))]
513        {
514            Err(Mecha10Error::Other("Messaging feature not enabled".to_string()))
515        }
516    }
517
518    async fn unassign_role(&self, node_id: &str) -> Result<()> {
519        #[cfg(feature = "messaging")]
520        {
521            use redis::AsyncCommands;
522
523            let redis_url = Context::get_redis_url()?;
524            let client = redis::Client::open(redis_url.as_str()).map_err(|e| Mecha10Error::MessagingError {
525                message: format!("Failed to connect to Redis: {}", e),
526                suggestion: "Ensure Redis is running".to_string(),
527            })?;
528
529            let mut conn =
530                client
531                    .get_multiplexed_async_connection()
532                    .await
533                    .map_err(|e| Mecha10Error::MessagingError {
534                        message: format!("Failed to get Redis connection: {}", e),
535                        suggestion: "Ensure Redis is running".to_string(),
536                    })?;
537
538            let key = Policy::redis_role_key(node_id);
539            conn.del::<_, ()>(&key)
540                .await
541                .map_err(|e| Mecha10Error::MessagingError {
542                    message: format!("Failed to unassign role: {}", e),
543                    suggestion: "Check Redis connection".to_string(),
544                })?;
545
546            Ok(())
547        }
548
549        #[cfg(not(feature = "messaging"))]
550        {
551            Err(Mecha10Error::Other("Messaging feature not enabled".to_string()))
552        }
553    }
554}
555
556/// Check if a topic matches a pattern
557///
558/// Uses the same glob matching implementation as aggregated topics,
559/// supporting full glob syntax including *, **, ?, [abc], {a,b}, etc.
560///
561/// # Examples
562///
563/// ```
564/// # use mecha10_core::auth::matches_pattern;
565/// assert!(matches_pattern("/sensors/*", "/sensors/camera"));
566/// assert!(matches_pattern("/sensors/*/rgb", "/sensors/camera/rgb"));
567/// assert!(!matches_pattern("/sensors/*", "/control/motor"));
568///
569/// // Advanced patterns
570/// assert!(matches_pattern("/sensors/**", "/sensors/camera/left/rgb"));
571/// assert!(matches_pattern("/sensors/camera?", "/sensors/camera1"));
572/// assert!(matches_pattern("/sensors/{left,right}", "/sensors/left"));
573/// ```
574pub fn matches_pattern(pattern: &str, topic: &str) -> bool {
575    // Use the centralized glob matching from aggregated module
576    crate::aggregated::pattern::matches_pattern(topic, pattern)
577}