use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::{broadcast, RwLock};
use uuid::Uuid;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub enum ChannelType {
Tell, Channel, Mission, Constraint, }
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "type")]
pub enum EventPayload {
Tell { message: String, urgency: Urgency },
StateChange { field: String, old: String, new: String },
SkillInvocation { skill: String, params: serde_json::Value },
ConstraintViolation { constraint: String, attempted_action: String },
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub enum Urgency {
Low,
Normal,
High,
Critical,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PlatoEvent {
pub id: Uuid,
pub timestamp: DateTime<Utc>,
pub sender: String, pub recipient: String, pub channel: ChannelType,
pub payload: EventPayload,
pub context: EventContext,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct EventContext {
pub repo: String,
pub branch: Option<String>,
pub room: Option<String>,
}
pub struct DeadLetterQueue {
events: Arc<RwLock<Vec<PlatoEvent>>>,
}
impl DeadLetterQueue {
pub fn new() -> Self {
Self {
events: Arc::new(RwLock::new(Vec::new())),
}
}
pub async fn push(&self, event: PlatoEvent) {
let mut events = self.events.write().await;
events.push(event);
}
pub async fn get_for(&self, recipient: &str) -> Vec<PlatoEvent> {
let events = self.events.read().await;
events.iter()
.filter(|e| e.recipient == recipient)
.cloned()
.collect()
}
pub async fn clear_for(&self, recipient: &str) {
let mut events = self.events.write().await;
events.retain(|e| e.recipient != recipient);
}
}
pub struct EventBus {
subscribers: Arc<RwLock<HashMap<String, broadcast::Sender<PlatoEvent>>>>,
dlq: Arc<DeadLetterQueue>,
}
impl EventBus {
pub fn new() -> Self {
Self {
subscribers: Arc::new(RwLock::new(HashMap::new())),
dlq: Arc::new(DeadLetterQueue::new()),
}
}
pub async fn subscribe(&self, identity: &str, _room: &str) -> broadcast::Receiver<PlatoEvent> {
let mut subscribers = self.subscribers.write().await;
let (sender, receiver) = broadcast::channel(100);
subscribers.insert(identity.to_string(), sender);
receiver
}
pub async fn publish(&self, event: PlatoEvent) {
let subscribers = self.subscribers.read().await;
if let Some(sender) = subscribers.get(&event.recipient) {
if sender.send(event.clone()).is_err() {
self.dlq.push(event).await;
}
return;
}
for (identity, sender) in subscribers.iter() {
if identity.starts_with("@") || identity == "broadcast" {
let _ = sender.send(event.clone());
}
}
}
pub async fn get_dlq_for(&self, recipient: &str) -> Vec<PlatoEvent> {
self.dlq.get_for(recipient).await
}
pub async fn clear_dlq_for(&self, recipient: &str) {
self.dlq.clear_for(recipient).await;
}
}
impl Default for EventBus {
fn default() -> Self {
Self::new()
}
}
impl Default for DeadLetterQueue {
fn default() -> Self {
Self::new()
}
}