use std::collections::HashMap;
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use async_trait::async_trait;
use chrono::{DateTime, Utc};
use tokio::sync::RwLock;
use tracing::{info, warn};
use punch_types::{PunchError, PunchResult};
use crate::{ChannelAdapter, ChannelPlatform, ChannelStatus, IncomingMessage};
pub struct ZulipAdapter {
server_url: String,
bot_email: String,
api_key: String,
client: reqwest::Client,
running: AtomicBool,
started_at: RwLock<Option<DateTime<Utc>>>,
messages_received: AtomicU64,
messages_sent: AtomicU64,
}
impl ZulipAdapter {
pub fn new(server_url: String, bot_email: String, api_key: String) -> Self {
let server_url = server_url.trim_end_matches('/').to_string();
Self {
server_url,
bot_email,
api_key,
client: reqwest::Client::new(),
running: AtomicBool::new(false),
started_at: RwLock::new(None),
messages_received: AtomicU64::new(0),
messages_sent: AtomicU64::new(0),
}
}
pub async fn send_stream_message(
&self,
stream: &str,
topic: &str,
content: &str,
) -> PunchResult<()> {
let url = format!("{}/api/v1/messages", self.server_url);
let params = [
("type", "stream"),
("to", stream),
("topic", topic),
("content", content),
];
let resp = self
.client
.post(&url)
.basic_auth(&self.bot_email, Some(&self.api_key))
.form(¶ms)
.send()
.await
.map_err(|e| PunchError::Channel {
channel: "zulip".to_string(),
message: format!("failed to send message: {e}"),
})?;
let status = resp.status();
if !status.is_success() {
let body_text = resp.text().await.unwrap_or_default();
warn!("Zulip send failed ({status}): {body_text}");
}
self.messages_sent.fetch_add(1, Ordering::Relaxed);
Ok(())
}
pub async fn send_private_message(
&self,
user_emails: &[&str],
content: &str,
) -> PunchResult<()> {
let url = format!("{}/api/v1/messages", self.server_url);
let to_json = serde_json::json!(user_emails).to_string();
let params = [("type", "private"), ("to", &to_json), ("content", content)];
let resp = self
.client
.post(&url)
.basic_auth(&self.bot_email, Some(&self.api_key))
.form(¶ms)
.send()
.await
.map_err(|e| PunchError::Channel {
channel: "zulip".to_string(),
message: format!("failed to send private message: {e}"),
})?;
let status = resp.status();
if !status.is_success() {
let body_text = resp.text().await.unwrap_or_default();
warn!("Zulip private message failed ({status}): {body_text}");
}
self.messages_sent.fetch_add(1, Ordering::Relaxed);
Ok(())
}
pub fn parse_webhook_payload(&self, payload: &serde_json::Value) -> Option<IncomingMessage> {
let message = payload.get("message")?;
let content = message.get("content")?.as_str()?;
if content.is_empty() {
return None;
}
let msg_id = message.get("id")?.as_u64()?;
let sender_id = message
.get("sender_id")
.and_then(|v| v.as_u64())
.unwrap_or(0);
let sender_name = message
.get("sender_full_name")
.and_then(|v| v.as_str())
.unwrap_or("Unknown");
let sender_email = message
.get("sender_email")
.and_then(|v| v.as_str())
.unwrap_or("unknown");
let msg_type = message
.get("type")
.and_then(|v| v.as_str())
.unwrap_or("stream");
let is_stream = msg_type == "stream";
let timestamp = message
.get("timestamp")
.and_then(|v| v.as_i64())
.and_then(|ts| DateTime::from_timestamp(ts, 0))
.unwrap_or_else(Utc::now);
let mut metadata = HashMap::new();
metadata.insert(
"sender_email".to_string(),
serde_json::Value::String(sender_email.to_string()),
);
metadata.insert(
"message_type".to_string(),
serde_json::Value::String(msg_type.to_string()),
);
if is_stream {
if let Some(stream) = message.get("display_recipient").and_then(|v| v.as_str()) {
metadata.insert(
"stream".to_string(),
serde_json::Value::String(stream.to_string()),
);
}
if let Some(topic) = message.get("subject").and_then(|v| v.as_str()) {
metadata.insert(
"topic".to_string(),
serde_json::Value::String(topic.to_string()),
);
}
}
let channel_id = if is_stream {
let stream = message
.get("display_recipient")
.and_then(|v| v.as_str())
.unwrap_or("unknown");
let topic = message
.get("subject")
.and_then(|v| v.as_str())
.unwrap_or("unknown");
format!("{stream}/{topic}")
} else {
sender_email.to_string()
};
self.messages_received.fetch_add(1, Ordering::Relaxed);
Some(IncomingMessage {
channel_id,
user_id: sender_id.to_string(),
display_name: sender_name.to_string(),
text: content.to_string(),
timestamp,
platform: ChannelPlatform::Zulip,
platform_message_id: msg_id.to_string(),
is_group: is_stream,
metadata,
})
}
}
#[async_trait]
impl ChannelAdapter for ZulipAdapter {
fn name(&self) -> &str {
"zulip"
}
fn platform(&self) -> ChannelPlatform {
ChannelPlatform::Zulip
}
async fn start(&self) -> PunchResult<()> {
self.running.store(true, Ordering::Relaxed);
*self.started_at.write().await = Some(Utc::now());
info!(server = %self.server_url, "Zulip adapter started");
Ok(())
}
async fn stop(&self) -> PunchResult<()> {
self.running.store(false, Ordering::Relaxed);
info!("Zulip adapter stopped");
Ok(())
}
async fn send_response(&self, channel_id: &str, message: &str) -> PunchResult<()> {
if let Some((stream, topic)) = channel_id.split_once('/') {
self.send_stream_message(stream, topic, message).await
} else {
self.send_private_message(&[channel_id], message).await
}
}
fn status(&self) -> ChannelStatus {
ChannelStatus {
connected: self.running.load(Ordering::Relaxed),
started_at: self.started_at.try_read().ok().and_then(|g| *g),
messages_received: self.messages_received.load(Ordering::Relaxed),
messages_sent: self.messages_sent.load(Ordering::Relaxed),
last_error: None,
}
}
}
#[cfg(test)]
mod tests {
use super::*;
fn make_adapter() -> ZulipAdapter {
ZulipAdapter::new(
"https://myorg.zulipchat.com".to_string(),
"bot@myorg.zulipchat.com".to_string(),
"test-api-key".to_string(),
)
}
#[test]
fn test_zulip_adapter_creation() {
let adapter = make_adapter();
assert_eq!(adapter.name(), "zulip");
assert_eq!(adapter.platform(), ChannelPlatform::Zulip);
}
#[test]
fn test_parse_stream_message() {
let adapter = make_adapter();
let payload = serde_json::json!({
"message": {
"id": 12345,
"sender_id": 67890,
"sender_full_name": "Alice",
"sender_email": "alice@example.com",
"content": "Hello @bot",
"timestamp": 1705320000,
"type": "stream",
"display_recipient": "general",
"subject": "greetings"
},
"trigger": "mention"
});
let msg = adapter.parse_webhook_payload(&payload).unwrap();
assert_eq!(msg.platform, ChannelPlatform::Zulip);
assert_eq!(msg.text, "Hello @bot");
assert_eq!(msg.channel_id, "general/greetings");
assert_eq!(msg.display_name, "Alice");
assert!(msg.is_group);
assert_eq!(
msg.metadata.get("stream").unwrap(),
&serde_json::Value::String("general".to_string())
);
assert_eq!(
msg.metadata.get("topic").unwrap(),
&serde_json::Value::String("greetings".to_string())
);
}
#[test]
fn test_parse_private_message() {
let adapter = make_adapter();
let payload = serde_json::json!({
"message": {
"id": 99999,
"sender_id": 11111,
"sender_full_name": "Bob",
"sender_email": "bob@example.com",
"content": "Private hello",
"timestamp": 1705320000,
"type": "private",
"display_recipient": [
{"email": "bot@myorg.zulipchat.com"},
{"email": "bob@example.com"}
]
},
"trigger": "private_message"
});
let msg = adapter.parse_webhook_payload(&payload).unwrap();
assert_eq!(msg.channel_id, "bob@example.com");
assert!(!msg.is_group);
}
#[test]
fn test_parse_webhook_empty_content() {
let adapter = make_adapter();
let payload = serde_json::json!({
"message": {
"id": 1,
"content": "",
"type": "stream"
}
});
assert!(adapter.parse_webhook_payload(&payload).is_none());
}
#[tokio::test]
async fn test_zulip_start_stop() {
let adapter = make_adapter();
assert!(!adapter.status().connected);
adapter.start().await.unwrap();
assert!(adapter.status().connected);
adapter.stop().await.unwrap();
assert!(!adapter.status().connected);
}
}