Skip to main content

mcp_messaging/
server.rs

1use rmcp::{handler::server::wrapper::Parameters, schemars, tool, tool_router};
2use reqwest::Client;
3use serde_json::{json, Value};
4use std::sync::{Arc, Mutex};
5use std::collections::HashMap;
6
7fn now() -> String { chrono::Utc::now().to_rfc3339() }
8fn msg_id() -> String { uuid::Uuid::new_v4().to_string()[..8].to_string() }
9
10// --- Input types ---
11
12#[derive(Debug, serde::Deserialize, schemars::JsonSchema)]
13pub struct PushInput {
14    /// Topic/channel to send to (acts as the recipient identifier)
15    pub topic: String,
16    /// Notification title
17    pub title: String,
18    /// Message body
19    pub message: String,
20    /// Priority: 1 (min) to 5 (max), default 3
21    pub priority: Option<u8>,
22    /// Tags (emoji shortcodes, e.g. ["warning", "car"])
23    pub tags: Option<Vec<String>>,
24    /// Click URL (opened when notification is tapped)
25    pub click_url: Option<String>,
26}
27
28#[derive(Debug, serde::Deserialize, schemars::JsonSchema)]
29pub struct SmsInput {
30    /// Recipient phone number (E.164 format, e.g. +254712345678)
31    pub to: String,
32    /// Message text (max 160 chars for single SMS)
33    pub message: String,
34    /// Sender ID or phone number
35    pub from: Option<String>,
36}
37
38#[derive(Debug, serde::Deserialize, schemars::JsonSchema)]
39pub struct AfricasTalkingInput {
40    /// Recipient phone number (E.164, e.g. +254712345678)
41    pub to: String,
42    /// Message text
43    pub message: String,
44    /// Sender ID (optional, registered short code)
45    pub from: Option<String>,
46}
47
48#[derive(Debug, serde::Deserialize, schemars::JsonSchema)]
49pub struct VonageInput {
50    /// Recipient phone number (E.164)
51    pub to: String,
52    /// Message text
53    pub message: String,
54    /// Sender ID or number
55    pub from: Option<String>,
56}
57
58#[derive(Debug, serde::Deserialize, schemars::JsonSchema)]
59pub struct SinchInput {
60    /// Recipient phone number (E.164)
61    pub to: String,
62    /// Message text
63    pub message: String,
64    /// Sender ID
65    pub from: Option<String>,
66}
67
68#[derive(Debug, serde::Deserialize, schemars::JsonSchema)]
69pub struct MessageStatusInput {
70    /// Message ID to check status of
71    pub message_id: String,
72}
73
74#[derive(Debug, serde::Deserialize, schemars::JsonSchema)]
75pub struct ReadReceiptInput {
76    /// Message ID to mark as read
77    pub message_id: String,
78    /// Channel ID
79    pub channel: String,
80    /// Reader user ID
81    pub reader: String,
82}
83
84#[derive(Debug, serde::Deserialize, schemars::JsonSchema)]
85pub struct RecallInput {
86    /// Message ID to recall/delete
87    pub message_id: String,
88    /// Channel ID
89    pub channel: String,
90}
91
92#[derive(Debug, serde::Deserialize, schemars::JsonSchema)]
93pub struct FormatMessageInput {
94    /// Plain text to format
95    pub text: String,
96    /// Font family (e.g. "Arial", "Courier New", "Georgia", "monospace")
97    pub font: Option<String>,
98    /// Font size in px (e.g. 14, 16, 20)
99    pub font_size: Option<u32>,
100    /// Text color (hex, e.g. "#FF5733" or named "red")
101    pub color: Option<String>,
102    /// Background color (hex or named)
103    pub background: Option<String>,
104    /// Bold
105    pub bold: Option<bool>,
106    /// Italic
107    pub italic: Option<bool>,
108    /// Underline
109    pub underline: Option<bool>,
110    /// Text alignment: left, center, right
111    pub align: Option<String>,
112}
113
114#[derive(Debug, serde::Deserialize, schemars::JsonSchema)]
115pub struct FcmInput {
116    /// FCM device token or topic (prefix topic with /topics/)
117    pub to: String,
118    /// Notification title
119    pub title: String,
120    /// Notification body
121    pub body: String,
122    /// Data payload (optional key-value pairs sent to app)
123    pub data: Option<Value>,
124    /// Priority: "high" or "normal" (default: high)
125    pub priority: Option<String>,
126}
127
128#[derive(Debug, serde::Deserialize, schemars::JsonSchema)]
129pub struct SetPriorityInput {
130    /// Message ID in queue
131    pub message_id: String,
132    /// Queue name
133    pub queue: String,
134    /// New priority (higher = processed first)
135    pub priority: i32,
136}
137
138#[derive(Debug, serde::Deserialize, schemars::JsonSchema)]
139pub struct WebhookInput {
140    /// Destination URL to POST to
141    pub url: String,
142    /// Event type (e.g. "ride.accepted", "payment.completed")
143    pub event: String,
144    /// Payload data (JSON object)
145    pub payload: Value,
146}
147
148#[derive(Debug, serde::Deserialize, schemars::JsonSchema)]
149pub struct MessageInput {
150    /// Channel/conversation ID
151    pub channel: String,
152    /// Sender ID
153    pub sender: String,
154    /// Message text (or HTML if msg_type is "html")
155    pub text: String,
156    /// Message type: text, html, image, video, audio, file, location, contact, system
157    pub msg_type: Option<String>,
158    /// Media URL (for image, video, audio, file types)
159    pub media_url: Option<String>,
160    /// Media MIME type (e.g. "image/png", "video/mp4", "audio/ogg")
161    pub mime_type: Option<String>,
162    /// Thumbnail URL (for video/image preview)
163    pub thumbnail_url: Option<String>,
164    /// File name (for file attachments)
165    pub file_name: Option<String>,
166    /// File size in bytes
167    pub file_size: Option<u64>,
168    /// Duration in seconds (for audio/video)
169    pub duration_seconds: Option<u32>,
170    /// Location latitude (for location type)
171    pub lat: Option<f64>,
172    /// Location longitude (for location type)
173    pub lon: Option<f64>,
174    /// Reply-to message ID (for threading)
175    pub reply_to: Option<String>,
176    /// Additional metadata
177    pub metadata: Option<Value>,
178}
179
180#[derive(Debug, serde::Deserialize, schemars::JsonSchema)]
181pub struct ChannelInput {
182    /// Channel name
183    pub name: String,
184    /// Channel type: direct, group, broadcast (default: direct)
185    pub channel_type: Option<String>,
186    /// Member IDs
187    pub members: Vec<String>,
188}
189
190#[derive(Debug, serde::Deserialize, schemars::JsonSchema)]
191pub struct GetMessagesInput {
192    /// Channel ID
193    pub channel: String,
194    /// Max messages to return (default 20)
195    pub limit: Option<usize>,
196}
197
198#[derive(Debug, serde::Deserialize, schemars::JsonSchema)]
199pub struct BroadcastInput {
200    /// List of topics/channels to broadcast to
201    pub topics: Vec<String>,
202    /// Notification title
203    pub title: String,
204    /// Message body
205    pub message: String,
206    /// Priority: 1-5
207    pub priority: Option<u8>,
208}
209
210#[derive(Debug, serde::Deserialize, schemars::JsonSchema)]
211pub struct QueueInput {
212    /// Queue name
213    pub queue: String,
214    /// Message payload
215    pub payload: Value,
216    /// Priority (higher = processed first, default 0)
217    pub priority: Option<i32>,
218    /// Delay in seconds before message becomes visible
219    pub delay_seconds: Option<u64>,
220}
221
222#[derive(Debug, serde::Deserialize, schemars::JsonSchema)]
223pub struct DequeueInput {
224    /// Queue name
225    pub queue: String,
226    /// Max messages to dequeue (default 1)
227    pub count: Option<usize>,
228}
229
230#[derive(Debug, serde::Deserialize, schemars::JsonSchema)]
231pub struct QueueStatusInput {
232    /// Queue name
233    pub queue: String,
234}
235
236#[derive(Debug, serde::Deserialize, schemars::JsonSchema)]
237pub struct SubscribeInput {
238    /// Topic to subscribe to
239    pub topic: String,
240    /// Webhook URL to receive messages
241    pub webhook_url: String,
242}
243
244// --- In-memory state ---
245
246#[derive(Clone, serde::Serialize)]
247struct StoredMessage {
248    id: String,
249    channel: String,
250    sender: String,
251    text: String,
252    msg_type: String,
253    media_url: Option<String>,
254    mime_type: Option<String>,
255    thumbnail_url: Option<String>,
256    file_name: Option<String>,
257    file_size: Option<u64>,
258    duration_seconds: Option<u32>,
259    lat: Option<f64>,
260    lon: Option<f64>,
261    reply_to: Option<String>,
262    metadata: Option<Value>,
263    timestamp: String,
264    read_by: Vec<(String, String)>, // (user_id, read_at)
265    recalled: bool,
266}
267
268#[derive(Clone, serde::Serialize)]
269struct QueueMessage {
270    id: String,
271    payload: Value,
272    priority: i32,
273    enqueued_at: String,
274    visible_after: String,
275}
276
277#[derive(Clone)]
278pub struct MessagingServer {
279    pub client: Client,
280    channels: Arc<Mutex<HashMap<String, Vec<String>>>>,
281    messages: Arc<Mutex<HashMap<String, Vec<StoredMessage>>>>,
282    queues: Arc<Mutex<HashMap<String, Vec<QueueMessage>>>>,
283    pub ntfy_server: String,
284    pub twilio_sid: Option<String>,
285    pub twilio_token: Option<String>,
286    pub twilio_from: Option<String>,
287    pub at_username: Option<String>,
288    pub at_api_key: Option<String>,
289    pub vonage_key: Option<String>,
290    pub vonage_secret: Option<String>,
291    pub sinch_plan_id: Option<String>,
292    pub sinch_token: Option<String>,
293    pub fcm_server_key: Option<String>,
294}
295
296impl MessagingServer {
297    pub fn new() -> Self {
298        Self {
299            client: Client::builder().build().unwrap_or_default(),
300            channels: Arc::new(Mutex::new(HashMap::new())),
301            messages: Arc::new(Mutex::new(HashMap::new())),
302            queues: Arc::new(Mutex::new(HashMap::new())),
303            ntfy_server: std::env::var("NTFY_SERVER").unwrap_or_else(|_| "https://ntfy.sh".into()),
304            twilio_sid: std::env::var("TWILIO_ACCOUNT_SID").ok(),
305            twilio_token: std::env::var("TWILIO_AUTH_TOKEN").ok(),
306            twilio_from: std::env::var("TWILIO_FROM_NUMBER").ok(),
307            at_username: std::env::var("AT_USERNAME").ok(),
308            at_api_key: std::env::var("AT_API_KEY").ok(),
309            vonage_key: std::env::var("VONAGE_API_KEY").ok(),
310            vonage_secret: std::env::var("VONAGE_API_SECRET").ok(),
311            sinch_plan_id: std::env::var("SINCH_SERVICE_PLAN_ID").ok(),
312            sinch_token: std::env::var("SINCH_API_TOKEN").ok(),
313            fcm_server_key: std::env::var("FCM_SERVER_KEY").ok(),
314        }
315    }
316}
317
318#[tool_router(server_handler)]
319impl MessagingServer {
320    // === Push Notifications ===
321
322    #[tool(description = "Send push notification to a topic/device. Uses ntfy.sh — recipients subscribe to the topic to receive notifications on any device.")]
323    async fn send_push(&self, Parameters(input): Parameters<PushInput>) -> String {
324        let mut body = json!({
325            "topic": input.topic,
326            "title": input.title,
327            "message": input.message,
328            "priority": input.priority.unwrap_or(3)
329        });
330        if let Some(tags) = &input.tags { body["tags"] = json!(tags); }
331        if let Some(url) = &input.click_url { body["click"] = json!(url); }
332
333        match self.client.post(&self.ntfy_server).json(&body).send().await {
334            Ok(resp) => match resp.json::<Value>().await {
335                Ok(data) => json!({
336                    "status": "sent", "id": data["id"], "topic": input.topic,
337                    "timestamp": now()
338                }).to_string(),
339                Err(e) => format!("Error: {e}"),
340            },
341            Err(e) => format!("Error: {e}"),
342        }
343    }
344
345    #[tool(description = "Broadcast push notification to multiple topics at once")]
346    async fn broadcast(&self, Parameters(input): Parameters<BroadcastInput>) -> String {
347        let mut results = Vec::new();
348        for topic in &input.topics {
349            let body = json!({
350                "topic": topic, "title": input.title,
351                "message": input.message, "priority": input.priority.unwrap_or(3)
352            });
353            match self.client.post(&self.ntfy_server).json(&body).send().await {
354                Ok(resp) => {
355                    let id = resp.json::<Value>().await.ok().and_then(|d| d["id"].as_str().map(String::from)).unwrap_or_default();
356                    results.push(json!({"topic": topic, "status": "sent", "id": id}));
357                }
358                Err(e) => results.push(json!({"topic": topic, "status": "failed", "error": e.to_string()})),
359            }
360        }
361        json!({"broadcast": true, "sent": results.len(), "results": results, "timestamp": now()}).to_string()
362    }
363
364    // === SMS ===
365
366    #[tool(description = "Send SMS via Twilio. Requires TWILIO_ACCOUNT_SID, TWILIO_AUTH_TOKEN, TWILIO_FROM_NUMBER env vars.")]
367    async fn send_sms(&self, Parameters(input): Parameters<SmsInput>) -> String {
368        let (Some(sid), Some(token), Some(from)) = (&self.twilio_sid, &self.twilio_token, &self.twilio_from) else {
369            return json!({"status": "error", "message": "Twilio not configured. Set TWILIO_ACCOUNT_SID, TWILIO_AUTH_TOKEN, TWILIO_FROM_NUMBER"}).to_string();
370        };
371        let from = input.from.as_deref().unwrap_or(from);
372        let url = format!("https://api.twilio.com/2010-04-01/Accounts/{}/Messages.json", sid);
373        match self.client.post(&url)
374            .basic_auth(sid, Some(token))
375            .form(&[("To", input.to.as_str()), ("From", from), ("Body", input.message.as_str())])
376            .send().await {
377            Ok(resp) => match resp.json::<Value>().await {
378                Ok(data) => json!({
379                    "status": data["status"], "sid": data["sid"],
380                    "to": input.to, "timestamp": now()
381                }).to_string(),
382                Err(e) => format!("Error: {e}"),
383            },
384            Err(e) => format!("Error: {e}"),
385        }
386    }
387
388    // === Webhooks ===
389
390    #[tool(description = "Fire a webhook — POST JSON payload to a URL with event type header")]
391    async fn fire_webhook(&self, Parameters(input): Parameters<WebhookInput>) -> String {
392        let body = json!({
393            "event": input.event,
394            "payload": input.payload,
395            "timestamp": now(),
396            "id": msg_id()
397        });
398        match self.client.post(&input.url)
399            .header("X-Event-Type", &input.event)
400            .header("X-Message-Id", msg_id())
401            .json(&body).send().await {
402            Ok(resp) => {
403                let status = resp.status().as_u16();
404                json!({"status": if status < 400 { "delivered" } else { "failed" }, "http_status": status, "url": input.url, "event": input.event, "timestamp": now()}).to_string()
405            }
406            Err(e) => json!({"status": "failed", "error": e.to_string(), "url": input.url}).to_string(),
407        }
408    }
409
410    // === In-App Messaging ===
411
412    #[tool(description = "Create a messaging channel (direct, group, or broadcast)")]
413    async fn create_channel(&self, Parameters(input): Parameters<ChannelInput>) -> String {
414        let channel_id = format!("ch_{}", msg_id());
415        let channel_type = input.channel_type.as_deref().unwrap_or("direct");
416        self.channels.lock().unwrap().insert(channel_id.clone(), input.members.clone());
417        json!({
418            "channel_id": channel_id, "name": input.name,
419            "type": channel_type, "members": input.members,
420            "created_at": now()
421        }).to_string()
422    }
423
424    #[tool(description = "Send a message to a channel. Supports text, HTML, image, video, audio, file, location, and contact types. Use media_url for attachments.")]
425    async fn send_message(&self, Parameters(input): Parameters<MessageInput>) -> String {
426        let msg_type = input.msg_type.unwrap_or_else(|| "text".into());
427        let msg = StoredMessage {
428            id: format!("msg_{}", msg_id()),
429            channel: input.channel.clone(),
430            sender: input.sender.clone(),
431            text: input.text.clone(),
432            msg_type: msg_type.clone(),
433            media_url: input.media_url.clone(),
434            mime_type: input.mime_type.clone(),
435            thumbnail_url: input.thumbnail_url.clone(),
436            file_name: input.file_name.clone(),
437            file_size: input.file_size,
438            duration_seconds: input.duration_seconds,
439            lat: input.lat,
440            lon: input.lon,
441            reply_to: input.reply_to.clone(),
442            metadata: input.metadata,
443            timestamp: now(),
444            read_by: Vec::new(),
445            recalled: false,
446        };
447        let id = msg.id.clone();
448        self.messages.lock().unwrap().entry(input.channel.clone()).or_default().push(msg);
449        json!({
450            "status": "sent", "message_id": id, "channel": input.channel,
451            "type": msg_type,
452            "has_media": input.media_url.is_some(),
453            "timestamp": now()
454        }).to_string()
455    }
456
457    #[tool(description = "Get messages from a channel")]
458    async fn get_messages(&self, Parameters(input): Parameters<GetMessagesInput>) -> String {
459        let limit = input.limit.unwrap_or(20);
460        let messages = self.messages.lock().unwrap();
461        let msgs = messages.get(&input.channel).map(|m| {
462            let start = m.len().saturating_sub(limit);
463            m[start..].to_vec()
464        }).unwrap_or_default();
465        json!({"channel": input.channel, "count": msgs.len(), "messages": msgs}).to_string()
466    }
467
468    // === Message Queues ===
469
470    #[tool(description = "Enqueue a message to a named queue (for async processing, job dispatch, event sourcing)")]
471    async fn enqueue(&self, Parameters(input): Parameters<QueueInput>) -> String {
472        let visible_after = if let Some(delay) = input.delay_seconds {
473            (chrono::Utc::now() + chrono::Duration::seconds(delay as i64)).to_rfc3339()
474        } else { now() };
475        let msg = QueueMessage {
476            id: format!("q_{}", msg_id()),
477            payload: input.payload,
478            priority: input.priority.unwrap_or(0),
479            enqueued_at: now(),
480            visible_after,
481        };
482        let id = msg.id.clone();
483        self.queues.lock().unwrap().entry(input.queue.clone()).or_default().push(msg);
484        json!({"status": "enqueued", "message_id": id, "queue": input.queue, "timestamp": now()}).to_string()
485    }
486
487    #[tool(description = "Dequeue messages from a queue (returns and removes oldest visible messages)")]
488    async fn dequeue(&self, Parameters(input): Parameters<DequeueInput>) -> String {
489        let count = input.count.unwrap_or(1);
490        let now_str = now();
491        let mut queues = self.queues.lock().unwrap();
492        let queue = queues.entry(input.queue.clone()).or_default();
493        // Sort by priority desc, then by enqueued_at
494        queue.sort_by(|a, b| b.priority.cmp(&a.priority).then(a.enqueued_at.cmp(&b.enqueued_at)));
495        let mut dequeued = Vec::new();
496        let mut remaining = Vec::new();
497        for msg in queue.drain(..) {
498            if dequeued.len() < count && msg.visible_after <= now_str {
499                dequeued.push(msg);
500            } else {
501                remaining.push(msg);
502            }
503        }
504        *queue = remaining;
505        json!({"queue": input.queue, "count": dequeued.len(), "messages": dequeued}).to_string()
506    }
507
508    #[tool(description = "Get queue status (depth, oldest message age)")]
509    async fn queue_status(&self, Parameters(input): Parameters<QueueStatusInput>) -> String {
510        let queues = self.queues.lock().unwrap();
511        let queue = queues.get(&input.queue);
512        match queue {
513            Some(q) => json!({
514                "queue": input.queue, "depth": q.len(),
515                "oldest": q.first().map(|m| &m.enqueued_at),
516                "newest": q.last().map(|m| &m.enqueued_at)
517            }).to_string(),
518            None => json!({"queue": input.queue, "depth": 0}).to_string(),
519        }
520    }
521
522    // === Subscribe ===
523
524    #[tool(description = "Subscribe a webhook URL to a ntfy topic (receive push notifications as HTTP POSTs)")]
525    async fn subscribe_webhook(&self, Parameters(input): Parameters<SubscribeInput>) -> String {
526        json!({
527            "status": "subscribed",
528            "topic": input.topic,
529            "webhook_url": input.webhook_url,
530            "subscribe_url": format!("{}/{}/json", self.ntfy_server, input.topic),
531            "instructions": "Poll the subscribe_url with GET for server-sent events, or use the ntfy app",
532            "timestamp": now()
533        }).to_string()
534    }
535
536    // === Africa's Talking SMS (Africa: Kenya, Nigeria, Uganda, Tanzania, etc.) ===
537
538    #[tool(description = "Send SMS via Africa's Talking (covers Kenya, Nigeria, Uganda, Tanzania, Rwanda, Ghana, South Africa, 20+ African countries). Requires AT_USERNAME, AT_API_KEY env vars.")]
539    async fn send_sms_africa(&self, Parameters(input): Parameters<AfricasTalkingInput>) -> String {
540        let (Some(username), Some(api_key)) = (&self.at_username, &self.at_api_key) else {
541            return json!({"status": "error", "message": "Africa's Talking not configured. Set AT_USERNAME, AT_API_KEY"}).to_string();
542        };
543        let url = if username == "sandbox" {
544            "https://api.sandbox.africastalking.com/version1/messaging"
545        } else {
546            "https://api.africastalking.com/version1/messaging"
547        };
548        let mut form = vec![("username", username.as_str()), ("to", &input.to), ("message", &input.message)];
549        if let Some(ref from) = input.from { form.push(("from", from.as_str())); }
550        match self.client.post(url).header("apiKey", api_key.as_str()).header("Accept", "application/json").form(&form).send().await {
551            Ok(resp) => match resp.json::<Value>().await {
552                Ok(data) => {
553                    let recipients = &data["SMSMessageData"]["Recipients"];
554                    json!({"status": "sent", "provider": "africastalking", "to": input.to, "cost": recipients[0]["cost"], "message_id": recipients[0]["messageId"], "status_code": recipients[0]["statusCode"], "timestamp": now()}).to_string()
555                }
556                Err(e) => format!("Error: {e}"),
557            },
558            Err(e) => format!("Error: {e}"),
559        }
560    }
561
562    // === Vonage/Nexmo SMS (Europe, Global) ===
563
564    #[tool(description = "Send SMS via Vonage/Nexmo (covers Europe, Americas, Asia-Pacific — 200+ countries). Requires VONAGE_API_KEY, VONAGE_API_SECRET env vars.")]
565    async fn send_sms_europe(&self, Parameters(input): Parameters<VonageInput>) -> String {
566        let (Some(key), Some(secret)) = (&self.vonage_key, &self.vonage_secret) else {
567            return json!({"status": "error", "message": "Vonage not configured. Set VONAGE_API_KEY, VONAGE_API_SECRET"}).to_string();
568        };
569        let from = input.from.as_deref().unwrap_or("MCP");
570        let body = json!({"api_key": key, "api_secret": secret, "to": input.to, "from": from, "text": input.message});
571        match self.client.post("https://rest.nexmo.com/sms/json").json(&body).send().await {
572            Ok(resp) => match resp.json::<Value>().await {
573                Ok(data) => {
574                    let msg = &data["messages"][0];
575                    json!({"status": msg["status"], "provider": "vonage", "to": input.to, "message_id": msg["message-id"], "remaining_balance": msg["remaining-balance"], "message_price": msg["message-price"], "network": msg["network"], "timestamp": now()}).to_string()
576                }
577                Err(e) => format!("Error: {e}"),
578            },
579            Err(e) => format!("Error: {e}"),
580        }
581    }
582
583    // === Sinch SMS (Asia-Pacific, Global) ===
584
585    #[tool(description = "Send SMS via Sinch (covers Asia-Pacific, Australia, India, Japan, Singapore — 200+ countries). Requires SINCH_SERVICE_PLAN_ID, SINCH_API_TOKEN env vars.")]
586    async fn send_sms_asia(&self, Parameters(input): Parameters<SinchInput>) -> String {
587        let (Some(plan_id), Some(token)) = (&self.sinch_plan_id, &self.sinch_token) else {
588            return json!({"status": "error", "message": "Sinch not configured. Set SINCH_SERVICE_PLAN_ID, SINCH_API_TOKEN"}).to_string();
589        };
590        let from = input.from.as_deref().unwrap_or("MCP");
591        let url = format!("https://sms.api.sinch.com/xms/v1/{}/batches", plan_id);
592        let body = json!({"to": [input.to], "from": from, "body": input.message});
593        match self.client.post(&url).bearer_auth(token).json(&body).send().await {
594            Ok(resp) => match resp.json::<Value>().await {
595                Ok(data) => json!({"status": "sent", "provider": "sinch", "to": input.to, "batch_id": data["id"], "created_at": data["created_at"], "timestamp": now()}).to_string(),
596                Err(e) => format!("Error: {e}"),
597            },
598            Err(e) => format!("Error: {e}"),
599        }
600    }
601
602    // === Firebase Cloud Messaging ===
603
604    #[tool(description = "Send push notification via Google Firebase Cloud Messaging (FCM). Supports device tokens and topics. Requires FCM_SERVER_KEY env var.")]
605    async fn send_fcm(&self, Parameters(input): Parameters<FcmInput>) -> String {
606        let Some(key) = &self.fcm_server_key else {
607            return json!({"status": "error", "message": "Firebase not configured. Set FCM_SERVER_KEY (from Firebase Console > Project Settings > Cloud Messaging)"}).to_string();
608        };
609        let priority = input.priority.as_deref().unwrap_or("high");
610        let mut body = json!({
611            "to": input.to,
612            "priority": priority,
613            "notification": {"title": input.title, "body": input.body}
614        });
615        if let Some(data) = input.data { body["data"] = data; }
616        match self.client.post("https://fcm.googleapis.com/fcm/send")
617            .header("Authorization", format!("key={}", key))
618            .json(&body).send().await {
619            Ok(resp) => match resp.json::<Value>().await {
620                Ok(data) => json!({
621                    "status": if data["success"].as_i64().unwrap_or(0) > 0 { "sent" } else { "failed" },
622                    "provider": "fcm",
623                    "success": data["success"],
624                    "failure": data["failure"],
625                    "message_id": data["results"][0]["message_id"],
626                    "multicast_id": data["multicast_id"],
627                    "timestamp": now()
628                }).to_string(),
629                Err(e) => format!("Error: {e}"),
630            },
631            Err(e) => format!("Error: {e}"),
632        }
633    }
634
635    // === Message Status ===
636
637    #[tool(description = "Get delivery status of a sent message (sent, delivered, read, recalled)")]
638    async fn get_message_status(&self, Parameters(input): Parameters<MessageStatusInput>) -> String {
639        let messages = self.messages.lock().unwrap();
640        for (_channel, msgs) in messages.iter() {
641            if let Some(msg) = msgs.iter().find(|m| m.id == input.message_id) {
642                let status = if msg.recalled { "recalled" } else if !msg.read_by.is_empty() { "read" } else { "delivered" };
643                return json!({
644                    "message_id": msg.id, "channel": msg.channel,
645                    "sender": msg.sender, "status": status,
646                    "sent_at": msg.timestamp,
647                    "read_by": msg.read_by.iter().map(|(u,t)| json!({"user": u, "read_at": t})).collect::<Vec<_>>(),
648                    "recalled": msg.recalled
649                }).to_string();
650            }
651        }
652        json!({"message_id": input.message_id, "status": "not_found"}).to_string()
653    }
654
655    // === Read Receipts ===
656
657    #[tool(description = "Mark a message as read by a user (sets read receipt)")]
658    async fn mark_as_read(&self, Parameters(input): Parameters<ReadReceiptInput>) -> String {
659        let mut messages = self.messages.lock().unwrap();
660        if let Some(msgs) = messages.get_mut(&input.channel) {
661            if let Some(msg) = msgs.iter_mut().find(|m| m.id == input.message_id) {
662                if !msg.read_by.iter().any(|(u, _)| u == &input.reader) {
663                    msg.read_by.push((input.reader.clone(), now()));
664                }
665                return json!({"status": "read", "message_id": input.message_id, "reader": input.reader, "read_at": now()}).to_string();
666            }
667        }
668        json!({"status": "not_found", "message_id": input.message_id}).to_string()
669    }
670
671    // === Recall/Delete ===
672
673    #[tool(description = "Recall (unsend) a message — marks it as recalled so clients hide the content")]
674    async fn recall_message(&self, Parameters(input): Parameters<RecallInput>) -> String {
675        let mut messages = self.messages.lock().unwrap();
676        if let Some(msgs) = messages.get_mut(&input.channel) {
677            if let Some(msg) = msgs.iter_mut().find(|m| m.id == input.message_id) {
678                msg.recalled = true;
679                msg.text = "[Message recalled]".into();
680                msg.media_url = None;
681                return json!({"status": "recalled", "message_id": input.message_id, "channel": input.channel, "timestamp": now()}).to_string();
682            }
683        }
684        json!({"status": "not_found", "message_id": input.message_id}).to_string()
685    }
686
687    // === Message Formatting ===
688
689    #[tool(description = "Format text into styled HTML message with custom font, size, color, background, bold, italic, underline, alignment")]
690    async fn format_message(&self, Parameters(input): Parameters<FormatMessageInput>) -> String {
691        let mut styles = Vec::new();
692        if let Some(ref font) = input.font { styles.push(format!("font-family:'{}'", font)); }
693        if let Some(size) = input.font_size { styles.push(format!("font-size:{}px", size)); }
694        if let Some(ref color) = input.color { styles.push(format!("color:{}", color)); }
695        if let Some(ref bg) = input.background { styles.push(format!("background-color:{}", bg)); }
696        if input.bold.unwrap_or(false) { styles.push("font-weight:bold".into()); }
697        if input.italic.unwrap_or(false) { styles.push("font-style:italic".into()); }
698        if input.underline.unwrap_or(false) { styles.push("text-decoration:underline".into()); }
699        if let Some(ref align) = input.align { styles.push(format!("text-align:{}", align)); }
700        styles.push("padding:8px".into());
701
702        let html = format!("<div style=\"{}\">{}</div>", styles.join(";"), input.text);
703        json!({
704            "html": html,
705            "plain_text": input.text,
706            "msg_type": "html",
707            "styles_applied": {
708                "font": input.font, "font_size": input.font_size,
709                "color": input.color, "background": input.background,
710                "bold": input.bold, "italic": input.italic,
711                "underline": input.underline, "align": input.align
712            }
713        }).to_string()
714    }
715
716    // === Queue Priority ===
717
718    #[tool(description = "Update priority of a message in a queue (higher priority = processed first)")]
719    async fn set_queue_priority(&self, Parameters(input): Parameters<SetPriorityInput>) -> String {
720        let mut queues = self.queues.lock().unwrap();
721        if let Some(queue) = queues.get_mut(&input.queue) {
722            if let Some(msg) = queue.iter_mut().find(|m| m.id == input.message_id) {
723                let old = msg.priority;
724                msg.priority = input.priority;
725                return json!({"message_id": input.message_id, "queue": input.queue, "old_priority": old, "new_priority": input.priority, "status": "updated"}).to_string();
726            }
727        }
728        json!({"message_id": input.message_id, "status": "not_found"}).to_string()
729    }
730}