use crate::activity::ActivityType;
use crate::state::AppState;
use axum::{
extract::{Path, State},
http::StatusCode,
response::IntoResponse,
Json,
};
use chrono::{DateTime, Utc};
use parking_lot::RwLock;
use serde::{Deserialize, Serialize};
use sha2::{Digest, Sha256};
use std::collections::HashMap;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
#[serde(tag = "type", rename_all = "snake_case")]
pub enum DeletionScope {
All,
SpecificCollections { collections: Vec<String> },
ExcludeAuditLogs,
}
impl Default for DeletionScope {
fn default() -> Self {
DeletionScope::All
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "lowercase")]
pub enum ExportFormat {
Json,
Csv,
}
impl Default for ExportFormat {
fn default() -> Self {
ExportFormat::Json
}
}
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
#[serde(tag = "type", rename_all = "snake_case")]
pub enum ExportScope {
#[default]
All,
SpecificCollections { collections: Vec<String> },
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DateRange {
#[serde(default)]
pub start: Option<String>,
#[serde(default)]
pub end: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ExportRequest {
pub subject_id: String,
#[serde(default)]
pub format: ExportFormat,
#[serde(default)]
pub scope: ExportScope,
#[serde(default)]
pub date_range: Option<DateRange>,
#[serde(default)]
pub search_fields: Vec<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ExportedItem {
pub store_type: String,
pub location: String,
pub item_id: String,
pub data: serde_json::Value,
#[serde(skip_serializing_if = "Option::is_none")]
pub timestamp: Option<String>,
}
#[derive(Debug, Serialize)]
pub struct ExportResponse {
pub export_id: String,
pub subject_id: String,
pub format: ExportFormat,
pub generated_at: String,
pub total_items: usize,
pub data: serde_json::Value,
pub scope: ExportScope,
#[serde(skip_serializing_if = "Option::is_none")]
pub date_range: Option<DateRange>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DeletionRequest {
pub subject_id: String,
#[serde(default)]
pub scope: DeletionScope,
pub requestor: String,
pub reason: String,
#[serde(default)]
pub search_fields: Vec<String>,
#[serde(default)]
pub secure_erase: bool,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DeletedItem {
pub store_type: String,
pub location: String,
pub item_id: String,
pub size_bytes: Option<u64>,
pub deleted_at: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DeletionCertificate {
pub id: String,
pub subject_id: String,
pub timestamp: String,
pub items_deleted: Vec<DeletedItem>,
pub total_items: usize,
pub total_bytes: u64,
pub scope: DeletionScope,
pub requestor: String,
pub reason: String,
pub verification_hash: String,
pub verified_by: String,
pub secure_erase_performed: bool,
}
impl DeletionCertificate {
pub fn new(
subject_id: String,
items: Vec<DeletedItem>,
scope: DeletionScope,
requestor: String,
reason: String,
node_id: String,
secure_erase: bool,
) -> Self {
let timestamp = Utc::now().to_rfc3339();
let total_items = items.len();
let total_bytes = items.iter().filter_map(|i| i.size_bytes).sum();
let id = format!(
"DEL-{}-{}",
timestamp
.replace([':', '-', 'T', 'Z', '.'], "")
.chars()
.take(14)
.collect::<String>(),
&subject_id.chars().take(8).collect::<String>()
);
let mut cert = Self {
id,
subject_id,
timestamp,
items_deleted: items,
total_items,
total_bytes,
scope,
requestor,
reason,
verification_hash: String::new(),
verified_by: node_id,
secure_erase_performed: secure_erase,
};
cert.verification_hash = cert.compute_hash();
cert
}
fn compute_hash(&self) -> String {
let mut hasher = Sha256::new();
hasher.update(self.id.as_bytes());
hasher.update(self.subject_id.as_bytes());
hasher.update(self.timestamp.as_bytes());
hasher.update(self.total_items.to_string().as_bytes());
hasher.update(self.total_bytes.to_string().as_bytes());
hasher.update(self.requestor.as_bytes());
hasher.update(self.reason.as_bytes());
hasher.update(self.verified_by.as_bytes());
for item in &self.items_deleted {
hasher.update(item.store_type.as_bytes());
hasher.update(item.location.as_bytes());
hasher.update(item.item_id.as_bytes());
}
let result = hasher.finalize();
hex_encode(&result)
}
pub fn verify(&self) -> bool {
let mut cert_copy = self.clone();
cert_copy.verification_hash = String::new();
cert_copy.compute_hash() == self.verification_hash
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DeletionAuditEntry {
pub id: String,
pub event_type: DeletionEventType,
pub subject_id: String,
pub timestamp: String,
pub actor: String,
pub details: serde_json::Value,
pub prev_hash: String,
pub hash: String,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum DeletionEventType {
RequestReceived,
DeletionStarted,
ItemDeleted,
SecureErasePerformed,
DeletionCompleted,
DeletionFailed,
CertificateGenerated,
ExportRequestReceived,
ExportCompleted,
ExportFailed,
}
#[derive(Debug, Serialize)]
pub struct DeletionResponse {
pub success: bool,
pub certificate: Option<DeletionCertificate>,
pub message: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub error: Option<String>,
}
#[derive(Debug, Serialize)]
pub struct ListCertificatesResponse {
pub certificates: Vec<DeletionCertificate>,
pub total: usize,
}
#[derive(Debug, Serialize)]
pub struct VerifyCertificateResponse {
pub valid: bool,
pub certificate: Option<DeletionCertificate>,
pub message: String,
}
pub struct DeletionAuditLog {
entries: RwLock<Vec<DeletionAuditEntry>>,
certificates: RwLock<HashMap<String, DeletionCertificate>>,
next_id: AtomicU64,
last_hash: RwLock<String>,
}
impl DeletionAuditLog {
pub fn new() -> Self {
Self {
entries: RwLock::new(Vec::new()),
certificates: RwLock::new(HashMap::new()),
next_id: AtomicU64::new(1),
last_hash: RwLock::new("genesis".to_string()),
}
}
pub fn log_event(
&self,
event_type: DeletionEventType,
subject_id: &str,
actor: &str,
details: serde_json::Value,
) -> String {
let id = format!("gdpr-{:08}", self.next_id.fetch_add(1, Ordering::SeqCst));
let timestamp = Utc::now().to_rfc3339();
let prev_hash = self.last_hash.read().clone();
let mut hasher = Sha256::new();
hasher.update(id.as_bytes());
hasher.update(format!("{:?}", event_type).as_bytes());
hasher.update(subject_id.as_bytes());
hasher.update(timestamp.as_bytes());
hasher.update(actor.as_bytes());
if let Ok(json) = serde_json::to_string(&details) {
hasher.update(json.as_bytes());
}
hasher.update(prev_hash.as_bytes());
let hash = hex_encode(&hasher.finalize());
let entry = DeletionAuditEntry {
id: id.clone(),
event_type,
subject_id: subject_id.to_string(),
timestamp,
actor: actor.to_string(),
details,
prev_hash,
hash: hash.clone(),
};
*self.last_hash.write() = hash;
self.entries.write().push(entry);
id
}
pub fn store_certificate(&self, cert: DeletionCertificate) {
self.certificates.write().insert(cert.id.clone(), cert);
}
pub fn get_certificate(&self, id: &str) -> Option<DeletionCertificate> {
self.certificates.read().get(id).cloned()
}
pub fn list_certificates(&self) -> Vec<DeletionCertificate> {
self.certificates.read().values().cloned().collect()
}
pub fn get_entries_for_subject(&self, subject_id: &str) -> Vec<DeletionAuditEntry> {
self.entries
.read()
.iter()
.filter(|e| e.subject_id == subject_id)
.cloned()
.collect()
}
pub fn verify_integrity(&self) -> Result<usize, String> {
let entries = self.entries.read();
let mut last_hash = "genesis".to_string();
for (idx, entry) in entries.iter().enumerate() {
if entry.prev_hash != last_hash {
return Err(format!(
"Hash chain broken at entry {}: expected prev_hash '{}', got '{}'",
idx, last_hash, entry.prev_hash
));
}
let mut hasher = Sha256::new();
hasher.update(entry.id.as_bytes());
hasher.update(format!("{:?}", entry.event_type).as_bytes());
hasher.update(entry.subject_id.as_bytes());
hasher.update(entry.timestamp.as_bytes());
hasher.update(entry.actor.as_bytes());
if let Ok(json) = serde_json::to_string(&entry.details) {
hasher.update(json.as_bytes());
}
hasher.update(entry.prev_hash.as_bytes());
let computed_hash = hex_encode(&hasher.finalize());
if entry.hash != computed_hash {
return Err(format!(
"Hash mismatch at entry {}: computed '{}', stored '{}'",
idx, computed_hash, entry.hash
));
}
last_hash = entry.hash.clone();
}
Ok(entries.len())
}
}
impl Default for DeletionAuditLog {
fn default() -> Self {
Self::new()
}
}
pub struct GdprService {
audit_log: Arc<DeletionAuditLog>,
}
impl GdprService {
pub fn new() -> Self {
Self {
audit_log: Arc::new(DeletionAuditLog::new()),
}
}
pub fn audit_log(&self) -> &Arc<DeletionAuditLog> {
&self.audit_log
}
fn default_search_fields() -> Vec<String> {
vec![
"email".to_string(),
"user_id".to_string(),
"customer_id".to_string(),
"subject_id".to_string(),
"id".to_string(),
"userId".to_string(),
"customerId".to_string(),
"user".to_string(),
"owner".to_string(),
]
}
pub fn delete_from_kv(
&self,
state: &AppState,
subject_id: &str,
search_fields: &[String],
) -> Vec<DeletedItem> {
let mut deleted = Vec::new();
let entries = state.kv_store.list(None, usize::MAX);
for entry in entries {
let should_delete =
self.value_contains_subject(&entry.value, subject_id, search_fields)
|| entry.key.contains(subject_id);
if should_delete {
let size = serde_json::to_string(&entry.value)
.map(|s| s.len() as u64)
.ok();
if state.kv_store.delete(&entry.key).is_some() {
deleted.push(DeletedItem {
store_type: "kv".to_string(),
location: "kv_store".to_string(),
item_id: entry.key,
size_bytes: size,
deleted_at: Utc::now().to_rfc3339(),
});
}
}
}
deleted
}
pub fn delete_from_documents(
&self,
state: &AppState,
subject_id: &str,
search_fields: &[String],
specific_collections: Option<&[String]>,
) -> Vec<DeletedItem> {
let mut deleted = Vec::new();
let collections = state.document_engine.list_collections();
for collection_name in collections {
if let Some(specific) = specific_collections {
if !specific.contains(&collection_name) {
continue;
}
}
let query = aegis_document::Query::new();
if let Ok(result) = state.document_engine.find(&collection_name, &query) {
for doc in &result.documents {
let should_delete =
self.document_belongs_to_subject(doc, subject_id, search_fields);
if should_delete {
let size = serde_json::to_string(&doc.data)
.map(|s| s.len() as u64)
.ok();
if state
.document_engine
.delete(&collection_name, &doc.id)
.is_ok()
{
deleted.push(DeletedItem {
store_type: "document".to_string(),
location: collection_name.clone(),
item_id: doc.id.to_string(),
size_bytes: size,
deleted_at: Utc::now().to_rfc3339(),
});
}
}
}
}
}
deleted
}
pub fn delete_from_sql(
&self,
state: &AppState,
subject_id: &str,
search_fields: &[String],
specific_tables: Option<&[String]>,
) -> Vec<DeletedItem> {
let mut deleted = Vec::new();
let tables = state.query_engine.list_tables(None);
for table_name in tables {
if let Some(specific) = specific_tables {
if !specific.contains(&table_name) {
continue;
}
}
if let Some(table_info) = state.query_engine.get_table_info(&table_name, None) {
let searchable_columns: Vec<&String> = table_info
.columns
.iter()
.filter(|c| {
search_fields
.iter()
.any(|f| c.name.to_lowercase() == f.to_lowercase())
})
.map(|c| &c.name)
.collect();
for column in searchable_columns {
let escaped_subject = subject_id.replace('\'', "''");
let delete_sql = format!(
"DELETE FROM {} WHERE {} = '{}'",
table_name, column, escaped_subject
);
if let Ok(result) = state.query_engine.execute(&delete_sql, None) {
if result.rows_affected > 0 {
deleted.push(DeletedItem {
store_type: "sql".to_string(),
location: table_name.clone(),
item_id: format!("{}={}", column, subject_id),
size_bytes: None, deleted_at: Utc::now().to_rfc3339(),
});
}
}
}
}
}
deleted
}
pub fn delete_from_graph(
&self,
state: &AppState,
subject_id: &str,
search_fields: &[String],
) -> Vec<DeletedItem> {
let mut deleted = Vec::new();
let nodes = state.graph_store.list_nodes();
for node in nodes {
let should_delete =
self.value_contains_subject(&node.properties, subject_id, search_fields)
|| node.id.contains(subject_id)
|| node.label.contains(subject_id);
if should_delete {
let size = serde_json::to_string(&node.properties)
.map(|s| s.len() as u64)
.ok();
if state.graph_store.delete_node(&node.id).is_ok() {
deleted.push(DeletedItem {
store_type: "graph".to_string(),
location: "graph_store".to_string(),
item_id: node.id,
size_bytes: size,
deleted_at: Utc::now().to_rfc3339(),
});
}
}
}
deleted
}
fn value_contains_subject(
&self,
value: &serde_json::Value,
subject_id: &str,
search_fields: &[String],
) -> bool {
match value {
serde_json::Value::Object(map) => {
for field in search_fields {
if let Some(v) = map.get(field) {
if self.value_matches_subject(v, subject_id) {
return true;
}
}
}
for (_, v) in map {
if self.value_contains_subject(v, subject_id, search_fields) {
return true;
}
}
false
}
serde_json::Value::Array(arr) => arr
.iter()
.any(|v| self.value_contains_subject(v, subject_id, search_fields)),
_ => false,
}
}
fn value_matches_subject(&self, value: &serde_json::Value, subject_id: &str) -> bool {
match value {
serde_json::Value::String(s) => s == subject_id,
serde_json::Value::Number(n) => n.to_string() == subject_id,
_ => false,
}
}
fn document_belongs_to_subject(
&self,
doc: &aegis_document::Document,
subject_id: &str,
search_fields: &[String],
) -> bool {
if doc.id.to_string() == subject_id {
return true;
}
for field in search_fields {
if let Some(value) = doc.get(field) {
if self.doc_value_matches_subject(value, subject_id) {
return true;
}
}
}
false
}
fn doc_value_matches_subject(&self, value: &aegis_document::Value, subject_id: &str) -> bool {
match value {
aegis_document::Value::String(s) => s == subject_id,
aegis_document::Value::Int(i) => i.to_string() == subject_id,
aegis_document::Value::Float(f) => f.to_string() == subject_id,
_ => false,
}
}
pub fn secure_erase(&self, _state: &AppState) -> bool {
true
}
pub fn execute_deletion(
&self,
state: &AppState,
request: DeletionRequest,
) -> Result<DeletionCertificate, String> {
let node_id = state.config.node_id.clone();
self.audit_log.log_event(
DeletionEventType::RequestReceived,
&request.subject_id,
&request.requestor,
serde_json::json!({
"scope": request.scope,
"reason": request.reason,
"secure_erase": request.secure_erase,
}),
);
self.audit_log.log_event(
DeletionEventType::DeletionStarted,
&request.subject_id,
&request.requestor,
serde_json::json!({"timestamp": Utc::now().to_rfc3339()}),
);
let search_fields = if request.search_fields.is_empty() {
Self::default_search_fields()
} else {
request.search_fields.clone()
};
let mut all_deleted = Vec::new();
let specific_collections = match &request.scope {
DeletionScope::SpecificCollections { collections } => Some(collections.as_slice()),
_ => None,
};
let kv_deleted = self.delete_from_kv(state, &request.subject_id, &search_fields);
for item in &kv_deleted {
self.audit_log.log_event(
DeletionEventType::ItemDeleted,
&request.subject_id,
&request.requestor,
serde_json::json!({
"store": "kv",
"item_id": item.item_id,
}),
);
}
all_deleted.extend(kv_deleted);
let doc_deleted = self.delete_from_documents(
state,
&request.subject_id,
&search_fields,
specific_collections,
);
for item in &doc_deleted {
self.audit_log.log_event(
DeletionEventType::ItemDeleted,
&request.subject_id,
&request.requestor,
serde_json::json!({
"store": "document",
"collection": item.location,
"item_id": item.item_id,
}),
);
}
all_deleted.extend(doc_deleted);
let sql_deleted = self.delete_from_sql(
state,
&request.subject_id,
&search_fields,
specific_collections,
);
for item in &sql_deleted {
self.audit_log.log_event(
DeletionEventType::ItemDeleted,
&request.subject_id,
&request.requestor,
serde_json::json!({
"store": "sql",
"table": item.location,
"item_id": item.item_id,
}),
);
}
all_deleted.extend(sql_deleted);
let graph_deleted = self.delete_from_graph(state, &request.subject_id, &search_fields);
for item in &graph_deleted {
self.audit_log.log_event(
DeletionEventType::ItemDeleted,
&request.subject_id,
&request.requestor,
serde_json::json!({
"store": "graph",
"item_id": item.item_id,
}),
);
}
all_deleted.extend(graph_deleted);
let secure_erase_performed = if request.secure_erase {
let success = self.secure_erase(state);
self.audit_log.log_event(
DeletionEventType::SecureErasePerformed,
&request.subject_id,
&request.requestor,
serde_json::json!({
"success": success,
"timestamp": Utc::now().to_rfc3339(),
}),
);
success
} else {
false
};
if let Err(e) = state.save_to_disk() {
tracing::warn!("Failed to persist deletions to disk: {}", e);
}
self.audit_log.log_event(
DeletionEventType::DeletionCompleted,
&request.subject_id,
&request.requestor,
serde_json::json!({
"items_deleted": all_deleted.len(),
"timestamp": Utc::now().to_rfc3339(),
}),
);
let certificate = DeletionCertificate::new(
request.subject_id.clone(),
all_deleted,
request.scope,
request.requestor.clone(),
request.reason,
node_id,
secure_erase_performed,
);
self.audit_log.log_event(
DeletionEventType::CertificateGenerated,
&request.subject_id,
&request.requestor,
serde_json::json!({
"certificate_id": certificate.id,
"verification_hash": certificate.verification_hash,
}),
);
self.audit_log.store_certificate(certificate.clone());
Ok(certificate)
}
pub fn export_from_kv(
&self,
state: &AppState,
subject_id: &str,
search_fields: &[String],
date_range: Option<&DateRange>,
) -> Vec<ExportedItem> {
let mut exported = Vec::new();
let entries = state.kv_store.list(None, usize::MAX);
for entry in entries {
let belongs_to_subject =
self.value_contains_subject(&entry.value, subject_id, search_fields)
|| entry.key.contains(subject_id);
if !belongs_to_subject {
continue;
}
if let Some(range) = date_range {
if !self.is_within_date_range(&entry.updated_at.to_rfc3339(), range) {
continue;
}
}
exported.push(ExportedItem {
store_type: "kv".to_string(),
location: "kv_store".to_string(),
item_id: entry.key.clone(),
data: serde_json::json!({
"key": entry.key,
"value": entry.value,
"ttl": entry.ttl,
}),
timestamp: Some(entry.updated_at.to_rfc3339()),
});
}
exported
}
pub fn export_from_documents(
&self,
state: &AppState,
subject_id: &str,
search_fields: &[String],
specific_collections: Option<&[String]>,
date_range: Option<&DateRange>,
) -> Vec<ExportedItem> {
let mut exported = Vec::new();
let collections = state.document_engine.list_collections();
for collection_name in collections {
if let Some(specific) = specific_collections {
if !specific.contains(&collection_name) {
continue;
}
}
let query = aegis_document::Query::new();
if let Ok(result) = state.document_engine.find(&collection_name, &query) {
for doc in &result.documents {
if !self.document_belongs_to_subject(doc, subject_id, search_fields) {
continue;
}
if let Some(range) = date_range {
if let Some(aegis_document::Value::String(updated)) = doc.get("updated_at")
{
if !self.is_within_date_range(updated, range) {
continue;
}
}
}
let doc_data = self.document_to_json(doc);
exported.push(ExportedItem {
store_type: "document".to_string(),
location: collection_name.clone(),
item_id: doc.id.to_string(),
data: doc_data,
timestamp: doc.get("updated_at").and_then(|v| {
if let aegis_document::Value::String(s) = v {
Some(s.clone())
} else {
None
}
}),
});
}
}
}
exported
}
pub fn export_from_sql(
&self,
state: &AppState,
subject_id: &str,
search_fields: &[String],
specific_tables: Option<&[String]>,
_date_range: Option<&DateRange>,
) -> Vec<ExportedItem> {
let mut exported = Vec::new();
let tables = state.query_engine.list_tables(None);
for table_name in tables {
if let Some(specific) = specific_tables {
if !specific.contains(&table_name) {
continue;
}
}
if let Some(table_info) = state.query_engine.get_table_info(&table_name, None) {
let searchable_columns: Vec<&String> = table_info
.columns
.iter()
.filter(|c| {
search_fields
.iter()
.any(|f| c.name.to_lowercase() == f.to_lowercase())
})
.map(|c| &c.name)
.collect();
for column in searchable_columns {
let escaped_subject = subject_id.replace('\'', "''");
let select_sql = format!(
"SELECT * FROM {} WHERE {} = '{}'",
table_name, column, escaped_subject
);
if let Ok(result) = state.query_engine.execute(&select_sql, None) {
for (idx, row) in result.rows.iter().enumerate() {
let mut row_data = serde_json::Map::new();
for (col_idx, col_name) in result.columns.iter().enumerate() {
if let Some(value) = row.get(col_idx) {
row_data.insert(col_name.clone(), value.clone());
}
}
exported.push(ExportedItem {
store_type: "sql".to_string(),
location: table_name.clone(),
item_id: format!("{}={}/row-{}", column, subject_id, idx),
data: serde_json::Value::Object(row_data),
timestamp: None,
});
}
}
}
}
}
exported
}
pub fn export_from_graph(
&self,
state: &AppState,
subject_id: &str,
search_fields: &[String],
) -> Vec<ExportedItem> {
let mut exported = Vec::new();
let nodes = state.graph_store.list_nodes();
for node in nodes {
let belongs_to_subject =
self.value_contains_subject(&node.properties, subject_id, search_fields)
|| node.id.contains(subject_id)
|| node.label.contains(subject_id);
if belongs_to_subject {
let edges = state.graph_store.get_edges_for_node(&node.id);
let edge_data: Vec<serde_json::Value> = edges
.iter()
.map(|e| {
serde_json::json!({
"id": e.id,
"source": e.source,
"target": e.target,
"relationship": e.relationship,
})
})
.collect();
exported.push(ExportedItem {
store_type: "graph".to_string(),
location: "graph_store".to_string(),
item_id: node.id.clone(),
data: serde_json::json!({
"id": node.id,
"label": node.label,
"properties": node.properties,
"edges": edge_data,
}),
timestamp: None,
});
}
}
exported
}
fn is_within_date_range(&self, timestamp: &str, range: &DateRange) -> bool {
let ts = match DateTime::parse_from_rfc3339(timestamp) {
Ok(dt) => dt.with_timezone(&Utc),
Err(_) => return true, };
if let Some(ref start) = range.start {
if let Ok(start_dt) = DateTime::parse_from_rfc3339(start) {
if ts < start_dt.with_timezone(&Utc) {
return false;
}
}
}
if let Some(ref end) = range.end {
if let Ok(end_dt) = DateTime::parse_from_rfc3339(end) {
if ts > end_dt.with_timezone(&Utc) {
return false;
}
}
}
true
}
fn document_to_json(&self, doc: &aegis_document::Document) -> serde_json::Value {
let mut map = serde_json::Map::new();
map.insert(
"_id".to_string(),
serde_json::Value::String(doc.id.to_string()),
);
for (key, value) in &doc.data {
map.insert(key.clone(), self.doc_value_to_json(value));
}
serde_json::Value::Object(map)
}
fn doc_value_to_json(&self, value: &aegis_document::Value) -> serde_json::Value {
match value {
aegis_document::Value::Null => serde_json::Value::Null,
aegis_document::Value::Bool(b) => serde_json::Value::Bool(*b),
aegis_document::Value::Int(i) => serde_json::Value::Number((*i).into()),
aegis_document::Value::Float(f) => serde_json::Number::from_f64(*f)
.map(serde_json::Value::Number)
.unwrap_or(serde_json::Value::Null),
aegis_document::Value::String(s) => serde_json::Value::String(s.clone()),
aegis_document::Value::Array(arr) => {
serde_json::Value::Array(arr.iter().map(|v| self.doc_value_to_json(v)).collect())
}
aegis_document::Value::Object(obj) => {
let map: serde_json::Map<String, serde_json::Value> = obj
.iter()
.map(|(k, v)| (k.clone(), self.doc_value_to_json(v)))
.collect();
serde_json::Value::Object(map)
}
}
}
fn items_to_csv(&self, items: &[ExportedItem]) -> String {
if items.is_empty() {
return "store_type,location,item_id,timestamp,data\n".to_string();
}
let mut csv = String::new();
let mut all_data_keys: Vec<String> = Vec::new();
for item in items {
if let serde_json::Value::Object(map) = &item.data {
for key in map.keys() {
if !all_data_keys.contains(key) {
all_data_keys.push(key.clone());
}
}
}
}
all_data_keys.sort();
let mut header_parts = vec![
"store_type".to_string(),
"location".to_string(),
"item_id".to_string(),
"timestamp".to_string(),
];
header_parts.extend(all_data_keys.iter().cloned());
csv.push_str(&header_parts.join(","));
csv.push('\n');
for item in items {
let mut row_parts = vec![
escape_csv_field(&item.store_type),
escape_csv_field(&item.location),
escape_csv_field(&item.item_id),
escape_csv_field(&item.timestamp.clone().unwrap_or_default()),
];
for key in &all_data_keys {
let value = if let serde_json::Value::Object(map) = &item.data {
map.get(key)
.map(json_value_to_csv_string)
.unwrap_or_default()
} else {
String::new()
};
row_parts.push(escape_csv_field(&value));
}
csv.push_str(&row_parts.join(","));
csv.push('\n');
}
csv
}
pub fn execute_export(
&self,
state: &AppState,
request: ExportRequest,
) -> Result<ExportResponse, String> {
let timestamp = Utc::now();
let export_id = format!(
"EXP-{}-{}",
timestamp.format("%Y%m%d%H%M%S"),
&request.subject_id.chars().take(8).collect::<String>()
);
self.audit_log.log_event(
DeletionEventType::ExportRequestReceived,
&request.subject_id,
"system",
serde_json::json!({
"export_id": export_id,
"format": request.format,
"scope": request.scope,
"date_range": request.date_range,
}),
);
let search_fields = if request.search_fields.is_empty() {
Self::default_search_fields()
} else {
request.search_fields.clone()
};
let mut all_exported = Vec::new();
let specific_collections = match &request.scope {
ExportScope::SpecificCollections { collections } => Some(collections.as_slice()),
_ => None,
};
let kv_exported = self.export_from_kv(
state,
&request.subject_id,
&search_fields,
request.date_range.as_ref(),
);
all_exported.extend(kv_exported);
let doc_exported = self.export_from_documents(
state,
&request.subject_id,
&search_fields,
specific_collections,
request.date_range.as_ref(),
);
all_exported.extend(doc_exported);
let sql_exported = self.export_from_sql(
state,
&request.subject_id,
&search_fields,
specific_collections,
request.date_range.as_ref(),
);
all_exported.extend(sql_exported);
let graph_exported = self.export_from_graph(state, &request.subject_id, &search_fields);
all_exported.extend(graph_exported);
let total_items = all_exported.len();
let data = match request.format {
ExportFormat::Json => {
let mut grouped: HashMap<String, Vec<&ExportedItem>> = HashMap::new();
for item in &all_exported {
grouped
.entry(item.store_type.clone())
.or_default()
.push(item);
}
let mut result = serde_json::Map::new();
result.insert(
"subject_id".to_string(),
serde_json::Value::String(request.subject_id.clone()),
);
result.insert(
"export_id".to_string(),
serde_json::Value::String(export_id.clone()),
);
result.insert(
"generated_at".to_string(),
serde_json::Value::String(timestamp.to_rfc3339()),
);
result.insert(
"total_items".to_string(),
serde_json::Value::Number(total_items.into()),
);
let mut data_section = serde_json::Map::new();
for (store_type, items) in grouped {
let items_json: Vec<serde_json::Value> = items
.iter()
.map(|item| {
serde_json::json!({
"location": item.location,
"item_id": item.item_id,
"data": item.data,
"timestamp": item.timestamp,
})
})
.collect();
data_section.insert(store_type, serde_json::Value::Array(items_json));
}
result.insert("data".to_string(), serde_json::Value::Object(data_section));
serde_json::Value::Object(result)
}
ExportFormat::Csv => {
let csv_content = self.items_to_csv(&all_exported);
serde_json::Value::String(csv_content)
}
};
self.audit_log.log_event(
DeletionEventType::ExportCompleted,
&request.subject_id,
"system",
serde_json::json!({
"export_id": export_id,
"total_items": total_items,
"format": request.format,
"timestamp": timestamp.to_rfc3339(),
}),
);
Ok(ExportResponse {
export_id,
subject_id: request.subject_id,
format: request.format,
generated_at: timestamp.to_rfc3339(),
total_items,
data,
scope: request.scope,
date_range: request.date_range,
})
}
}
impl Default for GdprService {
fn default() -> Self {
Self::new()
}
}
fn escape_csv_field(field: &str) -> String {
if field.contains(',') || field.contains('"') || field.contains('\n') || field.contains('\r') {
format!("\"{}\"", field.replace('"', "\"\""))
} else {
field.to_string()
}
}
fn json_value_to_csv_string(value: &serde_json::Value) -> String {
match value {
serde_json::Value::Null => String::new(),
serde_json::Value::Bool(b) => b.to_string(),
serde_json::Value::Number(n) => n.to_string(),
serde_json::Value::String(s) => s.clone(),
serde_json::Value::Array(_) | serde_json::Value::Object(_) => {
serde_json::to_string(value).unwrap_or_default()
}
}
}
pub async fn delete_data_subject(
State(state): State<AppState>,
Path(identifier): Path<String>,
Json(mut request): Json<DeletionRequest>,
) -> impl IntoResponse {
if request.subject_id.is_empty() {
request.subject_id = identifier;
}
state.activity.log_with_details(
ActivityType::Delete,
&format!("GDPR deletion request for subject: {}", request.subject_id),
None,
Some(&request.requestor),
Some("gdpr"),
Some(serde_json::json!({
"scope": request.scope,
"reason": request.reason,
})),
);
match state.gdpr.execute_deletion(&state, request) {
Ok(certificate) => {
state.activity.log_with_details(
ActivityType::System,
&format!("GDPR deletion completed. Certificate: {}", certificate.id),
None,
None,
Some("gdpr"),
Some(serde_json::json!({
"items_deleted": certificate.total_items,
"bytes_deleted": certificate.total_bytes,
})),
);
(
StatusCode::OK,
Json(DeletionResponse {
success: true,
certificate: Some(certificate),
message: "Data deletion completed successfully".to_string(),
error: None,
}),
)
}
Err(e) => {
state.activity.log_with_details(
ActivityType::System,
&format!("GDPR deletion failed: {}", e),
None,
None,
Some("gdpr"),
None,
);
(
StatusCode::INTERNAL_SERVER_ERROR,
Json(DeletionResponse {
success: false,
certificate: None,
message: "Data deletion failed".to_string(),
error: Some(e),
}),
)
}
}
}
pub async fn list_deletion_certificates(
State(state): State<AppState>,
) -> Json<ListCertificatesResponse> {
state
.activity
.log(ActivityType::Query, "List GDPR deletion certificates");
let certificates = state.gdpr.audit_log().list_certificates();
let total = certificates.len();
Json(ListCertificatesResponse {
certificates,
total,
})
}
pub async fn get_deletion_certificate(
State(state): State<AppState>,
Path(cert_id): Path<String>,
) -> impl IntoResponse {
state.activity.log(
ActivityType::Query,
&format!("Get GDPR certificate: {}", cert_id),
);
match state.gdpr.audit_log().get_certificate(&cert_id) {
Some(cert) => (StatusCode::OK, Json(Some(cert))),
None => (StatusCode::NOT_FOUND, Json(None)),
}
}
pub async fn verify_deletion_certificate(
State(state): State<AppState>,
Path(cert_id): Path<String>,
) -> Json<VerifyCertificateResponse> {
state.activity.log(
ActivityType::Query,
&format!("Verify GDPR certificate: {}", cert_id),
);
match state.gdpr.audit_log().get_certificate(&cert_id) {
Some(cert) => {
let valid = cert.verify();
Json(VerifyCertificateResponse {
valid,
certificate: Some(cert),
message: if valid {
"Certificate is valid and has not been tampered with".to_string()
} else {
"Certificate verification failed - hash mismatch".to_string()
},
})
}
None => Json(VerifyCertificateResponse {
valid: false,
certificate: None,
message: format!("Certificate '{}' not found", cert_id),
}),
}
}
pub async fn get_deletion_audit(
State(state): State<AppState>,
Path(subject_id): Path<String>,
) -> Json<Vec<DeletionAuditEntry>> {
state.activity.log(
ActivityType::Query,
&format!("Get GDPR audit for: {}", subject_id),
);
Json(state.gdpr.audit_log().get_entries_for_subject(&subject_id))
}
pub async fn verify_audit_integrity(State(state): State<AppState>) -> impl IntoResponse {
state
.activity
.log(ActivityType::Query, "Verify GDPR audit log integrity");
match state.gdpr.audit_log().verify_integrity() {
Ok(count) => (
StatusCode::OK,
Json(serde_json::json!({
"valid": true,
"entries_verified": count,
"message": "Audit log integrity verified successfully",
})),
),
Err(e) => (
StatusCode::OK,
Json(serde_json::json!({
"valid": false,
"entries_verified": 0,
"message": e,
})),
),
}
}
pub async fn export_data_subject(
State(state): State<AppState>,
Json(request): Json<ExportRequest>,
) -> impl IntoResponse {
if request.subject_id.is_empty() {
return (
StatusCode::BAD_REQUEST,
Json(serde_json::json!({
"success": false,
"error": "subject_id is required",
})),
);
}
state.activity.log_with_details(
ActivityType::Query,
&format!(
"GDPR data export request for subject: {}",
request.subject_id
),
None,
None,
Some("gdpr"),
Some(serde_json::json!({
"format": request.format,
"scope": request.scope,
"date_range": request.date_range,
})),
);
match state.gdpr.execute_export(&state, request) {
Ok(response) => {
state.activity.log_with_details(
ActivityType::System,
&format!(
"GDPR data export completed. Export ID: {}",
response.export_id
),
None,
None,
Some("gdpr"),
Some(serde_json::json!({
"export_id": response.export_id,
"total_items": response.total_items,
"format": response.format,
})),
);
(
StatusCode::OK,
Json(serde_json::json!({
"success": true,
"export_id": response.export_id,
"subject_id": response.subject_id,
"format": response.format,
"generated_at": response.generated_at,
"total_items": response.total_items,
"data": response.data,
"scope": response.scope,
"date_range": response.date_range,
})),
)
}
Err(e) => {
state.activity.log_with_details(
ActivityType::System,
&format!("GDPR data export failed: {}", e),
None,
None,
Some("gdpr"),
None,
);
(
StatusCode::INTERNAL_SERVER_ERROR,
Json(serde_json::json!({
"success": false,
"error": e,
})),
)
}
}
}
fn hex_encode(bytes: &[u8]) -> String {
const HEX_CHARS: &[u8] = b"0123456789abcdef";
let mut result = String::with_capacity(bytes.len() * 2);
for &byte in bytes {
result.push(HEX_CHARS[(byte >> 4) as usize] as char);
result.push(HEX_CHARS[(byte & 0x0f) as usize] as char);
}
result
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_deletion_scope_default() {
let scope = DeletionScope::default();
assert_eq!(scope, DeletionScope::All);
}
#[test]
fn test_deletion_certificate_hash() {
let items = vec![DeletedItem {
store_type: "kv".to_string(),
location: "kv_store".to_string(),
item_id: "user:123".to_string(),
size_bytes: Some(256),
deleted_at: "2025-01-26T12:00:00Z".to_string(),
}];
let cert = DeletionCertificate::new(
"user@example.com".to_string(),
items,
DeletionScope::All,
"admin".to_string(),
"GDPR request".to_string(),
"node-1".to_string(),
false,
);
assert!(!cert.verification_hash.is_empty());
assert!(cert.verify());
}
#[test]
fn test_deletion_certificate_tamper_detection() {
let items = vec![DeletedItem {
store_type: "kv".to_string(),
location: "kv_store".to_string(),
item_id: "user:123".to_string(),
size_bytes: Some(256),
deleted_at: "2025-01-26T12:00:00Z".to_string(),
}];
let mut cert = DeletionCertificate::new(
"user@example.com".to_string(),
items,
DeletionScope::All,
"admin".to_string(),
"GDPR request".to_string(),
"node-1".to_string(),
false,
);
cert.total_items = 999;
assert!(!cert.verify());
}
#[test]
fn test_audit_log_integrity() {
let audit_log = DeletionAuditLog::new();
audit_log.log_event(
DeletionEventType::RequestReceived,
"user@example.com",
"admin",
serde_json::json!({"test": true}),
);
audit_log.log_event(
DeletionEventType::DeletionStarted,
"user@example.com",
"admin",
serde_json::json!({}),
);
audit_log.log_event(
DeletionEventType::DeletionCompleted,
"user@example.com",
"admin",
serde_json::json!({"items": 5}),
);
let result = audit_log.verify_integrity();
assert!(result.is_ok());
assert_eq!(result.unwrap(), 3);
}
#[test]
fn test_audit_log_get_entries_for_subject() {
let audit_log = DeletionAuditLog::new();
audit_log.log_event(
DeletionEventType::RequestReceived,
"user1@example.com",
"admin",
serde_json::json!({}),
);
audit_log.log_event(
DeletionEventType::RequestReceived,
"user2@example.com",
"admin",
serde_json::json!({}),
);
audit_log.log_event(
DeletionEventType::DeletionCompleted,
"user1@example.com",
"admin",
serde_json::json!({}),
);
let user1_entries = audit_log.get_entries_for_subject("user1@example.com");
assert_eq!(user1_entries.len(), 2);
let user2_entries = audit_log.get_entries_for_subject("user2@example.com");
assert_eq!(user2_entries.len(), 1);
}
#[test]
fn test_gdpr_service_default_search_fields() {
let fields = GdprService::default_search_fields();
assert!(fields.contains(&"email".to_string()));
assert!(fields.contains(&"user_id".to_string()));
assert!(fields.contains(&"customer_id".to_string()));
}
#[test]
fn test_hex_encode() {
let data = [0x00, 0x01, 0x0a, 0xff, 0xab];
let hex = hex_encode(&data);
assert_eq!(hex, "00010affab");
}
#[test]
fn test_deletion_scope_serialization() {
let scope = DeletionScope::SpecificCollections {
collections: vec!["users".to_string(), "orders".to_string()],
};
let json = serde_json::to_string(&scope).unwrap();
assert!(json.contains("specific_collections"));
assert!(json.contains("users"));
let deserialized: DeletionScope = serde_json::from_str(&json).unwrap();
match deserialized {
DeletionScope::SpecificCollections { collections } => {
assert_eq!(collections.len(), 2);
assert!(collections.contains(&"users".to_string()));
}
_ => panic!("Wrong variant deserialized"),
}
}
#[test]
fn test_export_format_default() {
let format = ExportFormat::default();
assert_eq!(format, ExportFormat::Json);
}
#[test]
fn test_export_format_serialization() {
let format = ExportFormat::Json;
let json = serde_json::to_string(&format).unwrap();
assert_eq!(json, "\"json\"");
let deserialized: ExportFormat = serde_json::from_str(&json).unwrap();
assert_eq!(deserialized, ExportFormat::Json);
let format = ExportFormat::Csv;
let json = serde_json::to_string(&format).unwrap();
assert_eq!(json, "\"csv\"");
let deserialized: ExportFormat = serde_json::from_str(&json).unwrap();
assert_eq!(deserialized, ExportFormat::Csv);
}
#[test]
fn test_export_scope_default() {
let scope = ExportScope::default();
match scope {
ExportScope::All => {} _ => panic!("Expected ExportScope::All as default"),
}
}
#[test]
fn test_export_scope_serialization() {
let scope = ExportScope::SpecificCollections {
collections: vec!["users".to_string(), "orders".to_string()],
};
let json = serde_json::to_string(&scope).unwrap();
assert!(json.contains("specific_collections"));
assert!(json.contains("users"));
let deserialized: ExportScope = serde_json::from_str(&json).unwrap();
match deserialized {
ExportScope::SpecificCollections { collections } => {
assert_eq!(collections.len(), 2);
assert!(collections.contains(&"users".to_string()));
}
_ => panic!("Wrong variant deserialized"),
}
}
#[test]
fn test_export_request_deserialization() {
let json = r#"{"subject_id": "user@example.com"}"#;
let request: ExportRequest = serde_json::from_str(json).unwrap();
assert_eq!(request.subject_id, "user@example.com");
assert_eq!(request.format, ExportFormat::Json);
assert!(request.search_fields.is_empty());
let json = r#"{
"subject_id": "user@example.com",
"format": "csv",
"scope": {"type": "specific_collections", "collections": ["users"]},
"date_range": {"start": "2024-01-01T00:00:00Z", "end": "2024-12-31T23:59:59Z"},
"search_fields": ["email", "user_id"]
}"#;
let request: ExportRequest = serde_json::from_str(json).unwrap();
assert_eq!(request.subject_id, "user@example.com");
assert_eq!(request.format, ExportFormat::Csv);
assert_eq!(request.search_fields, vec!["email", "user_id"]);
assert!(request.date_range.is_some());
}
#[test]
fn test_date_range() {
let range = DateRange {
start: Some("2024-01-01T00:00:00Z".to_string()),
end: Some("2024-12-31T23:59:59Z".to_string()),
};
let json = serde_json::to_string(&range).unwrap();
assert!(json.contains("2024-01-01"));
assert!(json.contains("2024-12-31"));
}
#[test]
fn test_escape_csv_field() {
assert_eq!(escape_csv_field("hello"), "hello");
assert_eq!(escape_csv_field("hello,world"), "\"hello,world\"");
assert_eq!(escape_csv_field("hello\"world"), "\"hello\"\"world\"");
assert_eq!(escape_csv_field("hello\nworld"), "\"hello\nworld\"");
}
#[test]
fn test_json_value_to_csv_string() {
assert_eq!(json_value_to_csv_string(&serde_json::Value::Null), "");
assert_eq!(
json_value_to_csv_string(&serde_json::Value::Bool(true)),
"true"
);
assert_eq!(json_value_to_csv_string(&serde_json::json!(42)), "42");
assert_eq!(
json_value_to_csv_string(&serde_json::json!("hello")),
"hello"
);
assert_eq!(
json_value_to_csv_string(&serde_json::json!([1, 2, 3])),
"[1,2,3]"
);
}
#[test]
fn test_exported_item_structure() {
let item = ExportedItem {
store_type: "kv".to_string(),
location: "kv_store".to_string(),
item_id: "user:123".to_string(),
data: serde_json::json!({"key": "user:123", "value": {"name": "John"}}),
timestamp: Some("2024-01-26T12:00:00Z".to_string()),
};
let json = serde_json::to_string(&item).unwrap();
assert!(json.contains("kv_store"));
assert!(json.contains("user:123"));
assert!(json.contains("John"));
}
#[test]
fn test_export_event_types() {
let event = DeletionEventType::ExportRequestReceived;
let json = serde_json::to_string(&event).unwrap();
assert_eq!(json, "\"export_request_received\"");
let event = DeletionEventType::ExportCompleted;
let json = serde_json::to_string(&event).unwrap();
assert_eq!(json, "\"export_completed\"");
let event = DeletionEventType::ExportFailed;
let json = serde_json::to_string(&event).unwrap();
assert_eq!(json, "\"export_failed\"");
}
}