oxify_authz/
edge.rs

1//! # Edge Computing for Authorization
2//!
3//! Lightweight authorization engine designed for edge deployment.
4//! Enables low-latency authorization checks at the edge by maintaining
5//! a synchronized subset of authorization data.
6//!
7//! ## Features
8//!
9//! - **Lightweight Engine**: In-memory engine optimized for edge workers
10//! - **Tuple Synchronization**: Automatic sync from central database
11//! - **CRDT Conflict Resolution**: Handles concurrent updates across edge nodes
12//! - **Selective Sync**: Only sync relevant namespaces/tenants
13//!
14//! ## Example
15//!
16//! ```rust,no_run
17//! use oxify_authz::edge::{EdgeEngine, EdgeConfig, SyncConfig};
18//!
19//! #[tokio::main]
20//! async fn main() -> Result<(), Box<dyn std::error::Error>> {
21//!     let config = EdgeConfig {
22//!         central_db_url: "postgres://central-db/authz".to_string(),
23//!         sync_interval_secs: 30,
24//!         sync_config: SyncConfig::Namespaces(vec!["document".to_string()]),
25//!     };
26//!
27//!     let engine = EdgeEngine::new(config).await?;
28//!     engine.start_sync().await?;
29//!
30//!     // Perform fast authorization checks at the edge
31//!     let allowed = engine.check("document", "123", "viewer", "user:alice").await?;
32//!
33//!     Ok(())
34//! }
35//! ```
36
37use crate::{memory::InMemoryRebacManager, CheckRequest, RelationTuple, Result, Subject};
38use serde::{Deserialize, Serialize};
39use std::collections::HashMap;
40use std::sync::Arc;
41use std::time::{SystemTime, UNIX_EPOCH};
42use tokio::sync::RwLock;
43use tokio::time::{interval, Duration};
44
45/// Configuration for edge engine
46#[derive(Debug, Clone, Serialize, Deserialize)]
47pub struct EdgeConfig {
48    /// URL of the central database to sync from
49    pub central_db_url: String,
50    /// Sync interval in seconds
51    pub sync_interval_secs: u64,
52    /// What to sync from central database
53    pub sync_config: SyncConfig,
54}
55
56impl Default for EdgeConfig {
57    fn default() -> Self {
58        Self {
59            central_db_url: "postgres://localhost/authz".to_string(),
60            sync_interval_secs: 30,
61            sync_config: SyncConfig::All,
62        }
63    }
64}
65
66/// Defines what data to sync from central database
67#[derive(Debug, Clone, Serialize, Deserialize)]
68pub enum SyncConfig {
69    /// Sync all tuples
70    All,
71    /// Sync specific namespaces only
72    Namespaces(Vec<String>),
73    /// Sync specific tenants only
74    Tenants(Vec<String>),
75    /// Sync specific namespaces for specific tenants
76    NamespacesAndTenants {
77        namespaces: Vec<String>,
78        tenants: Vec<String>,
79    },
80}
81
82/// A tuple with CRDT metadata for conflict resolution
83#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
84pub struct CrdtTuple {
85    /// The actual relation tuple
86    pub tuple: RelationTuple,
87    /// Lamport timestamp for ordering
88    pub timestamp: u64,
89    /// Node ID that created this tuple
90    pub node_id: String,
91    /// Whether this tuple is a tombstone (deleted)
92    pub is_tombstone: bool,
93}
94
95impl CrdtTuple {
96    /// Create a new CRDT tuple
97    pub fn new(tuple: RelationTuple, node_id: String) -> Self {
98        Self {
99            tuple,
100            timestamp: Self::current_timestamp(),
101            node_id,
102            is_tombstone: false,
103        }
104    }
105
106    /// Create a tombstone for deletion
107    pub fn tombstone(tuple: RelationTuple, node_id: String) -> Self {
108        Self {
109            tuple,
110            timestamp: Self::current_timestamp(),
111            node_id,
112            is_tombstone: true,
113        }
114    }
115
116    fn current_timestamp() -> u64 {
117        SystemTime::now()
118            .duration_since(UNIX_EPOCH)
119            .unwrap()
120            .as_micros() as u64
121    }
122}
123
124/// CRDT-based conflict resolver using Last-Write-Wins (LWW) strategy
125#[derive(Debug)]
126pub struct CrdtResolver {
127    /// Store of CRDT tuples indexed by tuple key
128    store: Arc<RwLock<HashMap<String, CrdtTuple>>>,
129}
130
131impl CrdtResolver {
132    /// Create a new CRDT resolver
133    pub fn new() -> Self {
134        Self {
135            store: Arc::new(RwLock::new(HashMap::new())),
136        }
137    }
138
139    /// Get a unique key for a tuple
140    fn tuple_key(tuple: &RelationTuple) -> String {
141        format!(
142            "{}:{}:{}:{}",
143            tuple.namespace,
144            tuple.object_id,
145            tuple.relation,
146            match &tuple.subject {
147                Subject::User(id) => format!("user:{}", id),
148                Subject::UserSet {
149                    namespace,
150                    object_id,
151                    relation,
152                } => format!("set:{}:{}:{}", namespace, object_id, relation),
153            }
154        )
155    }
156
157    /// Merge a CRDT tuple using LWW strategy
158    pub async fn merge(&self, crdt_tuple: CrdtTuple) -> bool {
159        let key = Self::tuple_key(&crdt_tuple.tuple);
160        let mut store = self.store.write().await;
161
162        match store.get(&key) {
163            Some(existing) => {
164                // Last-Write-Wins: Compare timestamps
165                if crdt_tuple.timestamp > existing.timestamp
166                    || (crdt_tuple.timestamp == existing.timestamp
167                        && crdt_tuple.node_id > existing.node_id)
168                {
169                    store.insert(key, crdt_tuple);
170                    true
171                } else {
172                    false
173                }
174            }
175            None => {
176                store.insert(key, crdt_tuple);
177                true
178            }
179        }
180    }
181
182    /// Get all active (non-tombstone) tuples
183    pub async fn active_tuples(&self) -> Vec<RelationTuple> {
184        let store = self.store.read().await;
185        store
186            .values()
187            .filter(|ct| !ct.is_tombstone)
188            .map(|ct| ct.tuple.clone())
189            .collect()
190    }
191
192    /// Get CRDT metadata for a tuple
193    pub async fn get_crdt(&self, tuple: &RelationTuple) -> Option<CrdtTuple> {
194        let key = Self::tuple_key(tuple);
195        let store = self.store.read().await;
196        store.get(&key).cloned()
197    }
198
199    /// Clear all tombstones older than retention period
200    pub async fn gc_tombstones(&self, retention_micros: u64) {
201        let mut store = self.store.write().await;
202        let cutoff = CrdtTuple::current_timestamp() - retention_micros;
203
204        store.retain(|_, ct| !ct.is_tombstone || ct.timestamp > cutoff);
205    }
206}
207
208impl Default for CrdtResolver {
209    fn default() -> Self {
210        Self::new()
211    }
212}
213
214/// Statistics for edge engine
215#[derive(Debug, Clone, Serialize, Deserialize, Default)]
216pub struct EdgeStats {
217    /// Number of tuples synced from central
218    pub tuples_synced: u64,
219    /// Number of conflicts resolved
220    pub conflicts_resolved: u64,
221    /// Number of authorization checks performed
222    pub checks_performed: u64,
223    /// Last successful sync timestamp
224    pub last_sync_timestamp: Option<u64>,
225    /// Number of sync failures
226    pub sync_failures: u64,
227}
228
229/// Lightweight authorization engine for edge deployment
230pub struct EdgeEngine {
231    /// In-memory authorization manager
232    manager: InMemoryRebacManager,
233    /// CRDT resolver for conflict resolution
234    crdt: CrdtResolver,
235    /// Configuration
236    config: EdgeConfig,
237    /// Unique node ID
238    node_id: String,
239    /// Statistics
240    stats: Arc<RwLock<EdgeStats>>,
241    /// Sync task handle
242    sync_handle: Arc<RwLock<Option<tokio::task::JoinHandle<()>>>>,
243}
244
245impl EdgeEngine {
246    /// Create a new edge engine
247    pub async fn new(config: EdgeConfig) -> Result<Self> {
248        let node_id = uuid::Uuid::new_v4().to_string();
249
250        Ok(Self {
251            manager: InMemoryRebacManager::new(),
252            crdt: CrdtResolver::new(),
253            config,
254            node_id,
255            stats: Arc::new(RwLock::new(EdgeStats::default())),
256            sync_handle: Arc::new(RwLock::new(None)),
257        })
258    }
259
260    /// Get the node ID
261    pub fn node_id(&self) -> &str {
262        &self.node_id
263    }
264
265    /// Check authorization (fast, in-memory)
266    pub async fn check(
267        &self,
268        namespace: &str,
269        object_id: &str,
270        relation: &str,
271        subject_id: &str,
272    ) -> Result<bool> {
273        let request = CheckRequest {
274            namespace: namespace.to_string(),
275            object_id: object_id.to_string(),
276            relation: relation.to_string(),
277            subject: Subject::User(subject_id.to_string()),
278            context: None,
279        };
280
281        let response = self.manager.check(&request).await?;
282
283        // Update stats
284        let mut stats = self.stats.write().await;
285        stats.checks_performed += 1;
286
287        Ok(response.allowed)
288    }
289
290    /// Write a tuple (with CRDT metadata)
291    pub async fn write_tuple(&self, tuple: RelationTuple) -> Result<()> {
292        let crdt_tuple = CrdtTuple::new(tuple.clone(), self.node_id.clone());
293
294        // Merge into CRDT store
295        self.crdt.merge(crdt_tuple).await;
296
297        // Write to in-memory manager
298        self.manager.add_tuple(tuple).await
299    }
300
301    /// Delete a tuple (creates tombstone)
302    pub async fn delete_tuple(&self, tuple: RelationTuple) -> Result<()> {
303        let tombstone = CrdtTuple::tombstone(tuple.clone(), self.node_id.clone());
304
305        // Merge tombstone into CRDT store
306        self.crdt.merge(tombstone).await;
307
308        // Remove from in-memory manager
309        self.manager.remove_tuple(&tuple).await
310    }
311
312    /// Sync tuples from central database
313    pub async fn sync_from_central(&self) -> Result<()> {
314        // In a real implementation, this would connect to the central database
315        // and fetch tuples based on sync_config
316        // For now, we'll simulate this
317
318        // TODO: Implement actual database sync using sqlx
319        // This is a placeholder that shows the structure
320
321        let mut stats = self.stats.write().await;
322        stats.last_sync_timestamp = Some(CrdtTuple::current_timestamp());
323
324        Ok(())
325    }
326
327    /// Start background sync task
328    pub async fn start_sync(&self) -> Result<()> {
329        let mut handle_guard = self.sync_handle.write().await;
330
331        // Stop existing sync task if running
332        if let Some(handle) = handle_guard.take() {
333            handle.abort();
334        }
335
336        let interval_secs = self.config.sync_interval_secs;
337        let stats = self.stats.clone();
338
339        // Create sync task
340        let handle = tokio::spawn(async move {
341            let mut ticker = interval(Duration::from_secs(interval_secs));
342
343            loop {
344                ticker.tick().await;
345
346                // Perform sync (placeholder)
347                // In real implementation, call sync_from_central()
348
349                let mut s = stats.write().await;
350                s.last_sync_timestamp = Some(CrdtTuple::current_timestamp());
351            }
352        });
353
354        *handle_guard = Some(handle);
355
356        Ok(())
357    }
358
359    /// Stop background sync task
360    pub async fn stop_sync(&self) {
361        let mut handle_guard = self.sync_handle.write().await;
362
363        if let Some(handle) = handle_guard.take() {
364            handle.abort();
365        }
366    }
367
368    /// Get edge engine statistics
369    pub async fn stats(&self) -> EdgeStats {
370        self.stats.read().await.clone()
371    }
372
373    /// Merge tuples from another edge node (for gossip protocol)
374    pub async fn merge_remote_tuples(&self, remote_tuples: Vec<CrdtTuple>) -> Result<u64> {
375        let mut conflicts = 0;
376
377        for crdt_tuple in remote_tuples {
378            let merged = self.crdt.merge(crdt_tuple.clone()).await;
379
380            if merged {
381                if crdt_tuple.is_tombstone {
382                    // Remove tuple if it's a tombstone
383                    let _ = self.manager.remove_tuple(&crdt_tuple.tuple).await;
384                } else {
385                    // Add tuple if it's active
386                    let _ = self.manager.add_tuple(crdt_tuple.tuple.clone()).await;
387                }
388                conflicts += 1;
389            }
390        }
391
392        // Update stats
393        let mut stats = self.stats.write().await;
394        stats.conflicts_resolved += conflicts;
395
396        Ok(conflicts)
397    }
398
399    /// Garbage collect old tombstones
400    pub async fn gc_tombstones(&self, retention_secs: u64) -> Result<()> {
401        self.crdt.gc_tombstones(retention_secs * 1_000_000).await;
402        Ok(())
403    }
404}
405
406#[cfg(test)]
407mod tests {
408    use super::*;
409
410    #[tokio::test]
411    async fn test_edge_config_default() {
412        let config = EdgeConfig::default();
413        assert_eq!(config.sync_interval_secs, 30);
414        assert!(matches!(config.sync_config, SyncConfig::All));
415    }
416
417    #[tokio::test]
418    async fn test_crdt_tuple_creation() {
419        let tuple = RelationTuple::new(
420            "document",
421            "viewer",
422            "123",
423            Subject::User("alice".to_string()),
424        );
425
426        let crdt = CrdtTuple::new(tuple.clone(), "node1".to_string());
427
428        assert_eq!(crdt.tuple, tuple);
429        assert_eq!(crdt.node_id, "node1");
430        assert!(!crdt.is_tombstone);
431        assert!(crdt.timestamp > 0);
432    }
433
434    #[tokio::test]
435    async fn test_crdt_tombstone() {
436        let tuple = RelationTuple::new(
437            "document",
438            "viewer",
439            "123",
440            Subject::User("alice".to_string()),
441        );
442
443        let tombstone = CrdtTuple::tombstone(tuple.clone(), "node1".to_string());
444
445        assert_eq!(tombstone.tuple, tuple);
446        assert!(tombstone.is_tombstone);
447    }
448
449    #[tokio::test]
450    async fn test_crdt_resolver_lww() {
451        let resolver = CrdtResolver::new();
452
453        let tuple = RelationTuple::new(
454            "document",
455            "viewer",
456            "123",
457            Subject::User("alice".to_string()),
458        );
459
460        // First write
461        let mut crdt1 = CrdtTuple::new(tuple.clone(), "node1".to_string());
462        crdt1.timestamp = 100;
463        assert!(resolver.merge(crdt1.clone()).await);
464
465        // Later write should win
466        let mut crdt2 = CrdtTuple::new(tuple.clone(), "node2".to_string());
467        crdt2.timestamp = 200;
468        assert!(resolver.merge(crdt2.clone()).await);
469
470        // Earlier write should be rejected
471        let mut crdt3 = CrdtTuple::new(tuple.clone(), "node3".to_string());
472        crdt3.timestamp = 150;
473        assert!(!resolver.merge(crdt3).await);
474
475        // Verify latest is stored
476        let stored = resolver.get_crdt(&tuple).await.unwrap();
477        assert_eq!(stored.timestamp, 200);
478        assert_eq!(stored.node_id, "node2");
479    }
480
481    #[tokio::test]
482    async fn test_crdt_resolver_tie_breaking() {
483        let resolver = CrdtResolver::new();
484
485        let tuple = RelationTuple::new(
486            "document",
487            "viewer",
488            "123",
489            Subject::User("alice".to_string()),
490        );
491
492        // Same timestamp, node_id used for tie-breaking
493        let mut crdt1 = CrdtTuple::new(tuple.clone(), "node_a".to_string());
494        crdt1.timestamp = 100;
495        resolver.merge(crdt1).await;
496
497        let mut crdt2 = CrdtTuple::new(tuple.clone(), "node_z".to_string());
498        crdt2.timestamp = 100; // Same timestamp
499        assert!(resolver.merge(crdt2).await); // node_z > node_a
500
501        let stored = resolver.get_crdt(&tuple).await.unwrap();
502        assert_eq!(stored.node_id, "node_z");
503    }
504
505    #[tokio::test]
506    async fn test_edge_engine_creation() {
507        let config = EdgeConfig::default();
508        let engine = EdgeEngine::new(config).await.unwrap();
509
510        assert!(!engine.node_id().is_empty());
511
512        let stats = engine.stats().await;
513        assert_eq!(stats.checks_performed, 0);
514        assert_eq!(stats.tuples_synced, 0);
515    }
516
517    #[tokio::test]
518    async fn test_edge_engine_write_and_check() {
519        let config = EdgeConfig::default();
520        let engine = EdgeEngine::new(config).await.unwrap();
521
522        // Write a tuple
523        let tuple = RelationTuple::new(
524            "document",
525            "viewer",
526            "123",
527            Subject::User("alice".to_string()),
528        );
529        engine.write_tuple(tuple).await.unwrap();
530
531        // Check authorization
532        let allowed = engine
533            .check("document", "123", "viewer", "alice")
534            .await
535            .unwrap();
536        assert!(allowed);
537
538        let stats = engine.stats().await;
539        assert_eq!(stats.checks_performed, 1);
540    }
541
542    #[tokio::test]
543    async fn test_edge_engine_delete() {
544        let config = EdgeConfig::default();
545        let engine = EdgeEngine::new(config).await.unwrap();
546
547        // Write and then delete
548        let tuple = RelationTuple::new(
549            "document",
550            "viewer",
551            "123",
552            Subject::User("alice".to_string()),
553        );
554        engine.write_tuple(tuple.clone()).await.unwrap();
555        engine.delete_tuple(tuple.clone()).await.unwrap();
556
557        // Check that tuple is deleted
558        let allowed = engine
559            .check("document", "123", "viewer", "alice")
560            .await
561            .unwrap();
562        assert!(!allowed);
563
564        // Verify tombstone exists in CRDT
565        let crdt = engine.crdt.get_crdt(&tuple).await.unwrap();
566        assert!(crdt.is_tombstone);
567    }
568
569    #[tokio::test]
570    async fn test_merge_remote_tuples() {
571        let config = EdgeConfig::default();
572        let engine = EdgeEngine::new(config).await.unwrap();
573
574        let tuple = RelationTuple::new(
575            "document",
576            "viewer",
577            "123",
578            Subject::User("bob".to_string()),
579        );
580
581        let remote_crdt = CrdtTuple::new(tuple.clone(), "remote-node".to_string());
582
583        let conflicts = engine.merge_remote_tuples(vec![remote_crdt]).await.unwrap();
584
585        assert_eq!(conflicts, 1);
586
587        // Verify tuple was merged
588        let allowed = engine
589            .check("document", "123", "viewer", "bob")
590            .await
591            .unwrap();
592        assert!(allowed);
593
594        let stats = engine.stats().await;
595        assert_eq!(stats.conflicts_resolved, 1);
596    }
597
598    #[tokio::test]
599    async fn test_gc_tombstones() {
600        let resolver = CrdtResolver::new();
601
602        let tuple = RelationTuple::new(
603            "document",
604            "viewer",
605            "123",
606            Subject::User("alice".to_string()),
607        );
608
609        // Create old tombstone
610        let mut old_tombstone = CrdtTuple::tombstone(tuple.clone(), "node1".to_string());
611        old_tombstone.timestamp = 100; // Very old
612        resolver.merge(old_tombstone).await;
613
614        // GC with short retention
615        resolver.gc_tombstones(1_000_000).await; // 1 second
616
617        // Tombstone should be removed
618        let stored = resolver.get_crdt(&tuple).await;
619        assert!(stored.is_none());
620    }
621}