use crate::graphql::{
context::from_context,
types::*,
};
use async_graphql::{Context, Result, Subscription, ErrorExtensions};
use futures::{stream, Stream};
use chrono::Utc;
use std::sync::Arc;
use tokio::sync::broadcast;
use serde::{Serialize, Deserialize};
use async_stream;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DataChangeEventInternal {
pub id: String,
pub event_type: String, pub table_name: String,
pub database_name: String,
pub record_id: String,
pub old_data: Option<async_graphql::Json<serde_json::Value>>,
pub new_data: Option<async_graphql::Json<serde_json::Value>>,
pub timestamp: chrono::DateTime<chrono::Utc>,
pub user_id: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SecurityEventInternal {
pub event_type: String,
pub severity: String, pub user_id: Option<String>,
pub resource: String,
pub timestamp: chrono::DateTime<chrono::Utc>,
pub details: serde_json::Value,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct NetworkIO {
pub bytes_in: u64,
pub bytes_out: u64,
pub connections: u32,
}
pub struct EventBus {
data_changes: broadcast::Sender<DataChangeEventInternal>,
security_events: broadcast::Sender<SecurityEventInternal>,
performance_metrics: broadcast::Sender<PerformanceMetrics>,
}
impl EventBus {
pub fn new() -> Self {
let (data_tx, _) = broadcast::channel(1000);
let (security_tx, _) = broadcast::channel(1000);
let (perf_tx, _) = broadcast::channel(1000);
Self {
data_changes: data_tx,
security_events: security_tx,
performance_metrics: perf_tx,
}
}
pub fn publish_data_change(&self, event: DataChangeEventInternal) -> Result<(), broadcast::error::SendError<DataChangeEventInternal>> {
self.data_changes.send(event).map(|_| ())
}
pub fn publish_security_event(&self, event: SecurityEventInternal) -> Result<(), broadcast::error::SendError<SecurityEventInternal>> {
self.security_events.send(event).map(|_| ())
}
pub fn publish_performance_metrics(&self, metrics: PerformanceMetrics) -> Result<(), broadcast::error::SendError<PerformanceMetrics>> {
self.performance_metrics.send(metrics).map(|_| ())
}
pub fn subscribe_data_changes(&self) -> broadcast::Receiver<DataChangeEventInternal> {
self.data_changes.subscribe()
}
pub fn subscribe_security_events(&self) -> broadcast::Receiver<SecurityEventInternal> {
self.security_events.subscribe()
}
pub fn subscribe_performance_metrics(&self) -> broadcast::Receiver<PerformanceMetrics> {
self.performance_metrics.subscribe()
}
}
impl Default for EventBus {
fn default() -> Self {
Self::new()
}
}
pub struct Subscription;
#[Subscription]
impl Subscription {
async fn data_changes(
&self,
ctx: &Context<'_>,
database: String,
table: String,
#[graphql(desc = "Filter by operation type")] operation: Option<DataChangeEventType>,
#[graphql(desc = "Filter by user ID")] user_id: Option<String>
) -> Result<impl Stream<Item = DataChangeEvent>> {
let graphql_ctx = from_context(ctx)?;
if !self::check_table_access(&graphql_ctx, &database, &table)? {
return Err(async_graphql::Error::new("Access denied to table"));
}
let event_bus = ctx.data::<Arc<EventBus>>()
.map_err(|_| async_graphql::Error::new("Event bus not available"))?;
let mut receiver = event_bus.subscribe_data_changes();
let filtered_stream = async_stream::stream! {
while let Ok(event) = receiver.recv().await {
let graphql_event = DataChangeEvent {
id: event.id,
event_type: match event.event_type.as_str() {
"Inserted" => DataChangeEventType::Inserted,
"Updated" => DataChangeEventType::Updated,
"Deleted" => DataChangeEventType::Deleted,
_ => DataChangeEventType::Inserted, },
table_name: event.table_name,
database_name: event.database_name,
record_id: event.record_id,
old_data: event.old_data,
new_data: event.new_data,
timestamp: event.timestamp,
user_id: event.user_id,
};
if graphql_event.database_name == database && graphql_event.table_name == table {
if let Some(ref op_filter) = operation {
if graphql_event.event_type != *op_filter {
continue;
}
}
if let Some(ref user_filter) = user_id {
if graphql_event.user_id.as_ref() != Some(user_filter) {
continue;
}
}
yield graphql_event;
}
}
};
Ok(filtered_stream)
}
async fn security_events(
&self,
ctx: &Context<'_>,
#[graphql(desc = "Filter by severity")] severity: Option<SecuritySeverity>,
#[graphql(desc = "Filter by event type")] event_type: Option<String>
) -> Result<impl Stream<Item = SecurityEvent>> {
let graphql_ctx = from_context(ctx)?;
if !self::check_security_permissions(&graphql_ctx)? {
return Err(async_graphql::Error::new("Insufficient permissions for security monitoring"));
}
let event_bus = ctx.data::<Arc<EventBus>>()
.map_err(|_| async_graphql::Error::new("Event bus not available"))?;
let mut receiver = event_bus.subscribe_security_events();
let filtered_stream = async_stream::stream! {
while let Ok(event) = receiver.recv().await {
let graphql_event = SecurityEvent {
event_type: event.event_type,
severity: match event.severity.as_str() {
"Low" => SecuritySeverity::Low,
"Medium" => SecuritySeverity::Medium,
"High" => SecuritySeverity::High,
"Critical" => SecuritySeverity::Critical,
_ => SecuritySeverity::Low, },
user_id: event.user_id,
resource: event.resource,
timestamp: event.timestamp,
details: event.details,
};
if let Some(ref severity_filter) = severity {
if graphql_event.severity != *severity_filter {
continue;
}
}
if let Some(ref type_filter) = event_type {
if graphql_event.event_type != *type_filter {
continue;
}
}
yield graphql_event;
}
};
Ok(filtered_stream)
}
async fn performance_metrics(
&self,
ctx: &Context<'_>,
#[graphql(desc = "Update interval in seconds")] interval: Option<i32>
) -> Result<impl Stream<Item = PerformanceMetrics>> {
let graphql_ctx = from_context(ctx)?;
if !self::check_monitoring_permissions(&graphql_ctx)? {
return Err(async_graphql::Error::new("Insufficient permissions for system monitoring"));
}
let update_interval = interval.unwrap_or(5);
let stream = async_stream::stream! {
let mut interval_timer = tokio::time::interval(std::time::Duration::from_secs(update_interval as u64));
loop {
interval_timer.tick().await;
yield PerformanceMetrics {
timestamp: Utc::now(),
cpu_usage_percent: 45.2,
memory_usage_percent: 67.8,
disk_usage_percent: 23.4,
network_io_bytes_per_second: (1024 * 1024) as i64,
database_connections: 15,
active_queries: 8,
cache_hit_rate: 85.5,
average_response_time_ms: 25.3,
requests_per_second: 125.7,
};
}
};
Ok(stream)
}
async fn health_events(&self, ctx: &Context<'_>) -> Result<impl Stream<Item = HealthEvent>> {
let graphql_ctx = from_context(ctx)?;
graphql_ctx.require_role("admin")?;
let stream = stream::iter(vec![
HealthEvent {
id: "550e8400-e29b-41d4-a716-446655440010".to_string(),
event_type: HealthEventType::ServiceHealthy,
service_name: "database".to_string(),
status: "healthy".to_string(),
message: Some("Database service is running normally".to_string()),
details: std::collections::HashMap::from([
("response_time_ms".to_string(), "5".to_string()),
("connections".to_string(), "10".to_string()),
]),
timestamp: Utc::now(),
},
HealthEvent {
id: "550e8400-e29b-41d4-a716-446655440011".to_string(),
event_type: HealthEventType::ServiceWarning,
service_name: "cache".to_string(),
status: "warning".to_string(),
message: Some("Cache memory usage above 80%".to_string()),
details: std::collections::HashMap::from([
("memory_usage_percent".to_string(), "85".to_string()),
("cache_size_mb".to_string(), "512".to_string()),
]),
timestamp: Utc::now(),
},
]);
Ok(stream)
}
async fn key_rotation_events(&self, ctx: &Context<'_>, database: Option<String>) -> Result<impl Stream<Item = KeyRotationEvent>> {
let graphql_ctx = from_context(ctx)?;
graphql_ctx.require_role("admin")?;
let db_name = database.as_ref().map(|s| s.as_str()).unwrap_or("example_db");
let stream = stream::iter(vec![
KeyRotationEvent {
id: "550e8400-e29b-41d4-a716-446655440012".to_string(),
rotation_id: "550e8400-e29b-41d4-a716-446655440013".to_string(),
event_type: KeyRotationEventType::RotationStarted,
database_name: db_name.to_string(),
table_name: "users".to_string(),
algorithm: Some(EncryptionAlgorithm::Aegis256),
progress_percentage: 0.0,
records_processed: 0,
total_records: 100,
message: Some("Key rotation started".to_string()),
timestamp: Utc::now(),
},
KeyRotationEvent {
id: "550e8400-e29b-41d4-a716-446655440014".to_string(),
rotation_id: "550e8400-e29b-41d4-a716-446655440013".to_string(),
event_type: KeyRotationEventType::RotationProgress,
database_name: db_name.to_string(),
table_name: "users".to_string(),
algorithm: Some(EncryptionAlgorithm::Aegis256),
progress_percentage: 50.0,
records_processed: 50,
total_records: 100,
message: Some("Key rotation 50% complete".to_string()),
timestamp: Utc::now(),
},
KeyRotationEvent {
id: "550e8400-e29b-41d4-a716-446655440015".to_string(),
rotation_id: "550e8400-e29b-41d4-a716-446655440013".to_string(),
event_type: KeyRotationEventType::RotationCompleted,
database_name: db_name.to_string(),
table_name: "users".to_string(),
algorithm: Some(EncryptionAlgorithm::Aegis256),
progress_percentage: 100.0,
records_processed: 100,
total_records: 100,
message: Some("Key rotation completed successfully".to_string()),
timestamp: Utc::now(),
},
]);
Ok(stream)
}
async fn audit_events(&self, ctx: &Context<'_>, user_id: Option<String>) -> Result<impl Stream<Item = AuditEvent>> {
let graphql_ctx = from_context(ctx)?;
if let Some(current_user) = &graphql_ctx.user {
if current_user.roles.contains(&"admin".to_string()) ||
user_id.as_ref() == Some(¤t_user.id) {
} else {
return Err(async_graphql::Error::new("Insufficient permissions for audit access")
.extend_with(|_, e| e.set("code", "INSUFFICIENT_PERMISSIONS")));
}
} else {
return Err(async_graphql::Error::new("Authentication required")
.extend_with(|_, e| e.set("code", "AUTH_REQUIRED")));
}
let user_name = user_id.as_ref().map(|s| s.as_str()).unwrap_or("current_user");
let stream = stream::iter(vec![
AuditEvent {
id: "550e8400-e29b-41d4-a716-446655440016".to_string(),
event_type: AuditEventType::UserLogin,
user_id: user_name.to_string(),
action: "login".to_string(),
resource: Some("auth".to_string()),
details: std::collections::HashMap::from([
("ip_address".to_string(), "192.168.1.100".to_string()),
("user_agent".to_string(), "Mozilla/5.0".to_string()),
]),
timestamp: Utc::now(),
success: true,
error_message: None,
},
AuditEvent {
id: "550e8400-e29b-41d4-a716-446655440017".to_string(),
event_type: AuditEventType::DataAccess,
user_id: user_name.to_string(),
action: "select".to_string(),
resource: Some("example_db.users".to_string()),
details: std::collections::HashMap::from([
("query".to_string(), "SELECT * FROM users".to_string()),
("records_returned".to_string(), "10".to_string()),
]),
timestamp: Utc::now(),
success: true,
error_message: None,
},
]);
Ok(stream)
}
async fn system_performance(&self, ctx: &Context<'_>) -> Result<impl Stream<Item = PerformanceMetrics>> {
let graphql_ctx = from_context(ctx)?;
graphql_ctx.require_any_role(&["admin", "monitoring"])?;
let stream = stream::iter(vec![
PerformanceMetrics {
timestamp: Utc::now(),
cpu_usage_percent: 25.5,
memory_usage_percent: 45.2,
disk_usage_percent: 30.8,
network_io_bytes_per_second: (1024 * 1024) as i64, database_connections: 15,
active_queries: 3,
cache_hit_rate: 85.5,
average_response_time_ms: 25.3,
requests_per_second: 125.7,
},
PerformanceMetrics {
timestamp: Utc::now(),
cpu_usage_percent: 28.1,
memory_usage_percent: 47.8,
disk_usage_percent: 31.2,
network_io_bytes_per_second: (2 * 1024 * 1024) as i64, database_connections: 18,
active_queries: 5,
cache_hit_rate: 87.2,
average_response_time_ms: 28.9,
requests_per_second: 142.3,
},
]);
Ok(stream)
}
}
fn check_table_access(
_graphql_ctx: &crate::graphql::context::GraphQLContext,
_database: &str,
_table: &str,
) -> Result<bool, async_graphql::Error> {
Ok(true)
}
fn check_security_permissions(
graphql_ctx: &crate::graphql::context::GraphQLContext,
) -> Result<bool, async_graphql::Error> {
if let Some(user) = &graphql_ctx.user {
Ok(user.roles.contains(&"admin".to_string()) ||
user.roles.contains(&"security".to_string()))
} else {
Ok(false)
}
}
fn check_monitoring_permissions(
graphql_ctx: &crate::graphql::context::GraphQLContext,
) -> Result<bool, async_graphql::Error> {
if let Some(user) = &graphql_ctx.user {
Ok(user.roles.contains(&"admin".to_string()) ||
user.roles.contains(&"monitoring".to_string()))
} else {
Ok(false)
}
}
#[derive(async_graphql::Enum, Clone, Debug, Copy, PartialEq, Eq)]
pub enum DataChangeEventType {
Inserted,
Updated,
Deleted,
}
#[derive(async_graphql::Enum, Clone, Debug, Copy, PartialEq, Eq)]
pub enum SecuritySeverity {
Low,
Medium,
High,
Critical,
}
#[derive(async_graphql::Enum, Clone, Debug, Copy, PartialEq, Eq)]
pub enum DatabaseEventType {
DatabaseCreated,
DatabaseDeleted,
TableCreated,
TableDropped,
SchemaModified,
}
#[derive(async_graphql::Enum, Clone, Debug, Copy, PartialEq, Eq)]
pub enum HealthEventType {
ServiceHealthy,
ServiceWarning,
ServiceUnhealthy,
ServiceRecovered,
}
#[derive(async_graphql::Enum, Clone, Debug, Copy, PartialEq, Eq)]
pub enum KeyRotationEventType {
RotationStarted,
RotationProgress,
RotationCompleted,
RotationFailed,
}
#[derive(async_graphql::Enum, Clone, Debug, Copy, PartialEq, Eq)]
pub enum AuditEventType {
UserLogin,
UserLogout,
DataAccess,
DataModification,
ConfigurationChange,
SecurityEvent,
}
#[derive(async_graphql::SimpleObject, Clone, Debug)]
pub struct DataChangeEvent {
pub id: String,
pub event_type: DataChangeEventType,
pub table_name: String,
pub database_name: String,
pub record_id: String,
pub old_data: Option<async_graphql::Json<serde_json::Value>>,
pub new_data: Option<async_graphql::Json<serde_json::Value>>,
pub timestamp: chrono::DateTime<chrono::Utc>,
pub user_id: Option<String>,
}
#[derive(async_graphql::SimpleObject, Clone, Debug)]
pub struct SecurityEvent {
pub event_type: String,
pub severity: SecuritySeverity,
pub user_id: Option<String>,
pub resource: String,
pub timestamp: chrono::DateTime<chrono::Utc>,
pub details: serde_json::Value,
}
#[derive(async_graphql::SimpleObject, Clone, Debug)]
pub struct HealthEvent {
pub id: String,
pub event_type: HealthEventType,
pub service_name: String,
pub status: String,
pub message: Option<String>,
pub details: std::collections::HashMap<String, String>,
pub timestamp: chrono::DateTime<chrono::Utc>,
}
#[derive(async_graphql::SimpleObject, Clone, Debug)]
pub struct KeyRotationEvent {
pub id: String,
pub rotation_id: String,
pub event_type: KeyRotationEventType,
pub database_name: String,
pub table_name: String,
pub algorithm: Option<EncryptionAlgorithm>,
pub progress_percentage: f64,
pub records_processed: i32,
pub total_records: i32,
pub message: Option<String>,
pub timestamp: chrono::DateTime<chrono::Utc>,
}
#[derive(async_graphql::SimpleObject, Clone, Debug)]
pub struct AuditEvent {
pub id: String,
pub event_type: AuditEventType,
pub user_id: String,
pub action: String,
pub resource: Option<String>,
pub details: std::collections::HashMap<String, String>,
pub timestamp: chrono::DateTime<chrono::Utc>,
pub success: bool,
pub error_message: Option<String>,
}
#[derive(async_graphql::SimpleObject, Clone, Debug)]
pub struct PerformanceMetrics {
pub timestamp: chrono::DateTime<chrono::Utc>,
pub cpu_usage_percent: f64,
pub memory_usage_percent: f64,
pub disk_usage_percent: f64,
pub network_io_bytes_per_second: i64,
pub database_connections: i32,
pub active_queries: i32,
pub cache_hit_rate: f64,
pub average_response_time_ms: f64,
pub requests_per_second: f64,
}