1use crate::{memory::InMemoryRebacManager, CheckRequest, RelationTuple, Result, Subject};
38use serde::{Deserialize, Serialize};
39use std::collections::HashMap;
40use std::sync::Arc;
41use std::time::{SystemTime, UNIX_EPOCH};
42use tokio::sync::RwLock;
43use tokio::time::{interval, Duration};
44
45#[derive(Debug, Clone, Serialize, Deserialize)]
47pub struct EdgeConfig {
48 pub central_db_url: String,
50 pub sync_interval_secs: u64,
52 pub sync_config: SyncConfig,
54}
55
56impl Default for EdgeConfig {
57 fn default() -> Self {
58 Self {
59 central_db_url: "postgres://localhost/authz".to_string(),
60 sync_interval_secs: 30,
61 sync_config: SyncConfig::All,
62 }
63 }
64}
65
66#[derive(Debug, Clone, Serialize, Deserialize)]
68pub enum SyncConfig {
69 All,
71 Namespaces(Vec<String>),
73 Tenants(Vec<String>),
75 NamespacesAndTenants {
77 namespaces: Vec<String>,
78 tenants: Vec<String>,
79 },
80}
81
82#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
84pub struct CrdtTuple {
85 pub tuple: RelationTuple,
87 pub timestamp: u64,
89 pub node_id: String,
91 pub is_tombstone: bool,
93}
94
95impl CrdtTuple {
96 pub fn new(tuple: RelationTuple, node_id: String) -> Self {
98 Self {
99 tuple,
100 timestamp: Self::current_timestamp(),
101 node_id,
102 is_tombstone: false,
103 }
104 }
105
106 pub fn tombstone(tuple: RelationTuple, node_id: String) -> Self {
108 Self {
109 tuple,
110 timestamp: Self::current_timestamp(),
111 node_id,
112 is_tombstone: true,
113 }
114 }
115
116 fn current_timestamp() -> u64 {
117 SystemTime::now()
118 .duration_since(UNIX_EPOCH)
119 .unwrap()
120 .as_micros() as u64
121 }
122}
123
124#[derive(Debug)]
126pub struct CrdtResolver {
127 store: Arc<RwLock<HashMap<String, CrdtTuple>>>,
129}
130
131impl CrdtResolver {
132 pub fn new() -> Self {
134 Self {
135 store: Arc::new(RwLock::new(HashMap::new())),
136 }
137 }
138
139 fn tuple_key(tuple: &RelationTuple) -> String {
141 format!(
142 "{}:{}:{}:{}",
143 tuple.namespace,
144 tuple.object_id,
145 tuple.relation,
146 match &tuple.subject {
147 Subject::User(id) => format!("user:{}", id),
148 Subject::UserSet {
149 namespace,
150 object_id,
151 relation,
152 } => format!("set:{}:{}:{}", namespace, object_id, relation),
153 }
154 )
155 }
156
157 pub async fn merge(&self, crdt_tuple: CrdtTuple) -> bool {
159 let key = Self::tuple_key(&crdt_tuple.tuple);
160 let mut store = self.store.write().await;
161
162 match store.get(&key) {
163 Some(existing) => {
164 if crdt_tuple.timestamp > existing.timestamp
166 || (crdt_tuple.timestamp == existing.timestamp
167 && crdt_tuple.node_id > existing.node_id)
168 {
169 store.insert(key, crdt_tuple);
170 true
171 } else {
172 false
173 }
174 }
175 None => {
176 store.insert(key, crdt_tuple);
177 true
178 }
179 }
180 }
181
182 pub async fn active_tuples(&self) -> Vec<RelationTuple> {
184 let store = self.store.read().await;
185 store
186 .values()
187 .filter(|ct| !ct.is_tombstone)
188 .map(|ct| ct.tuple.clone())
189 .collect()
190 }
191
192 pub async fn get_crdt(&self, tuple: &RelationTuple) -> Option<CrdtTuple> {
194 let key = Self::tuple_key(tuple);
195 let store = self.store.read().await;
196 store.get(&key).cloned()
197 }
198
199 pub async fn gc_tombstones(&self, retention_micros: u64) {
201 let mut store = self.store.write().await;
202 let cutoff = CrdtTuple::current_timestamp() - retention_micros;
203
204 store.retain(|_, ct| !ct.is_tombstone || ct.timestamp > cutoff);
205 }
206}
207
208impl Default for CrdtResolver {
209 fn default() -> Self {
210 Self::new()
211 }
212}
213
214#[derive(Debug, Clone, Serialize, Deserialize, Default)]
216pub struct EdgeStats {
217 pub tuples_synced: u64,
219 pub conflicts_resolved: u64,
221 pub checks_performed: u64,
223 pub last_sync_timestamp: Option<u64>,
225 pub sync_failures: u64,
227}
228
229pub struct EdgeEngine {
231 manager: InMemoryRebacManager,
233 crdt: CrdtResolver,
235 config: EdgeConfig,
237 node_id: String,
239 stats: Arc<RwLock<EdgeStats>>,
241 sync_handle: Arc<RwLock<Option<tokio::task::JoinHandle<()>>>>,
243}
244
245impl EdgeEngine {
246 pub async fn new(config: EdgeConfig) -> Result<Self> {
248 let node_id = uuid::Uuid::new_v4().to_string();
249
250 Ok(Self {
251 manager: InMemoryRebacManager::new(),
252 crdt: CrdtResolver::new(),
253 config,
254 node_id,
255 stats: Arc::new(RwLock::new(EdgeStats::default())),
256 sync_handle: Arc::new(RwLock::new(None)),
257 })
258 }
259
260 pub fn node_id(&self) -> &str {
262 &self.node_id
263 }
264
265 pub async fn check(
267 &self,
268 namespace: &str,
269 object_id: &str,
270 relation: &str,
271 subject_id: &str,
272 ) -> Result<bool> {
273 let request = CheckRequest {
274 namespace: namespace.to_string(),
275 object_id: object_id.to_string(),
276 relation: relation.to_string(),
277 subject: Subject::User(subject_id.to_string()),
278 context: None,
279 };
280
281 let response = self.manager.check(&request).await?;
282
283 let mut stats = self.stats.write().await;
285 stats.checks_performed += 1;
286
287 Ok(response.allowed)
288 }
289
290 pub async fn write_tuple(&self, tuple: RelationTuple) -> Result<()> {
292 let crdt_tuple = CrdtTuple::new(tuple.clone(), self.node_id.clone());
293
294 self.crdt.merge(crdt_tuple).await;
296
297 self.manager.add_tuple(tuple).await
299 }
300
301 pub async fn delete_tuple(&self, tuple: RelationTuple) -> Result<()> {
303 let tombstone = CrdtTuple::tombstone(tuple.clone(), self.node_id.clone());
304
305 self.crdt.merge(tombstone).await;
307
308 self.manager.remove_tuple(&tuple).await
310 }
311
312 pub async fn sync_from_central(&self) -> Result<()> {
314 let mut stats = self.stats.write().await;
322 stats.last_sync_timestamp = Some(CrdtTuple::current_timestamp());
323
324 Ok(())
325 }
326
327 pub async fn start_sync(&self) -> Result<()> {
329 let mut handle_guard = self.sync_handle.write().await;
330
331 if let Some(handle) = handle_guard.take() {
333 handle.abort();
334 }
335
336 let interval_secs = self.config.sync_interval_secs;
337 let stats = self.stats.clone();
338
339 let handle = tokio::spawn(async move {
341 let mut ticker = interval(Duration::from_secs(interval_secs));
342
343 loop {
344 ticker.tick().await;
345
346 let mut s = stats.write().await;
350 s.last_sync_timestamp = Some(CrdtTuple::current_timestamp());
351 }
352 });
353
354 *handle_guard = Some(handle);
355
356 Ok(())
357 }
358
359 pub async fn stop_sync(&self) {
361 let mut handle_guard = self.sync_handle.write().await;
362
363 if let Some(handle) = handle_guard.take() {
364 handle.abort();
365 }
366 }
367
368 pub async fn stats(&self) -> EdgeStats {
370 self.stats.read().await.clone()
371 }
372
373 pub async fn merge_remote_tuples(&self, remote_tuples: Vec<CrdtTuple>) -> Result<u64> {
375 let mut conflicts = 0;
376
377 for crdt_tuple in remote_tuples {
378 let merged = self.crdt.merge(crdt_tuple.clone()).await;
379
380 if merged {
381 if crdt_tuple.is_tombstone {
382 let _ = self.manager.remove_tuple(&crdt_tuple.tuple).await;
384 } else {
385 let _ = self.manager.add_tuple(crdt_tuple.tuple.clone()).await;
387 }
388 conflicts += 1;
389 }
390 }
391
392 let mut stats = self.stats.write().await;
394 stats.conflicts_resolved += conflicts;
395
396 Ok(conflicts)
397 }
398
399 pub async fn gc_tombstones(&self, retention_secs: u64) -> Result<()> {
401 self.crdt.gc_tombstones(retention_secs * 1_000_000).await;
402 Ok(())
403 }
404}
405
406#[cfg(test)]
407mod tests {
408 use super::*;
409
410 #[tokio::test]
411 async fn test_edge_config_default() {
412 let config = EdgeConfig::default();
413 assert_eq!(config.sync_interval_secs, 30);
414 assert!(matches!(config.sync_config, SyncConfig::All));
415 }
416
417 #[tokio::test]
418 async fn test_crdt_tuple_creation() {
419 let tuple = RelationTuple::new(
420 "document",
421 "viewer",
422 "123",
423 Subject::User("alice".to_string()),
424 );
425
426 let crdt = CrdtTuple::new(tuple.clone(), "node1".to_string());
427
428 assert_eq!(crdt.tuple, tuple);
429 assert_eq!(crdt.node_id, "node1");
430 assert!(!crdt.is_tombstone);
431 assert!(crdt.timestamp > 0);
432 }
433
434 #[tokio::test]
435 async fn test_crdt_tombstone() {
436 let tuple = RelationTuple::new(
437 "document",
438 "viewer",
439 "123",
440 Subject::User("alice".to_string()),
441 );
442
443 let tombstone = CrdtTuple::tombstone(tuple.clone(), "node1".to_string());
444
445 assert_eq!(tombstone.tuple, tuple);
446 assert!(tombstone.is_tombstone);
447 }
448
449 #[tokio::test]
450 async fn test_crdt_resolver_lww() {
451 let resolver = CrdtResolver::new();
452
453 let tuple = RelationTuple::new(
454 "document",
455 "viewer",
456 "123",
457 Subject::User("alice".to_string()),
458 );
459
460 let mut crdt1 = CrdtTuple::new(tuple.clone(), "node1".to_string());
462 crdt1.timestamp = 100;
463 assert!(resolver.merge(crdt1.clone()).await);
464
465 let mut crdt2 = CrdtTuple::new(tuple.clone(), "node2".to_string());
467 crdt2.timestamp = 200;
468 assert!(resolver.merge(crdt2.clone()).await);
469
470 let mut crdt3 = CrdtTuple::new(tuple.clone(), "node3".to_string());
472 crdt3.timestamp = 150;
473 assert!(!resolver.merge(crdt3).await);
474
475 let stored = resolver.get_crdt(&tuple).await.unwrap();
477 assert_eq!(stored.timestamp, 200);
478 assert_eq!(stored.node_id, "node2");
479 }
480
481 #[tokio::test]
482 async fn test_crdt_resolver_tie_breaking() {
483 let resolver = CrdtResolver::new();
484
485 let tuple = RelationTuple::new(
486 "document",
487 "viewer",
488 "123",
489 Subject::User("alice".to_string()),
490 );
491
492 let mut crdt1 = CrdtTuple::new(tuple.clone(), "node_a".to_string());
494 crdt1.timestamp = 100;
495 resolver.merge(crdt1).await;
496
497 let mut crdt2 = CrdtTuple::new(tuple.clone(), "node_z".to_string());
498 crdt2.timestamp = 100; assert!(resolver.merge(crdt2).await); let stored = resolver.get_crdt(&tuple).await.unwrap();
502 assert_eq!(stored.node_id, "node_z");
503 }
504
505 #[tokio::test]
506 async fn test_edge_engine_creation() {
507 let config = EdgeConfig::default();
508 let engine = EdgeEngine::new(config).await.unwrap();
509
510 assert!(!engine.node_id().is_empty());
511
512 let stats = engine.stats().await;
513 assert_eq!(stats.checks_performed, 0);
514 assert_eq!(stats.tuples_synced, 0);
515 }
516
517 #[tokio::test]
518 async fn test_edge_engine_write_and_check() {
519 let config = EdgeConfig::default();
520 let engine = EdgeEngine::new(config).await.unwrap();
521
522 let tuple = RelationTuple::new(
524 "document",
525 "viewer",
526 "123",
527 Subject::User("alice".to_string()),
528 );
529 engine.write_tuple(tuple).await.unwrap();
530
531 let allowed = engine
533 .check("document", "123", "viewer", "alice")
534 .await
535 .unwrap();
536 assert!(allowed);
537
538 let stats = engine.stats().await;
539 assert_eq!(stats.checks_performed, 1);
540 }
541
542 #[tokio::test]
543 async fn test_edge_engine_delete() {
544 let config = EdgeConfig::default();
545 let engine = EdgeEngine::new(config).await.unwrap();
546
547 let tuple = RelationTuple::new(
549 "document",
550 "viewer",
551 "123",
552 Subject::User("alice".to_string()),
553 );
554 engine.write_tuple(tuple.clone()).await.unwrap();
555 engine.delete_tuple(tuple.clone()).await.unwrap();
556
557 let allowed = engine
559 .check("document", "123", "viewer", "alice")
560 .await
561 .unwrap();
562 assert!(!allowed);
563
564 let crdt = engine.crdt.get_crdt(&tuple).await.unwrap();
566 assert!(crdt.is_tombstone);
567 }
568
569 #[tokio::test]
570 async fn test_merge_remote_tuples() {
571 let config = EdgeConfig::default();
572 let engine = EdgeEngine::new(config).await.unwrap();
573
574 let tuple = RelationTuple::new(
575 "document",
576 "viewer",
577 "123",
578 Subject::User("bob".to_string()),
579 );
580
581 let remote_crdt = CrdtTuple::new(tuple.clone(), "remote-node".to_string());
582
583 let conflicts = engine.merge_remote_tuples(vec![remote_crdt]).await.unwrap();
584
585 assert_eq!(conflicts, 1);
586
587 let allowed = engine
589 .check("document", "123", "viewer", "bob")
590 .await
591 .unwrap();
592 assert!(allowed);
593
594 let stats = engine.stats().await;
595 assert_eq!(stats.conflicts_resolved, 1);
596 }
597
598 #[tokio::test]
599 async fn test_gc_tombstones() {
600 let resolver = CrdtResolver::new();
601
602 let tuple = RelationTuple::new(
603 "document",
604 "viewer",
605 "123",
606 Subject::User("alice".to_string()),
607 );
608
609 let mut old_tombstone = CrdtTuple::tombstone(tuple.clone(), "node1".to_string());
611 old_tombstone.timestamp = 100; resolver.merge(old_tombstone).await;
613
614 resolver.gc_tombstones(1_000_000).await; let stored = resolver.get_crdt(&tuple).await;
619 assert!(stored.is_none());
620 }
621}