1use super::{
7 ConsentAuditEntry, ConsentError, ConsentRecord, ConsentStatus, ConsentSummary, ConsentType,
8 LegalBasis,
9};
10use async_trait::async_trait;
11use chrono::Utc;
12use serde_json;
13use std::collections::HashMap;
14use std::sync::Arc;
15use tokio::sync::RwLock;
16use tracing::{debug, info, warn};
17use uuid::Uuid;
18
19#[derive(Debug, Clone)]
21pub struct ConsentRequest {
22 pub subject_id: String,
23 pub consent_type: ConsentType,
24 pub legal_basis: LegalBasis,
25 pub purpose: String,
26 pub data_categories: Vec<String>,
27 pub consent_source: String,
28 pub expires_in_days: Option<u32>,
29}
30
31#[async_trait]
33pub trait ConsentStorage: Send + Sync {
34 async fn get(&self, key: &str) -> Result<String, Box<dyn std::error::Error + Send + Sync>>;
35 async fn set(
36 &self,
37 key: &str,
38 value: &str,
39 ) -> Result<(), Box<dyn std::error::Error + Send + Sync>>;
40 async fn delete(&self, key: &str) -> Result<(), Box<dyn std::error::Error + Send + Sync>>;
41 async fn list(&self) -> Result<Vec<String>, Box<dyn std::error::Error + Send + Sync>>;
42}
43
44pub struct MemoryConsentStorage {
46 data: Arc<RwLock<HashMap<String, String>>>,
47}
48
49impl Default for MemoryConsentStorage {
50 fn default() -> Self {
51 Self::new()
52 }
53}
54
55impl MemoryConsentStorage {
56 pub fn new() -> Self {
57 Self {
58 data: Arc::new(RwLock::new(HashMap::new())),
59 }
60 }
61}
62
63#[async_trait]
64impl ConsentStorage for MemoryConsentStorage {
65 async fn get(&self, key: &str) -> Result<String, Box<dyn std::error::Error + Send + Sync>> {
66 let data = self.data.read().await;
67 data.get(key).cloned().ok_or_else(|| "Key not found".into())
68 }
69
70 async fn set(
71 &self,
72 key: &str,
73 value: &str,
74 ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
75 let mut data = self.data.write().await;
76 data.insert(key.to_string(), value.to_string());
77 Ok(())
78 }
79
80 async fn delete(&self, key: &str) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
81 let mut data = self.data.write().await;
82 data.remove(key);
83 Ok(())
84 }
85
86 async fn list(&self) -> Result<Vec<String>, Box<dyn std::error::Error + Send + Sync>> {
87 let data = self.data.read().await;
88 Ok(data.keys().cloned().collect())
89 }
90}
91
92#[derive(Debug, Clone)]
94pub struct ConsentConfig {
95 pub enabled: bool,
97
98 pub default_expiration_days: Option<u32>,
100
101 pub require_explicit_consent: bool,
103
104 pub enable_audit_log: bool,
106
107 pub audit_log_path: Option<std::path::PathBuf>,
109
110 pub cleanup_expired_after_days: u32,
112}
113
114impl Default for ConsentConfig {
115 fn default() -> Self {
116 Self {
117 enabled: true,
118 default_expiration_days: Some(365), require_explicit_consent: true,
120 enable_audit_log: true,
121 audit_log_path: None,
122 cleanup_expired_after_days: 90,
123 }
124 }
125}
126
127pub struct ConsentManager {
129 config: ConsentConfig,
130 storage: Arc<dyn ConsentStorage>,
131 audit_entries: Arc<RwLock<Vec<ConsentAuditEntry>>>,
132 consent_cache: Arc<RwLock<HashMap<String, ConsentRecord>>>,
133}
134
135impl ConsentManager {
136 pub fn new(config: ConsentConfig, storage: Arc<dyn ConsentStorage>) -> Self {
138 Self {
139 config,
140 storage,
141 audit_entries: Arc::new(RwLock::new(Vec::new())),
142 consent_cache: Arc::new(RwLock::new(HashMap::new())),
143 }
144 }
145
146 #[allow(clippy::too_many_arguments)]
148 pub async fn request_consent_individual(
149 &self,
150 subject_id: String,
151 consent_type: ConsentType,
152 legal_basis: LegalBasis,
153 purpose: String,
154 data_categories: Vec<String>,
155 consent_source: String,
156 expires_in_days: Option<u32>,
157 ) -> Result<ConsentRecord, ConsentError> {
158 let request = ConsentRequest {
159 subject_id,
160 consent_type,
161 legal_basis,
162 purpose,
163 data_categories,
164 consent_source,
165 expires_in_days,
166 };
167 self.request_consent(request).await
168 }
169
170 pub async fn request_consent(
172 &self,
173 request: ConsentRequest,
174 ) -> Result<ConsentRecord, ConsentError> {
175 if !self.config.enabled {
176 return Err(ConsentError::InvalidData(
177 "Consent management is disabled".to_string(),
178 ));
179 }
180
181 let existing_key = format!(
183 "consent:{}:{}",
184 request.subject_id,
185 self.consent_type_key(&request.consent_type)
186 );
187 if self.storage.get(&existing_key).await.is_ok() {
188 return Err(ConsentError::ConsentExists(format!(
189 "{}:{:?}",
190 request.subject_id, request.consent_type
191 )));
192 }
193
194 let mut record = ConsentRecord::new(
196 request.subject_id.clone(),
197 request.consent_type.clone(),
198 request.legal_basis,
199 request.purpose,
200 request.consent_source.clone(),
201 );
202
203 for category in request.data_categories {
205 record.add_data_category(category);
206 }
207
208 if let Some(days) = request
210 .expires_in_days
211 .or(self.config.default_expiration_days)
212 {
213 let expires_at = Utc::now() + chrono::Duration::days(days as i64);
214 record.set_expiration(expires_at);
215 }
216
217 let consent_data =
219 serde_json::to_string(&record).map_err(ConsentError::SerializationError)?;
220
221 self.storage
222 .set(&existing_key, &consent_data)
223 .await
224 .map_err(|e| ConsentError::StorageError(e.to_string()))?;
225
226 {
228 let mut cache = self.consent_cache.write().await;
229 cache.insert(record.id.clone(), record.clone());
230 }
231
232 self.create_audit_entry(
234 &record,
235 "consent_requested".to_string(),
236 None,
237 ConsentStatus::Pending,
238 request.consent_source,
239 None,
240 HashMap::new(),
241 )
242 .await?;
243
244 info!(
245 "Consent requested for subject {} with type {:?}",
246 request.subject_id, request.consent_type
247 );
248 Ok(record)
249 }
250
251 pub async fn grant_consent(
253 &self,
254 subject_id: &str,
255 consent_type: &ConsentType,
256 source_ip: Option<String>,
257 action_source: String,
258 ) -> Result<ConsentRecord, ConsentError> {
259 let consent_key = format!(
260 "consent:{}:{}",
261 subject_id,
262 self.consent_type_key(consent_type)
263 );
264
265 let consent_data =
267 self.storage.get(&consent_key).await.map_err(|_| {
268 ConsentError::ConsentNotFound(format!("{subject_id}:{consent_type:?}"))
269 })?;
270
271 let mut record: ConsentRecord =
272 serde_json::from_str(&consent_data).map_err(ConsentError::SerializationError)?;
273
274 let previous_status = record.status.clone();
275
276 record.grant(source_ip.clone());
278
279 let updated_data =
281 serde_json::to_string(&record).map_err(ConsentError::SerializationError)?;
282
283 self.storage
284 .set(&consent_key, &updated_data)
285 .await
286 .map_err(|e| ConsentError::StorageError(e.to_string()))?;
287
288 {
290 let mut cache = self.consent_cache.write().await;
291 cache.insert(record.id.clone(), record.clone());
292 }
293
294 self.create_audit_entry(
296 &record,
297 "consent_granted".to_string(),
298 Some(previous_status),
299 record.status.clone(),
300 action_source,
301 source_ip,
302 HashMap::new(),
303 )
304 .await?;
305
306 info!(
307 "Consent granted for subject {} with type {:?}",
308 subject_id, consent_type
309 );
310 Ok(record)
311 }
312
313 pub async fn withdraw_consent(
315 &self,
316 subject_id: &str,
317 consent_type: &ConsentType,
318 source_ip: Option<String>,
319 action_source: String,
320 ) -> Result<ConsentRecord, ConsentError> {
321 let consent_key = format!(
322 "consent:{}:{}",
323 subject_id,
324 self.consent_type_key(consent_type)
325 );
326
327 let consent_data =
329 self.storage.get(&consent_key).await.map_err(|_| {
330 ConsentError::ConsentNotFound(format!("{subject_id}:{consent_type:?}"))
331 })?;
332
333 let mut record: ConsentRecord =
334 serde_json::from_str(&consent_data).map_err(ConsentError::SerializationError)?;
335
336 let previous_status = record.status.clone();
337
338 record.withdraw(source_ip.clone());
340
341 let updated_data =
343 serde_json::to_string(&record).map_err(ConsentError::SerializationError)?;
344
345 self.storage
346 .set(&consent_key, &updated_data)
347 .await
348 .map_err(|e| ConsentError::StorageError(e.to_string()))?;
349
350 {
352 let mut cache = self.consent_cache.write().await;
353 cache.insert(record.id.clone(), record.clone());
354 }
355
356 self.create_audit_entry(
358 &record,
359 "consent_withdrawn".to_string(),
360 Some(previous_status),
361 record.status.clone(),
362 action_source,
363 source_ip,
364 HashMap::new(),
365 )
366 .await?;
367
368 warn!(
369 "Consent withdrawn for subject {} with type {:?}",
370 subject_id, consent_type
371 );
372 Ok(record)
373 }
374
375 pub async fn check_consent(
377 &self,
378 subject_id: &str,
379 consent_type: &ConsentType,
380 ) -> Result<bool, ConsentError> {
381 if !self.config.enabled {
382 return Ok(true);
384 }
385
386 let consent_key = format!(
387 "consent:{}:{}",
388 subject_id,
389 self.consent_type_key(consent_type)
390 );
391
392 {
394 let cache = self.consent_cache.read().await;
395 if let Some(record) = cache
396 .values()
397 .find(|r| r.subject_id == subject_id && &r.consent_type == consent_type)
398 {
399 return Ok(record.is_valid());
400 }
401 }
402
403 match self.storage.get(&consent_key).await {
405 Ok(consent_data) => {
406 let record: ConsentRecord = serde_json::from_str(&consent_data)
407 .map_err(ConsentError::SerializationError)?;
408
409 {
411 let mut cache = self.consent_cache.write().await;
412 cache.insert(record.id.clone(), record.clone());
413 }
414
415 Ok(record.is_valid())
416 }
417 Err(_) => {
418 if self.config.require_explicit_consent {
419 Ok(false) } else {
421 Ok(true) }
423 }
424 }
425 }
426
427 pub async fn get_consent_summary(
429 &self,
430 subject_id: &str,
431 ) -> Result<ConsentSummary, ConsentError> {
432 let mut consents = HashMap::new();
433 let mut pending_requests = 0;
434 let mut expired_consents = 0;
435 let mut last_updated = Utc::now();
436
437 let all_keys = self
440 .storage
441 .list()
442 .await
443 .map_err(|e| ConsentError::StorageError(e.to_string()))?;
444
445 let subject_prefix = format!("consent:{subject_id}:");
446
447 for key in all_keys {
448 if key.starts_with(&subject_prefix) {
449 if let Ok(consent_data) = self.storage.get(&key).await {
450 if let Ok(record) = serde_json::from_str::<ConsentRecord>(&consent_data) {
451 consents.insert(record.consent_type.clone(), record.status.clone());
452
453 if record.status == ConsentStatus::Pending {
454 pending_requests += 1;
455 }
456
457 if record.is_expired() {
458 expired_consents += 1;
459 }
460
461 if record.updated_at > last_updated {
462 last_updated = record.updated_at;
463 }
464 }
465 }
466 }
467 }
468
469 let is_valid = consents
470 .iter()
471 .all(|(_, status)| *status == ConsentStatus::Granted);
472
473 Ok(ConsentSummary {
474 subject_id: subject_id.to_string(),
475 consents,
476 is_valid,
477 last_updated,
478 pending_requests,
479 expired_consents,
480 })
481 }
482
483 pub async fn cleanup_expired_consents(&self) -> Result<usize, ConsentError> {
485 let cutoff_date =
486 Utc::now() - chrono::Duration::days(self.config.cleanup_expired_after_days as i64);
487 let mut cleaned_count = 0;
488
489 let all_keys = self
490 .storage
491 .list()
492 .await
493 .map_err(|e| ConsentError::StorageError(e.to_string()))?;
494
495 for key in all_keys {
496 if key.starts_with("consent:") {
497 if let Ok(consent_data) = self.storage.get(&key).await {
498 if let Ok(record) = serde_json::from_str::<ConsentRecord>(&consent_data) {
499 if record.is_expired() && record.updated_at < cutoff_date {
500 self.storage
501 .delete(&key)
502 .await
503 .map_err(|e| ConsentError::StorageError(e.to_string()))?;
504
505 {
507 let mut cache = self.consent_cache.write().await;
508 cache.remove(&record.id);
509 }
510
511 cleaned_count += 1;
512 debug!("Cleaned up expired consent record: {}", record.id);
513 }
514 }
515 }
516 }
517 }
518
519 info!("Cleaned up {} expired consent records", cleaned_count);
520 Ok(cleaned_count)
521 }
522
523 pub async fn get_audit_trail(&self, subject_id: &str) -> Vec<ConsentAuditEntry> {
525 let audit_entries = self.audit_entries.read().await;
526 audit_entries
527 .iter()
528 .filter(|entry| entry.subject_id == subject_id)
529 .cloned()
530 .collect()
531 }
532
533 #[allow(clippy::too_many_arguments)]
535 async fn create_audit_entry(
536 &self,
537 record: &ConsentRecord,
538 action: String,
539 previous_status: Option<ConsentStatus>,
540 new_status: ConsentStatus,
541 action_source: String,
542 source_ip: Option<String>,
543 details: HashMap<String, String>,
544 ) -> Result<(), ConsentError> {
545 if !self.config.enable_audit_log {
546 return Ok(());
547 }
548
549 let audit_entry = ConsentAuditEntry {
550 id: Uuid::new_v4().to_string(),
551 consent_id: record.id.clone(),
552 subject_id: record.subject_id.clone(),
553 action,
554 previous_status,
555 new_status,
556 action_source,
557 source_ip,
558 details,
559 timestamp: Utc::now(),
560 };
561
562 {
564 let mut audit_entries = self.audit_entries.write().await;
565 audit_entries.push(audit_entry.clone());
566
567 if audit_entries.len() > 10000 {
569 audit_entries.drain(0..1000);
570 }
571 }
572
573 if let Some(log_path) = &self.config.audit_log_path {
575 let log_entry = serde_json::to_string(&audit_entry)
576 .unwrap_or_else(|_| "Failed to serialize audit entry".to_string());
577 let log_line = format!("{}\n", log_entry);
578
579 match tokio::fs::OpenOptions::new()
580 .create(true)
581 .append(true)
582 .open(log_path)
583 .await
584 {
585 Ok(mut file) => {
586 use tokio::io::AsyncWriteExt;
587 if let Err(e) = file.write_all(log_line.as_bytes()).await {
588 tracing::error!("Failed to write audit log to file: {}", e);
589 } else if let Err(e) = file.flush().await {
590 tracing::error!("Failed to flush audit log file: {}", e);
591 }
592 }
593 Err(e) => {
594 tracing::error!("Failed to open audit log file: {}", e);
595 }
596 }
597 }
598
599 Ok(())
600 }
601
602 fn consent_type_key(&self, consent_type: &ConsentType) -> String {
604 match consent_type {
605 ConsentType::DataProcessing => "data_processing".to_string(),
606 ConsentType::Marketing => "marketing".to_string(),
607 ConsentType::Analytics => "analytics".to_string(),
608 ConsentType::DataSharing => "data_sharing".to_string(),
609 ConsentType::AutomatedDecisionMaking => "automated_decision_making".to_string(),
610 ConsentType::SessionStorage => "session_storage".to_string(),
611 ConsentType::AuditLogging => "audit_logging".to_string(),
612 ConsentType::Custom(name) => {
613 format!("custom_{}", name.to_lowercase().replace(' ', "_"))
614 }
615 }
616 }
617}
618
619#[cfg(test)]
620mod tests {
621 use super::*;
622
623 #[tokio::test]
624 async fn test_consent_manager_creation() {
625 let config = ConsentConfig::default();
626 let storage = Arc::new(MemoryConsentStorage::new());
627 let manager = ConsentManager::new(config, storage);
628
629 assert!(manager.config.enabled);
631 }
632
633 #[tokio::test]
634 async fn test_consent_request_and_grant() {
635 let config = ConsentConfig::default();
636 let storage = Arc::new(MemoryConsentStorage::new());
637 let manager = ConsentManager::new(config, storage);
638
639 let request = ConsentRequest {
641 subject_id: "user123".to_string(),
642 consent_type: ConsentType::DataProcessing,
643 legal_basis: LegalBasis::Consent,
644 purpose: "Process authentication data".to_string(),
645 data_categories: vec!["personal_identifiers".to_string()],
646 consent_source: "test".to_string(),
647 expires_in_days: None,
648 };
649 let record = manager.request_consent(request).await.unwrap();
650
651 assert_eq!(record.status, ConsentStatus::Pending);
652
653 let granted_record = manager
655 .grant_consent(
656 "user123",
657 &ConsentType::DataProcessing,
658 Some("127.0.0.1".to_string()),
659 "test".to_string(),
660 )
661 .await
662 .unwrap();
663
664 assert_eq!(granted_record.status, ConsentStatus::Granted);
665
666 let is_valid = manager
668 .check_consent("user123", &ConsentType::DataProcessing)
669 .await
670 .unwrap();
671 assert!(is_valid);
672 }
673
674 #[tokio::test]
675 async fn test_consent_withdrawal() {
676 let config = ConsentConfig::default();
677 let storage = Arc::new(MemoryConsentStorage::new());
678 let manager = ConsentManager::new(config, storage);
679
680 let request = ConsentRequest {
682 subject_id: "user123".to_string(),
683 consent_type: ConsentType::Analytics,
684 legal_basis: LegalBasis::Consent,
685 purpose: "Analytics tracking".to_string(),
686 data_categories: vec![],
687 consent_source: "test".to_string(),
688 expires_in_days: None,
689 };
690 manager.request_consent(request).await.unwrap();
691
692 manager
693 .grant_consent("user123", &ConsentType::Analytics, None, "test".to_string())
694 .await
695 .unwrap();
696
697 let withdrawn_record = manager
699 .withdraw_consent("user123", &ConsentType::Analytics, None, "test".to_string())
700 .await
701 .unwrap();
702
703 assert_eq!(withdrawn_record.status, ConsentStatus::Withdrawn);
704
705 let is_valid = manager
707 .check_consent("user123", &ConsentType::Analytics)
708 .await
709 .unwrap();
710 assert!(!is_valid);
711 }
712}