pulseengine_mcp_auth/consent/
manager.rs

1//! Consent management operations and storage
2//!
3//! This module provides the main ConsentManager for handling consent
4//! operations, storage, and audit trails.
5
6use super::{
7    ConsentAuditEntry, ConsentError, ConsentRecord, ConsentStatus, ConsentSummary, ConsentType,
8    LegalBasis,
9};
10use async_trait::async_trait;
11use chrono::Utc;
12use serde_json;
13use std::collections::HashMap;
14use std::sync::Arc;
15use tokio::sync::RwLock;
16use tracing::{debug, info, warn};
17use uuid::Uuid;
18
19/// Parameters for requesting consent
20#[derive(Debug, Clone)]
21pub struct ConsentRequest {
22    pub subject_id: String,
23    pub consent_type: ConsentType,
24    pub legal_basis: LegalBasis,
25    pub purpose: String,
26    pub data_categories: Vec<String>,
27    pub consent_source: String,
28    pub expires_in_days: Option<u32>,
29}
30
31/// Simple key-value storage trait for consent data
32#[async_trait]
33pub trait ConsentStorage: Send + Sync {
34    async fn get(&self, key: &str) -> Result<String, Box<dyn std::error::Error + Send + Sync>>;
35    async fn set(
36        &self,
37        key: &str,
38        value: &str,
39    ) -> Result<(), Box<dyn std::error::Error + Send + Sync>>;
40    async fn delete(&self, key: &str) -> Result<(), Box<dyn std::error::Error + Send + Sync>>;
41    async fn list(&self) -> Result<Vec<String>, Box<dyn std::error::Error + Send + Sync>>;
42}
43
44/// Simple in-memory storage implementation for consent data
45pub struct MemoryConsentStorage {
46    data: Arc<RwLock<HashMap<String, String>>>,
47}
48
49impl Default for MemoryConsentStorage {
50    fn default() -> Self {
51        Self::new()
52    }
53}
54
55impl MemoryConsentStorage {
56    pub fn new() -> Self {
57        Self {
58            data: Arc::new(RwLock::new(HashMap::new())),
59        }
60    }
61}
62
63#[async_trait]
64impl ConsentStorage for MemoryConsentStorage {
65    async fn get(&self, key: &str) -> Result<String, Box<dyn std::error::Error + Send + Sync>> {
66        let data = self.data.read().await;
67        data.get(key).cloned().ok_or_else(|| "Key not found".into())
68    }
69
70    async fn set(
71        &self,
72        key: &str,
73        value: &str,
74    ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
75        let mut data = self.data.write().await;
76        data.insert(key.to_string(), value.to_string());
77        Ok(())
78    }
79
80    async fn delete(&self, key: &str) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
81        let mut data = self.data.write().await;
82        data.remove(key);
83        Ok(())
84    }
85
86    async fn list(&self) -> Result<Vec<String>, Box<dyn std::error::Error + Send + Sync>> {
87        let data = self.data.read().await;
88        Ok(data.keys().cloned().collect())
89    }
90}
91
92/// Consent manager configuration
93#[derive(Debug, Clone)]
94pub struct ConsentConfig {
95    /// Enable consent management
96    pub enabled: bool,
97
98    /// Default consent expiration in days (None = no expiration)
99    pub default_expiration_days: Option<u32>,
100
101    /// Require explicit consent for all operations
102    pub require_explicit_consent: bool,
103
104    /// Enable consent audit logging
105    pub enable_audit_log: bool,
106
107    /// Path for consent audit log
108    pub audit_log_path: Option<std::path::PathBuf>,
109
110    /// Automatic cleanup of expired consents after days
111    pub cleanup_expired_after_days: u32,
112}
113
114impl Default for ConsentConfig {
115    fn default() -> Self {
116        Self {
117            enabled: true,
118            default_expiration_days: Some(365), // 1 year default
119            require_explicit_consent: true,
120            enable_audit_log: true,
121            audit_log_path: None,
122            cleanup_expired_after_days: 90,
123        }
124    }
125}
126
127/// Main consent manager
128pub struct ConsentManager {
129    config: ConsentConfig,
130    storage: Arc<dyn ConsentStorage>,
131    audit_entries: Arc<RwLock<Vec<ConsentAuditEntry>>>,
132    consent_cache: Arc<RwLock<HashMap<String, ConsentRecord>>>,
133}
134
135impl ConsentManager {
136    /// Create a new consent manager
137    pub fn new(config: ConsentConfig, storage: Arc<dyn ConsentStorage>) -> Self {
138        Self {
139            config,
140            storage,
141            audit_entries: Arc::new(RwLock::new(Vec::new())),
142            consent_cache: Arc::new(RwLock::new(HashMap::new())),
143        }
144    }
145
146    /// Request consent from a subject with individual parameters
147    #[allow(clippy::too_many_arguments)]
148    pub async fn request_consent_individual(
149        &self,
150        subject_id: String,
151        consent_type: ConsentType,
152        legal_basis: LegalBasis,
153        purpose: String,
154        data_categories: Vec<String>,
155        consent_source: String,
156        expires_in_days: Option<u32>,
157    ) -> Result<ConsentRecord, ConsentError> {
158        let request = ConsentRequest {
159            subject_id,
160            consent_type,
161            legal_basis,
162            purpose,
163            data_categories,
164            consent_source,
165            expires_in_days,
166        };
167        self.request_consent(request).await
168    }
169
170    /// Request consent from a subject
171    pub async fn request_consent(
172        &self,
173        request: ConsentRequest,
174    ) -> Result<ConsentRecord, ConsentError> {
175        if !self.config.enabled {
176            return Err(ConsentError::InvalidData(
177                "Consent management is disabled".to_string(),
178            ));
179        }
180
181        // Check if consent already exists
182        let existing_key = format!(
183            "consent:{}:{}",
184            request.subject_id,
185            self.consent_type_key(&request.consent_type)
186        );
187        if self.storage.get(&existing_key).await.is_ok() {
188            return Err(ConsentError::ConsentExists(format!(
189                "{}:{:?}",
190                request.subject_id, request.consent_type
191            )));
192        }
193
194        // Create consent record
195        let mut record = ConsentRecord::new(
196            request.subject_id.clone(),
197            request.consent_type.clone(),
198            request.legal_basis,
199            request.purpose,
200            request.consent_source.clone(),
201        );
202
203        // Add data categories
204        for category in request.data_categories {
205            record.add_data_category(category);
206        }
207
208        // Set expiration
209        if let Some(days) = request
210            .expires_in_days
211            .or(self.config.default_expiration_days)
212        {
213            let expires_at = Utc::now() + chrono::Duration::days(days as i64);
214            record.set_expiration(expires_at);
215        }
216
217        // Store consent record
218        let consent_data =
219            serde_json::to_string(&record).map_err(ConsentError::SerializationError)?;
220
221        self.storage
222            .set(&existing_key, &consent_data)
223            .await
224            .map_err(|e| ConsentError::StorageError(e.to_string()))?;
225
226        // Update cache
227        {
228            let mut cache = self.consent_cache.write().await;
229            cache.insert(record.id.clone(), record.clone());
230        }
231
232        // Create audit entry
233        self.create_audit_entry(
234            &record,
235            "consent_requested".to_string(),
236            None,
237            ConsentStatus::Pending,
238            request.consent_source,
239            None,
240            HashMap::new(),
241        )
242        .await?;
243
244        info!(
245            "Consent requested for subject {} with type {:?}",
246            request.subject_id, request.consent_type
247        );
248        Ok(record)
249    }
250
251    /// Grant consent
252    pub async fn grant_consent(
253        &self,
254        subject_id: &str,
255        consent_type: &ConsentType,
256        source_ip: Option<String>,
257        action_source: String,
258    ) -> Result<ConsentRecord, ConsentError> {
259        let consent_key = format!(
260            "consent:{}:{}",
261            subject_id,
262            self.consent_type_key(consent_type)
263        );
264
265        // Load existing consent record
266        let consent_data =
267            self.storage.get(&consent_key).await.map_err(|_| {
268                ConsentError::ConsentNotFound(format!("{subject_id}:{consent_type:?}"))
269            })?;
270
271        let mut record: ConsentRecord =
272            serde_json::from_str(&consent_data).map_err(ConsentError::SerializationError)?;
273
274        let previous_status = record.status.clone();
275
276        // Grant consent
277        record.grant(source_ip.clone());
278
279        // Update storage
280        let updated_data =
281            serde_json::to_string(&record).map_err(ConsentError::SerializationError)?;
282
283        self.storage
284            .set(&consent_key, &updated_data)
285            .await
286            .map_err(|e| ConsentError::StorageError(e.to_string()))?;
287
288        // Update cache
289        {
290            let mut cache = self.consent_cache.write().await;
291            cache.insert(record.id.clone(), record.clone());
292        }
293
294        // Create audit entry
295        self.create_audit_entry(
296            &record,
297            "consent_granted".to_string(),
298            Some(previous_status),
299            record.status.clone(),
300            action_source,
301            source_ip,
302            HashMap::new(),
303        )
304        .await?;
305
306        info!(
307            "Consent granted for subject {} with type {:?}",
308            subject_id, consent_type
309        );
310        Ok(record)
311    }
312
313    /// Withdraw consent
314    pub async fn withdraw_consent(
315        &self,
316        subject_id: &str,
317        consent_type: &ConsentType,
318        source_ip: Option<String>,
319        action_source: String,
320    ) -> Result<ConsentRecord, ConsentError> {
321        let consent_key = format!(
322            "consent:{}:{}",
323            subject_id,
324            self.consent_type_key(consent_type)
325        );
326
327        // Load existing consent record
328        let consent_data =
329            self.storage.get(&consent_key).await.map_err(|_| {
330                ConsentError::ConsentNotFound(format!("{subject_id}:{consent_type:?}"))
331            })?;
332
333        let mut record: ConsentRecord =
334            serde_json::from_str(&consent_data).map_err(ConsentError::SerializationError)?;
335
336        let previous_status = record.status.clone();
337
338        // Withdraw consent
339        record.withdraw(source_ip.clone());
340
341        // Update storage
342        let updated_data =
343            serde_json::to_string(&record).map_err(ConsentError::SerializationError)?;
344
345        self.storage
346            .set(&consent_key, &updated_data)
347            .await
348            .map_err(|e| ConsentError::StorageError(e.to_string()))?;
349
350        // Update cache
351        {
352            let mut cache = self.consent_cache.write().await;
353            cache.insert(record.id.clone(), record.clone());
354        }
355
356        // Create audit entry
357        self.create_audit_entry(
358            &record,
359            "consent_withdrawn".to_string(),
360            Some(previous_status),
361            record.status.clone(),
362            action_source,
363            source_ip,
364            HashMap::new(),
365        )
366        .await?;
367
368        warn!(
369            "Consent withdrawn for subject {} with type {:?}",
370            subject_id, consent_type
371        );
372        Ok(record)
373    }
374
375    /// Check if consent is valid for a subject and type
376    pub async fn check_consent(
377        &self,
378        subject_id: &str,
379        consent_type: &ConsentType,
380    ) -> Result<bool, ConsentError> {
381        if !self.config.enabled {
382            // If consent management is disabled, assume consent
383            return Ok(true);
384        }
385
386        let consent_key = format!(
387            "consent:{}:{}",
388            subject_id,
389            self.consent_type_key(consent_type)
390        );
391
392        // Try cache first
393        {
394            let cache = self.consent_cache.read().await;
395            if let Some(record) = cache
396                .values()
397                .find(|r| r.subject_id == subject_id && &r.consent_type == consent_type)
398            {
399                return Ok(record.is_valid());
400            }
401        }
402
403        // Load from storage
404        match self.storage.get(&consent_key).await {
405            Ok(consent_data) => {
406                let record: ConsentRecord = serde_json::from_str(&consent_data)
407                    .map_err(ConsentError::SerializationError)?;
408
409                // Update cache
410                {
411                    let mut cache = self.consent_cache.write().await;
412                    cache.insert(record.id.clone(), record.clone());
413                }
414
415                Ok(record.is_valid())
416            }
417            Err(_) => {
418                if self.config.require_explicit_consent {
419                    Ok(false) // No consent found and explicit consent required
420                } else {
421                    Ok(true) // No consent found but explicit consent not required
422                }
423            }
424        }
425    }
426
427    /// Get consent summary for a subject
428    pub async fn get_consent_summary(
429        &self,
430        subject_id: &str,
431    ) -> Result<ConsentSummary, ConsentError> {
432        let mut consents = HashMap::new();
433        let mut pending_requests = 0;
434        let mut expired_consents = 0;
435        let mut last_updated = Utc::now();
436
437        // Search for all consent records for this subject
438        // This is simplified - in a real implementation you'd want indexed lookups
439        let all_keys = self
440            .storage
441            .list()
442            .await
443            .map_err(|e| ConsentError::StorageError(e.to_string()))?;
444
445        let subject_prefix = format!("consent:{subject_id}:");
446
447        for key in all_keys {
448            if key.starts_with(&subject_prefix) {
449                if let Ok(consent_data) = self.storage.get(&key).await {
450                    if let Ok(record) = serde_json::from_str::<ConsentRecord>(&consent_data) {
451                        consents.insert(record.consent_type.clone(), record.status.clone());
452
453                        if record.status == ConsentStatus::Pending {
454                            pending_requests += 1;
455                        }
456
457                        if record.is_expired() {
458                            expired_consents += 1;
459                        }
460
461                        if record.updated_at > last_updated {
462                            last_updated = record.updated_at;
463                        }
464                    }
465                }
466            }
467        }
468
469        let is_valid = consents
470            .iter()
471            .all(|(_, status)| *status == ConsentStatus::Granted);
472
473        Ok(ConsentSummary {
474            subject_id: subject_id.to_string(),
475            consents,
476            is_valid,
477            last_updated,
478            pending_requests,
479            expired_consents,
480        })
481    }
482
483    /// Clean up expired consents
484    pub async fn cleanup_expired_consents(&self) -> Result<usize, ConsentError> {
485        let cutoff_date =
486            Utc::now() - chrono::Duration::days(self.config.cleanup_expired_after_days as i64);
487        let mut cleaned_count = 0;
488
489        let all_keys = self
490            .storage
491            .list()
492            .await
493            .map_err(|e| ConsentError::StorageError(e.to_string()))?;
494
495        for key in all_keys {
496            if key.starts_with("consent:") {
497                if let Ok(consent_data) = self.storage.get(&key).await {
498                    if let Ok(record) = serde_json::from_str::<ConsentRecord>(&consent_data) {
499                        if record.is_expired() && record.updated_at < cutoff_date {
500                            self.storage
501                                .delete(&key)
502                                .await
503                                .map_err(|e| ConsentError::StorageError(e.to_string()))?;
504
505                            // Remove from cache
506                            {
507                                let mut cache = self.consent_cache.write().await;
508                                cache.remove(&record.id);
509                            }
510
511                            cleaned_count += 1;
512                            debug!("Cleaned up expired consent record: {}", record.id);
513                        }
514                    }
515                }
516            }
517        }
518
519        info!("Cleaned up {} expired consent records", cleaned_count);
520        Ok(cleaned_count)
521    }
522
523    /// Get audit trail for a subject
524    pub async fn get_audit_trail(&self, subject_id: &str) -> Vec<ConsentAuditEntry> {
525        let audit_entries = self.audit_entries.read().await;
526        audit_entries
527            .iter()
528            .filter(|entry| entry.subject_id == subject_id)
529            .cloned()
530            .collect()
531    }
532
533    /// Create an audit entry
534    #[allow(clippy::too_many_arguments)]
535    async fn create_audit_entry(
536        &self,
537        record: &ConsentRecord,
538        action: String,
539        previous_status: Option<ConsentStatus>,
540        new_status: ConsentStatus,
541        action_source: String,
542        source_ip: Option<String>,
543        details: HashMap<String, String>,
544    ) -> Result<(), ConsentError> {
545        if !self.config.enable_audit_log {
546            return Ok(());
547        }
548
549        let audit_entry = ConsentAuditEntry {
550            id: Uuid::new_v4().to_string(),
551            consent_id: record.id.clone(),
552            subject_id: record.subject_id.clone(),
553            action,
554            previous_status,
555            new_status,
556            action_source,
557            source_ip,
558            details,
559            timestamp: Utc::now(),
560        };
561
562        // Add to in-memory audit log
563        {
564            let mut audit_entries = self.audit_entries.write().await;
565            audit_entries.push(audit_entry.clone());
566
567            // Keep only last 10000 entries to prevent memory bloat
568            if audit_entries.len() > 10000 {
569                audit_entries.drain(0..1000);
570            }
571        }
572
573        // Write to persistent audit log file if configured
574        if let Some(log_path) = &self.config.audit_log_path {
575            let log_entry = serde_json::to_string(&audit_entry)
576                .unwrap_or_else(|_| "Failed to serialize audit entry".to_string());
577            let log_line = format!("{}\n", log_entry);
578
579            match tokio::fs::OpenOptions::new()
580                .create(true)
581                .append(true)
582                .open(log_path)
583                .await
584            {
585                Ok(mut file) => {
586                    use tokio::io::AsyncWriteExt;
587                    if let Err(e) = file.write_all(log_line.as_bytes()).await {
588                        tracing::error!("Failed to write audit log to file: {}", e);
589                    } else if let Err(e) = file.flush().await {
590                        tracing::error!("Failed to flush audit log file: {}", e);
591                    }
592                }
593                Err(e) => {
594                    tracing::error!("Failed to open audit log file: {}", e);
595                }
596            }
597        }
598
599        Ok(())
600    }
601
602    /// Convert consent type to storage key
603    fn consent_type_key(&self, consent_type: &ConsentType) -> String {
604        match consent_type {
605            ConsentType::DataProcessing => "data_processing".to_string(),
606            ConsentType::Marketing => "marketing".to_string(),
607            ConsentType::Analytics => "analytics".to_string(),
608            ConsentType::DataSharing => "data_sharing".to_string(),
609            ConsentType::AutomatedDecisionMaking => "automated_decision_making".to_string(),
610            ConsentType::SessionStorage => "session_storage".to_string(),
611            ConsentType::AuditLogging => "audit_logging".to_string(),
612            ConsentType::Custom(name) => {
613                format!("custom_{}", name.to_lowercase().replace(' ', "_"))
614            }
615        }
616    }
617}
618
619#[cfg(test)]
620mod tests {
621    use super::*;
622
623    #[tokio::test]
624    async fn test_consent_manager_creation() {
625        let config = ConsentConfig::default();
626        let storage = Arc::new(MemoryConsentStorage::new());
627        let manager = ConsentManager::new(config, storage);
628
629        // Manager should be created successfully
630        assert!(manager.config.enabled);
631    }
632
633    #[tokio::test]
634    async fn test_consent_request_and_grant() {
635        let config = ConsentConfig::default();
636        let storage = Arc::new(MemoryConsentStorage::new());
637        let manager = ConsentManager::new(config, storage);
638
639        // Request consent
640        let request = ConsentRequest {
641            subject_id: "user123".to_string(),
642            consent_type: ConsentType::DataProcessing,
643            legal_basis: LegalBasis::Consent,
644            purpose: "Process authentication data".to_string(),
645            data_categories: vec!["personal_identifiers".to_string()],
646            consent_source: "test".to_string(),
647            expires_in_days: None,
648        };
649        let record = manager.request_consent(request).await.unwrap();
650
651        assert_eq!(record.status, ConsentStatus::Pending);
652
653        // Grant consent
654        let granted_record = manager
655            .grant_consent(
656                "user123",
657                &ConsentType::DataProcessing,
658                Some("127.0.0.1".to_string()),
659                "test".to_string(),
660            )
661            .await
662            .unwrap();
663
664        assert_eq!(granted_record.status, ConsentStatus::Granted);
665
666        // Check consent
667        let is_valid = manager
668            .check_consent("user123", &ConsentType::DataProcessing)
669            .await
670            .unwrap();
671        assert!(is_valid);
672    }
673
674    #[tokio::test]
675    async fn test_consent_withdrawal() {
676        let config = ConsentConfig::default();
677        let storage = Arc::new(MemoryConsentStorage::new());
678        let manager = ConsentManager::new(config, storage);
679
680        // Request and grant consent
681        let request = ConsentRequest {
682            subject_id: "user123".to_string(),
683            consent_type: ConsentType::Analytics,
684            legal_basis: LegalBasis::Consent,
685            purpose: "Analytics tracking".to_string(),
686            data_categories: vec![],
687            consent_source: "test".to_string(),
688            expires_in_days: None,
689        };
690        manager.request_consent(request).await.unwrap();
691
692        manager
693            .grant_consent("user123", &ConsentType::Analytics, None, "test".to_string())
694            .await
695            .unwrap();
696
697        // Withdraw consent
698        let withdrawn_record = manager
699            .withdraw_consent("user123", &ConsentType::Analytics, None, "test".to_string())
700            .await
701            .unwrap();
702
703        assert_eq!(withdrawn_record.status, ConsentStatus::Withdrawn);
704
705        // Check consent is no longer valid
706        let is_valid = manager
707            .check_consent("user123", &ConsentType::Analytics)
708            .await
709            .unwrap();
710        assert!(!is_valid);
711    }
712}