use std::collections::HashMap;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use async_trait::async_trait;
use chrono::{DateTime, Utc};
use dashmap::DashMap;
use tokio::sync::RwLock;
use tokio::sync::broadcast;
use tracing::{info, warn};
use punch_types::{PunchError, PunchResult};
use crate::{ChannelAdapter, ChannelPlatform, ChannelStatus, IncomingMessage};
const SESSION_CHANNEL_CAPACITY: usize = 256;
#[derive(Debug, Clone)]
pub struct WebChatSession {
pub session_id: String,
pub display_name: String,
pub connected_at: DateTime<Utc>,
pub active: bool,
}
pub struct WebChatAdapter {
sessions: Arc<DashMap<String, WebChatSession>>,
senders: Arc<DashMap<String, broadcast::Sender<String>>>,
running: AtomicBool,
started_at: RwLock<Option<DateTime<Utc>>>,
messages_received: AtomicU64,
messages_sent: AtomicU64,
}
impl WebChatAdapter {
pub fn new() -> Self {
Self {
sessions: Arc::new(DashMap::new()),
senders: Arc::new(DashMap::new()),
running: AtomicBool::new(false),
started_at: RwLock::new(None),
messages_received: AtomicU64::new(0),
messages_sent: AtomicU64::new(0),
}
}
pub fn register_session(
&self,
session_id: String,
display_name: String,
) -> broadcast::Receiver<String> {
let (tx, rx) = broadcast::channel(SESSION_CHANNEL_CAPACITY);
let session = WebChatSession {
session_id: session_id.clone(),
display_name,
connected_at: Utc::now(),
active: true,
};
self.sessions.insert(session_id.clone(), session);
self.senders.insert(session_id, tx);
rx
}
pub fn remove_session(&self, session_id: &str) {
self.sessions.remove(session_id);
self.senders.remove(session_id);
}
pub fn active_sessions(&self) -> Vec<String> {
self.sessions
.iter()
.filter(|entry| entry.value().active)
.map(|entry| entry.key().clone())
.collect()
}
pub fn session_count(&self) -> usize {
self.sessions.len()
}
pub fn create_message_from_ws(
&self,
session_id: &str,
payload: &serde_json::Value,
) -> Option<IncomingMessage> {
let text = payload["text"].as_str()?;
if text.is_empty() {
return None;
}
let session = self.sessions.get(session_id)?;
let display_name = session.display_name.clone();
let message_id = payload["message_id"].as_str().unwrap_or("").to_string();
self.messages_received.fetch_add(1, Ordering::Relaxed);
Some(IncomingMessage {
channel_id: session_id.to_string(),
user_id: session_id.to_string(),
display_name,
text: text.to_string(),
timestamp: Utc::now(),
platform: ChannelPlatform::WebChat,
platform_message_id: message_id,
is_group: false,
metadata: HashMap::new(),
})
}
fn send_to_session(&self, session_id: &str, message: &str) -> PunchResult<()> {
let sender = self
.senders
.get(session_id)
.ok_or_else(|| PunchError::Channel {
channel: "webchat".to_string(),
message: format!("session {session_id} not found"),
})?;
sender.send(message.to_string()).map_err(|e| {
warn!("Failed to send to webchat session {session_id}: {e}");
PunchError::Channel {
channel: "webchat".to_string(),
message: format!("broadcast send failed for {session_id}: {e}"),
}
})?;
Ok(())
}
}
impl Default for WebChatAdapter {
fn default() -> Self {
Self::new()
}
}
#[async_trait]
impl ChannelAdapter for WebChatAdapter {
fn name(&self) -> &str {
"webchat"
}
fn platform(&self) -> ChannelPlatform {
ChannelPlatform::WebChat
}
async fn start(&self) -> PunchResult<()> {
self.running.store(true, Ordering::Relaxed);
*self.started_at.write().await = Some(Utc::now());
info!("WebChat adapter started (WebSocket mode)");
Ok(())
}
async fn stop(&self) -> PunchResult<()> {
self.running.store(false, Ordering::Relaxed);
self.sessions.clear();
self.senders.clear();
info!("WebChat adapter stopped");
Ok(())
}
async fn send_response(&self, channel_id: &str, message: &str) -> PunchResult<()> {
self.send_to_session(channel_id, message)?;
self.messages_sent.fetch_add(1, Ordering::Relaxed);
Ok(())
}
fn status(&self) -> ChannelStatus {
ChannelStatus {
connected: self.running.load(Ordering::Relaxed),
started_at: self.started_at.try_read().ok().and_then(|g| *g),
messages_received: self.messages_received.load(Ordering::Relaxed),
messages_sent: self.messages_sent.load(Ordering::Relaxed),
last_error: None,
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_webchat_adapter_creation() {
let adapter = WebChatAdapter::new();
assert_eq!(adapter.name(), "webchat");
assert_eq!(adapter.platform(), ChannelPlatform::WebChat);
}
#[test]
fn test_register_and_remove_session() {
let adapter = WebChatAdapter::new();
let _rx = adapter.register_session("sess-1".to_string(), "Alice".to_string());
assert_eq!(adapter.session_count(), 1);
assert_eq!(adapter.active_sessions(), vec!["sess-1"]);
adapter.remove_session("sess-1");
assert_eq!(adapter.session_count(), 0);
}
#[test]
fn test_create_message_from_ws() {
let adapter = WebChatAdapter::new();
let _rx = adapter.register_session("sess-1".to_string(), "Alice".to_string());
let payload = serde_json::json!({
"text": "Hello from browser!",
"message_id": "client-msg-1"
});
let msg = adapter.create_message_from_ws("sess-1", &payload).unwrap();
assert_eq!(msg.platform, ChannelPlatform::WebChat);
assert_eq!(msg.user_id, "sess-1");
assert_eq!(msg.display_name, "Alice");
assert_eq!(msg.text, "Hello from browser!");
assert_eq!(msg.platform_message_id, "client-msg-1");
assert!(!msg.is_group);
}
#[test]
fn test_create_message_from_ws_empty_text() {
let adapter = WebChatAdapter::new();
let _rx = adapter.register_session("sess-1".to_string(), "Alice".to_string());
let payload = serde_json::json!({ "text": "" });
let msg = adapter.create_message_from_ws("sess-1", &payload);
assert!(msg.is_none());
}
#[test]
fn test_create_message_from_ws_unknown_session() {
let adapter = WebChatAdapter::new();
let payload = serde_json::json!({ "text": "Hello" });
let msg = adapter.create_message_from_ws("nonexistent", &payload);
assert!(msg.is_none());
}
#[test]
fn test_send_to_session() {
let adapter = WebChatAdapter::new();
let mut rx = adapter.register_session("sess-1".to_string(), "Alice".to_string());
adapter
.send_to_session("sess-1", "Response message")
.unwrap();
let received = rx.try_recv().unwrap();
assert_eq!(received, "Response message");
}
#[test]
fn test_send_to_unknown_session() {
let adapter = WebChatAdapter::new();
let result = adapter.send_to_session("nonexistent", "Hello");
assert!(result.is_err());
}
#[tokio::test]
async fn test_webchat_adapter_start_stop() {
let adapter = WebChatAdapter::new();
assert!(!adapter.status().connected);
adapter.start().await.unwrap();
assert!(adapter.status().connected);
let _rx = adapter.register_session("sess-1".to_string(), "Alice".to_string());
assert_eq!(adapter.session_count(), 1);
adapter.stop().await.unwrap();
assert!(!adapter.status().connected);
assert_eq!(adapter.session_count(), 0);
}
#[test]
fn test_register_multiple_sessions() {
let adapter = WebChatAdapter::new();
let _rx1 = adapter.register_session("s1".to_string(), "Alice".to_string());
let _rx2 = adapter.register_session("s2".to_string(), "Bob".to_string());
let _rx3 = adapter.register_session("s3".to_string(), "Charlie".to_string());
assert_eq!(adapter.session_count(), 3);
assert_eq!(adapter.active_sessions().len(), 3);
}
#[test]
fn test_remove_nonexistent_session() {
let adapter = WebChatAdapter::new();
adapter.remove_session("nonexistent");
assert_eq!(adapter.session_count(), 0);
}
#[test]
fn test_create_message_no_text_field() {
let adapter = WebChatAdapter::new();
let _rx = adapter.register_session("s1".to_string(), "Alice".to_string());
let payload = serde_json::json!({ "other": "data" });
assert!(adapter.create_message_from_ws("s1", &payload).is_none());
}
#[test]
fn test_create_message_no_message_id() {
let adapter = WebChatAdapter::new();
let _rx = adapter.register_session("s1".to_string(), "Alice".to_string());
let payload = serde_json::json!({ "text": "Hello" });
let msg = adapter.create_message_from_ws("s1", &payload).unwrap();
assert_eq!(msg.platform_message_id, "");
}
#[test]
fn test_send_multiple_messages_to_session() {
let adapter = WebChatAdapter::new();
let mut rx = adapter.register_session("s1".to_string(), "Alice".to_string());
adapter.send_to_session("s1", "msg1").unwrap();
adapter.send_to_session("s1", "msg2").unwrap();
assert_eq!(rx.try_recv().unwrap(), "msg1");
assert_eq!(rx.try_recv().unwrap(), "msg2");
}
#[test]
fn test_webchat_default() {
let adapter = WebChatAdapter::default();
assert_eq!(adapter.name(), "webchat");
assert_eq!(adapter.session_count(), 0);
}
}