Skip to main content

construct/channels/
qq.rs

1use super::traits::{Channel, ChannelMessage, SendMessage};
2use async_trait::async_trait;
3use base64::Engine as _;
4use futures_util::{SinkExt, StreamExt};
5use serde::Deserialize;
6use serde_json::json;
7use sha2::{Digest, Sha256};
8use std::collections::{HashMap, HashSet};
9use std::path::{Path, PathBuf};
10use std::sync::Arc;
11use tokio::sync::RwLock;
12use tokio_tungstenite::tungstenite::Message;
13use uuid::Uuid;
14
15const QQ_API_BASE: &str = "https://api.sgroup.qq.com";
16const QQ_AUTH_URL: &str = "https://bots.qq.com/app/getAppAccessToken";
17
18/// Maximum upload size for QQ media files (10 MB).
19const QQ_MAX_UPLOAD_BYTES: u64 = 10 * 1024 * 1024;
20
21/// Maximum entries in the upload cache before eviction.
22const UPLOAD_CACHE_CAPACITY: usize = 500;
23
24/// Passive reply limit per msg_id per hour (QQ API restriction).
25const REPLY_LIMIT: u32 = 4;
26
27/// Passive reply tracking window in seconds (1 hour).
28const REPLY_TTL_SECS: u64 = 3600;
29
30/// Maximum entries in the reply tracker before cleanup.
31const REPLY_TRACKER_CAPACITY: usize = 10_000;
32
33/// QQ API media file types.
34#[derive(Debug, Clone, Copy, PartialEq, Eq)]
35enum QQMediaFileType {
36    /// Image (png, jpg, gif, etc.)
37    Image = 1,
38    /// Video (mp4, mov, etc.)
39    Video = 2,
40    /// Voice — only natively supported formats (.wav, .mp3, .silk).
41    /// Non-native audio formats degrade to `File` instead.
42    /// Note: The TS openclaw-qqbot uses silk-wasm + ffmpeg for full format
43    /// transcoding; Rust version avoids heavyweight dependencies and only
44    /// passes through natively supported formats.
45    Voice = 3,
46    /// File (pdf, zip, or any non-native audio format)
47    File = 4,
48}
49
50/// A parsed media attachment from `[TYPE:target]` markers.
51#[derive(Debug, Clone, PartialEq, Eq)]
52struct QQMediaAttachment {
53    kind: QQMediaFileType,
54    target: String,
55}
56
57/// A segment of outbound message content — either plain text or a media attachment.
58#[derive(Debug, Clone, PartialEq, Eq)]
59enum QQSendSegment {
60    Text(String),
61    Media(QQMediaAttachment),
62}
63
64/// Response from QQ media upload API.
65#[derive(Debug, Deserialize)]
66struct QQUploadResponse {
67    file_info: String,
68    #[allow(dead_code)]
69    file_uuid: Option<String>,
70    ttl: Option<u64>,
71}
72
73/// Cached upload entry to avoid re-uploading the same file within TTL.
74struct UploadCacheEntry {
75    file_info: String,
76    expires_at: u64,
77}
78
79/// Tracks passive reply count per msg_id for QQ API rate limiting.
80struct ReplyRecord {
81    count: u32,
82    first_reply_at: u64,
83}
84
85fn ensure_https(url: &str) -> anyhow::Result<()> {
86    if !url.starts_with("https://") {
87        anyhow::bail!(
88            "Refusing to transmit sensitive data over non-HTTPS URL: URL scheme must be https"
89        );
90    }
91    Ok(())
92}
93
94/// Check whether a file extension is a natively supported QQ voice format.
95fn is_native_voice_ext(ext: &str) -> bool {
96    matches!(ext.to_ascii_lowercase().as_str(), "wav" | "mp3" | "silk")
97}
98
99/// Map a `[TYPE:target]` marker kind string to `QQMediaFileType`.
100///
101/// For AUDIO/VOICE types, the target's extension determines whether it's
102/// sent as `Voice` (native formats only) or degrades to `File`.
103fn marker_kind_to_qq_file_type(marker: &str, target: &str) -> Option<QQMediaFileType> {
104    match marker.trim().to_ascii_uppercase().as_str() {
105        "IMAGE" | "PHOTO" => Some(QQMediaFileType::Image),
106        "DOCUMENT" | "FILE" => Some(QQMediaFileType::File),
107        "VIDEO" => Some(QQMediaFileType::Video),
108        "AUDIO" | "VOICE" => {
109            let ext = Path::new(target.split('?').next().unwrap_or(target))
110                .extension()
111                .and_then(|e| e.to_str())
112                .unwrap_or("");
113            if is_native_voice_ext(ext) {
114                Some(QQMediaFileType::Voice)
115            } else {
116                Some(QQMediaFileType::File)
117            }
118        }
119        _ => None,
120    }
121}
122
123/// Find the matching closing bracket, handling nested brackets.
124fn find_matching_close(s: &str) -> Option<usize> {
125    let mut depth = 1usize;
126    for (i, ch) in s.char_indices() {
127        match ch {
128            '[' => depth += 1,
129            ']' => {
130                depth -= 1;
131                if depth == 0 {
132                    return Some(i);
133                }
134            }
135            _ => {}
136        }
137    }
138    None
139}
140
141/// Parse `[TYPE:target]` attachment markers from message content.
142///
143/// Returns the cleaned text (markers removed) and a list of parsed attachments.
144/// Uses the same bracket-matching logic as `telegram.rs::parse_attachment_markers`.
145fn parse_qq_attachment_markers(content: &str) -> (String, Vec<QQMediaAttachment>) {
146    let mut cleaned = String::with_capacity(content.len());
147    let mut attachments = Vec::new();
148    let mut cursor = 0;
149
150    while cursor < content.len() {
151        let Some(open_rel) = content[cursor..].find('[') else {
152            cleaned.push_str(&content[cursor..]);
153            break;
154        };
155
156        let open = cursor + open_rel;
157        cleaned.push_str(&content[cursor..open]);
158
159        let Some(close_rel) = find_matching_close(&content[open + 1..]) else {
160            cleaned.push_str(&content[open..]);
161            break;
162        };
163
164        let close = open + 1 + close_rel;
165        let marker = &content[open + 1..close];
166
167        let parsed = marker.split_once(':').and_then(|(kind, target)| {
168            let target = target.trim();
169            if target.is_empty() {
170                return None;
171            }
172            let file_type = marker_kind_to_qq_file_type(kind, target)?;
173            Some(QQMediaAttachment {
174                kind: file_type,
175                target: target.to_string(),
176            })
177        });
178
179        if let Some(attachment) = parsed {
180            attachments.push(attachment);
181        } else {
182            cleaned.push_str(&content[open..=close]);
183        }
184
185        cursor = close + 1;
186    }
187
188    (cleaned.trim().to_string(), attachments)
189}
190
191/// Infer attachment type marker from content_type or filename.
192fn infer_attachment_marker(content_type: &str, filename: &str) -> &'static str {
193    let ct = content_type.to_ascii_lowercase();
194    if ct.starts_with("image/") {
195        return "IMAGE";
196    }
197    if ct.starts_with("audio/") || ct.contains("voice") {
198        return "VOICE";
199    }
200    if ct.starts_with("video/") {
201        return "VIDEO";
202    }
203
204    // Fallback to extension
205    let lower = filename.to_ascii_lowercase();
206    if lower.ends_with(".png")
207        || lower.ends_with(".jpg")
208        || lower.ends_with(".jpeg")
209        || lower.ends_with(".gif")
210        || lower.ends_with(".webp")
211        || lower.ends_with(".bmp")
212        || lower.ends_with(".heic")
213        || lower.ends_with(".heif")
214        || lower.ends_with(".svg")
215    {
216        return "IMAGE";
217    }
218    if lower.ends_with(".mp3")
219        || lower.ends_with(".wav")
220        || lower.ends_with(".silk")
221        || lower.ends_with(".ogg")
222        || lower.ends_with(".flac")
223        || lower.ends_with(".m4a")
224    {
225        return "VOICE";
226    }
227    if lower.ends_with(".mp4")
228        || lower.ends_with(".mov")
229        || lower.ends_with(".mkv")
230        || lower.ends_with(".avi")
231        || lower.ends_with(".webm")
232    {
233        return "VIDEO";
234    }
235    "DOCUMENT"
236}
237
238/// Fix QQ attachment URLs that start with `//` (missing scheme).
239fn fix_qq_url(url: &str) -> String {
240    let trimmed = url.trim();
241    if trimmed.starts_with("//") {
242        format!("https:{trimmed}")
243    } else {
244        trimmed.to_string()
245    }
246}
247
248/// Generate a message sequence number for QQ API requests.
249/// Based on timestamp low bits XOR random, range 0~65535.
250fn next_msg_seq() -> u32 {
251    #[allow(clippy::cast_possible_truncation)]
252    let time_part = (std::time::SystemTime::now()
253        .duration_since(std::time::UNIX_EPOCH)
254        .unwrap_or_default()
255        .as_millis() as u32)
256        % 100_000_000;
257    let random = u32::from(rand::random::<u16>());
258    (time_part ^ random) % 65536
259}
260
261/// Get current unix timestamp in seconds.
262fn now_secs() -> u64 {
263    std::time::SystemTime::now()
264        .duration_since(std::time::UNIX_EPOCH)
265        .unwrap_or_default()
266        .as_secs()
267}
268
269/// Deduplication set capacity — evict half of entries when full.
270const DEDUP_CAPACITY: usize = 10_000;
271
272/// Maximum number of retry attempts when fetching the access token.
273const AUTH_RETRY_MAX_ATTEMPTS: u32 = 4;
274
275/// Initial backoff delay for auth token retry (in milliseconds).
276const AUTH_RETRY_INITIAL_BACKOFF_MS: u64 = 500;
277
278/// Maximum backoff delay for auth token retry (in milliseconds).
279const AUTH_RETRY_MAX_BACKOFF_MS: u64 = 8_000;
280
281/// QQ Official Bot channel — uses Tencent's official QQ Bot API with
282/// OAuth2 authentication and a Discord-like WebSocket gateway protocol.
283pub struct QQChannel {
284    app_id: String,
285    app_secret: String,
286    allowed_users: Vec<String>,
287    /// Cached access token + expiry timestamp.
288    token_cache: Arc<RwLock<Option<(String, u64)>>>,
289    /// Message deduplication set.
290    dedup: Arc<RwLock<HashSet<String>>>,
291    /// Workspace directory for saving downloaded attachments.
292    workspace_dir: Option<PathBuf>,
293    /// Upload cache: avoids re-uploading the same file within TTL.
294    upload_cache: Arc<RwLock<HashMap<String, UploadCacheEntry>>>,
295    /// Passive reply tracker for QQ API rate limiting.
296    reply_tracker: Arc<RwLock<HashMap<String, ReplyRecord>>>,
297    /// Per-channel proxy URL override.
298    proxy_url: Option<String>,
299    /// Session ID from the last READY event, used for gateway resume (opcode 6).
300    session_id: Arc<RwLock<Option<String>>>,
301    /// Last sequence number received, used for gateway resume (opcode 6).
302    last_sequence: Arc<RwLock<Option<i64>>>,
303}
304
305impl QQChannel {
306    pub fn new(app_id: String, app_secret: String, allowed_users: Vec<String>) -> Self {
307        Self {
308            app_id,
309            app_secret,
310            allowed_users,
311            token_cache: Arc::new(RwLock::new(None)),
312            dedup: Arc::new(RwLock::new(HashSet::new())),
313            workspace_dir: None,
314            upload_cache: Arc::new(RwLock::new(HashMap::new())),
315            reply_tracker: Arc::new(RwLock::new(HashMap::new())),
316            proxy_url: None,
317            session_id: Arc::new(RwLock::new(None)),
318            last_sequence: Arc::new(RwLock::new(None)),
319        }
320    }
321
322    /// Configure workspace directory for saving downloaded attachments.
323    pub fn with_workspace_dir(mut self, dir: PathBuf) -> Self {
324        self.workspace_dir = Some(dir);
325        self
326    }
327
328    /// Set a per-channel proxy URL that overrides the global proxy config.
329    pub fn with_proxy_url(mut self, proxy_url: Option<String>) -> Self {
330        self.proxy_url = proxy_url;
331        self
332    }
333
334    fn http_client(&self) -> reqwest::Client {
335        crate::config::build_channel_proxy_client("channel.qq", self.proxy_url.as_deref())
336    }
337
338    fn is_user_allowed(&self, user_id: &str) -> bool {
339        self.allowed_users.iter().any(|u| u == "*" || u == user_id)
340    }
341
342    /// Fetch an access token from QQ's OAuth2 endpoint.
343    async fn fetch_access_token(&self) -> anyhow::Result<(String, u64)> {
344        let body = json!({
345            "appId": self.app_id,
346            "clientSecret": self.app_secret,
347        });
348
349        let resp = self
350            .http_client()
351            .post(QQ_AUTH_URL)
352            .json(&body)
353            .send()
354            .await?;
355
356        if !resp.status().is_success() {
357            let status = resp.status();
358            let err = resp.text().await.unwrap_or_default();
359            anyhow::bail!("QQ token request failed ({status}): {err}");
360        }
361
362        let data: serde_json::Value = resp.json().await?;
363        let token = data
364            .get("access_token")
365            .and_then(|t| t.as_str())
366            .ok_or_else(|| anyhow::anyhow!("Missing access_token in QQ response"))?
367            .to_string();
368
369        let expires_in = data
370            .get("expires_in")
371            .and_then(|e| e.as_str())
372            .and_then(|e| e.parse::<u64>().ok())
373            .unwrap_or(7200);
374
375        let now = std::time::SystemTime::now()
376            .duration_since(std::time::UNIX_EPOCH)
377            .unwrap_or_default()
378            .as_secs();
379
380        // Expire 60 seconds early to avoid edge cases
381        let expiry = now + expires_in.saturating_sub(60);
382
383        Ok((token, expiry))
384    }
385
386    /// Fetch an access token with retry and exponential backoff.
387    ///
388    /// Transient failures (network errors, 5xx responses) during reconnection
389    /// can cause the entire recovery loop to fail. This method retries up to
390    /// `AUTH_RETRY_MAX_ATTEMPTS` times with exponential backoff + jitter so
391    /// that a single transient error doesn't permanently break the reconnect
392    /// flow (see issue #4745).
393    async fn fetch_access_token_with_retry(&self) -> anyhow::Result<(String, u64)> {
394        let mut backoff_ms = AUTH_RETRY_INITIAL_BACKOFF_MS;
395        let mut last_err = None;
396
397        for attempt in 1..=AUTH_RETRY_MAX_ATTEMPTS {
398            match self.fetch_access_token().await {
399                Ok(result) => {
400                    if attempt > 1 {
401                        tracing::info!(
402                            "QQ: getAppAccessToken succeeded on attempt {attempt}/{AUTH_RETRY_MAX_ATTEMPTS}"
403                        );
404                    }
405                    return Ok(result);
406                }
407                Err(e) => {
408                    tracing::warn!(
409                        "QQ: getAppAccessToken failed (attempt {attempt}/{AUTH_RETRY_MAX_ATTEMPTS}): {e}"
410                    );
411                    last_err = Some(e);
412
413                    if attempt < AUTH_RETRY_MAX_ATTEMPTS {
414                        // Add jitter: 75%-125% of base backoff
415                        let jitter_factor = 0.75 + (rand::random::<f64>() * 0.5);
416                        #[allow(clippy::cast_possible_truncation, clippy::cast_sign_loss)]
417                        let sleep_ms = (backoff_ms as f64 * jitter_factor) as u64;
418                        tokio::time::sleep(std::time::Duration::from_millis(sleep_ms)).await;
419                        backoff_ms = (backoff_ms * 2).min(AUTH_RETRY_MAX_BACKOFF_MS);
420                    }
421                }
422            }
423        }
424
425        Err(last_err.unwrap_or_else(|| {
426            anyhow::anyhow!("QQ: getAppAccessToken failed after {AUTH_RETRY_MAX_ATTEMPTS} attempts")
427        }))
428    }
429
430    /// Get a valid access token, refreshing if expired.
431    async fn get_token(&self) -> anyhow::Result<String> {
432        let now = std::time::SystemTime::now()
433            .duration_since(std::time::UNIX_EPOCH)
434            .unwrap_or_default()
435            .as_secs();
436
437        {
438            let cache = self.token_cache.read().await;
439            if let Some((ref token, expiry)) = *cache {
440                if now < expiry {
441                    return Ok(token.clone());
442                }
443            }
444        }
445
446        let (token, expiry) = self.fetch_access_token_with_retry().await?;
447        {
448            let mut cache = self.token_cache.write().await;
449            *cache = Some((token.clone(), expiry));
450        }
451        Ok(token)
452    }
453
454    /// Get the WebSocket gateway URL.
455    async fn get_gateway_url(&self, token: &str) -> anyhow::Result<String> {
456        let resp = self
457            .http_client()
458            .get(format!("{QQ_API_BASE}/gateway"))
459            .header("Authorization", format!("QQBot {token}"))
460            .send()
461            .await?;
462
463        if !resp.status().is_success() {
464            let status = resp.status();
465            let err = resp.text().await.unwrap_or_default();
466            anyhow::bail!("QQ gateway request failed ({status}): {err}");
467        }
468
469        let data: serde_json::Value = resp.json().await?;
470        let url = data
471            .get("url")
472            .and_then(|u| u.as_str())
473            .ok_or_else(|| anyhow::anyhow!("Missing gateway URL in QQ response"))?
474            .to_string();
475
476        Ok(url)
477    }
478
479    /// Check and insert message ID for deduplication.
480    async fn is_duplicate(&self, msg_id: &str) -> bool {
481        if msg_id.is_empty() {
482            return false;
483        }
484
485        let mut dedup = self.dedup.write().await;
486
487        if dedup.contains(msg_id) {
488            return true;
489        }
490
491        // Evict oldest half when at capacity
492        if dedup.len() >= DEDUP_CAPACITY {
493            let to_remove: Vec<String> = dedup.iter().take(DEDUP_CAPACITY / 2).cloned().collect();
494            for key in to_remove {
495                dedup.remove(&key);
496            }
497        }
498
499        dedup.insert(msg_id.to_string());
500        false
501    }
502
503    /// Build upload cache key from file content hash.
504    fn upload_cache_key(
505        file_data: &[u8],
506        scope: &str,
507        target_id: &str,
508        file_type: QQMediaFileType,
509    ) -> String {
510        let mut hasher = Sha256::new();
511        hasher.update(file_data);
512        let hash = format!("{:x}", hasher.finalize());
513        format!("{hash}:{scope}:{target_id}:{}", file_type as u8)
514    }
515
516    /// Look up a cached file_info, returning it if still valid.
517    async fn get_cached_upload(&self, cache_key: &str) -> Option<String> {
518        let cache = self.upload_cache.read().await;
519        if let Some(entry) = cache.get(cache_key) {
520            // TTL safety margin: expire 60s early (same as TS version)
521            if now_secs() + 60 < entry.expires_at {
522                return Some(entry.file_info.clone());
523            }
524        }
525        None
526    }
527
528    /// Store a file_info in the upload cache with TTL.
529    async fn set_cached_upload(&self, cache_key: String, file_info: String, ttl: u64) {
530        let mut cache = self.upload_cache.write().await;
531
532        // Evict expired entries if at capacity
533        if cache.len() >= UPLOAD_CACHE_CAPACITY {
534            let now = now_secs();
535            cache.retain(|_, v| v.expires_at > now);
536
537            // If still at capacity, evict half
538            if cache.len() >= UPLOAD_CACHE_CAPACITY {
539                let keys_to_remove: Vec<String> = cache
540                    .keys()
541                    .take(UPLOAD_CACHE_CAPACITY / 2)
542                    .cloned()
543                    .collect();
544                for key in keys_to_remove {
545                    cache.remove(&key);
546                }
547            }
548        }
549
550        cache.insert(
551            cache_key,
552            UploadCacheEntry {
553                file_info,
554                expires_at: now_secs() + ttl,
555            },
556        );
557    }
558
559    /// Track passive reply count for a msg_id. Returns true if reply is allowed.
560    async fn check_reply_allowed(&self, msg_id: &str) -> bool {
561        let now = now_secs();
562        let mut tracker = self.reply_tracker.write().await;
563
564        // Cleanup if tracker is too large
565        if tracker.len() >= REPLY_TRACKER_CAPACITY {
566            tracker.retain(|_, v| now - v.first_reply_at < REPLY_TTL_SECS);
567        }
568
569        if let Some(record) = tracker.get_mut(msg_id) {
570            if now - record.first_reply_at >= REPLY_TTL_SECS {
571                // Window expired, cannot use passive reply
572                return false;
573            }
574            if record.count >= REPLY_LIMIT {
575                return false;
576            }
577            record.count += 1;
578            true
579        } else {
580            tracker.insert(
581                msg_id.to_string(),
582                ReplyRecord {
583                    count: 1,
584                    first_reply_at: now,
585                },
586            );
587            true
588        }
589    }
590
591    /// Resolve the API endpoint path components from a recipient string.
592    /// Returns (scope, id) where scope is "groups" or "users".
593    fn resolve_recipient(recipient: &str) -> (&str, String) {
594        if let Some(group_id) = recipient.strip_prefix("group:") {
595            ("groups", group_id.to_string())
596        } else {
597            let raw_uid = recipient.strip_prefix("user:").unwrap_or(recipient);
598            let user_id: String = raw_uid
599                .chars()
600                .filter(|c| c.is_alphanumeric() || *c == '_')
601                .collect();
602            ("users", user_id)
603        }
604    }
605
606    /// Upload media to QQ API and return file_info for sending.
607    ///
608    /// Supports two modes:
609    /// - URL upload: pass `url = Some(...)`, `file_data = None`
610    /// - Base64 upload: pass `file_data = Some(...)`, `url = None`
611    async fn upload_media(
612        &self,
613        recipient: &str,
614        file_type: QQMediaFileType,
615        url: Option<&str>,
616        file_data: Option<&str>,
617        file_name: Option<&str>,
618    ) -> anyhow::Result<(String, Option<u64>)> {
619        let token = self.get_token().await?;
620        let (scope, id) = Self::resolve_recipient(recipient);
621
622        let api_url = format!("{QQ_API_BASE}/v2/{scope}/{id}/files");
623        ensure_https(&api_url)?;
624
625        let mut body = json!({
626            "file_type": file_type as u8,
627            "srv_send_msg": false,
628        });
629
630        if let Some(u) = url {
631            body["url"] = json!(u);
632        }
633        if let Some(d) = file_data {
634            body["file_data"] = json!(d);
635        }
636        // QQ API uses file_name for File type to display the filename in chat
637        if file_type == QQMediaFileType::File {
638            if let Some(name) = file_name {
639                body["file_name"] = json!(name);
640            }
641        }
642
643        let resp = self
644            .http_client()
645            .post(&api_url)
646            .header("Authorization", format!("QQBot {token}"))
647            .json(&body)
648            .send()
649            .await?;
650
651        if !resp.status().is_success() {
652            let status = resp.status();
653            let err = resp.text().await.unwrap_or_default();
654            anyhow::bail!("QQ upload media failed ({status}): {err}");
655        }
656
657        let upload_resp: QQUploadResponse = resp.json().await?;
658        Ok((upload_resp.file_info, upload_resp.ttl))
659    }
660
661    /// Send a media message (msg_type=7) with an already-uploaded file_info.
662    async fn send_media_message(&self, recipient: &str, file_info: &str) -> anyhow::Result<()> {
663        let token = self.get_token().await?;
664        let (scope, id) = Self::resolve_recipient(recipient);
665
666        let url = format!("{QQ_API_BASE}/v2/{scope}/{id}/messages");
667        ensure_https(&url)?;
668
669        let body = json!({
670            "msg_type": 7,
671            "media": {
672                "file_info": file_info,
673            },
674            "msg_seq": next_msg_seq(),
675        });
676
677        let resp = self
678            .http_client()
679            .post(&url)
680            .header("Authorization", format!("QQBot {token}"))
681            .json(&body)
682            .send()
683            .await?;
684
685        if !resp.status().is_success() {
686            let status = resp.status();
687            let err = resp.text().await.unwrap_or_default();
688            anyhow::bail!("QQ send media message failed ({status}): {err}");
689        }
690
691        Ok(())
692    }
693
694    /// Send a single attachment: resolve local/URL, upload, then send.
695    async fn send_attachment(
696        &self,
697        recipient: &str,
698        attachment: &QQMediaAttachment,
699    ) -> anyhow::Result<()> {
700        let target = attachment.target.trim();
701
702        // Extract filename from target path/URL for File type display
703        let file_name = Path::new(target.split('?').next().unwrap_or(target))
704            .file_name()
705            .and_then(|n| n.to_str())
706            .map(|s| s.to_string());
707
708        if target.starts_with("http://") || target.starts_with("https://") {
709            // URL upload — no caching (remote content may change)
710            let (file_info, _ttl) = self
711                .upload_media(
712                    recipient,
713                    attachment.kind,
714                    Some(target),
715                    None,
716                    file_name.as_deref(),
717                )
718                .await?;
719            self.send_media_message(recipient, &file_info).await?;
720        } else {
721            // Local file upload
722            let path = Path::new(target);
723            if !path.exists() {
724                anyhow::bail!("QQ attachment path not found: {target}");
725            }
726
727            let metadata = tokio::fs::metadata(path).await?;
728            if metadata.len() > QQ_MAX_UPLOAD_BYTES {
729                anyhow::bail!(
730                    "QQ attachment too large ({} bytes, max {}): {target}",
731                    metadata.len(),
732                    QQ_MAX_UPLOAD_BYTES
733                );
734            }
735
736            let file_bytes = tokio::fs::read(path).await?;
737            let (scope_label, target_id) = Self::resolve_recipient(recipient);
738            let scope = if scope_label == "groups" {
739                "group"
740            } else {
741                "c2c"
742            };
743            let cache_key = Self::upload_cache_key(&file_bytes, scope, &target_id, attachment.kind);
744
745            // Check upload cache
746            if let Some(cached_file_info) = self.get_cached_upload(&cache_key).await {
747                tracing::debug!("QQ: using cached upload for {target}");
748                self.send_media_message(recipient, &cached_file_info)
749                    .await?;
750                return Ok(());
751            }
752
753            let b64 = base64::engine::general_purpose::STANDARD.encode(&file_bytes);
754            let (file_info, ttl) = self
755                .upload_media(
756                    recipient,
757                    attachment.kind,
758                    None,
759                    Some(&b64),
760                    file_name.as_deref(),
761                )
762                .await?;
763
764            // Cache the upload result
765            if let Some(ttl_secs) = ttl {
766                self.set_cached_upload(cache_key, file_info.clone(), ttl_secs)
767                    .await;
768            }
769
770            self.send_media_message(recipient, &file_info).await?;
771        }
772
773        Ok(())
774    }
775
776    /// Compose message content from an incoming QQ event payload.
777    ///
778    /// Handles all attachment types (not just images), downloads to workspace
779    /// if configured, and generates appropriate `[TYPE:path]` markers.
780    async fn compose_message_content(&self, payload: &serde_json::Value) -> Option<String> {
781        let text = payload
782            .get("content")
783            .and_then(|c| c.as_str())
784            .unwrap_or("")
785            .trim();
786
787        let mut markers: Vec<String> = Vec::new();
788        let mut voice_transcripts: Vec<String> = Vec::new();
789
790        if let Some(attachments) = payload.get("attachments").and_then(|a| a.as_array()) {
791            for att in attachments {
792                let url = match att.get("url").and_then(|u| u.as_str()) {
793                    Some(u) if !u.trim().is_empty() => fix_qq_url(u),
794                    _ => continue,
795                };
796
797                let content_type = att
798                    .get("content_type")
799                    .and_then(|ct| ct.as_str())
800                    .unwrap_or("");
801                let filename = att
802                    .get("filename")
803                    .and_then(|f| f.as_str())
804                    .unwrap_or("attachment");
805
806                let marker_type = infer_attachment_marker(content_type, filename);
807
808                // For voice attachments, prefer voice_wav_url (WAV format) over
809                // the default url (AMR/SILK). QQ provides this for direct use
810                // without transcoding. (aligned with openclaw-qqbot behavior)
811                let is_voice = content_type == "voice"
812                    || content_type.starts_with("audio/")
813                    || marker_type == "VOICE";
814                let (download_url, save_filename) = if is_voice {
815                    if let Some(wav_url) = att
816                        .get("voice_wav_url")
817                        .and_then(|u| u.as_str())
818                        .filter(|u| !u.trim().is_empty())
819                    {
820                        let fixed = fix_qq_url(wav_url);
821                        // Extract filename from WAV URL path
822                        let wav_name = Path::new(fixed.split('?').next().unwrap_or(&fixed))
823                            .file_name()
824                            .and_then(|n| n.to_str())
825                            .unwrap_or("voice.wav")
826                            .to_string();
827                        (fixed, wav_name)
828                    } else {
829                        (url.clone(), filename.to_string())
830                    }
831                } else {
832                    (url.clone(), filename.to_string())
833                };
834
835                // Try to download to workspace
836                let location = if let Some(ref ws) = self.workspace_dir {
837                    let dir = ws.join("qq_files");
838                    match self
839                        .download_attachment(&download_url, &dir, &save_filename)
840                        .await
841                    {
842                        Ok(local_path) => local_path.display().to_string(),
843                        Err(e) => {
844                            tracing::warn!("QQ: failed to download attachment: {e}");
845                            url.clone()
846                        }
847                    }
848                } else {
849                    url.clone()
850                };
851
852                if is_voice {
853                    // For voice: include ASR transcription text (aligned with
854                    // openclaw-qqbot format: "[语音消息] transcribed text")
855                    // Also keep the file path marker for future multimodal support
856                    markers.push(format!("[{marker_type}:{location}]"));
857                    if let Some(asr_text) = att
858                        .get("asr_refer_text")
859                        .and_then(|t| t.as_str())
860                        .map(|t| t.trim())
861                        .filter(|t| !t.is_empty())
862                    {
863                        voice_transcripts.push(asr_text.to_string());
864                    }
865                } else {
866                    markers.push(format!("[{marker_type}:{location}]"));
867                }
868            }
869        }
870
871        // Voice ASR transcription uses angle brackets to distinguish from
872        // [TYPE:target] media markers (which use square brackets)
873        let voice_text = match voice_transcripts.len() {
874            0 => String::new(),
875            1 => format!(
876                "<VOICE_TRANSCRIPTION>{}</VOICE_TRANSCRIPTION>",
877                voice_transcripts[0]
878            ),
879            _ => voice_transcripts
880                .iter()
881                .enumerate()
882                .map(|(i, t)| format!("<VOICE_TRANSCRIPTION_{i}>{t}</VOICE_TRANSCRIPTION_{i}>"))
883                .collect::<Vec<_>>()
884                .join("\n"),
885        };
886
887        let mut parts: Vec<&str> = Vec::new();
888        if !text.is_empty() {
889            parts.push(text);
890        }
891        if !voice_text.is_empty() {
892            parts.push(&voice_text);
893        }
894        let markers_joined = markers.join("\n");
895        if !markers_joined.is_empty() {
896            parts.push(&markers_joined);
897        }
898
899        if parts.is_empty() {
900            return None;
901        }
902
903        Some(parts.join("\n"))
904    }
905
906    /// Download an attachment to the local workspace directory.
907    async fn download_attachment(
908        &self,
909        url: &str,
910        dir: &Path,
911        filename: &str,
912    ) -> anyhow::Result<PathBuf> {
913        tokio::fs::create_dir_all(dir).await?;
914
915        // Generate a unique filename to avoid collisions
916        let stem = Path::new(filename)
917            .file_stem()
918            .and_then(|s| s.to_str())
919            .unwrap_or("file");
920        let ext = Path::new(filename)
921            .extension()
922            .and_then(|e| e.to_str())
923            .unwrap_or("");
924        let unique = &Uuid::new_v4().to_string()[..8];
925        let safe_name = if ext.is_empty() {
926            format!("{stem}_{unique}")
927        } else {
928            format!("{stem}_{unique}.{ext}")
929        };
930
931        let dest = dir.join(&safe_name);
932
933        // QQ multimedia URLs carry rkey auth in query params — no Authorization header needed
934        // (consistent with openclaw-qqbot's downloadFile implementation)
935        let resp = self.http_client().get(url).send().await?;
936        if !resp.status().is_success() {
937            anyhow::bail!("Download failed ({}): {url}", resp.status());
938        }
939
940        let bytes = resp.bytes().await?;
941        tokio::fs::write(&dest, &bytes).await?;
942
943        Ok(dest)
944    }
945
946    /// Send a markdown text message (msg_type=2).
947    async fn send_text_markdown(&self, recipient: &str, content: &str) -> anyhow::Result<()> {
948        let token = self.get_token().await?;
949        let (scope, id) = Self::resolve_recipient(recipient);
950
951        let url = format!("{QQ_API_BASE}/v2/{scope}/{id}/messages");
952        ensure_https(&url)?;
953
954        let body = json!({
955            "markdown": {
956                "content": content,
957            },
958            "msg_type": 2,
959            "msg_seq": next_msg_seq(),
960        });
961
962        let resp = self
963            .http_client()
964            .post(&url)
965            .header("Authorization", format!("QQBot {token}"))
966            .json(&body)
967            .send()
968            .await?;
969
970        if !resp.status().is_success() {
971            let status = resp.status();
972            let err = resp.text().await.unwrap_or_default();
973            anyhow::bail!("QQ send message failed ({status}): {err}");
974        }
975
976        Ok(())
977    }
978}
979
980#[async_trait]
981impl Channel for QQChannel {
982    fn name(&self) -> &str {
983        "qq"
984    }
985
986    async fn send(&self, message: &SendMessage) -> anyhow::Result<()> {
987        let (cleaned_text, attachments) = parse_qq_attachment_markers(&message.content);
988
989        if attachments.is_empty() {
990            // No media markers — send as markdown (original path)
991            return self
992                .send_text_markdown(&message.recipient, &message.content)
993                .await;
994        }
995
996        // Send cleaned text first (if non-empty)
997        if !cleaned_text.is_empty() {
998            self.send_text_markdown(&message.recipient, &cleaned_text)
999                .await?;
1000        }
1001
1002        // Send each media attachment
1003        for attachment in &attachments {
1004            if let Err(e) = self.send_attachment(&message.recipient, attachment).await {
1005                tracing::warn!(
1006                    target = attachment.target,
1007                    error = %e,
1008                    "QQ: failed to send media attachment; falling back to text"
1009                );
1010                // Degrade to text fallback
1011                let fallback = format!(
1012                    "{}: {}",
1013                    match attachment.kind {
1014                        QQMediaFileType::Image => "Image",
1015                        QQMediaFileType::Video => "Video",
1016                        QQMediaFileType::Voice => "Voice",
1017                        QQMediaFileType::File => "File",
1018                    },
1019                    attachment.target
1020                );
1021                self.send_text_markdown(&message.recipient, &fallback)
1022                    .await?;
1023            }
1024        }
1025
1026        Ok(())
1027    }
1028
1029    #[allow(clippy::too_many_lines)]
1030    async fn listen(&self, tx: tokio::sync::mpsc::Sender<ChannelMessage>) -> anyhow::Result<()> {
1031        tracing::info!("QQ: authenticating...");
1032        let token = self.get_token().await?;
1033
1034        tracing::info!("QQ: fetching gateway URL...");
1035        let gw_url = self.get_gateway_url(&token).await?;
1036
1037        tracing::info!("QQ: connecting to gateway WebSocket...");
1038        let (ws_stream, _) =
1039            crate::config::ws_connect_with_proxy(&gw_url, "channel.qq", self.proxy_url.as_deref())
1040                .await?;
1041        let (mut write, mut read) = ws_stream.split();
1042
1043        // Read Hello (opcode 10)
1044        let hello = read
1045            .next()
1046            .await
1047            .ok_or(anyhow::anyhow!("QQ: no hello frame"))??;
1048        let hello_data: serde_json::Value = serde_json::from_str(&hello.to_string())?;
1049        let heartbeat_interval = hello_data
1050            .get("d")
1051            .and_then(|d| d.get("heartbeat_interval"))
1052            .and_then(serde_json::Value::as_u64)
1053            .unwrap_or(41250);
1054
1055        // Check if we can resume a previous session
1056        let stored_session = self.session_id.read().await.clone();
1057        let stored_seq = *self.last_sequence.read().await;
1058
1059        if let (Some(sid), Some(seq)) = (&stored_session, stored_seq) {
1060            // Attempt Resume (opcode 6)
1061            tracing::info!("QQ: attempting session resume (session_id={sid}, seq={seq})");
1062            let resume = json!({
1063                "op": 6,
1064                "d": {
1065                    "token": format!("QQBot {token}"),
1066                    "session_id": sid,
1067                    "seq": seq,
1068                }
1069            });
1070            write.send(Message::Text(resume.to_string().into())).await?;
1071        } else {
1072            // Send Identify (opcode 2)
1073            // Intents: PUBLIC_GUILD_MESSAGES (1<<30) | C2C_MESSAGE_CREATE & GROUP_AT_MESSAGE_CREATE (1<<25)
1074            let intents: u64 = (1 << 25) | (1 << 30);
1075            let identify = json!({
1076                "op": 2,
1077                "d": {
1078                    "token": format!("QQBot {token}"),
1079                    "intents": intents,
1080                    "properties": {
1081                        "os": "linux",
1082                        "browser": "construct",
1083                        "device": "construct",
1084                    }
1085                }
1086            });
1087            write
1088                .send(Message::Text(identify.to_string().into()))
1089                .await?;
1090            tracing::info!("QQ: connected and sent Identify");
1091        }
1092
1093        let mut sequence: i64 = stored_seq.unwrap_or(-1);
1094
1095        // Track consecutive missed heartbeat ACKs.  The previous logic
1096        // killed the connection on the *first* missed ACK which is overly
1097        // aggressive -- transient network hiccups or brief server-side GC
1098        // pauses can cause a single ACK to be delayed.  We now allow up to
1099        // `MAX_MISSED_ACKS` consecutive misses before declaring the
1100        // connection dead.
1101        const MAX_MISSED_ACKS: u32 = 3;
1102        let mut missed_ack_count: u32 = 0;
1103
1104        // Spawn heartbeat timer.
1105        //
1106        // We add a small grace period (10% of the server-provided interval,
1107        // capped at 5s) so that a slightly-delayed ACK does not immediately
1108        // count as missed.
1109        let hb_interval = heartbeat_interval;
1110        let grace_ms: u64 = (hb_interval / 10).min(5_000);
1111        let effective_interval = hb_interval.saturating_add(grace_ms);
1112
1113        let (hb_tx, mut hb_rx) = tokio::sync::mpsc::channel::<()>(1);
1114        tokio::spawn(async move {
1115            let mut interval =
1116                tokio::time::interval(std::time::Duration::from_millis(effective_interval));
1117            loop {
1118                interval.tick().await;
1119                if hb_tx.send(()).await.is_err() {
1120                    break;
1121                }
1122            }
1123        });
1124
1125        // Reason the loop exited — used to decide error type
1126        enum ExitReason {
1127            Reconnect,
1128            InvalidSession,
1129            Close(Option<tokio_tungstenite::tungstenite::protocol::CloseFrame>),
1130            StreamEnded,
1131            HeartbeatTimeout,
1132            WriteFailed,
1133            ChannelClosed,
1134        }
1135
1136        let exit_reason;
1137
1138        'outer: loop {
1139            tokio::select! {
1140                _ = hb_rx.recv() => {
1141                    // Increment the missed-ACK counter.  Only declare the
1142                    // connection dead after MAX_MISSED_ACKS consecutive
1143                    // heartbeats go un-acknowledged.
1144                    if missed_ack_count > 0 {
1145                        if missed_ack_count >= MAX_MISSED_ACKS {
1146                            tracing::warn!(
1147                                "QQ: {missed_ack_count} consecutive heartbeat ACKs missed \
1148                                 (interval {hb_interval}ms + {grace_ms}ms grace); \
1149                                 connection appears zombied"
1150                            );
1151                            exit_reason = ExitReason::HeartbeatTimeout;
1152                            break;
1153                        }
1154                        tracing::info!(
1155                            "QQ: heartbeat ACK missed ({missed_ack_count}/{MAX_MISSED_ACKS}); \
1156                             tolerating transient delay"
1157                        );
1158                    }
1159                    let d = if sequence >= 0 { json!(sequence) } else { json!(null) };
1160                    let hb = json!({"op": 1, "d": d});
1161                    if write
1162                        .send(Message::Text(hb.to_string().into()))
1163                        .await
1164                        .is_err()
1165                    {
1166                        exit_reason = ExitReason::WriteFailed;
1167                        break;
1168                    }
1169                    missed_ack_count += 1;
1170                }
1171                msg = read.next() => {
1172                    let msg = match msg {
1173                        Some(Ok(Message::Text(t))) => t,
1174                        Some(Ok(Message::Ping(payload))) => {
1175                            if write.send(Message::Pong(payload)).await.is_err() {
1176                                exit_reason = ExitReason::WriteFailed;
1177                                break;
1178                            }
1179                            continue;
1180                        }
1181                        Some(Ok(Message::Close(frame))) => {
1182                            exit_reason = ExitReason::Close(frame);
1183                            break;
1184                        }
1185                        None => {
1186                            exit_reason = ExitReason::StreamEnded;
1187                            break;
1188                        }
1189                        _ => continue,
1190                    };
1191
1192                    let event: serde_json::Value = match serde_json::from_str(msg.as_ref()) {
1193                        Ok(e) => e,
1194                        Err(_) => continue,
1195                    };
1196
1197                    // Track sequence number
1198                    if let Some(s) = event.get("s").and_then(serde_json::Value::as_i64) {
1199                        sequence = s;
1200                    }
1201
1202                    let op = event.get("op").and_then(serde_json::Value::as_u64).unwrap_or(0);
1203
1204                    match op {
1205                        // Server requests immediate heartbeat
1206                        1 => {
1207                            let d = if sequence >= 0 { json!(sequence) } else { json!(null) };
1208                            let hb = json!({"op": 1, "d": d});
1209                            if write
1210                                .send(Message::Text(hb.to_string().into()))
1211                                .await
1212                                .is_err()
1213                            {
1214                                exit_reason = ExitReason::WriteFailed;
1215                                break;
1216                            }
1217                            missed_ack_count += 1;
1218                            continue;
1219                        }
1220                        // Reconnect
1221                        7 => {
1222                            tracing::warn!("QQ: received Reconnect (op 7); will resume");
1223                            exit_reason = ExitReason::Reconnect;
1224                            break;
1225                        }
1226                        // Invalid Session
1227                        9 => {
1228                            tracing::warn!("QQ: received Invalid Session (op 9); clearing session for fresh auth");
1229                            exit_reason = ExitReason::InvalidSession;
1230                            break;
1231                        }
1232                        // Heartbeat ACK
1233                        11 => {
1234                            missed_ack_count = 0;
1235                            continue;
1236                        }
1237                        _ => {}
1238                    }
1239
1240                    // Only process dispatch events (op 0)
1241                    if op != 0 {
1242                        continue;
1243                    }
1244
1245                    let event_type = event.get("t").and_then(|t| t.as_str()).unwrap_or("");
1246                    let d = match event.get("d") {
1247                        Some(d) => d,
1248                        None => continue,
1249                    };
1250
1251                    // Capture session_id from READY event for future resume
1252                    if event_type == "READY" || event_type == "RESUMED" {
1253                        if let Some(sid) = d.get("session_id").and_then(|s| s.as_str()) {
1254                            *self.session_id.write().await = Some(sid.to_string());
1255                            tracing::info!("QQ: session established (session_id={sid}, event={event_type})");
1256                        }
1257                        continue;
1258                    }
1259
1260                    tracing::debug!("QQ: event_type={event_type} payload={d}");
1261
1262                    match event_type {
1263                        "C2C_MESSAGE_CREATE" => {
1264                            let msg_id = d.get("id").and_then(|i| i.as_str()).unwrap_or("");
1265                            if self.is_duplicate(msg_id).await {
1266                                continue;
1267                            }
1268
1269                            let Some(content) = self.compose_message_content(d).await else {
1270                                continue;
1271                            };
1272
1273                            let author_id = d.get("author").and_then(|a| a.get("id")).and_then(|i| i.as_str()).unwrap_or("unknown");
1274                            // For QQ, user_openid is the identifier
1275                            let user_openid = d.get("author").and_then(|a| a.get("user_openid")).and_then(|u| u.as_str()).unwrap_or(author_id);
1276
1277                            if !self.is_user_allowed(user_openid) {
1278                                tracing::warn!("QQ: ignoring C2C message from unauthorized user: {user_openid}");
1279                                continue;
1280                            }
1281
1282                            let chat_id = format!("user:{user_openid}");
1283
1284                            let channel_msg = ChannelMessage {
1285                                id: Uuid::new_v4().to_string(),
1286                                sender: user_openid.to_string(),
1287                                reply_target: chat_id,
1288                                content,
1289                                channel: "qq".to_string(),
1290                                timestamp: std::time::SystemTime::now()
1291                                    .duration_since(std::time::UNIX_EPOCH)
1292                                    .unwrap_or_default()
1293                                    .as_secs(),
1294                                thread_ts: None,
1295                                interruption_scope_id: None,
1296                    attachments: vec![],
1297                            };
1298
1299                            if tx.send(channel_msg).await.is_err() {
1300                                tracing::warn!("QQ: message channel closed");
1301                                exit_reason = ExitReason::ChannelClosed;
1302                                break 'outer;
1303                            }
1304                        }
1305                        "GROUP_AT_MESSAGE_CREATE" => {
1306                            let msg_id = d.get("id").and_then(|i| i.as_str()).unwrap_or("");
1307                            if self.is_duplicate(msg_id).await {
1308                                continue;
1309                            }
1310
1311                            let Some(content) = self.compose_message_content(d).await else {
1312                                continue;
1313                            };
1314
1315                            let author_id = d.get("author").and_then(|a| a.get("member_openid")).and_then(|m| m.as_str()).unwrap_or("unknown");
1316
1317                            if !self.is_user_allowed(author_id) {
1318                                tracing::warn!("QQ: ignoring group message from unauthorized user: {author_id}");
1319                                continue;
1320                            }
1321
1322                            let group_openid = d.get("group_openid").and_then(|g| g.as_str()).unwrap_or("unknown");
1323                            let chat_id = format!("group:{group_openid}");
1324
1325                            let channel_msg = ChannelMessage {
1326                                id: Uuid::new_v4().to_string(),
1327                                sender: author_id.to_string(),
1328                                reply_target: chat_id,
1329                                content,
1330                                channel: "qq".to_string(),
1331                                timestamp: std::time::SystemTime::now()
1332                                    .duration_since(std::time::UNIX_EPOCH)
1333                                    .unwrap_or_default()
1334                                    .as_secs(),
1335                                thread_ts: None,
1336                                interruption_scope_id: None,
1337                    attachments: vec![],
1338                            };
1339
1340                            if tx.send(channel_msg).await.is_err() {
1341                                tracing::warn!("QQ: message channel closed");
1342                                exit_reason = ExitReason::ChannelClosed;
1343                                break 'outer;
1344                            }
1345                        }
1346                        _ => {}
1347                    }
1348                }
1349            }
1350        }
1351
1352        // Persist sequence number for potential resume on next reconnect
1353        *self.last_sequence.write().await = if sequence >= 0 { Some(sequence) } else { None };
1354
1355        match exit_reason {
1356            ExitReason::InvalidSession => {
1357                // Clear stored session so next reconnect does a fresh Identify
1358                *self.session_id.write().await = None;
1359                *self.last_sequence.write().await = None;
1360                anyhow::bail!(
1361                    "QQ WebSocket connection closed: invalid session (fresh auth required)"
1362                )
1363            }
1364            ExitReason::Reconnect => {
1365                // Session state preserved — supervisor will reconnect and we'll attempt Resume
1366                anyhow::bail!(
1367                    "QQ WebSocket connection closed: server requested reconnect (resume will be attempted)"
1368                )
1369            }
1370            ExitReason::Close(ref frame) => {
1371                let (code, reason) = frame
1372                    .as_ref()
1373                    .map(|f| (f.code.to_string(), f.reason.to_string()))
1374                    .unwrap_or_else(|| ("unknown".into(), "none".into()));
1375                tracing::warn!(
1376                    "QQ: WebSocket closed with code={code}, reason=\"{reason}\"; \
1377                     resume will be attempted on reconnect"
1378                );
1379                anyhow::bail!(
1380                    "QQ WebSocket connection closed: close_code={code}, reason=\"{reason}\""
1381                )
1382            }
1383            ExitReason::StreamEnded => {
1384                tracing::warn!(
1385                    "QQ: WebSocket stream ended unexpectedly; resume will be attempted on reconnect"
1386                );
1387                anyhow::bail!("QQ WebSocket connection closed: stream ended unexpectedly")
1388            }
1389            ExitReason::HeartbeatTimeout => {
1390                tracing::warn!(
1391                    "QQ: heartbeat timeout after {MAX_MISSED_ACKS} consecutive missed ACKs; \
1392                     resume will be attempted on reconnect"
1393                );
1394                anyhow::bail!(
1395                    "QQ WebSocket connection closed: heartbeat ACK timeout \
1396                     ({MAX_MISSED_ACKS} consecutive missed ACKs)"
1397                )
1398            }
1399            ExitReason::WriteFailed => {
1400                tracing::warn!("QQ: WebSocket write failed; resume will be attempted on reconnect");
1401                anyhow::bail!("QQ WebSocket connection closed: write failed")
1402            }
1403            ExitReason::ChannelClosed => {
1404                anyhow::bail!("QQ WebSocket connection closed: internal message channel closed")
1405            }
1406        }
1407    }
1408
1409    async fn health_check(&self) -> bool {
1410        self.fetch_access_token_with_retry().await.is_ok()
1411    }
1412}
1413
1414#[cfg(test)]
1415mod tests {
1416    use super::*;
1417    use serde_json::json;
1418
1419    fn make_channel() -> QQChannel {
1420        QQChannel::new("id".into(), "secret".into(), vec![])
1421    }
1422
1423    #[test]
1424    fn test_name() {
1425        let ch = make_channel();
1426        assert_eq!(ch.name(), "qq");
1427    }
1428
1429    #[test]
1430    fn test_user_allowed_wildcard() {
1431        let ch = QQChannel::new("id".into(), "secret".into(), vec!["*".into()]);
1432        assert!(ch.is_user_allowed("anyone"));
1433    }
1434
1435    #[test]
1436    fn test_user_allowed_specific() {
1437        let ch = QQChannel::new("id".into(), "secret".into(), vec!["user123".into()]);
1438        assert!(ch.is_user_allowed("user123"));
1439        assert!(!ch.is_user_allowed("other"));
1440    }
1441
1442    #[test]
1443    fn test_user_denied_empty() {
1444        let ch = make_channel();
1445        assert!(!ch.is_user_allowed("anyone"));
1446    }
1447
1448    #[tokio::test]
1449    async fn test_dedup() {
1450        let ch = make_channel();
1451        assert!(!ch.is_duplicate("msg1").await);
1452        assert!(ch.is_duplicate("msg1").await);
1453        assert!(!ch.is_duplicate("msg2").await);
1454    }
1455
1456    #[tokio::test]
1457    async fn test_dedup_empty_id() {
1458        let ch = make_channel();
1459        assert!(!ch.is_duplicate("").await);
1460        assert!(!ch.is_duplicate("").await);
1461    }
1462
1463    #[test]
1464    fn test_config_serde() {
1465        let toml_str = r#"
1466app_id = "12345"
1467app_secret = "secret_abc"
1468allowed_users = ["user1"]
1469"#;
1470        let config: crate::config::schema::QQConfig = toml::from_str(toml_str).unwrap();
1471        assert_eq!(config.app_id, "12345");
1472        assert_eq!(config.app_secret, "secret_abc");
1473        assert_eq!(config.allowed_users, vec!["user1"]);
1474    }
1475
1476    // --- Marker parsing tests ---
1477
1478    #[test]
1479    fn test_parse_qq_markers_single_image() {
1480        let (text, atts) = parse_qq_attachment_markers("Hello [IMAGE:/tmp/a.png] world");
1481        assert_eq!(text, "Hello  world");
1482        assert_eq!(atts.len(), 1);
1483        assert_eq!(atts[0].kind, QQMediaFileType::Image);
1484        assert_eq!(atts[0].target, "/tmp/a.png");
1485    }
1486
1487    #[test]
1488    fn test_parse_qq_markers_multiple() {
1489        let (text, atts) =
1490            parse_qq_attachment_markers("[IMAGE:/a.png] text [VIDEO:https://example.com/v.mp4]");
1491        assert_eq!(text, "text");
1492        assert_eq!(atts.len(), 2);
1493        assert_eq!(atts[0].kind, QQMediaFileType::Image);
1494        assert_eq!(atts[1].kind, QQMediaFileType::Video);
1495    }
1496
1497    #[test]
1498    fn test_parse_qq_markers_no_markers() {
1499        let (text, atts) = parse_qq_attachment_markers("Just plain text");
1500        assert_eq!(text, "Just plain text");
1501        assert!(atts.is_empty());
1502    }
1503
1504    #[test]
1505    fn test_parse_qq_markers_case_insensitive() {
1506        let (_, atts) = parse_qq_attachment_markers("[image:/a.png]");
1507        assert_eq!(atts.len(), 1);
1508        assert_eq!(atts[0].kind, QQMediaFileType::Image);
1509
1510        let (_, atts) = parse_qq_attachment_markers("[Image:/a.png]");
1511        assert_eq!(atts.len(), 1);
1512        assert_eq!(atts[0].kind, QQMediaFileType::Image);
1513    }
1514
1515    #[test]
1516    fn test_parse_qq_markers_invalid_preserved() {
1517        let (text, atts) = parse_qq_attachment_markers("Keep [UNKNOWN:foo] here");
1518        assert_eq!(text, "Keep [UNKNOWN:foo] here");
1519        assert!(atts.is_empty());
1520    }
1521
1522    #[test]
1523    fn test_parse_qq_markers_mixed_text_and_markers() {
1524        let (text, atts) =
1525            parse_qq_attachment_markers("Before [DOCUMENT:/doc.pdf] middle [PHOTO:/p.jpg] after");
1526        assert_eq!(text, "Before  middle  after");
1527        assert_eq!(atts.len(), 2);
1528        assert_eq!(atts[0].kind, QQMediaFileType::File);
1529        assert_eq!(atts[0].target, "/doc.pdf");
1530        assert_eq!(atts[1].kind, QQMediaFileType::Image);
1531        assert_eq!(atts[1].target, "/p.jpg");
1532    }
1533
1534    // --- marker_kind_to_qq_file_type tests ---
1535
1536    #[test]
1537    fn test_marker_kind_image() {
1538        assert_eq!(
1539            marker_kind_to_qq_file_type("IMAGE", "/a.png"),
1540            Some(QQMediaFileType::Image)
1541        );
1542        assert_eq!(
1543            marker_kind_to_qq_file_type("PHOTO", "/a.png"),
1544            Some(QQMediaFileType::Image)
1545        );
1546    }
1547
1548    #[test]
1549    fn test_marker_kind_document() {
1550        assert_eq!(
1551            marker_kind_to_qq_file_type("DOCUMENT", "/a.pdf"),
1552            Some(QQMediaFileType::File)
1553        );
1554        assert_eq!(
1555            marker_kind_to_qq_file_type("FILE", "/a.zip"),
1556            Some(QQMediaFileType::File)
1557        );
1558    }
1559
1560    #[test]
1561    fn test_marker_kind_video() {
1562        assert_eq!(
1563            marker_kind_to_qq_file_type("VIDEO", "/v.mp4"),
1564            Some(QQMediaFileType::Video)
1565        );
1566    }
1567
1568    #[test]
1569    fn test_marker_kind_voice_native() {
1570        assert_eq!(
1571            marker_kind_to_qq_file_type("VOICE", "/a.mp3"),
1572            Some(QQMediaFileType::Voice)
1573        );
1574        assert_eq!(
1575            marker_kind_to_qq_file_type("AUDIO", "/a.wav"),
1576            Some(QQMediaFileType::Voice)
1577        );
1578        assert_eq!(
1579            marker_kind_to_qq_file_type("VOICE", "/a.silk"),
1580            Some(QQMediaFileType::Voice)
1581        );
1582    }
1583
1584    #[test]
1585    fn test_marker_kind_voice_non_native_degrades() {
1586        // .ogg is not a natively supported QQ voice format — degrades to File
1587        assert_eq!(
1588            marker_kind_to_qq_file_type("VOICE", "/a.ogg"),
1589            Some(QQMediaFileType::File)
1590        );
1591        assert_eq!(
1592            marker_kind_to_qq_file_type("AUDIO", "/a.flac"),
1593            Some(QQMediaFileType::File)
1594        );
1595    }
1596
1597    // --- Upload/send body construction tests ---
1598
1599    #[test]
1600    fn test_upload_body_url() {
1601        let body = json!({
1602            "file_type": QQMediaFileType::Image as u8,
1603            "srv_send_msg": false,
1604            "url": "https://example.com/a.jpg",
1605        });
1606        assert_eq!(body["file_type"], 1);
1607        assert_eq!(body["srv_send_msg"], false);
1608        assert_eq!(body["url"], "https://example.com/a.jpg");
1609        assert!(body.get("file_data").is_none());
1610    }
1611
1612    #[test]
1613    fn test_upload_body_base64() {
1614        let body = json!({
1615            "file_type": QQMediaFileType::File as u8,
1616            "srv_send_msg": false,
1617            "file_data": "dGVzdA==",
1618        });
1619        assert_eq!(body["file_type"], 4);
1620        assert_eq!(body["file_data"], "dGVzdA==");
1621        assert!(body.get("url").is_none());
1622    }
1623
1624    #[test]
1625    fn test_send_media_body_msg_type_7() {
1626        let file_info = "some_file_info_string";
1627        let body = json!({
1628            "msg_type": 7,
1629            "media": {
1630                "file_info": file_info,
1631            },
1632            "msg_seq": 1,
1633        });
1634        assert_eq!(body["msg_type"], 7);
1635        assert_eq!(body["media"]["file_info"], file_info);
1636    }
1637
1638    // --- compose_message_content tests (now async) ---
1639
1640    #[tokio::test]
1641    async fn test_compose_message_content_text_only() {
1642        let ch = make_channel();
1643        let payload = json!({ "content": "  hello world  " });
1644        assert_eq!(
1645            ch.compose_message_content(&payload).await,
1646            Some("hello world".to_string())
1647        );
1648    }
1649
1650    #[tokio::test]
1651    async fn test_compose_message_content_image_attachment() {
1652        let ch = make_channel();
1653        let payload = json!({
1654            "content": "   ",
1655            "attachments": [{
1656                "content_type": "image/jpg",
1657                "url": "https://cdn.example.com/a.jpg"
1658            }]
1659        });
1660        assert_eq!(
1661            ch.compose_message_content(&payload).await,
1662            Some("[IMAGE:https://cdn.example.com/a.jpg]".to_string())
1663        );
1664    }
1665
1666    #[tokio::test]
1667    async fn test_compose_message_content_text_and_attachments() {
1668        let ch = make_channel();
1669        let payload = json!({
1670            "content": "Here is an image",
1671            "attachments": [
1672                { "content_type": "image/png", "url": "https://cdn.example.com/a.png" },
1673                { "filename": "b.jpeg", "url": "https://cdn.example.com/b.jpeg" }
1674            ]
1675        });
1676        assert_eq!(
1677            ch.compose_message_content(&payload).await,
1678            Some(
1679                "Here is an image\n[IMAGE:https://cdn.example.com/a.png]\n[IMAGE:https://cdn.example.com/b.jpeg]"
1680                    .to_string()
1681            )
1682        );
1683    }
1684
1685    #[tokio::test]
1686    async fn test_compose_all_attachment_types() {
1687        let ch = make_channel();
1688        let payload = json!({
1689            "content": "",
1690            "attachments": [
1691                { "content_type": "image/png", "url": "https://cdn.example.com/a.png" },
1692                { "content_type": "audio/mpeg", "url": "https://cdn.example.com/b.mp3" },
1693                { "content_type": "video/mp4", "url": "https://cdn.example.com/c.mp4" },
1694                { "content_type": "application/pdf", "url": "https://cdn.example.com/d.pdf" }
1695            ]
1696        });
1697        let result = ch.compose_message_content(&payload).await.unwrap();
1698        assert!(result.contains("[IMAGE:"));
1699        assert!(result.contains("[VOICE:"));
1700        assert!(result.contains("[VIDEO:"));
1701        assert!(result.contains("[DOCUMENT:"));
1702    }
1703
1704    #[tokio::test]
1705    async fn test_compose_fixes_double_slash_url() {
1706        let ch = make_channel();
1707        let payload = json!({
1708            "content": "",
1709            "attachments": [{
1710                "content_type": "image/png",
1711                "url": "//cdn.example.com/a.png"
1712            }]
1713        });
1714        let result = ch.compose_message_content(&payload).await.unwrap();
1715        assert!(result.contains("https://cdn.example.com/a.png"));
1716        // Ensure the raw `//` prefix was replaced with `https:`
1717        assert!(!result.starts_with("[IMAGE://"));
1718    }
1719
1720    #[tokio::test]
1721    async fn test_compose_fallback_no_workspace() {
1722        // Without workspace_dir, attachments use URLs directly
1723        let ch = make_channel();
1724        let payload = json!({
1725            "content": "text",
1726            "attachments": [{
1727                "content_type": "application/pdf",
1728                "filename": "report.pdf",
1729                "url": "https://cdn.example.com/report.pdf"
1730            }]
1731        });
1732        let result = ch.compose_message_content(&payload).await.unwrap();
1733        assert!(result.contains("[DOCUMENT:https://cdn.example.com/report.pdf]"));
1734    }
1735
1736    #[tokio::test]
1737    async fn test_compose_drops_empty_url() {
1738        let ch = make_channel();
1739        let payload = json!({
1740            "content": "   ",
1741            "attachments": [{
1742                "content_type": "image/png",
1743                "url": "   "
1744            }]
1745        });
1746        assert_eq!(ch.compose_message_content(&payload).await, None);
1747    }
1748
1749    // --- Markdown send body test ---
1750
1751    #[test]
1752    fn test_send_body_uses_markdown_msg_type() {
1753        let content = "**bold** and `code`";
1754        let body = json!({
1755            "markdown": { "content": content },
1756            "msg_type": 2,
1757        });
1758        assert_eq!(body["msg_type"], 2);
1759        assert_eq!(body["markdown"]["content"], content);
1760        assert!(
1761            body.get("content").is_none(),
1762            "top-level 'content' must not be present"
1763        );
1764    }
1765
1766    // --- Helper function tests ---
1767
1768    #[test]
1769    fn test_fix_qq_url() {
1770        assert_eq!(
1771            fix_qq_url("//cdn.example.com/a.png"),
1772            "https://cdn.example.com/a.png"
1773        );
1774        assert_eq!(
1775            fix_qq_url("https://cdn.example.com/a.png"),
1776            "https://cdn.example.com/a.png"
1777        );
1778    }
1779
1780    #[test]
1781    fn test_next_msg_seq_range() {
1782        for _ in 0..100 {
1783            let seq = next_msg_seq();
1784            assert!(seq < 65536);
1785        }
1786    }
1787
1788    #[test]
1789    fn test_resolve_recipient_group() {
1790        let (scope, id) = QQChannel::resolve_recipient("group:abc123");
1791        assert_eq!(scope, "groups");
1792        assert_eq!(id, "abc123");
1793    }
1794
1795    #[test]
1796    fn test_resolve_recipient_user() {
1797        let (scope, id) = QQChannel::resolve_recipient("user:xyz789");
1798        assert_eq!(scope, "users");
1799        assert_eq!(id, "xyz789");
1800    }
1801
1802    #[test]
1803    fn test_resolve_recipient_bare_id() {
1804        let (scope, id) = QQChannel::resolve_recipient("raw_id_123");
1805        assert_eq!(scope, "users");
1806        assert_eq!(id, "raw_id_123");
1807    }
1808
1809    #[test]
1810    fn test_infer_attachment_marker() {
1811        assert_eq!(infer_attachment_marker("image/png", "a.png"), "IMAGE");
1812        assert_eq!(infer_attachment_marker("audio/mpeg", "a.mp3"), "VOICE");
1813        assert_eq!(infer_attachment_marker("video/mp4", "a.mp4"), "VIDEO");
1814        assert_eq!(
1815            infer_attachment_marker("application/pdf", "doc.pdf"),
1816            "DOCUMENT"
1817        );
1818        assert_eq!(infer_attachment_marker("", "photo.jpg"), "IMAGE");
1819        assert_eq!(infer_attachment_marker("", "song.mp3"), "VOICE");
1820        assert_eq!(infer_attachment_marker("", "clip.mp4"), "VIDEO");
1821        assert_eq!(infer_attachment_marker("", "unknown.xyz"), "DOCUMENT");
1822    }
1823
1824    // --- Upload cache tests ---
1825
1826    #[tokio::test]
1827    async fn test_upload_cache_hit_and_miss() {
1828        let ch = make_channel();
1829        let key = QQChannel::upload_cache_key(b"test_data", "c2c", "user1", QQMediaFileType::Image);
1830
1831        // Miss
1832        assert!(ch.get_cached_upload(&key).await.is_none());
1833
1834        // Set with long TTL
1835        ch.set_cached_upload(key.clone(), "cached_file_info".into(), 3600)
1836            .await;
1837
1838        // Hit
1839        assert_eq!(
1840            ch.get_cached_upload(&key).await,
1841            Some("cached_file_info".to_string())
1842        );
1843    }
1844
1845    #[tokio::test]
1846    async fn test_upload_cache_expired() {
1847        let ch = make_channel();
1848        let key = QQChannel::upload_cache_key(b"test_data", "group", "g1", QQMediaFileType::Video);
1849
1850        // Set with 0 TTL (already expired considering 60s safety margin)
1851        ch.set_cached_upload(key.clone(), "old_info".into(), 0)
1852            .await;
1853
1854        // Should miss due to expiry
1855        assert!(ch.get_cached_upload(&key).await.is_none());
1856    }
1857
1858    // --- Reply tracker tests ---
1859
1860    #[tokio::test]
1861    async fn test_reply_tracker_allows_up_to_limit() {
1862        let ch = make_channel();
1863        for _ in 0..REPLY_LIMIT {
1864            assert!(ch.check_reply_allowed("msg1").await);
1865        }
1866        // 5th reply should be denied
1867        assert!(!ch.check_reply_allowed("msg1").await);
1868    }
1869
1870    #[tokio::test]
1871    async fn test_reply_tracker_independent_msg_ids() {
1872        let ch = make_channel();
1873        assert!(ch.check_reply_allowed("msg_a").await);
1874        assert!(ch.check_reply_allowed("msg_b").await);
1875    }
1876
1877    // --- Auth retry tests ---
1878
1879    #[test]
1880    fn test_auth_retry_constants_are_sensible() {
1881        const {
1882            assert!(AUTH_RETRY_MAX_ATTEMPTS >= 2, "should retry at least once");
1883            assert!(
1884                AUTH_RETRY_INITIAL_BACKOFF_MS > 0,
1885                "initial backoff must be positive"
1886            );
1887            assert!(
1888                AUTH_RETRY_MAX_BACKOFF_MS >= AUTH_RETRY_INITIAL_BACKOFF_MS,
1889                "max backoff must be >= initial"
1890            );
1891        }
1892    }
1893
1894    #[test]
1895    fn test_auth_retry_backoff_stays_within_bounds() {
1896        // Simulate the backoff progression and verify it caps at max
1897        let mut backoff = AUTH_RETRY_INITIAL_BACKOFF_MS;
1898        for _ in 1..AUTH_RETRY_MAX_ATTEMPTS {
1899            backoff = (backoff * 2).min(AUTH_RETRY_MAX_BACKOFF_MS);
1900        }
1901        assert!(
1902            backoff <= AUTH_RETRY_MAX_BACKOFF_MS,
1903            "backoff must never exceed the configured maximum"
1904        );
1905    }
1906
1907    #[tokio::test]
1908    async fn test_get_token_returns_cached_token_without_fetch() {
1909        let ch = make_channel();
1910        // Pre-populate the token cache with a token that expires far in the future
1911        let future_expiry = now_secs() + 3600;
1912        *ch.token_cache.write().await = Some(("cached_tok".to_string(), future_expiry));
1913
1914        // get_token should return the cached value without hitting the network
1915        let tok = ch.get_token().await.unwrap();
1916        assert_eq!(tok, "cached_tok");
1917    }
1918
1919    #[tokio::test]
1920    async fn test_get_token_refreshes_expired_cache() {
1921        let ch = make_channel();
1922        // Pre-populate with an already-expired token
1923        *ch.token_cache.write().await = Some(("old_tok".to_string(), 0));
1924
1925        // get_token should try to refresh -- will fail because there's no real
1926        // server, but the important thing is it doesn't return the stale token.
1927        let result = ch.get_token().await;
1928        assert!(
1929            result.is_err(),
1930            "should fail when token expired and no server available"
1931        );
1932    }
1933
1934    // --- Heartbeat stability tests ---
1935
1936    #[test]
1937    fn test_heartbeat_grace_period_calculation() {
1938        // The grace period is 10% of the server interval, capped at 5000ms.
1939        let cases: Vec<(u64, u64)> = vec![
1940            (41_250, 4_125),  // default QQ interval
1941            (30_000, 3_000),  // smaller interval
1942            (60_000, 5_000),  // larger interval, capped at 5s
1943            (100_000, 5_000), // very large, still capped
1944            (5_000, 500),     // small interval
1945            (0, 0),           // degenerate zero
1946        ];
1947        for (interval, expected_grace) in cases {
1948            let grace: u64 = (interval / 10).min(5_000);
1949            assert_eq!(
1950                grace, expected_grace,
1951                "grace for interval {interval} should be {expected_grace}"
1952            );
1953            let effective = interval.saturating_add(grace);
1954            assert!(effective >= interval);
1955        }
1956    }
1957
1958    #[test]
1959    fn test_missed_ack_counter_logic() {
1960        let max_missed: u32 = 3;
1961        let mut missed: u32 = 0;
1962
1963        // First tick: counter is 0, send heartbeat
1964        assert!(missed < max_missed);
1965        missed += 1;
1966        assert_eq!(missed, 1, "counter should be 1 after first heartbeat");
1967
1968        // ACK received: reset
1969        missed = 0;
1970        assert_eq!(missed, 0, "counter should reset on ACK");
1971
1972        // 3 consecutive misses without ACK
1973        for _ in 0..max_missed {
1974            assert!(
1975                missed < max_missed,
1976                "should not reach zombie state before {max_missed} misses"
1977            );
1978            missed += 1;
1979        }
1980        assert!(
1981            missed >= max_missed,
1982            "should declare zombie after {max_missed} missed ACKs"
1983        );
1984    }
1985
1986    #[test]
1987    fn test_missed_ack_counter_reset_on_ack() {
1988        let max_missed: u32 = 3;
1989        let mut missed: u32 = 0;
1990
1991        missed += 1;
1992        missed += 1;
1993        assert_eq!(missed, 2);
1994
1995        // ACK arrives: reset
1996        missed = 0;
1997        assert_eq!(missed, 0);
1998
1999        // One more miss, still under threshold
2000        missed += 1;
2001        assert!(missed < max_missed);
2002    }
2003
2004    #[test]
2005    fn test_effective_interval_never_overflows() {
2006        let interval = u64::MAX;
2007        let grace: u64 = (interval / 10).min(5_000);
2008        let effective = interval.saturating_add(grace);
2009        assert_eq!(effective, u64::MAX);
2010    }
2011}