1use super::traits::{Channel, ChannelMessage, SendMessage};
2use async_trait::async_trait;
3use base64::Engine as _;
4use futures_util::{SinkExt, StreamExt};
5use serde::Deserialize;
6use serde_json::json;
7use sha2::{Digest, Sha256};
8use std::collections::{HashMap, HashSet};
9use std::path::{Path, PathBuf};
10use std::sync::Arc;
11use tokio::sync::RwLock;
12use tokio_tungstenite::tungstenite::Message;
13use uuid::Uuid;
14
15const QQ_API_BASE: &str = "https://api.sgroup.qq.com";
16const QQ_AUTH_URL: &str = "https://bots.qq.com/app/getAppAccessToken";
17
18const QQ_MAX_UPLOAD_BYTES: u64 = 10 * 1024 * 1024;
20
21const UPLOAD_CACHE_CAPACITY: usize = 500;
23
24const REPLY_LIMIT: u32 = 4;
26
27const REPLY_TTL_SECS: u64 = 3600;
29
30const REPLY_TRACKER_CAPACITY: usize = 10_000;
32
33#[derive(Debug, Clone, Copy, PartialEq, Eq)]
35enum QQMediaFileType {
36 Image = 1,
38 Video = 2,
40 Voice = 3,
46 File = 4,
48}
49
50#[derive(Debug, Clone, PartialEq, Eq)]
52struct QQMediaAttachment {
53 kind: QQMediaFileType,
54 target: String,
55}
56
57#[derive(Debug, Clone, PartialEq, Eq)]
59enum QQSendSegment {
60 Text(String),
61 Media(QQMediaAttachment),
62}
63
64#[derive(Debug, Deserialize)]
66struct QQUploadResponse {
67 file_info: String,
68 #[allow(dead_code)]
69 file_uuid: Option<String>,
70 ttl: Option<u64>,
71}
72
73struct UploadCacheEntry {
75 file_info: String,
76 expires_at: u64,
77}
78
79struct ReplyRecord {
81 count: u32,
82 first_reply_at: u64,
83}
84
85fn ensure_https(url: &str) -> anyhow::Result<()> {
86 if !url.starts_with("https://") {
87 anyhow::bail!(
88 "Refusing to transmit sensitive data over non-HTTPS URL: URL scheme must be https"
89 );
90 }
91 Ok(())
92}
93
94fn is_native_voice_ext(ext: &str) -> bool {
96 matches!(ext.to_ascii_lowercase().as_str(), "wav" | "mp3" | "silk")
97}
98
99fn marker_kind_to_qq_file_type(marker: &str, target: &str) -> Option<QQMediaFileType> {
104 match marker.trim().to_ascii_uppercase().as_str() {
105 "IMAGE" | "PHOTO" => Some(QQMediaFileType::Image),
106 "DOCUMENT" | "FILE" => Some(QQMediaFileType::File),
107 "VIDEO" => Some(QQMediaFileType::Video),
108 "AUDIO" | "VOICE" => {
109 let ext = Path::new(target.split('?').next().unwrap_or(target))
110 .extension()
111 .and_then(|e| e.to_str())
112 .unwrap_or("");
113 if is_native_voice_ext(ext) {
114 Some(QQMediaFileType::Voice)
115 } else {
116 Some(QQMediaFileType::File)
117 }
118 }
119 _ => None,
120 }
121}
122
123fn find_matching_close(s: &str) -> Option<usize> {
125 let mut depth = 1usize;
126 for (i, ch) in s.char_indices() {
127 match ch {
128 '[' => depth += 1,
129 ']' => {
130 depth -= 1;
131 if depth == 0 {
132 return Some(i);
133 }
134 }
135 _ => {}
136 }
137 }
138 None
139}
140
141fn parse_qq_attachment_markers(content: &str) -> (String, Vec<QQMediaAttachment>) {
146 let mut cleaned = String::with_capacity(content.len());
147 let mut attachments = Vec::new();
148 let mut cursor = 0;
149
150 while cursor < content.len() {
151 let Some(open_rel) = content[cursor..].find('[') else {
152 cleaned.push_str(&content[cursor..]);
153 break;
154 };
155
156 let open = cursor + open_rel;
157 cleaned.push_str(&content[cursor..open]);
158
159 let Some(close_rel) = find_matching_close(&content[open + 1..]) else {
160 cleaned.push_str(&content[open..]);
161 break;
162 };
163
164 let close = open + 1 + close_rel;
165 let marker = &content[open + 1..close];
166
167 let parsed = marker.split_once(':').and_then(|(kind, target)| {
168 let target = target.trim();
169 if target.is_empty() {
170 return None;
171 }
172 let file_type = marker_kind_to_qq_file_type(kind, target)?;
173 Some(QQMediaAttachment {
174 kind: file_type,
175 target: target.to_string(),
176 })
177 });
178
179 if let Some(attachment) = parsed {
180 attachments.push(attachment);
181 } else {
182 cleaned.push_str(&content[open..=close]);
183 }
184
185 cursor = close + 1;
186 }
187
188 (cleaned.trim().to_string(), attachments)
189}
190
191fn infer_attachment_marker(content_type: &str, filename: &str) -> &'static str {
193 let ct = content_type.to_ascii_lowercase();
194 if ct.starts_with("image/") {
195 return "IMAGE";
196 }
197 if ct.starts_with("audio/") || ct.contains("voice") {
198 return "VOICE";
199 }
200 if ct.starts_with("video/") {
201 return "VIDEO";
202 }
203
204 let lower = filename.to_ascii_lowercase();
206 if lower.ends_with(".png")
207 || lower.ends_with(".jpg")
208 || lower.ends_with(".jpeg")
209 || lower.ends_with(".gif")
210 || lower.ends_with(".webp")
211 || lower.ends_with(".bmp")
212 || lower.ends_with(".heic")
213 || lower.ends_with(".heif")
214 || lower.ends_with(".svg")
215 {
216 return "IMAGE";
217 }
218 if lower.ends_with(".mp3")
219 || lower.ends_with(".wav")
220 || lower.ends_with(".silk")
221 || lower.ends_with(".ogg")
222 || lower.ends_with(".flac")
223 || lower.ends_with(".m4a")
224 {
225 return "VOICE";
226 }
227 if lower.ends_with(".mp4")
228 || lower.ends_with(".mov")
229 || lower.ends_with(".mkv")
230 || lower.ends_with(".avi")
231 || lower.ends_with(".webm")
232 {
233 return "VIDEO";
234 }
235 "DOCUMENT"
236}
237
238fn fix_qq_url(url: &str) -> String {
240 let trimmed = url.trim();
241 if trimmed.starts_with("//") {
242 format!("https:{trimmed}")
243 } else {
244 trimmed.to_string()
245 }
246}
247
248fn next_msg_seq() -> u32 {
251 #[allow(clippy::cast_possible_truncation)]
252 let time_part = (std::time::SystemTime::now()
253 .duration_since(std::time::UNIX_EPOCH)
254 .unwrap_or_default()
255 .as_millis() as u32)
256 % 100_000_000;
257 let random = u32::from(rand::random::<u16>());
258 (time_part ^ random) % 65536
259}
260
261fn now_secs() -> u64 {
263 std::time::SystemTime::now()
264 .duration_since(std::time::UNIX_EPOCH)
265 .unwrap_or_default()
266 .as_secs()
267}
268
269const DEDUP_CAPACITY: usize = 10_000;
271
272const AUTH_RETRY_MAX_ATTEMPTS: u32 = 4;
274
275const AUTH_RETRY_INITIAL_BACKOFF_MS: u64 = 500;
277
278const AUTH_RETRY_MAX_BACKOFF_MS: u64 = 8_000;
280
281pub struct QQChannel {
284 app_id: String,
285 app_secret: String,
286 allowed_users: Vec<String>,
287 token_cache: Arc<RwLock<Option<(String, u64)>>>,
289 dedup: Arc<RwLock<HashSet<String>>>,
291 workspace_dir: Option<PathBuf>,
293 upload_cache: Arc<RwLock<HashMap<String, UploadCacheEntry>>>,
295 reply_tracker: Arc<RwLock<HashMap<String, ReplyRecord>>>,
297 proxy_url: Option<String>,
299 session_id: Arc<RwLock<Option<String>>>,
301 last_sequence: Arc<RwLock<Option<i64>>>,
303}
304
305impl QQChannel {
306 pub fn new(app_id: String, app_secret: String, allowed_users: Vec<String>) -> Self {
307 Self {
308 app_id,
309 app_secret,
310 allowed_users,
311 token_cache: Arc::new(RwLock::new(None)),
312 dedup: Arc::new(RwLock::new(HashSet::new())),
313 workspace_dir: None,
314 upload_cache: Arc::new(RwLock::new(HashMap::new())),
315 reply_tracker: Arc::new(RwLock::new(HashMap::new())),
316 proxy_url: None,
317 session_id: Arc::new(RwLock::new(None)),
318 last_sequence: Arc::new(RwLock::new(None)),
319 }
320 }
321
322 pub fn with_workspace_dir(mut self, dir: PathBuf) -> Self {
324 self.workspace_dir = Some(dir);
325 self
326 }
327
328 pub fn with_proxy_url(mut self, proxy_url: Option<String>) -> Self {
330 self.proxy_url = proxy_url;
331 self
332 }
333
334 fn http_client(&self) -> reqwest::Client {
335 crate::config::build_channel_proxy_client("channel.qq", self.proxy_url.as_deref())
336 }
337
338 fn is_user_allowed(&self, user_id: &str) -> bool {
339 self.allowed_users.iter().any(|u| u == "*" || u == user_id)
340 }
341
342 async fn fetch_access_token(&self) -> anyhow::Result<(String, u64)> {
344 let body = json!({
345 "appId": self.app_id,
346 "clientSecret": self.app_secret,
347 });
348
349 let resp = self
350 .http_client()
351 .post(QQ_AUTH_URL)
352 .json(&body)
353 .send()
354 .await?;
355
356 if !resp.status().is_success() {
357 let status = resp.status();
358 let err = resp.text().await.unwrap_or_default();
359 anyhow::bail!("QQ token request failed ({status}): {err}");
360 }
361
362 let data: serde_json::Value = resp.json().await?;
363 let token = data
364 .get("access_token")
365 .and_then(|t| t.as_str())
366 .ok_or_else(|| anyhow::anyhow!("Missing access_token in QQ response"))?
367 .to_string();
368
369 let expires_in = data
370 .get("expires_in")
371 .and_then(|e| e.as_str())
372 .and_then(|e| e.parse::<u64>().ok())
373 .unwrap_or(7200);
374
375 let now = std::time::SystemTime::now()
376 .duration_since(std::time::UNIX_EPOCH)
377 .unwrap_or_default()
378 .as_secs();
379
380 let expiry = now + expires_in.saturating_sub(60);
382
383 Ok((token, expiry))
384 }
385
386 async fn fetch_access_token_with_retry(&self) -> anyhow::Result<(String, u64)> {
394 let mut backoff_ms = AUTH_RETRY_INITIAL_BACKOFF_MS;
395 let mut last_err = None;
396
397 for attempt in 1..=AUTH_RETRY_MAX_ATTEMPTS {
398 match self.fetch_access_token().await {
399 Ok(result) => {
400 if attempt > 1 {
401 tracing::info!(
402 "QQ: getAppAccessToken succeeded on attempt {attempt}/{AUTH_RETRY_MAX_ATTEMPTS}"
403 );
404 }
405 return Ok(result);
406 }
407 Err(e) => {
408 tracing::warn!(
409 "QQ: getAppAccessToken failed (attempt {attempt}/{AUTH_RETRY_MAX_ATTEMPTS}): {e}"
410 );
411 last_err = Some(e);
412
413 if attempt < AUTH_RETRY_MAX_ATTEMPTS {
414 let jitter_factor = 0.75 + (rand::random::<f64>() * 0.5);
416 #[allow(clippy::cast_possible_truncation, clippy::cast_sign_loss)]
417 let sleep_ms = (backoff_ms as f64 * jitter_factor) as u64;
418 tokio::time::sleep(std::time::Duration::from_millis(sleep_ms)).await;
419 backoff_ms = (backoff_ms * 2).min(AUTH_RETRY_MAX_BACKOFF_MS);
420 }
421 }
422 }
423 }
424
425 Err(last_err.unwrap_or_else(|| {
426 anyhow::anyhow!("QQ: getAppAccessToken failed after {AUTH_RETRY_MAX_ATTEMPTS} attempts")
427 }))
428 }
429
430 async fn get_token(&self) -> anyhow::Result<String> {
432 let now = std::time::SystemTime::now()
433 .duration_since(std::time::UNIX_EPOCH)
434 .unwrap_or_default()
435 .as_secs();
436
437 {
438 let cache = self.token_cache.read().await;
439 if let Some((ref token, expiry)) = *cache {
440 if now < expiry {
441 return Ok(token.clone());
442 }
443 }
444 }
445
446 let (token, expiry) = self.fetch_access_token_with_retry().await?;
447 {
448 let mut cache = self.token_cache.write().await;
449 *cache = Some((token.clone(), expiry));
450 }
451 Ok(token)
452 }
453
454 async fn get_gateway_url(&self, token: &str) -> anyhow::Result<String> {
456 let resp = self
457 .http_client()
458 .get(format!("{QQ_API_BASE}/gateway"))
459 .header("Authorization", format!("QQBot {token}"))
460 .send()
461 .await?;
462
463 if !resp.status().is_success() {
464 let status = resp.status();
465 let err = resp.text().await.unwrap_or_default();
466 anyhow::bail!("QQ gateway request failed ({status}): {err}");
467 }
468
469 let data: serde_json::Value = resp.json().await?;
470 let url = data
471 .get("url")
472 .and_then(|u| u.as_str())
473 .ok_or_else(|| anyhow::anyhow!("Missing gateway URL in QQ response"))?
474 .to_string();
475
476 Ok(url)
477 }
478
479 async fn is_duplicate(&self, msg_id: &str) -> bool {
481 if msg_id.is_empty() {
482 return false;
483 }
484
485 let mut dedup = self.dedup.write().await;
486
487 if dedup.contains(msg_id) {
488 return true;
489 }
490
491 if dedup.len() >= DEDUP_CAPACITY {
493 let to_remove: Vec<String> = dedup.iter().take(DEDUP_CAPACITY / 2).cloned().collect();
494 for key in to_remove {
495 dedup.remove(&key);
496 }
497 }
498
499 dedup.insert(msg_id.to_string());
500 false
501 }
502
503 fn upload_cache_key(
505 file_data: &[u8],
506 scope: &str,
507 target_id: &str,
508 file_type: QQMediaFileType,
509 ) -> String {
510 let mut hasher = Sha256::new();
511 hasher.update(file_data);
512 let hash = format!("{:x}", hasher.finalize());
513 format!("{hash}:{scope}:{target_id}:{}", file_type as u8)
514 }
515
516 async fn get_cached_upload(&self, cache_key: &str) -> Option<String> {
518 let cache = self.upload_cache.read().await;
519 if let Some(entry) = cache.get(cache_key) {
520 if now_secs() + 60 < entry.expires_at {
522 return Some(entry.file_info.clone());
523 }
524 }
525 None
526 }
527
528 async fn set_cached_upload(&self, cache_key: String, file_info: String, ttl: u64) {
530 let mut cache = self.upload_cache.write().await;
531
532 if cache.len() >= UPLOAD_CACHE_CAPACITY {
534 let now = now_secs();
535 cache.retain(|_, v| v.expires_at > now);
536
537 if cache.len() >= UPLOAD_CACHE_CAPACITY {
539 let keys_to_remove: Vec<String> = cache
540 .keys()
541 .take(UPLOAD_CACHE_CAPACITY / 2)
542 .cloned()
543 .collect();
544 for key in keys_to_remove {
545 cache.remove(&key);
546 }
547 }
548 }
549
550 cache.insert(
551 cache_key,
552 UploadCacheEntry {
553 file_info,
554 expires_at: now_secs() + ttl,
555 },
556 );
557 }
558
559 async fn check_reply_allowed(&self, msg_id: &str) -> bool {
561 let now = now_secs();
562 let mut tracker = self.reply_tracker.write().await;
563
564 if tracker.len() >= REPLY_TRACKER_CAPACITY {
566 tracker.retain(|_, v| now - v.first_reply_at < REPLY_TTL_SECS);
567 }
568
569 if let Some(record) = tracker.get_mut(msg_id) {
570 if now - record.first_reply_at >= REPLY_TTL_SECS {
571 return false;
573 }
574 if record.count >= REPLY_LIMIT {
575 return false;
576 }
577 record.count += 1;
578 true
579 } else {
580 tracker.insert(
581 msg_id.to_string(),
582 ReplyRecord {
583 count: 1,
584 first_reply_at: now,
585 },
586 );
587 true
588 }
589 }
590
591 fn resolve_recipient(recipient: &str) -> (&str, String) {
594 if let Some(group_id) = recipient.strip_prefix("group:") {
595 ("groups", group_id.to_string())
596 } else {
597 let raw_uid = recipient.strip_prefix("user:").unwrap_or(recipient);
598 let user_id: String = raw_uid
599 .chars()
600 .filter(|c| c.is_alphanumeric() || *c == '_')
601 .collect();
602 ("users", user_id)
603 }
604 }
605
606 async fn upload_media(
612 &self,
613 recipient: &str,
614 file_type: QQMediaFileType,
615 url: Option<&str>,
616 file_data: Option<&str>,
617 file_name: Option<&str>,
618 ) -> anyhow::Result<(String, Option<u64>)> {
619 let token = self.get_token().await?;
620 let (scope, id) = Self::resolve_recipient(recipient);
621
622 let api_url = format!("{QQ_API_BASE}/v2/{scope}/{id}/files");
623 ensure_https(&api_url)?;
624
625 let mut body = json!({
626 "file_type": file_type as u8,
627 "srv_send_msg": false,
628 });
629
630 if let Some(u) = url {
631 body["url"] = json!(u);
632 }
633 if let Some(d) = file_data {
634 body["file_data"] = json!(d);
635 }
636 if file_type == QQMediaFileType::File {
638 if let Some(name) = file_name {
639 body["file_name"] = json!(name);
640 }
641 }
642
643 let resp = self
644 .http_client()
645 .post(&api_url)
646 .header("Authorization", format!("QQBot {token}"))
647 .json(&body)
648 .send()
649 .await?;
650
651 if !resp.status().is_success() {
652 let status = resp.status();
653 let err = resp.text().await.unwrap_or_default();
654 anyhow::bail!("QQ upload media failed ({status}): {err}");
655 }
656
657 let upload_resp: QQUploadResponse = resp.json().await?;
658 Ok((upload_resp.file_info, upload_resp.ttl))
659 }
660
661 async fn send_media_message(&self, recipient: &str, file_info: &str) -> anyhow::Result<()> {
663 let token = self.get_token().await?;
664 let (scope, id) = Self::resolve_recipient(recipient);
665
666 let url = format!("{QQ_API_BASE}/v2/{scope}/{id}/messages");
667 ensure_https(&url)?;
668
669 let body = json!({
670 "msg_type": 7,
671 "media": {
672 "file_info": file_info,
673 },
674 "msg_seq": next_msg_seq(),
675 });
676
677 let resp = self
678 .http_client()
679 .post(&url)
680 .header("Authorization", format!("QQBot {token}"))
681 .json(&body)
682 .send()
683 .await?;
684
685 if !resp.status().is_success() {
686 let status = resp.status();
687 let err = resp.text().await.unwrap_or_default();
688 anyhow::bail!("QQ send media message failed ({status}): {err}");
689 }
690
691 Ok(())
692 }
693
694 async fn send_attachment(
696 &self,
697 recipient: &str,
698 attachment: &QQMediaAttachment,
699 ) -> anyhow::Result<()> {
700 let target = attachment.target.trim();
701
702 let file_name = Path::new(target.split('?').next().unwrap_or(target))
704 .file_name()
705 .and_then(|n| n.to_str())
706 .map(|s| s.to_string());
707
708 if target.starts_with("http://") || target.starts_with("https://") {
709 let (file_info, _ttl) = self
711 .upload_media(
712 recipient,
713 attachment.kind,
714 Some(target),
715 None,
716 file_name.as_deref(),
717 )
718 .await?;
719 self.send_media_message(recipient, &file_info).await?;
720 } else {
721 let path = Path::new(target);
723 if !path.exists() {
724 anyhow::bail!("QQ attachment path not found: {target}");
725 }
726
727 let metadata = tokio::fs::metadata(path).await?;
728 if metadata.len() > QQ_MAX_UPLOAD_BYTES {
729 anyhow::bail!(
730 "QQ attachment too large ({} bytes, max {}): {target}",
731 metadata.len(),
732 QQ_MAX_UPLOAD_BYTES
733 );
734 }
735
736 let file_bytes = tokio::fs::read(path).await?;
737 let (scope_label, target_id) = Self::resolve_recipient(recipient);
738 let scope = if scope_label == "groups" {
739 "group"
740 } else {
741 "c2c"
742 };
743 let cache_key = Self::upload_cache_key(&file_bytes, scope, &target_id, attachment.kind);
744
745 if let Some(cached_file_info) = self.get_cached_upload(&cache_key).await {
747 tracing::debug!("QQ: using cached upload for {target}");
748 self.send_media_message(recipient, &cached_file_info)
749 .await?;
750 return Ok(());
751 }
752
753 let b64 = base64::engine::general_purpose::STANDARD.encode(&file_bytes);
754 let (file_info, ttl) = self
755 .upload_media(
756 recipient,
757 attachment.kind,
758 None,
759 Some(&b64),
760 file_name.as_deref(),
761 )
762 .await?;
763
764 if let Some(ttl_secs) = ttl {
766 self.set_cached_upload(cache_key, file_info.clone(), ttl_secs)
767 .await;
768 }
769
770 self.send_media_message(recipient, &file_info).await?;
771 }
772
773 Ok(())
774 }
775
776 async fn compose_message_content(&self, payload: &serde_json::Value) -> Option<String> {
781 let text = payload
782 .get("content")
783 .and_then(|c| c.as_str())
784 .unwrap_or("")
785 .trim();
786
787 let mut markers: Vec<String> = Vec::new();
788 let mut voice_transcripts: Vec<String> = Vec::new();
789
790 if let Some(attachments) = payload.get("attachments").and_then(|a| a.as_array()) {
791 for att in attachments {
792 let url = match att.get("url").and_then(|u| u.as_str()) {
793 Some(u) if !u.trim().is_empty() => fix_qq_url(u),
794 _ => continue,
795 };
796
797 let content_type = att
798 .get("content_type")
799 .and_then(|ct| ct.as_str())
800 .unwrap_or("");
801 let filename = att
802 .get("filename")
803 .and_then(|f| f.as_str())
804 .unwrap_or("attachment");
805
806 let marker_type = infer_attachment_marker(content_type, filename);
807
808 let is_voice = content_type == "voice"
812 || content_type.starts_with("audio/")
813 || marker_type == "VOICE";
814 let (download_url, save_filename) = if is_voice {
815 if let Some(wav_url) = att
816 .get("voice_wav_url")
817 .and_then(|u| u.as_str())
818 .filter(|u| !u.trim().is_empty())
819 {
820 let fixed = fix_qq_url(wav_url);
821 let wav_name = Path::new(fixed.split('?').next().unwrap_or(&fixed))
823 .file_name()
824 .and_then(|n| n.to_str())
825 .unwrap_or("voice.wav")
826 .to_string();
827 (fixed, wav_name)
828 } else {
829 (url.clone(), filename.to_string())
830 }
831 } else {
832 (url.clone(), filename.to_string())
833 };
834
835 let location = if let Some(ref ws) = self.workspace_dir {
837 let dir = ws.join("qq_files");
838 match self
839 .download_attachment(&download_url, &dir, &save_filename)
840 .await
841 {
842 Ok(local_path) => local_path.display().to_string(),
843 Err(e) => {
844 tracing::warn!("QQ: failed to download attachment: {e}");
845 url.clone()
846 }
847 }
848 } else {
849 url.clone()
850 };
851
852 if is_voice {
853 markers.push(format!("[{marker_type}:{location}]"));
857 if let Some(asr_text) = att
858 .get("asr_refer_text")
859 .and_then(|t| t.as_str())
860 .map(|t| t.trim())
861 .filter(|t| !t.is_empty())
862 {
863 voice_transcripts.push(asr_text.to_string());
864 }
865 } else {
866 markers.push(format!("[{marker_type}:{location}]"));
867 }
868 }
869 }
870
871 let voice_text = match voice_transcripts.len() {
874 0 => String::new(),
875 1 => format!(
876 "<VOICE_TRANSCRIPTION>{}</VOICE_TRANSCRIPTION>",
877 voice_transcripts[0]
878 ),
879 _ => voice_transcripts
880 .iter()
881 .enumerate()
882 .map(|(i, t)| format!("<VOICE_TRANSCRIPTION_{i}>{t}</VOICE_TRANSCRIPTION_{i}>"))
883 .collect::<Vec<_>>()
884 .join("\n"),
885 };
886
887 let mut parts: Vec<&str> = Vec::new();
888 if !text.is_empty() {
889 parts.push(text);
890 }
891 if !voice_text.is_empty() {
892 parts.push(&voice_text);
893 }
894 let markers_joined = markers.join("\n");
895 if !markers_joined.is_empty() {
896 parts.push(&markers_joined);
897 }
898
899 if parts.is_empty() {
900 return None;
901 }
902
903 Some(parts.join("\n"))
904 }
905
906 async fn download_attachment(
908 &self,
909 url: &str,
910 dir: &Path,
911 filename: &str,
912 ) -> anyhow::Result<PathBuf> {
913 tokio::fs::create_dir_all(dir).await?;
914
915 let stem = Path::new(filename)
917 .file_stem()
918 .and_then(|s| s.to_str())
919 .unwrap_or("file");
920 let ext = Path::new(filename)
921 .extension()
922 .and_then(|e| e.to_str())
923 .unwrap_or("");
924 let unique = &Uuid::new_v4().to_string()[..8];
925 let safe_name = if ext.is_empty() {
926 format!("{stem}_{unique}")
927 } else {
928 format!("{stem}_{unique}.{ext}")
929 };
930
931 let dest = dir.join(&safe_name);
932
933 let resp = self.http_client().get(url).send().await?;
936 if !resp.status().is_success() {
937 anyhow::bail!("Download failed ({}): {url}", resp.status());
938 }
939
940 let bytes = resp.bytes().await?;
941 tokio::fs::write(&dest, &bytes).await?;
942
943 Ok(dest)
944 }
945
946 async fn send_text_markdown(&self, recipient: &str, content: &str) -> anyhow::Result<()> {
948 let token = self.get_token().await?;
949 let (scope, id) = Self::resolve_recipient(recipient);
950
951 let url = format!("{QQ_API_BASE}/v2/{scope}/{id}/messages");
952 ensure_https(&url)?;
953
954 let body = json!({
955 "markdown": {
956 "content": content,
957 },
958 "msg_type": 2,
959 "msg_seq": next_msg_seq(),
960 });
961
962 let resp = self
963 .http_client()
964 .post(&url)
965 .header("Authorization", format!("QQBot {token}"))
966 .json(&body)
967 .send()
968 .await?;
969
970 if !resp.status().is_success() {
971 let status = resp.status();
972 let err = resp.text().await.unwrap_or_default();
973 anyhow::bail!("QQ send message failed ({status}): {err}");
974 }
975
976 Ok(())
977 }
978}
979
980#[async_trait]
981impl Channel for QQChannel {
982 fn name(&self) -> &str {
983 "qq"
984 }
985
986 async fn send(&self, message: &SendMessage) -> anyhow::Result<()> {
987 let (cleaned_text, attachments) = parse_qq_attachment_markers(&message.content);
988
989 if attachments.is_empty() {
990 return self
992 .send_text_markdown(&message.recipient, &message.content)
993 .await;
994 }
995
996 if !cleaned_text.is_empty() {
998 self.send_text_markdown(&message.recipient, &cleaned_text)
999 .await?;
1000 }
1001
1002 for attachment in &attachments {
1004 if let Err(e) = self.send_attachment(&message.recipient, attachment).await {
1005 tracing::warn!(
1006 target = attachment.target,
1007 error = %e,
1008 "QQ: failed to send media attachment; falling back to text"
1009 );
1010 let fallback = format!(
1012 "{}: {}",
1013 match attachment.kind {
1014 QQMediaFileType::Image => "Image",
1015 QQMediaFileType::Video => "Video",
1016 QQMediaFileType::Voice => "Voice",
1017 QQMediaFileType::File => "File",
1018 },
1019 attachment.target
1020 );
1021 self.send_text_markdown(&message.recipient, &fallback)
1022 .await?;
1023 }
1024 }
1025
1026 Ok(())
1027 }
1028
1029 #[allow(clippy::too_many_lines)]
1030 async fn listen(&self, tx: tokio::sync::mpsc::Sender<ChannelMessage>) -> anyhow::Result<()> {
1031 tracing::info!("QQ: authenticating...");
1032 let token = self.get_token().await?;
1033
1034 tracing::info!("QQ: fetching gateway URL...");
1035 let gw_url = self.get_gateway_url(&token).await?;
1036
1037 tracing::info!("QQ: connecting to gateway WebSocket...");
1038 let (ws_stream, _) =
1039 crate::config::ws_connect_with_proxy(&gw_url, "channel.qq", self.proxy_url.as_deref())
1040 .await?;
1041 let (mut write, mut read) = ws_stream.split();
1042
1043 let hello = read
1045 .next()
1046 .await
1047 .ok_or(anyhow::anyhow!("QQ: no hello frame"))??;
1048 let hello_data: serde_json::Value = serde_json::from_str(&hello.to_string())?;
1049 let heartbeat_interval = hello_data
1050 .get("d")
1051 .and_then(|d| d.get("heartbeat_interval"))
1052 .and_then(serde_json::Value::as_u64)
1053 .unwrap_or(41250);
1054
1055 let stored_session = self.session_id.read().await.clone();
1057 let stored_seq = *self.last_sequence.read().await;
1058
1059 if let (Some(sid), Some(seq)) = (&stored_session, stored_seq) {
1060 tracing::info!("QQ: attempting session resume (session_id={sid}, seq={seq})");
1062 let resume = json!({
1063 "op": 6,
1064 "d": {
1065 "token": format!("QQBot {token}"),
1066 "session_id": sid,
1067 "seq": seq,
1068 }
1069 });
1070 write.send(Message::Text(resume.to_string().into())).await?;
1071 } else {
1072 let intents: u64 = (1 << 25) | (1 << 30);
1075 let identify = json!({
1076 "op": 2,
1077 "d": {
1078 "token": format!("QQBot {token}"),
1079 "intents": intents,
1080 "properties": {
1081 "os": "linux",
1082 "browser": "construct",
1083 "device": "construct",
1084 }
1085 }
1086 });
1087 write
1088 .send(Message::Text(identify.to_string().into()))
1089 .await?;
1090 tracing::info!("QQ: connected and sent Identify");
1091 }
1092
1093 let mut sequence: i64 = stored_seq.unwrap_or(-1);
1094
1095 const MAX_MISSED_ACKS: u32 = 3;
1102 let mut missed_ack_count: u32 = 0;
1103
1104 let hb_interval = heartbeat_interval;
1110 let grace_ms: u64 = (hb_interval / 10).min(5_000);
1111 let effective_interval = hb_interval.saturating_add(grace_ms);
1112
1113 let (hb_tx, mut hb_rx) = tokio::sync::mpsc::channel::<()>(1);
1114 tokio::spawn(async move {
1115 let mut interval =
1116 tokio::time::interval(std::time::Duration::from_millis(effective_interval));
1117 loop {
1118 interval.tick().await;
1119 if hb_tx.send(()).await.is_err() {
1120 break;
1121 }
1122 }
1123 });
1124
1125 enum ExitReason {
1127 Reconnect,
1128 InvalidSession,
1129 Close(Option<tokio_tungstenite::tungstenite::protocol::CloseFrame>),
1130 StreamEnded,
1131 HeartbeatTimeout,
1132 WriteFailed,
1133 ChannelClosed,
1134 }
1135
1136 let exit_reason;
1137
1138 'outer: loop {
1139 tokio::select! {
1140 _ = hb_rx.recv() => {
1141 if missed_ack_count > 0 {
1145 if missed_ack_count >= MAX_MISSED_ACKS {
1146 tracing::warn!(
1147 "QQ: {missed_ack_count} consecutive heartbeat ACKs missed \
1148 (interval {hb_interval}ms + {grace_ms}ms grace); \
1149 connection appears zombied"
1150 );
1151 exit_reason = ExitReason::HeartbeatTimeout;
1152 break;
1153 }
1154 tracing::info!(
1155 "QQ: heartbeat ACK missed ({missed_ack_count}/{MAX_MISSED_ACKS}); \
1156 tolerating transient delay"
1157 );
1158 }
1159 let d = if sequence >= 0 { json!(sequence) } else { json!(null) };
1160 let hb = json!({"op": 1, "d": d});
1161 if write
1162 .send(Message::Text(hb.to_string().into()))
1163 .await
1164 .is_err()
1165 {
1166 exit_reason = ExitReason::WriteFailed;
1167 break;
1168 }
1169 missed_ack_count += 1;
1170 }
1171 msg = read.next() => {
1172 let msg = match msg {
1173 Some(Ok(Message::Text(t))) => t,
1174 Some(Ok(Message::Ping(payload))) => {
1175 if write.send(Message::Pong(payload)).await.is_err() {
1176 exit_reason = ExitReason::WriteFailed;
1177 break;
1178 }
1179 continue;
1180 }
1181 Some(Ok(Message::Close(frame))) => {
1182 exit_reason = ExitReason::Close(frame);
1183 break;
1184 }
1185 None => {
1186 exit_reason = ExitReason::StreamEnded;
1187 break;
1188 }
1189 _ => continue,
1190 };
1191
1192 let event: serde_json::Value = match serde_json::from_str(msg.as_ref()) {
1193 Ok(e) => e,
1194 Err(_) => continue,
1195 };
1196
1197 if let Some(s) = event.get("s").and_then(serde_json::Value::as_i64) {
1199 sequence = s;
1200 }
1201
1202 let op = event.get("op").and_then(serde_json::Value::as_u64).unwrap_or(0);
1203
1204 match op {
1205 1 => {
1207 let d = if sequence >= 0 { json!(sequence) } else { json!(null) };
1208 let hb = json!({"op": 1, "d": d});
1209 if write
1210 .send(Message::Text(hb.to_string().into()))
1211 .await
1212 .is_err()
1213 {
1214 exit_reason = ExitReason::WriteFailed;
1215 break;
1216 }
1217 missed_ack_count += 1;
1218 continue;
1219 }
1220 7 => {
1222 tracing::warn!("QQ: received Reconnect (op 7); will resume");
1223 exit_reason = ExitReason::Reconnect;
1224 break;
1225 }
1226 9 => {
1228 tracing::warn!("QQ: received Invalid Session (op 9); clearing session for fresh auth");
1229 exit_reason = ExitReason::InvalidSession;
1230 break;
1231 }
1232 11 => {
1234 missed_ack_count = 0;
1235 continue;
1236 }
1237 _ => {}
1238 }
1239
1240 if op != 0 {
1242 continue;
1243 }
1244
1245 let event_type = event.get("t").and_then(|t| t.as_str()).unwrap_or("");
1246 let d = match event.get("d") {
1247 Some(d) => d,
1248 None => continue,
1249 };
1250
1251 if event_type == "READY" || event_type == "RESUMED" {
1253 if let Some(sid) = d.get("session_id").and_then(|s| s.as_str()) {
1254 *self.session_id.write().await = Some(sid.to_string());
1255 tracing::info!("QQ: session established (session_id={sid}, event={event_type})");
1256 }
1257 continue;
1258 }
1259
1260 tracing::debug!("QQ: event_type={event_type} payload={d}");
1261
1262 match event_type {
1263 "C2C_MESSAGE_CREATE" => {
1264 let msg_id = d.get("id").and_then(|i| i.as_str()).unwrap_or("");
1265 if self.is_duplicate(msg_id).await {
1266 continue;
1267 }
1268
1269 let Some(content) = self.compose_message_content(d).await else {
1270 continue;
1271 };
1272
1273 let author_id = d.get("author").and_then(|a| a.get("id")).and_then(|i| i.as_str()).unwrap_or("unknown");
1274 let user_openid = d.get("author").and_then(|a| a.get("user_openid")).and_then(|u| u.as_str()).unwrap_or(author_id);
1276
1277 if !self.is_user_allowed(user_openid) {
1278 tracing::warn!("QQ: ignoring C2C message from unauthorized user: {user_openid}");
1279 continue;
1280 }
1281
1282 let chat_id = format!("user:{user_openid}");
1283
1284 let channel_msg = ChannelMessage {
1285 id: Uuid::new_v4().to_string(),
1286 sender: user_openid.to_string(),
1287 reply_target: chat_id,
1288 content,
1289 channel: "qq".to_string(),
1290 timestamp: std::time::SystemTime::now()
1291 .duration_since(std::time::UNIX_EPOCH)
1292 .unwrap_or_default()
1293 .as_secs(),
1294 thread_ts: None,
1295 interruption_scope_id: None,
1296 attachments: vec![],
1297 };
1298
1299 if tx.send(channel_msg).await.is_err() {
1300 tracing::warn!("QQ: message channel closed");
1301 exit_reason = ExitReason::ChannelClosed;
1302 break 'outer;
1303 }
1304 }
1305 "GROUP_AT_MESSAGE_CREATE" => {
1306 let msg_id = d.get("id").and_then(|i| i.as_str()).unwrap_or("");
1307 if self.is_duplicate(msg_id).await {
1308 continue;
1309 }
1310
1311 let Some(content) = self.compose_message_content(d).await else {
1312 continue;
1313 };
1314
1315 let author_id = d.get("author").and_then(|a| a.get("member_openid")).and_then(|m| m.as_str()).unwrap_or("unknown");
1316
1317 if !self.is_user_allowed(author_id) {
1318 tracing::warn!("QQ: ignoring group message from unauthorized user: {author_id}");
1319 continue;
1320 }
1321
1322 let group_openid = d.get("group_openid").and_then(|g| g.as_str()).unwrap_or("unknown");
1323 let chat_id = format!("group:{group_openid}");
1324
1325 let channel_msg = ChannelMessage {
1326 id: Uuid::new_v4().to_string(),
1327 sender: author_id.to_string(),
1328 reply_target: chat_id,
1329 content,
1330 channel: "qq".to_string(),
1331 timestamp: std::time::SystemTime::now()
1332 .duration_since(std::time::UNIX_EPOCH)
1333 .unwrap_or_default()
1334 .as_secs(),
1335 thread_ts: None,
1336 interruption_scope_id: None,
1337 attachments: vec![],
1338 };
1339
1340 if tx.send(channel_msg).await.is_err() {
1341 tracing::warn!("QQ: message channel closed");
1342 exit_reason = ExitReason::ChannelClosed;
1343 break 'outer;
1344 }
1345 }
1346 _ => {}
1347 }
1348 }
1349 }
1350 }
1351
1352 *self.last_sequence.write().await = if sequence >= 0 { Some(sequence) } else { None };
1354
1355 match exit_reason {
1356 ExitReason::InvalidSession => {
1357 *self.session_id.write().await = None;
1359 *self.last_sequence.write().await = None;
1360 anyhow::bail!(
1361 "QQ WebSocket connection closed: invalid session (fresh auth required)"
1362 )
1363 }
1364 ExitReason::Reconnect => {
1365 anyhow::bail!(
1367 "QQ WebSocket connection closed: server requested reconnect (resume will be attempted)"
1368 )
1369 }
1370 ExitReason::Close(ref frame) => {
1371 let (code, reason) = frame
1372 .as_ref()
1373 .map(|f| (f.code.to_string(), f.reason.to_string()))
1374 .unwrap_or_else(|| ("unknown".into(), "none".into()));
1375 tracing::warn!(
1376 "QQ: WebSocket closed with code={code}, reason=\"{reason}\"; \
1377 resume will be attempted on reconnect"
1378 );
1379 anyhow::bail!(
1380 "QQ WebSocket connection closed: close_code={code}, reason=\"{reason}\""
1381 )
1382 }
1383 ExitReason::StreamEnded => {
1384 tracing::warn!(
1385 "QQ: WebSocket stream ended unexpectedly; resume will be attempted on reconnect"
1386 );
1387 anyhow::bail!("QQ WebSocket connection closed: stream ended unexpectedly")
1388 }
1389 ExitReason::HeartbeatTimeout => {
1390 tracing::warn!(
1391 "QQ: heartbeat timeout after {MAX_MISSED_ACKS} consecutive missed ACKs; \
1392 resume will be attempted on reconnect"
1393 );
1394 anyhow::bail!(
1395 "QQ WebSocket connection closed: heartbeat ACK timeout \
1396 ({MAX_MISSED_ACKS} consecutive missed ACKs)"
1397 )
1398 }
1399 ExitReason::WriteFailed => {
1400 tracing::warn!("QQ: WebSocket write failed; resume will be attempted on reconnect");
1401 anyhow::bail!("QQ WebSocket connection closed: write failed")
1402 }
1403 ExitReason::ChannelClosed => {
1404 anyhow::bail!("QQ WebSocket connection closed: internal message channel closed")
1405 }
1406 }
1407 }
1408
1409 async fn health_check(&self) -> bool {
1410 self.fetch_access_token_with_retry().await.is_ok()
1411 }
1412}
1413
1414#[cfg(test)]
1415mod tests {
1416 use super::*;
1417 use serde_json::json;
1418
1419 fn make_channel() -> QQChannel {
1420 QQChannel::new("id".into(), "secret".into(), vec![])
1421 }
1422
1423 #[test]
1424 fn test_name() {
1425 let ch = make_channel();
1426 assert_eq!(ch.name(), "qq");
1427 }
1428
1429 #[test]
1430 fn test_user_allowed_wildcard() {
1431 let ch = QQChannel::new("id".into(), "secret".into(), vec!["*".into()]);
1432 assert!(ch.is_user_allowed("anyone"));
1433 }
1434
1435 #[test]
1436 fn test_user_allowed_specific() {
1437 let ch = QQChannel::new("id".into(), "secret".into(), vec!["user123".into()]);
1438 assert!(ch.is_user_allowed("user123"));
1439 assert!(!ch.is_user_allowed("other"));
1440 }
1441
1442 #[test]
1443 fn test_user_denied_empty() {
1444 let ch = make_channel();
1445 assert!(!ch.is_user_allowed("anyone"));
1446 }
1447
1448 #[tokio::test]
1449 async fn test_dedup() {
1450 let ch = make_channel();
1451 assert!(!ch.is_duplicate("msg1").await);
1452 assert!(ch.is_duplicate("msg1").await);
1453 assert!(!ch.is_duplicate("msg2").await);
1454 }
1455
1456 #[tokio::test]
1457 async fn test_dedup_empty_id() {
1458 let ch = make_channel();
1459 assert!(!ch.is_duplicate("").await);
1460 assert!(!ch.is_duplicate("").await);
1461 }
1462
1463 #[test]
1464 fn test_config_serde() {
1465 let toml_str = r#"
1466app_id = "12345"
1467app_secret = "secret_abc"
1468allowed_users = ["user1"]
1469"#;
1470 let config: crate::config::schema::QQConfig = toml::from_str(toml_str).unwrap();
1471 assert_eq!(config.app_id, "12345");
1472 assert_eq!(config.app_secret, "secret_abc");
1473 assert_eq!(config.allowed_users, vec!["user1"]);
1474 }
1475
1476 #[test]
1479 fn test_parse_qq_markers_single_image() {
1480 let (text, atts) = parse_qq_attachment_markers("Hello [IMAGE:/tmp/a.png] world");
1481 assert_eq!(text, "Hello world");
1482 assert_eq!(atts.len(), 1);
1483 assert_eq!(atts[0].kind, QQMediaFileType::Image);
1484 assert_eq!(atts[0].target, "/tmp/a.png");
1485 }
1486
1487 #[test]
1488 fn test_parse_qq_markers_multiple() {
1489 let (text, atts) =
1490 parse_qq_attachment_markers("[IMAGE:/a.png] text [VIDEO:https://example.com/v.mp4]");
1491 assert_eq!(text, "text");
1492 assert_eq!(atts.len(), 2);
1493 assert_eq!(atts[0].kind, QQMediaFileType::Image);
1494 assert_eq!(atts[1].kind, QQMediaFileType::Video);
1495 }
1496
1497 #[test]
1498 fn test_parse_qq_markers_no_markers() {
1499 let (text, atts) = parse_qq_attachment_markers("Just plain text");
1500 assert_eq!(text, "Just plain text");
1501 assert!(atts.is_empty());
1502 }
1503
1504 #[test]
1505 fn test_parse_qq_markers_case_insensitive() {
1506 let (_, atts) = parse_qq_attachment_markers("[image:/a.png]");
1507 assert_eq!(atts.len(), 1);
1508 assert_eq!(atts[0].kind, QQMediaFileType::Image);
1509
1510 let (_, atts) = parse_qq_attachment_markers("[Image:/a.png]");
1511 assert_eq!(atts.len(), 1);
1512 assert_eq!(atts[0].kind, QQMediaFileType::Image);
1513 }
1514
1515 #[test]
1516 fn test_parse_qq_markers_invalid_preserved() {
1517 let (text, atts) = parse_qq_attachment_markers("Keep [UNKNOWN:foo] here");
1518 assert_eq!(text, "Keep [UNKNOWN:foo] here");
1519 assert!(atts.is_empty());
1520 }
1521
1522 #[test]
1523 fn test_parse_qq_markers_mixed_text_and_markers() {
1524 let (text, atts) =
1525 parse_qq_attachment_markers("Before [DOCUMENT:/doc.pdf] middle [PHOTO:/p.jpg] after");
1526 assert_eq!(text, "Before middle after");
1527 assert_eq!(atts.len(), 2);
1528 assert_eq!(atts[0].kind, QQMediaFileType::File);
1529 assert_eq!(atts[0].target, "/doc.pdf");
1530 assert_eq!(atts[1].kind, QQMediaFileType::Image);
1531 assert_eq!(atts[1].target, "/p.jpg");
1532 }
1533
1534 #[test]
1537 fn test_marker_kind_image() {
1538 assert_eq!(
1539 marker_kind_to_qq_file_type("IMAGE", "/a.png"),
1540 Some(QQMediaFileType::Image)
1541 );
1542 assert_eq!(
1543 marker_kind_to_qq_file_type("PHOTO", "/a.png"),
1544 Some(QQMediaFileType::Image)
1545 );
1546 }
1547
1548 #[test]
1549 fn test_marker_kind_document() {
1550 assert_eq!(
1551 marker_kind_to_qq_file_type("DOCUMENT", "/a.pdf"),
1552 Some(QQMediaFileType::File)
1553 );
1554 assert_eq!(
1555 marker_kind_to_qq_file_type("FILE", "/a.zip"),
1556 Some(QQMediaFileType::File)
1557 );
1558 }
1559
1560 #[test]
1561 fn test_marker_kind_video() {
1562 assert_eq!(
1563 marker_kind_to_qq_file_type("VIDEO", "/v.mp4"),
1564 Some(QQMediaFileType::Video)
1565 );
1566 }
1567
1568 #[test]
1569 fn test_marker_kind_voice_native() {
1570 assert_eq!(
1571 marker_kind_to_qq_file_type("VOICE", "/a.mp3"),
1572 Some(QQMediaFileType::Voice)
1573 );
1574 assert_eq!(
1575 marker_kind_to_qq_file_type("AUDIO", "/a.wav"),
1576 Some(QQMediaFileType::Voice)
1577 );
1578 assert_eq!(
1579 marker_kind_to_qq_file_type("VOICE", "/a.silk"),
1580 Some(QQMediaFileType::Voice)
1581 );
1582 }
1583
1584 #[test]
1585 fn test_marker_kind_voice_non_native_degrades() {
1586 assert_eq!(
1588 marker_kind_to_qq_file_type("VOICE", "/a.ogg"),
1589 Some(QQMediaFileType::File)
1590 );
1591 assert_eq!(
1592 marker_kind_to_qq_file_type("AUDIO", "/a.flac"),
1593 Some(QQMediaFileType::File)
1594 );
1595 }
1596
1597 #[test]
1600 fn test_upload_body_url() {
1601 let body = json!({
1602 "file_type": QQMediaFileType::Image as u8,
1603 "srv_send_msg": false,
1604 "url": "https://example.com/a.jpg",
1605 });
1606 assert_eq!(body["file_type"], 1);
1607 assert_eq!(body["srv_send_msg"], false);
1608 assert_eq!(body["url"], "https://example.com/a.jpg");
1609 assert!(body.get("file_data").is_none());
1610 }
1611
1612 #[test]
1613 fn test_upload_body_base64() {
1614 let body = json!({
1615 "file_type": QQMediaFileType::File as u8,
1616 "srv_send_msg": false,
1617 "file_data": "dGVzdA==",
1618 });
1619 assert_eq!(body["file_type"], 4);
1620 assert_eq!(body["file_data"], "dGVzdA==");
1621 assert!(body.get("url").is_none());
1622 }
1623
1624 #[test]
1625 fn test_send_media_body_msg_type_7() {
1626 let file_info = "some_file_info_string";
1627 let body = json!({
1628 "msg_type": 7,
1629 "media": {
1630 "file_info": file_info,
1631 },
1632 "msg_seq": 1,
1633 });
1634 assert_eq!(body["msg_type"], 7);
1635 assert_eq!(body["media"]["file_info"], file_info);
1636 }
1637
1638 #[tokio::test]
1641 async fn test_compose_message_content_text_only() {
1642 let ch = make_channel();
1643 let payload = json!({ "content": " hello world " });
1644 assert_eq!(
1645 ch.compose_message_content(&payload).await,
1646 Some("hello world".to_string())
1647 );
1648 }
1649
1650 #[tokio::test]
1651 async fn test_compose_message_content_image_attachment() {
1652 let ch = make_channel();
1653 let payload = json!({
1654 "content": " ",
1655 "attachments": [{
1656 "content_type": "image/jpg",
1657 "url": "https://cdn.example.com/a.jpg"
1658 }]
1659 });
1660 assert_eq!(
1661 ch.compose_message_content(&payload).await,
1662 Some("[IMAGE:https://cdn.example.com/a.jpg]".to_string())
1663 );
1664 }
1665
1666 #[tokio::test]
1667 async fn test_compose_message_content_text_and_attachments() {
1668 let ch = make_channel();
1669 let payload = json!({
1670 "content": "Here is an image",
1671 "attachments": [
1672 { "content_type": "image/png", "url": "https://cdn.example.com/a.png" },
1673 { "filename": "b.jpeg", "url": "https://cdn.example.com/b.jpeg" }
1674 ]
1675 });
1676 assert_eq!(
1677 ch.compose_message_content(&payload).await,
1678 Some(
1679 "Here is an image\n[IMAGE:https://cdn.example.com/a.png]\n[IMAGE:https://cdn.example.com/b.jpeg]"
1680 .to_string()
1681 )
1682 );
1683 }
1684
1685 #[tokio::test]
1686 async fn test_compose_all_attachment_types() {
1687 let ch = make_channel();
1688 let payload = json!({
1689 "content": "",
1690 "attachments": [
1691 { "content_type": "image/png", "url": "https://cdn.example.com/a.png" },
1692 { "content_type": "audio/mpeg", "url": "https://cdn.example.com/b.mp3" },
1693 { "content_type": "video/mp4", "url": "https://cdn.example.com/c.mp4" },
1694 { "content_type": "application/pdf", "url": "https://cdn.example.com/d.pdf" }
1695 ]
1696 });
1697 let result = ch.compose_message_content(&payload).await.unwrap();
1698 assert!(result.contains("[IMAGE:"));
1699 assert!(result.contains("[VOICE:"));
1700 assert!(result.contains("[VIDEO:"));
1701 assert!(result.contains("[DOCUMENT:"));
1702 }
1703
1704 #[tokio::test]
1705 async fn test_compose_fixes_double_slash_url() {
1706 let ch = make_channel();
1707 let payload = json!({
1708 "content": "",
1709 "attachments": [{
1710 "content_type": "image/png",
1711 "url": "//cdn.example.com/a.png"
1712 }]
1713 });
1714 let result = ch.compose_message_content(&payload).await.unwrap();
1715 assert!(result.contains("https://cdn.example.com/a.png"));
1716 assert!(!result.starts_with("[IMAGE://"));
1718 }
1719
1720 #[tokio::test]
1721 async fn test_compose_fallback_no_workspace() {
1722 let ch = make_channel();
1724 let payload = json!({
1725 "content": "text",
1726 "attachments": [{
1727 "content_type": "application/pdf",
1728 "filename": "report.pdf",
1729 "url": "https://cdn.example.com/report.pdf"
1730 }]
1731 });
1732 let result = ch.compose_message_content(&payload).await.unwrap();
1733 assert!(result.contains("[DOCUMENT:https://cdn.example.com/report.pdf]"));
1734 }
1735
1736 #[tokio::test]
1737 async fn test_compose_drops_empty_url() {
1738 let ch = make_channel();
1739 let payload = json!({
1740 "content": " ",
1741 "attachments": [{
1742 "content_type": "image/png",
1743 "url": " "
1744 }]
1745 });
1746 assert_eq!(ch.compose_message_content(&payload).await, None);
1747 }
1748
1749 #[test]
1752 fn test_send_body_uses_markdown_msg_type() {
1753 let content = "**bold** and `code`";
1754 let body = json!({
1755 "markdown": { "content": content },
1756 "msg_type": 2,
1757 });
1758 assert_eq!(body["msg_type"], 2);
1759 assert_eq!(body["markdown"]["content"], content);
1760 assert!(
1761 body.get("content").is_none(),
1762 "top-level 'content' must not be present"
1763 );
1764 }
1765
1766 #[test]
1769 fn test_fix_qq_url() {
1770 assert_eq!(
1771 fix_qq_url("//cdn.example.com/a.png"),
1772 "https://cdn.example.com/a.png"
1773 );
1774 assert_eq!(
1775 fix_qq_url("https://cdn.example.com/a.png"),
1776 "https://cdn.example.com/a.png"
1777 );
1778 }
1779
1780 #[test]
1781 fn test_next_msg_seq_range() {
1782 for _ in 0..100 {
1783 let seq = next_msg_seq();
1784 assert!(seq < 65536);
1785 }
1786 }
1787
1788 #[test]
1789 fn test_resolve_recipient_group() {
1790 let (scope, id) = QQChannel::resolve_recipient("group:abc123");
1791 assert_eq!(scope, "groups");
1792 assert_eq!(id, "abc123");
1793 }
1794
1795 #[test]
1796 fn test_resolve_recipient_user() {
1797 let (scope, id) = QQChannel::resolve_recipient("user:xyz789");
1798 assert_eq!(scope, "users");
1799 assert_eq!(id, "xyz789");
1800 }
1801
1802 #[test]
1803 fn test_resolve_recipient_bare_id() {
1804 let (scope, id) = QQChannel::resolve_recipient("raw_id_123");
1805 assert_eq!(scope, "users");
1806 assert_eq!(id, "raw_id_123");
1807 }
1808
1809 #[test]
1810 fn test_infer_attachment_marker() {
1811 assert_eq!(infer_attachment_marker("image/png", "a.png"), "IMAGE");
1812 assert_eq!(infer_attachment_marker("audio/mpeg", "a.mp3"), "VOICE");
1813 assert_eq!(infer_attachment_marker("video/mp4", "a.mp4"), "VIDEO");
1814 assert_eq!(
1815 infer_attachment_marker("application/pdf", "doc.pdf"),
1816 "DOCUMENT"
1817 );
1818 assert_eq!(infer_attachment_marker("", "photo.jpg"), "IMAGE");
1819 assert_eq!(infer_attachment_marker("", "song.mp3"), "VOICE");
1820 assert_eq!(infer_attachment_marker("", "clip.mp4"), "VIDEO");
1821 assert_eq!(infer_attachment_marker("", "unknown.xyz"), "DOCUMENT");
1822 }
1823
1824 #[tokio::test]
1827 async fn test_upload_cache_hit_and_miss() {
1828 let ch = make_channel();
1829 let key = QQChannel::upload_cache_key(b"test_data", "c2c", "user1", QQMediaFileType::Image);
1830
1831 assert!(ch.get_cached_upload(&key).await.is_none());
1833
1834 ch.set_cached_upload(key.clone(), "cached_file_info".into(), 3600)
1836 .await;
1837
1838 assert_eq!(
1840 ch.get_cached_upload(&key).await,
1841 Some("cached_file_info".to_string())
1842 );
1843 }
1844
1845 #[tokio::test]
1846 async fn test_upload_cache_expired() {
1847 let ch = make_channel();
1848 let key = QQChannel::upload_cache_key(b"test_data", "group", "g1", QQMediaFileType::Video);
1849
1850 ch.set_cached_upload(key.clone(), "old_info".into(), 0)
1852 .await;
1853
1854 assert!(ch.get_cached_upload(&key).await.is_none());
1856 }
1857
1858 #[tokio::test]
1861 async fn test_reply_tracker_allows_up_to_limit() {
1862 let ch = make_channel();
1863 for _ in 0..REPLY_LIMIT {
1864 assert!(ch.check_reply_allowed("msg1").await);
1865 }
1866 assert!(!ch.check_reply_allowed("msg1").await);
1868 }
1869
1870 #[tokio::test]
1871 async fn test_reply_tracker_independent_msg_ids() {
1872 let ch = make_channel();
1873 assert!(ch.check_reply_allowed("msg_a").await);
1874 assert!(ch.check_reply_allowed("msg_b").await);
1875 }
1876
1877 #[test]
1880 fn test_auth_retry_constants_are_sensible() {
1881 const {
1882 assert!(AUTH_RETRY_MAX_ATTEMPTS >= 2, "should retry at least once");
1883 assert!(
1884 AUTH_RETRY_INITIAL_BACKOFF_MS > 0,
1885 "initial backoff must be positive"
1886 );
1887 assert!(
1888 AUTH_RETRY_MAX_BACKOFF_MS >= AUTH_RETRY_INITIAL_BACKOFF_MS,
1889 "max backoff must be >= initial"
1890 );
1891 }
1892 }
1893
1894 #[test]
1895 fn test_auth_retry_backoff_stays_within_bounds() {
1896 let mut backoff = AUTH_RETRY_INITIAL_BACKOFF_MS;
1898 for _ in 1..AUTH_RETRY_MAX_ATTEMPTS {
1899 backoff = (backoff * 2).min(AUTH_RETRY_MAX_BACKOFF_MS);
1900 }
1901 assert!(
1902 backoff <= AUTH_RETRY_MAX_BACKOFF_MS,
1903 "backoff must never exceed the configured maximum"
1904 );
1905 }
1906
1907 #[tokio::test]
1908 async fn test_get_token_returns_cached_token_without_fetch() {
1909 let ch = make_channel();
1910 let future_expiry = now_secs() + 3600;
1912 *ch.token_cache.write().await = Some(("cached_tok".to_string(), future_expiry));
1913
1914 let tok = ch.get_token().await.unwrap();
1916 assert_eq!(tok, "cached_tok");
1917 }
1918
1919 #[tokio::test]
1920 async fn test_get_token_refreshes_expired_cache() {
1921 let ch = make_channel();
1922 *ch.token_cache.write().await = Some(("old_tok".to_string(), 0));
1924
1925 let result = ch.get_token().await;
1928 assert!(
1929 result.is_err(),
1930 "should fail when token expired and no server available"
1931 );
1932 }
1933
1934 #[test]
1937 fn test_heartbeat_grace_period_calculation() {
1938 let cases: Vec<(u64, u64)> = vec![
1940 (41_250, 4_125), (30_000, 3_000), (60_000, 5_000), (100_000, 5_000), (5_000, 500), (0, 0), ];
1947 for (interval, expected_grace) in cases {
1948 let grace: u64 = (interval / 10).min(5_000);
1949 assert_eq!(
1950 grace, expected_grace,
1951 "grace for interval {interval} should be {expected_grace}"
1952 );
1953 let effective = interval.saturating_add(grace);
1954 assert!(effective >= interval);
1955 }
1956 }
1957
1958 #[test]
1959 fn test_missed_ack_counter_logic() {
1960 let max_missed: u32 = 3;
1961 let mut missed: u32 = 0;
1962
1963 assert!(missed < max_missed);
1965 missed += 1;
1966 assert_eq!(missed, 1, "counter should be 1 after first heartbeat");
1967
1968 missed = 0;
1970 assert_eq!(missed, 0, "counter should reset on ACK");
1971
1972 for _ in 0..max_missed {
1974 assert!(
1975 missed < max_missed,
1976 "should not reach zombie state before {max_missed} misses"
1977 );
1978 missed += 1;
1979 }
1980 assert!(
1981 missed >= max_missed,
1982 "should declare zombie after {max_missed} missed ACKs"
1983 );
1984 }
1985
1986 #[test]
1987 fn test_missed_ack_counter_reset_on_ack() {
1988 let max_missed: u32 = 3;
1989 let mut missed: u32 = 0;
1990
1991 missed += 1;
1992 missed += 1;
1993 assert_eq!(missed, 2);
1994
1995 missed = 0;
1997 assert_eq!(missed, 0);
1998
1999 missed += 1;
2001 assert!(missed < max_missed);
2002 }
2003
2004 #[test]
2005 fn test_effective_interval_never_overflows() {
2006 let interval = u64::MAX;
2007 let grace: u64 = (interval / 10).min(5_000);
2008 let effective = interval.saturating_add(grace);
2009 assert_eq!(effective, u64::MAX);
2010 }
2011}