Skip to main content

weixin_agent/messaging/
inbound.rs

1//! Inbound message parsing and context token management.
2
3use std::collections::HashMap;
4use std::path::{Path, PathBuf};
5use std::sync::Arc;
6
7use dashmap::DashMap;
8
9use crate::error::Result;
10use crate::types::{
11    CdnMedia, MediaType, MessageItem, MessageItemType, MessageState, MessageType,
12    SendTypingRequest, TypingStatus, WeixinMessage, build_base_info,
13};
14use crate::util::random::generate_id;
15
16/// High-level media information extracted from an inbound message.
17#[derive(Debug, Clone)]
18pub struct MediaInfo {
19    /// Media type category.
20    pub media_type: MediaType,
21    /// CDN media reference (for download).
22    pub cdn_media: Option<CdnMedia>,
23    /// Direct URL (if available).
24    pub url: Option<String>,
25    /// Original file name.
26    pub file_name: Option<String>,
27    /// Plaintext file size.
28    pub file_size: Option<u64>,
29    /// AES key for decryption (base64).
30    pub aes_key_base64: Option<String>,
31}
32
33/// Quoted (referenced) message info.
34#[derive(Debug, Clone)]
35pub struct RefMessageInfo {
36    /// Summary title.
37    pub title: Option<String>,
38    /// Body text of the quoted message.
39    pub body: Option<String>,
40}
41
42/// Result of sending a message.
43#[derive(Debug, Clone)]
44pub struct SendResult {
45    /// Client-generated message ID.
46    pub message_id: String,
47}
48
49/// Internal sender handle for replying from a [`MessageContext`].
50pub struct MessageSender {
51    pub(crate) api: Arc<crate::api::client::HttpApiClient>,
52    pub(crate) cdn_base_url: String,
53    pub(crate) config_cache: Arc<crate::api::config_cache::ConfigCache>,
54}
55
56/// Inbound message context passed to the handler.
57pub struct MessageContext {
58    /// SDK-generated message session ID.
59    pub message_id: String,
60    /// Server-assigned message ID.
61    pub server_message_id: Option<i64>,
62    /// Sender user ID.
63    pub from: String,
64    /// Recipient user ID.
65    pub to: String,
66    /// Creation timestamp (ms).
67    pub timestamp: i64,
68    /// Session ID.
69    pub session_id: Option<String>,
70    /// Context token for replies.
71    pub context_token: Option<String>,
72    /// Text body (including quoted text).
73    pub body: Option<String>,
74    /// Media attachment info.
75    pub media: Option<MediaInfo>,
76    /// Referenced (quoted) message info.
77    pub ref_message: Option<RefMessageInfo>,
78    /// Internal sender.
79    pub(crate) sender: Arc<MessageSender>,
80}
81
82impl MessageContext {
83    /// Reply with a text message.
84    pub async fn reply_text(&self, text: &str) -> Result<SendResult> {
85        crate::messaging::send::send_text(
86            &self.sender.api,
87            &self.from,
88            text,
89            self.context_token.as_deref(),
90        )
91        .await
92    }
93
94    /// Reply with a media file.
95    pub async fn reply_media(&self, file_path: &Path) -> Result<SendResult> {
96        crate::messaging::send_media::send_media_file(
97            &self.sender.api,
98            &self.sender.cdn_base_url,
99            &self.from,
100            file_path,
101            "",
102            self.context_token.as_deref(),
103        )
104        .await
105    }
106
107    /// Download media from this message to a destination path.
108    pub async fn download_media(&self, media: &MediaInfo, dest: &Path) -> Result<PathBuf> {
109        let data = if let Some(aes_key) = &media.aes_key_base64 {
110            let cdn_media = media
111                .cdn_media
112                .as_ref()
113                .ok_or_else(|| crate::error::Error::CdnUpload("no cdn_media".into()))?;
114            crate::cdn::download::download_and_decrypt(
115                &self.sender.cdn_base_url,
116                cdn_media,
117                aes_key,
118            )
119            .await?
120        } else if let Some(cdn_media) = &media.cdn_media {
121            crate::cdn::download::download_plain(&self.sender.cdn_base_url, cdn_media).await?
122        } else {
123            return Err(crate::error::Error::CdnUpload(
124                "no media source available".into(),
125            ));
126        };
127        tokio::fs::write(dest, &data).await?;
128        Ok(dest.to_path_buf())
129    }
130
131    /// Send a typing indicator.
132    pub async fn send_typing(&self) -> Result<()> {
133        let ticket = self
134            .sender
135            .config_cache
136            .get_typing_ticket(&self.from, self.context_token.as_deref())
137            .await;
138        let req = SendTypingRequest {
139            ilink_user_id: self.from.clone(),
140            typing_ticket: ticket,
141            status: TypingStatus::Typing,
142            base_info: build_base_info(),
143        };
144        self.sender.api.send_typing(&req).await
145    }
146
147    /// Cancel the typing indicator.
148    pub async fn cancel_typing(&self) -> Result<()> {
149        let ticket = self
150            .sender
151            .config_cache
152            .get_typing_ticket(&self.from, self.context_token.as_deref())
153            .await;
154        let req = SendTypingRequest {
155            ilink_user_id: self.from.clone(),
156            typing_ticket: ticket,
157            status: TypingStatus::Cancel,
158            base_info: build_base_info(),
159        };
160        self.sender.api.send_typing(&req).await
161    }
162}
163
164// ── Context token store ─────────────────────────────────────────────
165
166/// In-memory context token store with export/import for persistence.
167#[derive(Default)]
168pub struct ContextTokenStore {
169    tokens: DashMap<String, String>,
170}
171
172impl ContextTokenStore {
173    /// Create a new empty store.
174    pub fn new() -> Self {
175        Self::default()
176    }
177
178    /// Set a context token for a user.
179    pub fn set(&self, user_id: &str, token: &str) {
180        self.tokens.insert(user_id.to_owned(), token.to_owned());
181    }
182
183    /// Get the context token for a user.
184    pub fn get(&self, user_id: &str) -> Option<String> {
185        self.tokens.get(user_id).map(|v| v.value().clone())
186    }
187
188    /// Export all tokens for caller persistence.
189    pub fn export_all(&self) -> HashMap<String, String> {
190        self.tokens
191            .iter()
192            .map(|e| (e.key().clone(), e.value().clone()))
193            .collect()
194    }
195
196    /// Import tokens (e.g. on startup restore).
197    pub fn import(&self, tokens: HashMap<String, String>) {
198        for (k, v) in tokens {
199            self.tokens.insert(k, v);
200        }
201    }
202}
203
204// ── Message parsing ─────────────────────────────────────────────────
205
206/// Returns `true` if the message item is a media type.
207fn is_media_item(item: &MessageItem) -> bool {
208    matches!(
209        item.item_type,
210        Some(
211            MessageItemType::Image
212                | MessageItemType::Video
213                | MessageItemType::File
214                | MessageItemType::Voice
215        )
216    )
217}
218
219/// Extract text body from item list (with quoted message handling).
220fn body_from_item_list(items: &[MessageItem]) -> String {
221    for item in items {
222        if item.item_type == Some(MessageItemType::Text) {
223            if let Some(text) = item.text_item.as_ref().and_then(|t| t.text.as_deref()) {
224                let text = text.to_owned();
225                if let Some(ref_msg) = &item.ref_msg {
226                    if let Some(ref_item) = &ref_msg.message_item {
227                        if is_media_item(ref_item) {
228                            return text;
229                        }
230                    }
231                    let mut parts = Vec::new();
232                    if let Some(title) = &ref_msg.title {
233                        parts.push(title.clone());
234                    }
235                    if let Some(ref_item) = &ref_msg.message_item {
236                        let ref_body = body_from_item_list(&[*ref_item.clone()]);
237                        if !ref_body.is_empty() {
238                            parts.push(ref_body);
239                        }
240                    }
241                    if parts.is_empty() {
242                        return text;
243                    }
244                    return format!("[引用: {}]\n{text}", parts.join(" | "));
245                }
246                return text;
247            }
248        }
249        // Voice-to-text
250        if item.item_type == Some(MessageItemType::Voice) {
251            if let Some(voice_text) = item.voice_item.as_ref().and_then(|v| v.text.as_deref()) {
252                return voice_text.to_owned();
253            }
254        }
255    }
256    String::new()
257}
258
259/// Extract media info from item list. Priority: image > video > file > voice.
260fn extract_media(items: &[MessageItem]) -> Option<MediaInfo> {
261    // First pass: direct items
262    for item in items {
263        if let Some(info) = extract_media_from_item(item) {
264            return Some(info);
265        }
266    }
267    // Second pass: check ref_msg for media fallback
268    for item in items {
269        if let Some(ref_msg) = &item.ref_msg {
270            if let Some(ref_item) = &ref_msg.message_item {
271                if let Some(info) = extract_media_from_item(ref_item) {
272                    return Some(info);
273                }
274            }
275        }
276    }
277    None
278}
279
280fn extract_media_from_item(item: &MessageItem) -> Option<MediaInfo> {
281    match item.item_type? {
282        MessageItemType::Image => {
283            let img = item.image_item.as_ref()?;
284            let aes_key = if let Some(hex_key) = &img.aeskey {
285                // Image: aeskey is hex string → decode to raw bytes → base64
286                use base64::Engine;
287                let bytes = crate::cdn::aes_ecb::hex_to_bytes(hex_key).ok()?;
288                if bytes.len() == 16 {
289                    Some(base64::engine::general_purpose::STANDARD.encode(&bytes))
290                } else {
291                    None
292                }
293            } else {
294                img.media.as_ref().and_then(|m| m.aes_key.clone())
295            };
296            Some(MediaInfo {
297                media_type: MediaType::Image,
298                cdn_media: img.media.clone(),
299                url: img.url.clone(),
300                file_name: None,
301                file_size: None,
302                aes_key_base64: aes_key,
303            })
304        }
305        MessageItemType::Video => {
306            let vid = item.video_item.as_ref()?;
307            Some(MediaInfo {
308                media_type: MediaType::Video,
309                cdn_media: vid.media.clone(),
310                url: None,
311                file_name: None,
312                file_size: vid.video_size.and_then(|s| u64::try_from(s).ok()),
313                aes_key_base64: vid.media.as_ref().and_then(|m| m.aes_key.clone()),
314            })
315        }
316        MessageItemType::File => {
317            let f = item.file_item.as_ref()?;
318            Some(MediaInfo {
319                media_type: MediaType::File,
320                cdn_media: f.media.clone(),
321                url: None,
322                file_name: f.file_name.clone(),
323                file_size: f.len.as_deref().and_then(|s| s.parse().ok()),
324                aes_key_base64: f.media.as_ref().and_then(|m| m.aes_key.clone()),
325            })
326        }
327        MessageItemType::Voice => {
328            let v = item.voice_item.as_ref()?;
329            // Skip media download if voice has text transcription
330            if v.text.is_some() {
331                return None;
332            }
333            Some(MediaInfo {
334                media_type: MediaType::Voice,
335                cdn_media: v.media.clone(),
336                url: None,
337                file_name: None,
338                file_size: None,
339                aes_key_base64: v.media.as_ref().and_then(|m| m.aes_key.clone()),
340            })
341        }
342        _ => None,
343    }
344}
345
346/// Extract ref message info.
347fn extract_ref_message(items: &[MessageItem]) -> Option<RefMessageInfo> {
348    for item in items {
349        if let Some(ref_msg) = &item.ref_msg {
350            return Some(RefMessageInfo {
351                title: ref_msg.title.clone(),
352                body: ref_msg
353                    .message_item
354                    .as_ref()
355                    .map(|ri| body_from_item_list(&[*ri.clone()])),
356            });
357        }
358    }
359    None
360}
361
362/// Returns `true` if this message should be processed by the handler.
363pub fn should_process(msg: &WeixinMessage) -> bool {
364    // Only USER messages
365    if msg.message_type != Some(MessageType::User) {
366        return false;
367    }
368    // Skip recalled messages
369    if msg.delete_time_ms.unwrap_or(0) > 0 {
370        return false;
371    }
372    // Skip GENERATING state
373    if msg.message_state == Some(MessageState::Generating) {
374        return false;
375    }
376    true
377}
378
379/// Parse a raw `WeixinMessage` into a `MessageContext`.
380pub fn parse_inbound_message(msg: &WeixinMessage, sender: Arc<MessageSender>) -> MessageContext {
381    let items = msg.item_list.as_deref().unwrap_or(&[]);
382    let body = body_from_item_list(items);
383
384    MessageContext {
385        message_id: generate_id("weixin-agent"),
386        server_message_id: msg.message_id,
387        from: msg.from_user_id.clone().unwrap_or_default(),
388        to: msg.to_user_id.clone().unwrap_or_default(),
389        timestamp: msg.create_time_ms.unwrap_or(0),
390        session_id: msg.session_id.clone(),
391        context_token: msg.context_token.clone(),
392        body: if body.is_empty() { None } else { Some(body) },
393        media: extract_media(items),
394        ref_message: extract_ref_message(items),
395        sender,
396    }
397}
398
399#[cfg(test)]
400mod tests {
401    use super::*;
402    use crate::types::*;
403
404    fn make_msg(msg_type: MessageType) -> WeixinMessage {
405        WeixinMessage {
406            message_type: Some(msg_type),
407            ..Default::default()
408        }
409    }
410
411    #[test]
412    fn should_process_user_message() {
413        assert!(should_process(&make_msg(MessageType::User)));
414    }
415
416    #[test]
417    fn should_process_rejects_bot() {
418        assert!(!should_process(&make_msg(MessageType::Bot)));
419    }
420
421    #[test]
422    fn should_process_rejects_deleted() {
423        let msg = WeixinMessage {
424            message_type: Some(MessageType::User),
425            delete_time_ms: Some(1000),
426            ..Default::default()
427        };
428        assert!(!should_process(&msg));
429    }
430
431    #[test]
432    fn should_process_rejects_generating() {
433        let msg = WeixinMessage {
434            message_type: Some(MessageType::User),
435            message_state: Some(MessageState::Generating),
436            ..Default::default()
437        };
438        assert!(!should_process(&msg));
439    }
440
441    #[test]
442    fn body_from_text_item() {
443        let items = vec![MessageItem {
444            item_type: Some(MessageItemType::Text),
445            text_item: Some(TextItem {
446                text: Some("hello".into()),
447            }),
448            ..Default::default()
449        }];
450        assert_eq!(body_from_item_list(&items), "hello");
451    }
452
453    #[test]
454    fn body_from_voice_item() {
455        let items = vec![MessageItem {
456            item_type: Some(MessageItemType::Voice),
457            voice_item: Some(VoiceItem {
458                text: Some("voice text".into()),
459                ..Default::default()
460            }),
461            ..Default::default()
462        }];
463        assert_eq!(body_from_item_list(&items), "voice text");
464    }
465
466    #[test]
467    fn body_from_ref_message() {
468        let items = vec![MessageItem {
469            item_type: Some(MessageItemType::Text),
470            text_item: Some(TextItem {
471                text: Some("reply".into()),
472            }),
473            ref_msg: Some(RefMessage {
474                title: Some("quoted title".into()),
475                message_item: Some(Box::new(MessageItem {
476                    item_type: Some(MessageItemType::Text),
477                    text_item: Some(TextItem {
478                        text: Some("original".into()),
479                    }),
480                    ..Default::default()
481                })),
482            }),
483            ..Default::default()
484        }];
485        let body = body_from_item_list(&items);
486        assert!(body.contains("引用"));
487        assert!(body.contains("reply"));
488    }
489
490    #[test]
491    fn extract_media_image() {
492        let items = vec![MessageItem {
493            item_type: Some(MessageItemType::Image),
494            image_item: Some(ImageItem {
495                url: Some("https://img.example.com/1.jpg".into()),
496                aeskey: Some("0123456789abcdef0123456789abcdef".into()),
497                ..Default::default()
498            }),
499            ..Default::default()
500        }];
501        let media = extract_media(&items).unwrap();
502        assert_eq!(media.media_type, MediaType::Image);
503        assert!(media.aes_key_base64.is_some());
504    }
505
506    #[test]
507    fn extract_media_video() {
508        let items = vec![MessageItem {
509            item_type: Some(MessageItemType::Video),
510            video_item: Some(VideoItem {
511                video_size: Some(1024),
512                ..Default::default()
513            }),
514            ..Default::default()
515        }];
516        let media = extract_media(&items).unwrap();
517        assert_eq!(media.media_type, MediaType::Video);
518        assert_eq!(media.file_size, Some(1024));
519    }
520
521    #[test]
522    fn extract_media_file() {
523        let items = vec![MessageItem {
524            item_type: Some(MessageItemType::File),
525            file_item: Some(FileItem {
526                file_name: Some("doc.pdf".into()),
527                len: Some("2048".into()),
528                ..Default::default()
529            }),
530            ..Default::default()
531        }];
532        let media = extract_media(&items).unwrap();
533        assert_eq!(media.media_type, MediaType::File);
534        assert_eq!(media.file_name.as_deref(), Some("doc.pdf"));
535        assert_eq!(media.file_size, Some(2048));
536    }
537
538    #[test]
539    fn extract_media_voice_with_text_returns_none() {
540        let items = vec![MessageItem {
541            item_type: Some(MessageItemType::Voice),
542            voice_item: Some(VoiceItem {
543                text: Some("transcribed".into()),
544                ..Default::default()
545            }),
546            ..Default::default()
547        }];
548        assert!(extract_media(&items).is_none());
549    }
550
551    #[test]
552    fn context_token_store_set_get() {
553        let store = ContextTokenStore::new();
554        store.set("user1", "token_a");
555        assert_eq!(store.get("user1"), Some("token_a".into()));
556        assert_eq!(store.get("user2"), None);
557    }
558
559    #[test]
560    fn context_token_store_export_import() {
561        let store = ContextTokenStore::new();
562        store.set("u1", "t1");
563        store.set("u2", "t2");
564        let exported = store.export_all();
565        assert_eq!(exported.len(), 2);
566
567        let store2 = ContextTokenStore::new();
568        store2.import(exported);
569        assert_eq!(store2.get("u1"), Some("t1".into()));
570        assert_eq!(store2.get("u2"), Some("t2".into()));
571    }
572}