use std::sync::Arc;
use fraiseql_core::runtime::subscription::{
SubscriptionEvent, SubscriptionManager, SubscriptionOperation,
};
use tokio::sync::mpsc;
use tracing::{debug, info};
#[derive(Debug, Clone, Copy)]
pub struct EventBridgeConfig {
pub channel_capacity: usize,
}
impl EventBridgeConfig {
#[must_use]
pub const fn new() -> Self {
Self {
channel_capacity: 100,
}
}
#[must_use]
pub const fn with_channel_capacity(mut self, capacity: usize) -> Self {
self.channel_capacity = capacity;
self
}
}
impl Default for EventBridgeConfig {
fn default() -> Self {
Self::new()
}
}
#[derive(Debug, Clone)]
pub struct EntityEvent {
pub entity_type: String,
pub entity_id: String,
pub operation: String,
pub data: serde_json::Value,
pub old_data: Option<serde_json::Value>,
pub tenant_id: Option<String>,
}
impl EntityEvent {
#[must_use]
pub fn new(
entity_type: impl Into<String>,
entity_id: impl Into<String>,
operation: impl Into<String>,
data: serde_json::Value,
) -> Self {
Self {
entity_type: entity_type.into(),
entity_id: entity_id.into(),
operation: operation.into(),
data,
old_data: None,
tenant_id: None,
}
}
#[must_use]
pub fn with_old_data(mut self, old_data: serde_json::Value) -> Self {
self.old_data = Some(old_data);
self
}
#[must_use]
pub fn with_tenant_id(mut self, tenant_id: impl Into<String>) -> Self {
self.tenant_id = Some(tenant_id.into());
self
}
}
pub struct EventBridge {
manager: Arc<SubscriptionManager>,
receiver: mpsc::Receiver<EntityEvent>,
sender: mpsc::Sender<EntityEvent>,
}
impl EventBridge {
#[must_use]
pub fn new(manager: Arc<SubscriptionManager>, config: EventBridgeConfig) -> Self {
let (sender, receiver) = mpsc::channel(config.channel_capacity);
Self {
manager,
receiver,
sender,
}
}
#[must_use]
pub fn sender(&self) -> mpsc::Sender<EntityEvent> {
self.sender.clone()
}
pub fn convert_event(entity_event: EntityEvent) -> SubscriptionEvent {
let operation = match entity_event.operation.to_uppercase().as_str() {
"INSERT" => SubscriptionOperation::Create,
"UPDATE" => SubscriptionOperation::Update,
"DELETE" => SubscriptionOperation::Delete,
_ => {
debug!("Unknown operation: {}, defaulting to Create", entity_event.operation);
SubscriptionOperation::Create
},
};
let mut event = SubscriptionEvent::new(
entity_event.entity_type,
entity_event.entity_id,
operation,
entity_event.data,
);
if let Some(old_data) = entity_event.old_data {
event = event.with_old_data(old_data);
}
if let Some(tenant_id) = entity_event.tenant_id {
event = event.with_tenant_id(tenant_id);
}
event
}
#[allow(clippy::cognitive_complexity)] pub async fn run(mut self) {
info!("EventBridge started");
while let Some(entity_event) = self.receiver.recv().await {
debug!("EventBridge received entity event: {}", entity_event.entity_type);
let subscription_event = Self::convert_event(entity_event);
let matched = self.manager.publish_event(subscription_event);
if matched > 0 {
debug!("EventBridge matched {} subscriptions", matched);
}
}
info!("EventBridge stopped");
}
#[must_use = "dropping the JoinHandle detaches the task; store or abort it to control lifecycle"]
pub fn spawn(self) -> tokio::task::JoinHandle<()> {
tokio::spawn(self.run())
}
#[must_use]
pub fn get_sender(&self) -> mpsc::Sender<EntityEvent> {
self.sender.clone()
}
#[must_use]
pub fn manager(&self) -> Arc<SubscriptionManager> {
Arc::clone(&self.manager)
}
}