use crate::error::Result;
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use std::sync::Arc;
use tokio::sync::broadcast;
use uuid::Uuid;
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
#[serde(rename_all = "snake_case")]
pub enum ChangeType {
MockCreated,
MockUpdated,
MockDeleted,
WorkspaceUpdated,
MemberAdded,
MemberRemoved,
RoleChanged,
SnapshotCreated,
CursorMoved,
UserJoined,
UserLeft,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ChangeEvent {
pub id: Uuid,
pub workspace_id: Uuid,
pub change_type: ChangeType,
pub user_id: Uuid,
pub resource_id: Option<Uuid>,
pub payload: serde_json::Value,
pub timestamp: DateTime<Utc>,
}
impl ChangeEvent {
#[must_use]
pub fn new(
workspace_id: Uuid,
change_type: ChangeType,
user_id: Uuid,
resource_id: Option<Uuid>,
payload: serde_json::Value,
) -> Self {
Self {
id: Uuid::new_v4(),
workspace_id,
change_type,
user_id,
resource_id,
payload,
timestamp: Utc::now(),
}
}
}
#[async_trait::async_trait]
pub trait EventListener: Send + Sync {
async fn on_event(&self, event: ChangeEvent) -> Result<()>;
}
pub struct EventBus {
sender: broadcast::Sender<ChangeEvent>,
}
impl EventBus {
#[must_use]
pub fn new(capacity: usize) -> Self {
let (sender, _) = broadcast::channel(capacity);
Self { sender }
}
pub fn publish(&self, event: ChangeEvent) -> Result<()> {
let _ = self.sender.send(event);
Ok(())
}
#[must_use]
pub fn subscribe(&self) -> broadcast::Receiver<ChangeEvent> {
self.sender.subscribe()
}
#[must_use]
pub fn subscriber_count(&self) -> usize {
self.sender.receiver_count()
}
}
pub struct WorkspaceEventBus {
event_bus: Arc<EventBus>,
workspace_id: Uuid,
}
impl WorkspaceEventBus {
#[must_use]
pub const fn new(event_bus: Arc<EventBus>, workspace_id: Uuid) -> Self {
Self {
event_bus,
workspace_id,
}
}
pub fn publish(
&self,
change_type: ChangeType,
user_id: Uuid,
resource_id: Option<Uuid>,
payload: serde_json::Value,
) -> Result<()> {
let event = ChangeEvent::new(self.workspace_id, change_type, user_id, resource_id, payload);
self.event_bus.publish(event)
}
#[must_use]
pub fn subscribe(&self) -> broadcast::Receiver<ChangeEvent> {
self.event_bus.subscribe()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_change_event_creation() {
let workspace_id = Uuid::new_v4();
let user_id = Uuid::new_v4();
let event = ChangeEvent::new(
workspace_id,
ChangeType::MockCreated,
user_id,
None,
serde_json::json!({"mock_id": "123"}),
);
assert_eq!(event.workspace_id, workspace_id);
assert_eq!(event.change_type, ChangeType::MockCreated);
assert_eq!(event.user_id, user_id);
}
#[test]
fn test_event_bus() {
let bus = EventBus::new(100);
assert_eq!(bus.subscriber_count(), 0);
let _rx1 = bus.subscribe();
assert_eq!(bus.subscriber_count(), 1);
let _rx2 = bus.subscribe();
assert_eq!(bus.subscriber_count(), 2);
}
#[tokio::test]
async fn test_event_publishing() {
let bus = EventBus::new(100);
let mut rx = bus.subscribe();
let workspace_id = Uuid::new_v4();
let user_id = Uuid::new_v4();
let event = ChangeEvent::new(
workspace_id,
ChangeType::MockCreated,
user_id,
None,
serde_json::json!({}),
);
bus.publish(event.clone()).unwrap();
let received = rx.recv().await.unwrap();
assert_eq!(received.workspace_id, workspace_id);
assert_eq!(received.change_type, ChangeType::MockCreated);
}
}