1use futures_core::Stream;
4use futures_util::StreamExt;
5use std::collections::{HashMap, VecDeque};
6use std::future::Future;
7use std::path::Path;
8use std::pin::Pin;
9use std::sync::Arc;
10use std::task::{Context, Poll};
11use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
12use tokio::sync::{broadcast, oneshot, Mutex, Notify, RwLock};
13use tokio::time::sleep;
14use tracing::{error, info, warn};
15
16use crate::cdn::CdnClient;
17use crate::crypto;
18use crate::error::{Result, WechatIlinkError};
19use crate::protocol::{self, ILinkClient, ILinkClientOptions};
20use crate::types::*;
21use md5::{Digest, Md5};
22use rand::Rng;
23use serde_json::json;
24use uuid::Uuid;
25
26const EVENT_CHANNEL_CAPACITY: usize = 256;
27
28#[derive(Debug, Clone)]
30pub enum WechatEvent {
31 ContextObserved(WechatContext),
33 Message(IncomingMessage),
35 CursorAdvanced { account_key: String, cursor: String },
37 AuthSessionExpired { account_key: String },
39 UserInteractionRequested {
44 account_key: String,
45 user_id: Option<String>,
46 reason: UserInteractionReason,
47 },
48}
49
50#[derive(Debug, Clone, PartialEq, Eq)]
52pub struct SendReceipt {
53 pub message_ids: Vec<String>,
54 pub visible_texts: Vec<String>,
55}
56
57impl SendReceipt {
58 pub fn last_message_id(&self) -> Option<&str> {
59 self.message_ids.last().map(String::as_str)
60 }
61
62 fn single(message_id: String) -> Self {
63 Self {
64 message_ids: vec![message_id],
65 visible_texts: Vec::new(),
66 }
67 }
68
69 fn empty() -> Self {
70 Self {
71 message_ids: Vec::new(),
72 visible_texts: Vec::new(),
73 }
74 }
75
76 fn append(&mut self, mut other: SendReceipt) {
77 self.message_ids.append(&mut other.message_ids);
78 self.visible_texts.append(&mut other.visible_texts);
79 }
80}
81
82#[derive(Debug, Clone, PartialEq, Eq)]
84pub enum UserInteractionReason {
85 ContextExpiring {
87 observed_at: SystemTime,
88 expires_at: SystemTime,
89 remind_before: Duration,
90 },
91 OutboundRateLimitApproaching {
93 sent_count: usize,
94 window: Duration,
95 threshold: usize,
96 },
97}
98
99pub struct WechatEventStream<'a> {
101 inner: Pin<Box<dyn Stream<Item = Result<WechatEvent>> + Send + 'a>>,
102}
103
104impl<'a> WechatEventStream<'a> {
105 fn new(stream: impl Stream<Item = Result<WechatEvent>> + Send + 'a) -> Self {
106 Self {
107 inner: Box::pin(stream),
108 }
109 }
110
111 pub async fn next(&mut self) -> Option<Result<WechatEvent>> {
112 self.inner.next().await
113 }
114}
115
116impl Stream for WechatEventStream<'_> {
117 type Item = Result<WechatEvent>;
118
119 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
120 self.inner.as_mut().poll_next(cx)
121 }
122}
123
124#[derive(Debug)]
126pub enum LoginQrEvent {
127 QrCode { content: String },
129 StatusChanged { status: String },
131 NeedVerifyCode {
133 prompt: String,
134 responder: VerifyCodeResponder,
135 },
136 Confirmed { credentials: Credentials },
138}
139
140#[derive(Debug)]
142pub struct VerifyCodeResponder {
143 tx: oneshot::Sender<Option<String>>,
144}
145
146impl VerifyCodeResponder {
147 pub fn send(self, code: impl Into<String>) -> std::result::Result<(), Option<String>> {
148 let code = Some(code.into());
149 self.tx.send(code)
150 }
151
152 pub fn cancel(self) -> std::result::Result<(), Option<String>> {
153 self.tx.send(None)
154 }
155}
156
157pub struct LoginQrStream<'a> {
159 inner: Pin<Box<dyn Stream<Item = Result<LoginQrEvent>> + Send + 'a>>,
160}
161
162impl<'a> LoginQrStream<'a> {
163 fn new(stream: impl Stream<Item = Result<LoginQrEvent>> + Send + 'a) -> Self {
164 Self {
165 inner: Box::pin(stream),
166 }
167 }
168
169 pub async fn next(&mut self) -> Option<Result<LoginQrEvent>> {
170 self.inner.next().await
171 }
172}
173
174impl Stream for LoginQrStream<'_> {
175 type Item = Result<LoginQrEvent>;
176
177 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
178 self.inner.as_mut().poll_next(cx)
179 }
180}
181
182#[derive(Debug, Clone)]
184pub struct WechatRateLimitOptions {
185 pub retry_after: Duration,
187 pub retry_attempts: usize,
189 pub interaction_window: Duration,
191 pub interaction_threshold: usize,
193 pub context_ttl: Duration,
195 pub context_remind_before: Duration,
197}
198
199impl Default for WechatRateLimitOptions {
200 fn default() -> Self {
201 Self {
202 retry_after: protocol::DEFAULT_RATE_LIMIT_RETRY_AFTER,
203 retry_attempts: 5,
204 interaction_window: Duration::from_secs(5 * 60),
205 interaction_threshold: 6,
206 context_ttl: Duration::from_secs(24 * 60 * 60),
207 context_remind_before: Duration::from_secs(30 * 60),
208 }
209 }
210}
211
212#[derive(Default)]
213struct AccountRateLimitState {
214 sent_at: VecDeque<Instant>,
215 next_allowed_at: Option<Instant>,
216}
217
218struct ContextRefreshState {
219 user_id: String,
220 observed_at: SystemTime,
221 expires_at: SystemTime,
222 reminded: bool,
223}
224
225pub struct WechatIlinkClientBuilder {
227 base_url: Option<String>,
228 bot_agent: Option<String>,
229 ilink_app_id: Option<String>,
230 route_tag: Option<String>,
231 markdown_filter: bool,
232 rate_limit: WechatRateLimitOptions,
233 credentials: Option<Credentials>,
234 http_client: Option<reqwest::Client>,
235}
236
237impl Default for WechatIlinkClientBuilder {
238 fn default() -> Self {
239 Self {
240 base_url: None,
241 bot_agent: None,
242 ilink_app_id: None,
243 route_tag: None,
244 markdown_filter: true,
245 rate_limit: WechatRateLimitOptions::default(),
246 credentials: None,
247 http_client: None,
248 }
249 }
250}
251
252impl WechatIlinkClientBuilder {
253 pub fn base_url(mut self, base_url: impl Into<String>) -> Self {
254 self.base_url = Some(base_url.into());
255 self
256 }
257
258 pub fn bot_agent(mut self, bot_agent: impl Into<String>) -> Self {
259 self.bot_agent = Some(bot_agent.into());
260 self
261 }
262
263 pub fn ilink_app_id(mut self, ilink_app_id: impl Into<String>) -> Self {
264 self.ilink_app_id = Some(ilink_app_id.into());
265 self
266 }
267
268 pub fn route_tag(mut self, route_tag: impl Into<String>) -> Self {
269 self.route_tag = Some(route_tag.into());
270 self
271 }
272
273 pub fn markdown_filter(mut self, enabled: bool) -> Self {
274 self.markdown_filter = enabled;
275 self
276 }
277
278 pub fn rate_limit_options(mut self, options: WechatRateLimitOptions) -> Self {
279 self.rate_limit = options;
280 self
281 }
282
283 pub fn rate_limit_retry_after(mut self, retry_after: Duration) -> Self {
284 self.rate_limit.retry_after = if retry_after.is_zero() {
285 protocol::DEFAULT_RATE_LIMIT_RETRY_AFTER
286 } else {
287 retry_after
288 };
289 self
290 }
291
292 pub fn rate_limit_retry_attempts(mut self, retry_attempts: usize) -> Self {
293 self.rate_limit.retry_attempts = retry_attempts;
294 self
295 }
296
297 pub fn rate_limit_max_retries(mut self, max_retries: usize) -> Self {
298 self.rate_limit.retry_attempts = max_retries;
299 self
300 }
301
302 pub fn context_ttl(mut self, ttl: Duration) -> Self {
303 if !ttl.is_zero() {
304 self.rate_limit.context_ttl = ttl;
305 }
306 self
307 }
308
309 pub fn context_expiry_remind_before(mut self, remind_before: Duration) -> Self {
310 self.rate_limit.context_remind_before = remind_before;
311 self
312 }
313
314 pub fn rate_limit_interaction_window(mut self, window: Duration) -> Self {
315 if !window.is_zero() {
316 self.rate_limit.interaction_window = window;
317 }
318 self
319 }
320
321 pub fn rate_limit_interaction_threshold(mut self, threshold: usize) -> Self {
322 self.rate_limit.interaction_threshold = threshold;
323 self
324 }
325
326 pub fn credentials(mut self, credentials: Credentials) -> Self {
327 self.credentials = Some(credentials);
328 self
329 }
330
331 pub fn http_client(mut self, http_client: reqwest::Client) -> Self {
332 self.http_client = Some(http_client);
333 self
334 }
335
336 pub fn build(self) -> WechatIlinkClient {
337 WechatIlinkClient::from_builder(self)
338 }
339}
340
341pub struct WechatIlinkClient {
343 client: Arc<ILinkClient>,
344 cdn: CdnClient,
345 credentials: RwLock<Option<Credentials>>,
346 event_tx: broadcast::Sender<WechatEvent>,
347 rate_limit_states: Mutex<HashMap<String, AccountRateLimitState>>,
348 context_refresh_states: Mutex<HashMap<(String, String), ContextRefreshState>>,
349 rate_limit_notify: Notify,
350 base_url: RwLock<String>,
351 stopped: RwLock<bool>,
352 rate_limit: WechatRateLimitOptions,
353}
354
355impl WechatIlinkClient {
356 pub fn new() -> Self {
358 Self::builder().build()
359 }
360
361 pub fn builder() -> WechatIlinkClientBuilder {
363 WechatIlinkClientBuilder::default()
364 }
365
366 fn from_builder(builder: WechatIlinkClientBuilder) -> Self {
367 let base_url = builder
368 .credentials
369 .as_ref()
370 .map(|creds| creds.base_url.clone())
371 .or(builder.base_url)
372 .unwrap_or_else(|| protocol::DEFAULT_BASE_URL.to_string());
373 let options = ILinkClientOptions {
374 bot_agent: builder.bot_agent,
375 route_tag: builder.route_tag,
376 ilink_app_id: builder.ilink_app_id,
377 markdown_filter: builder.markdown_filter,
378 };
379 let (client, cdn) = match builder.http_client {
380 Some(http_client) => (
381 ILinkClient::with_http_client_and_options(http_client.clone(), options),
382 CdnClient::with_client(http_client),
383 ),
384 None => (ILinkClient::with_options(options), CdnClient::new()),
385 };
386 Self {
387 client: Arc::new(client),
388 cdn,
389 credentials: RwLock::new(builder.credentials),
390 event_tx: broadcast::channel(EVENT_CHANNEL_CAPACITY).0,
391 rate_limit_states: Mutex::new(HashMap::new()),
392 context_refresh_states: Mutex::new(HashMap::new()),
393 rate_limit_notify: Notify::new(),
394 base_url: RwLock::new(base_url),
395 stopped: RwLock::new(false),
396 rate_limit: builder.rate_limit,
397 }
398 }
399
400 const MAX_QR_REFRESH: u32 = 3;
402 const FIXED_QR_BASE_URL: &'static str = "https://ilinkai.weixin.qq.com";
404
405 pub async fn set_credentials(&self, creds: Credentials) {
407 *self.base_url.write().await = creds.base_url.clone();
408 *self.credentials.write().await = Some(creds);
409 }
410
411 pub async fn credentials(&self) -> Option<Credentials> {
413 self.credentials.read().await.clone()
414 }
415
416 pub fn login_qr(&self) -> LoginQrStream<'_> {
422 LoginQrStream::new(async_stream::stream! {
423 let base_url = self.base_url.read().await.clone();
424 let mut qr_refresh_count = 0u32;
425 loop {
426 qr_refresh_count += 1;
427 if qr_refresh_count > Self::MAX_QR_REFRESH {
428 yield Err(WechatIlinkError::Auth(format!(
429 "QR code expired {} times — login aborted",
430 Self::MAX_QR_REFRESH
431 )));
432 return;
433 }
434
435 let qr = match self.client.get_qr_code(Self::FIXED_QR_BASE_URL).await {
436 Ok(qr) => qr,
437 Err(err) => {
438 yield Err(err);
439 return;
440 }
441 };
442 yield Ok(LoginQrEvent::QrCode {
443 content: qr.qrcode_img_content.clone(),
444 });
445
446 let mut last_status = String::new();
447 let mut current_poll_base_url = Self::FIXED_QR_BASE_URL.to_string();
448 let mut pending_verify_code: Option<String> = None;
449 loop {
450 let status = match self
451 .client
452 .poll_qr_status_with_verify_code(
453 ¤t_poll_base_url,
454 &qr.qrcode,
455 pending_verify_code.as_deref(),
456 )
457 .await
458 {
459 Ok(status) => status,
460 Err(err) => {
461 yield Err(err);
462 return;
463 }
464 };
465
466 if status.status != last_status {
467 last_status = status.status.clone();
468 match status.status.as_str() {
469 "scaned" => info!("QR scanned — confirm in WeChat"),
470 "expired" => warn!("QR expired — requesting new one"),
471 "confirmed" => info!("Login confirmed"),
472 "need_verifycode" => info!("QR verification code required"),
473 "verify_code_blocked" => warn!("QR verification code blocked"),
474 "binded_redirect" => warn!("QR already bound"),
475 _ => {}
476 }
477 yield Ok(LoginQrEvent::StatusChanged {
478 status: status.status.clone(),
479 });
480 }
481
482 if status.status == "wait" {
483 sleep(Duration::from_secs(1)).await;
484 continue;
485 }
486
487 if status.status == "need_verifycode" {
488 let prompt = if pending_verify_code.is_some() {
489 "QR verification code mismatch"
490 } else {
491 "QR verification code required"
492 };
493 let (tx, rx) = oneshot::channel();
494 yield Ok(LoginQrEvent::NeedVerifyCode {
495 prompt: prompt.to_string(),
496 responder: VerifyCodeResponder { tx },
497 });
498 let code = match rx.await {
499 Ok(Some(value)) => value.trim().to_string(),
500 _ => String::new(),
501 };
502 if code.is_empty() {
503 yield Err(WechatIlinkError::Auth(
504 "QR verification code was not provided".into(),
505 ));
506 return;
507 }
508 pending_verify_code = Some(code);
509 continue;
510 }
511
512 if status.status == "verify_code_blocked" {
513 break;
514 }
515
516 if status.status == "binded_redirect" {
517 yield Err(WechatIlinkError::Auth(
518 "QR login is already bound to this app".into(),
519 ));
520 return;
521 }
522
523 if status.status == "confirmed" {
524 let token = match status.bot_token {
525 Some(token) => token,
526 None => {
527 yield Err(WechatIlinkError::Auth("missing bot_token".into()));
528 return;
529 }
530 };
531 let creds = Credentials {
532 token,
533 base_url: status.baseurl.unwrap_or_else(|| base_url.clone()),
534 account_id: status.ilink_bot_id.unwrap_or_default(),
535 user_id: status.ilink_user_id.unwrap_or_default(),
536 saved_at: Some(chrono_now()),
537 };
538 *self.credentials.write().await = Some(creds.clone());
539 *self.base_url.write().await = creds.base_url.clone();
540 yield Ok(LoginQrEvent::Confirmed { credentials: creds });
541 return;
542 }
543
544 if status.status == "scaned_but_redirect" {
545 if let Some(ref host) = status.redirect_host {
546 current_poll_base_url = format!("https://{}", host);
547 info!("IDC redirect, switching polling host to {}", host);
548 } else {
549 warn!("Received scaned_but_redirect but redirect_host is missing");
550 }
551 sleep(Duration::from_secs(2)).await;
552 continue;
553 }
554
555 if status.status == "scaned" && pending_verify_code.is_some() {
556 pending_verify_code = None;
557 }
558
559 if status.status == "expired" {
560 break;
561 }
562
563 sleep(Duration::from_secs(2)).await;
564 }
565 }
566 })
567 }
568
569 pub fn events_from_cursor(
571 self: Arc<Self>,
572 cursor: Option<String>,
573 ) -> WechatEventStream<'static> {
574 let mut rx = self.event_tx.subscribe();
575 let runner = Arc::clone(&self);
576 let mut task =
577 tokio::spawn(async move { runner.run_loop(cursor.unwrap_or_default()).await });
578 let abort_on_drop = AbortOnDrop(task.abort_handle());
579 WechatEventStream::new(async_stream::stream! {
580 let _abort_on_drop = abort_on_drop;
581 loop {
582 tokio::select! {
583 biased;
584 result = &mut task => {
585 match result {
586 Ok(Ok(())) => return,
587 Ok(Err(err)) => {
588 yield Err(err);
589 return;
590 }
591 Err(err) => {
592 yield Err(WechatIlinkError::Other(format!(
593 "wechat poll task failed: {err}"
594 )));
595 return;
596 }
597 }
598 }
599 event = rx.recv() => match event {
600 Ok(event) => yield Ok(event),
601 Err(broadcast::error::RecvError::Lagged(skipped)) => {
602 yield Err(WechatIlinkError::Other(format!(
603 "wechat event stream lagged by {skipped} events"
604 )));
605 }
606 Err(broadcast::error::RecvError::Closed) => return,
607 }
608 }
609 }
610 })
611 }
612
613 fn emit_event(&self, event: WechatEvent) {
615 let _ = self.event_tx.send(event);
616 }
617
618 pub async fn reply(&self, msg: &IncomingMessage, text: &str) -> Result<SendReceipt> {
623 let context = msg
624 .context
625 .as_ref()
626 .ok_or_else(|| WechatIlinkError::NoContext(msg.user_id.clone()))?;
627 self.send_text_with_context(context, text).await
628 }
629
630 pub async fn send_text_with_context(
636 &self,
637 context: &WechatContext,
638 text: &str,
639 ) -> Result<SendReceipt> {
640 self.send_text(&context.user_id, text, &context.context_token)
641 .await
642 }
643
644 pub async fn send_media_with_context(
648 &self,
649 context: &WechatContext,
650 content: SendContent,
651 ) -> Result<SendReceipt> {
652 self.send_content(&context.user_id, &context.context_token, content)
653 .await
654 }
655
656 pub async fn send_typing_with_context(&self, context: &WechatContext) -> Result<()> {
660 let (base_url, token) = self.get_auth().await?;
661 let config = self
662 .retry_rate_limited(|| {
663 self.client
664 .get_config(&base_url, &token, &context.user_id, &context.context_token)
665 })
666 .await?;
667 if let Some(ticket) = config.typing_ticket {
668 self.retry_rate_limited(|| {
669 self.client
670 .send_typing(&base_url, &token, &context.user_id, &ticket, 1)
671 })
672 .await?;
673 }
674 Ok(())
675 }
676
677 pub async fn reply_media(
681 &self,
682 msg: &IncomingMessage,
683 content: SendContent,
684 ) -> Result<SendReceipt> {
685 let context = msg
686 .context
687 .as_ref()
688 .ok_or_else(|| WechatIlinkError::NoContext(msg.user_id.clone()))?;
689 self.send_media_with_context(context, content).await
690 }
691
692 pub async fn download(&self, msg: &IncomingMessage) -> Result<Option<DownloadedMedia>> {
695 if let Some(img) = msg.images.first() {
696 if let Some(ref media) = img.media {
697 let data = self.cdn.download(media, img.aes_key.as_deref()).await?;
698 return Ok(Some(DownloadedMedia {
699 data,
700 media_type: "image".into(),
701 file_name: None,
702 format: None,
703 }));
704 }
705 }
706 if let Some(file) = msg.files.first() {
707 if let Some(ref media) = file.media {
708 require_download_aes_key("file", media)?;
709 let data = self.cdn.download(media, None).await?;
710 return Ok(Some(DownloadedMedia {
711 data,
712 media_type: "file".into(),
713 file_name: Some(file.file_name.clone().unwrap_or_else(|| "file.bin".into())),
714 format: None,
715 }));
716 }
717 }
718 if let Some(video) = msg.videos.first() {
719 if let Some(ref media) = video.media {
720 require_download_aes_key("video", media)?;
721 let data = self.cdn.download(media, None).await?;
722 return Ok(Some(DownloadedMedia {
723 data,
724 media_type: "video".into(),
725 file_name: None,
726 format: None,
727 }));
728 }
729 }
730 if let Some(voice) = msg.voices.first() {
731 if let Some(ref media) = voice.media {
732 require_download_aes_key("voice", media)?;
733 let data = self.cdn.download(media, None).await?;
734 return Ok(Some(DownloadedMedia {
735 data,
736 media_type: "voice".into(),
737 file_name: None,
738 format: Some("silk".into()),
739 }));
740 }
741 }
742 Ok(None)
743 }
744
745 pub async fn download_raw(
747 &self,
748 media: &CDNMedia,
749 aeskey_override: Option<&str>,
750 ) -> Result<Vec<u8>> {
751 self.cdn.download(media, aeskey_override).await
752 }
753
754 pub async fn upload(
756 &self,
757 data: &[u8],
758 user_id: &str,
759 media_type: i32,
760 ) -> Result<UploadResult> {
761 let (base_url, token) = self.get_auth().await?;
762 self.cdn_upload(&base_url, &token, data, user_id, media_type)
763 .await
764 }
765
766 async fn run_loop(&self, mut cursor: String) -> Result<()> {
767 *self.stopped.write().await = false;
768 info!("Long-poll loop started");
769 let mut retry_delay = Duration::from_secs(1);
770
771 if let Ok((base_url, token)) = self.get_auth().await {
772 match self.client.notify_start(&base_url, &token).await {
773 Ok(resp) if resp.ret.is_some_and(|ret| ret != 0) => warn!(
774 "notify_start returned ret={:?} errmsg={:?}",
775 resp.ret, resp.errmsg
776 ),
777 Ok(_) => {}
778 Err(err) => warn!("notify_start failed: {}", err),
779 }
780 }
781
782 loop {
783 if *self.stopped.read().await {
784 break;
785 }
786
787 let (base_url, token) = self.get_auth().await?;
788
789 match self.client.get_updates(&base_url, &token, &cursor).await {
790 Ok(updates) => {
791 let account_key = self.account_key().await;
792 let new_cursor = updates.get_updates_buf.clone();
793 let mut cursor_changed = false;
794 if !new_cursor.is_empty() && cursor.as_str() != new_cursor.as_str() {
795 cursor_changed = true;
796 cursor = new_cursor.clone();
797 }
798 retry_delay = Duration::from_secs(1);
799
800 for wire in &updates.msgs {
801 if let Some(incoming) =
802 IncomingMessage::from_wire_for_account(wire, &account_key)
803 {
804 self.reset_account_rate_limit(&account_key).await;
805 if let Some(context) = incoming.context.clone() {
806 self.observe_context_for_interaction(&context).await;
807 self.emit_event(WechatEvent::ContextObserved(context));
808 }
809 self.emit_event(WechatEvent::Message(incoming.clone()));
810 }
811 }
812 self.emit_due_context_interaction_requests().await;
813 if cursor_changed {
814 self.emit_event(WechatEvent::CursorAdvanced {
815 account_key: account_key.clone(),
816 cursor: new_cursor.clone(),
817 });
818 }
819 }
820 Err(e) if e.is_session_expired() => {
821 warn!("Session expired — re-login required");
822 let account_key = self.account_key().await;
823 self.emit_event(WechatEvent::AuthSessionExpired {
824 account_key: account_key.clone(),
825 });
826 cursor.clear();
827 break;
828 }
829 Err(e) if e.is_rate_limited() => {
830 self.report_error(&e);
831 sleep(self.rate_limit.retry_after).await;
832 retry_delay = Duration::from_secs(1);
833 continue;
834 }
835 Err(e) => {
836 self.report_error(&e);
837 sleep(retry_delay).await;
838 retry_delay = std::cmp::min(retry_delay * 2, Duration::from_secs(10));
839 continue;
840 }
841 }
842 }
843
844 info!("Long-poll loop stopped");
845 if let Ok((base_url, token)) = self.get_auth().await {
846 match self.client.notify_stop(&base_url, &token).await {
847 Ok(resp) if resp.ret.is_some_and(|ret| ret != 0) => warn!(
848 "notify_stop returned ret={:?} errmsg={:?}",
849 resp.ret, resp.errmsg
850 ),
851 Ok(_) => {}
852 Err(err) => warn!("notify_stop failed: {}", err),
853 }
854 }
855 Ok(())
856 }
857
858 async fn retry_rate_limited<F, Fut, T>(&self, mut operation: F) -> Result<T>
859 where
860 F: FnMut() -> Fut,
861 Fut: Future<Output = Result<T>>,
862 {
863 let account_key = self.account_key().await;
864 if let Some(wait) = self.active_rate_limit_backoff(&account_key).await {
865 return Err(rate_limited_error(wait));
866 }
867
868 let mut retries = 0usize;
869 loop {
870 match operation().await {
871 Ok(value) => return Ok(value),
872 Err(err) if err.is_rate_limited() => {
873 let err = with_rate_limit_retry_after(err, self.rate_limit.retry_after);
874 self.defer_account_rate_limit(&account_key, self.rate_limit.retry_after)
875 .await;
876 if retries >= self.rate_limit.retry_attempts {
877 return Err(err);
878 }
879 retries += 1;
880 self.wait_for_rate_limit_reset_or_timeout(
881 &account_key,
882 self.rate_limit.retry_after,
883 )
884 .await;
885 }
886 Err(err) => return Err(err),
887 }
888 }
889 }
890
891 async fn active_rate_limit_backoff(&self, account_key: &str) -> Option<Duration> {
892 if account_key.is_empty() {
893 return None;
894 }
895 let mut states = self.rate_limit_states.lock().await;
896 let state = states.entry(account_key.to_string()).or_default();
897 match state.next_allowed_at {
898 Some(next_allowed_at) => match next_allowed_at.checked_duration_since(Instant::now()) {
899 Some(wait) if !wait.is_zero() => Some(wait),
900 _ => {
901 state.next_allowed_at = None;
902 None
903 }
904 },
905 None => None,
906 }
907 }
908
909 async fn wait_for_rate_limit_reset_or_timeout(&self, account_key: &str, wait: Duration) {
910 if account_key.is_empty() || wait.is_zero() {
911 return;
912 }
913 let _ = tokio::time::timeout(wait, self.rate_limit_notify.notified()).await;
914 }
915
916 async fn defer_account_rate_limit(&self, account_key: &str, retry_after: Duration) {
917 if account_key.is_empty() {
918 return;
919 }
920 let next_allowed_at = Instant::now() + retry_after;
921 let mut states = self.rate_limit_states.lock().await;
922 let state = states.entry(account_key.to_string()).or_default();
923 if state
924 .next_allowed_at
925 .map_or(true, |current| current < next_allowed_at)
926 {
927 state.next_allowed_at = Some(next_allowed_at);
928 }
929 }
930
931 async fn reset_account_rate_limit(&self, account_key: &str) {
932 if account_key.is_empty() {
933 return;
934 }
935 let mut states = self.rate_limit_states.lock().await;
936 let state = states.entry(account_key.to_string()).or_default();
937 state.sent_at.clear();
938 state.next_allowed_at = None;
939 drop(states);
940 self.rate_limit_notify.notify_waiters();
941 }
942
943 async fn observe_context_for_interaction(&self, context: &WechatContext) {
944 let observed_at = SystemTime::now();
945 let expires_at = observed_at + self.rate_limit.context_ttl;
946 self.context_refresh_states.lock().await.insert(
947 (context.account_key.clone(), context.user_id.clone()),
948 ContextRefreshState {
949 user_id: context.user_id.clone(),
950 observed_at,
951 expires_at,
952 reminded: false,
953 },
954 );
955 }
956
957 async fn emit_due_context_interaction_requests(&self) {
958 if self.rate_limit.context_ttl.is_zero() {
959 return;
960 }
961 let now = SystemTime::now();
962 let mut events = Vec::new();
963 {
964 let mut contexts = self.context_refresh_states.lock().await;
965 contexts.retain(|_, state| !(state.reminded && now > state.expires_at));
966 for ((account_key, _), state) in contexts.iter_mut() {
967 if state.reminded {
968 continue;
969 }
970 let remind_at = state
971 .expires_at
972 .checked_sub(self.rate_limit.context_remind_before)
973 .unwrap_or(state.observed_at);
974 if now >= remind_at {
975 state.reminded = true;
976 events.push(WechatEvent::UserInteractionRequested {
977 account_key: account_key.clone(),
978 user_id: Some(state.user_id.clone()),
979 reason: UserInteractionReason::ContextExpiring {
980 observed_at: state.observed_at,
981 expires_at: state.expires_at,
982 remind_before: self.rate_limit.context_remind_before,
983 },
984 });
985 }
986 }
987 }
988 for event in events {
989 self.emit_event(event);
990 }
991 }
992
993 async fn record_successful_message_send(&self) {
994 let account_key = self.account_key().await;
995 if account_key.is_empty() || self.rate_limit.interaction_threshold == 0 {
996 return;
997 }
998
999 let now = Instant::now();
1000 let mut event = None;
1001 {
1002 let mut states = self.rate_limit_states.lock().await;
1003 let state = states.entry(account_key.clone()).or_default();
1004 while let Some(sent_at) = state.sent_at.front().copied() {
1005 if now.duration_since(sent_at) > self.rate_limit.interaction_window {
1006 state.sent_at.pop_front();
1007 } else {
1008 break;
1009 }
1010 }
1011 state.sent_at.push_back(now);
1012 let sent_count = state.sent_at.len();
1013 if sent_count == self.rate_limit.interaction_threshold {
1014 event = Some(WechatEvent::UserInteractionRequested {
1015 account_key,
1016 user_id: None,
1017 reason: UserInteractionReason::OutboundRateLimitApproaching {
1018 sent_count,
1019 window: self.rate_limit.interaction_window,
1020 threshold: self.rate_limit.interaction_threshold,
1021 },
1022 });
1023 }
1024 }
1025
1026 if let Some(event) = event {
1027 self.emit_event(event);
1028 }
1029 }
1030
1031 pub async fn stop(&self) {
1033 *self.stopped.write().await = true;
1034 }
1035
1036 fn send_content<'a>(
1039 &'a self,
1040 user_id: &'a str,
1041 context_token: &'a str,
1042 content: SendContent,
1043 ) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<SendReceipt>> + Send + 'a>> {
1044 Box::pin(async move {
1045 let (base_url, token) = self.get_auth().await?;
1046 match content {
1047 SendContent::Text(text) => self.send_text(user_id, &text, context_token).await,
1048 SendContent::Image { data, caption } => {
1049 let result = self
1050 .cdn_upload(&base_url, &token, &data, user_id, 1)
1051 .await?;
1052 let mut receipt = SendReceipt::empty();
1053 if let Some(cap) = caption {
1054 receipt.append(self.send_text(user_id, &cap, context_token).await?);
1055 }
1056 let item = json!({"type": 2, "image_item": {
1057 "media": cdn_media_json(&result.media),
1058 "mid_size": result.encrypted_file_size,
1059 }});
1060 receipt.append(
1061 self.send_media_item(&base_url, &token, user_id, context_token, item)
1062 .await?,
1063 );
1064 Ok(receipt)
1065 }
1066 SendContent::Video { data, caption } => {
1067 let result = self
1068 .cdn_upload(&base_url, &token, &data, user_id, 2)
1069 .await?;
1070 let mut receipt = SendReceipt::empty();
1071 if let Some(cap) = caption {
1072 receipt.append(self.send_text(user_id, &cap, context_token).await?);
1073 }
1074 let item = json!({"type": 5, "video_item": {
1075 "media": cdn_media_json(&result.media),
1076 "video_size": result.encrypted_file_size,
1077 }});
1078 receipt.append(
1079 self.send_media_item(&base_url, &token, user_id, context_token, item)
1080 .await?,
1081 );
1082 Ok(receipt)
1083 }
1084 SendContent::File {
1085 data,
1086 file_name,
1087 caption,
1088 } => {
1089 let cat = categorize_by_extension(&file_name);
1090 match cat {
1091 "image" => {
1092 self.send_content(
1093 user_id,
1094 context_token,
1095 SendContent::Image { data, caption },
1096 )
1097 .await
1098 }
1099 "video" => {
1100 self.send_content(
1101 user_id,
1102 context_token,
1103 SendContent::Video { data, caption },
1104 )
1105 .await
1106 }
1107 _ => {
1108 let mut receipt = SendReceipt::empty();
1109 if let Some(cap) = caption {
1110 receipt.append(self.send_text(user_id, &cap, context_token).await?);
1111 }
1112 let data_len = data.len();
1113 let result = self
1114 .cdn_upload(&base_url, &token, &data, user_id, 3)
1115 .await?;
1116 let item = json!({"type": 4, "file_item": {
1117 "media": cdn_media_json(&result.media),
1118 "file_name": file_name,
1119 "len": data_len.to_string(),
1120 }});
1121 receipt.append(
1122 self.send_media_item(
1123 &base_url,
1124 &token,
1125 user_id,
1126 context_token,
1127 item,
1128 )
1129 .await?,
1130 );
1131 Ok(receipt)
1132 }
1133 }
1134 }
1135 }
1136 })
1137 }
1138
1139 async fn send_media_item(
1140 &self,
1141 base_url: &str,
1142 token: &str,
1143 user_id: &str,
1144 context_token: &str,
1145 item: serde_json::Value,
1146 ) -> Result<SendReceipt> {
1147 let client_id = Uuid::new_v4().to_string();
1148 let msg = protocol::build_media_message_with_client_id(
1149 user_id,
1150 context_token,
1151 vec![item],
1152 &client_id,
1153 );
1154 self.retry_rate_limited(|| self.client.send_message(base_url, token, &msg))
1155 .await?;
1156 self.record_successful_message_send().await;
1157 Ok(SendReceipt::single(client_id))
1158 }
1159
1160 async fn cdn_upload(
1161 &self,
1162 base_url: &str,
1163 token: &str,
1164 data: &[u8],
1165 user_id: &str,
1166 media_type: i32,
1167 ) -> Result<UploadResult> {
1168 let aes_key = crypto::generate_aes_key();
1169 let ciphertext = crypto::encrypt_aes_ecb(data, &aes_key);
1170
1171 let mut filekey_buf = [0u8; 16];
1172 rand::rng().fill_bytes(&mut filekey_buf);
1173 let filekey = hex::encode(filekey_buf);
1174
1175 let raw_md5 = hex::encode(Md5::digest(data));
1176
1177 let params = protocol::GetUploadUrlParams {
1178 filekey: filekey.clone(),
1179 media_type,
1180 to_user_id: user_id.to_string(),
1181 rawsize: data.len(),
1182 rawfilemd5: raw_md5,
1183 filesize: ciphertext.len(),
1184 thumb_rawsize: None,
1185 thumb_rawfilemd5: None,
1186 thumb_filesize: None,
1187 no_need_thumb: true,
1188 aeskey: crypto::encode_aes_key_hex(&aes_key),
1189 };
1190
1191 let upload_resp = self.client.get_upload_url(base_url, token, ¶ms).await?;
1192 let upload_url = resolve_cdn_upload_url(&upload_resp, &filekey)?;
1193
1194 let encrypted_file_size = ciphertext.len();
1195
1196 let encrypt_query_param = self.client.upload_to_cdn(&upload_url, &ciphertext).await?;
1197
1198 Ok(UploadResult {
1199 media: CDNMedia {
1200 encrypt_query_param: Some(encrypt_query_param),
1201 aes_key: Some(crypto::encode_aes_key_base64(&aes_key)),
1202 encrypt_type: Some(1),
1203 full_url: None,
1204 },
1205 aes_key,
1206 encrypted_file_size,
1207 })
1208 }
1209
1210 async fn send_text(
1213 &self,
1214 user_id: &str,
1215 text: &str,
1216 context_token: &str,
1217 ) -> Result<SendReceipt> {
1218 let (base_url, token) = self.get_auth().await?;
1219 let mut message_ids = Vec::new();
1220 let mut visible_texts = Vec::new();
1221 for chunk in chunk_text(text, 4000) {
1222 let client_id = Uuid::new_v4().to_string();
1223 let msg = self.client.build_text_message_with_client_id(
1224 user_id,
1225 context_token,
1226 &chunk,
1227 &client_id,
1228 );
1229 self.retry_rate_limited(|| self.client.send_message(&base_url, &token, &msg))
1230 .await?;
1231 self.record_successful_message_send().await;
1232 message_ids.push(client_id);
1233 visible_texts.push(chunk);
1234 }
1235 Ok(SendReceipt {
1236 message_ids,
1237 visible_texts,
1238 })
1239 }
1240
1241 async fn get_auth(&self) -> Result<(String, String)> {
1242 let creds = self.credentials.read().await;
1243 let creds = creds
1244 .as_ref()
1245 .ok_or_else(|| WechatIlinkError::Auth("not logged in".into()))?;
1246 Ok((creds.base_url.clone(), creds.token.clone()))
1247 }
1248
1249 async fn account_key(&self) -> String {
1250 let creds = self.credentials.read().await;
1251 match creds.as_ref() {
1252 Some(c) => context_account_key(c).to_string(),
1253 None => String::new(),
1254 }
1255 }
1256
1257 fn report_error(&self, err: &WechatIlinkError) {
1258 error!("{}", err);
1259 }
1260}
1261
1262#[cfg(test)]
1263fn event_stream_from_receiver(
1264 mut rx: broadcast::Receiver<WechatEvent>,
1265) -> WechatEventStream<'static> {
1266 WechatEventStream::new(async_stream::stream! {
1267 loop {
1268 match rx.recv().await {
1269 Ok(event) => yield Ok(event),
1270 Err(broadcast::error::RecvError::Lagged(skipped)) => {
1271 yield Err(WechatIlinkError::Other(format!(
1272 "wechat event stream lagged by {skipped} events"
1273 )));
1274 }
1275 Err(broadcast::error::RecvError::Closed) => return,
1276 }
1277 }
1278 })
1279}
1280
1281struct AbortOnDrop(tokio::task::AbortHandle);
1282
1283impl Drop for AbortOnDrop {
1284 fn drop(&mut self) {
1285 self.0.abort();
1286 }
1287}
1288
1289fn rate_limited_error(retry_after: Duration) -> WechatIlinkError {
1290 WechatIlinkError::RateLimited {
1291 retry_after,
1292 message: "ret=-2".to_string(),
1293 http_status: 200,
1294 errcode: -2,
1295 }
1296}
1297
1298fn with_rate_limit_retry_after(err: WechatIlinkError, retry_after: Duration) -> WechatIlinkError {
1299 match err {
1300 WechatIlinkError::RateLimited {
1301 message,
1302 http_status,
1303 errcode,
1304 ..
1305 } => WechatIlinkError::RateLimited {
1306 retry_after,
1307 message,
1308 http_status,
1309 errcode,
1310 },
1311 other => other,
1312 }
1313}
1314
1315pub enum SendContent {
1317 Text(String),
1318 Image {
1319 data: Vec<u8>,
1320 caption: Option<String>,
1321 },
1322 Video {
1323 data: Vec<u8>,
1324 caption: Option<String>,
1325 },
1326 File {
1327 data: Vec<u8>,
1328 file_name: String,
1329 caption: Option<String>,
1330 },
1331}
1332
1333fn cdn_media_json(media: &CDNMedia) -> serde_json::Value {
1334 let mut v = json!({});
1335 if let Some(param) = &media.encrypt_query_param {
1336 v["encrypt_query_param"] = json!(param);
1337 }
1338 if let Some(key) = &media.aes_key {
1339 v["aes_key"] = json!(key);
1340 }
1341 if let Some(et) = media.encrypt_type {
1342 v["encrypt_type"] = json!(et);
1343 }
1344 if let Some(url) = &media.full_url {
1345 v["full_url"] = json!(url);
1346 }
1347 v
1348}
1349
1350fn resolve_cdn_upload_url(
1351 response: &protocol::GetUploadUrlResponse,
1352 filekey: &str,
1353) -> Result<String> {
1354 if let Some(upload_full_url) = response
1355 .upload_full_url
1356 .as_deref()
1357 .filter(|value| !value.is_empty())
1358 {
1359 return Ok(upload_full_url.to_string());
1360 }
1361 if let Some(upload_param) = response
1362 .upload_param
1363 .as_deref()
1364 .filter(|value| !value.is_empty())
1365 {
1366 return Ok(protocol::build_cdn_upload_url(
1367 protocol::CDN_BASE_URL,
1368 upload_param,
1369 filekey,
1370 ));
1371 }
1372 Err(WechatIlinkError::Media(
1373 "getuploadurl did not return upload_full_url or upload_param".into(),
1374 ))
1375}
1376
1377fn require_download_aes_key(media_type: &str, media: &CDNMedia) -> Result<()> {
1378 if media
1379 .aes_key
1380 .as_deref()
1381 .is_some_and(|value| !value.is_empty())
1382 {
1383 return Ok(());
1384 }
1385 Err(WechatIlinkError::Media(format!(
1386 "{} CDN media missing AES key",
1387 media_type
1388 )))
1389}
1390
1391fn categorize_by_extension(filename: &str) -> &'static str {
1392 let ext = Path::new(filename)
1393 .extension()
1394 .and_then(|e| e.to_str())
1395 .unwrap_or("")
1396 .to_lowercase();
1397 match ext.as_str() {
1398 "png" | "jpg" | "jpeg" | "gif" | "webp" | "bmp" | "svg" => "image",
1399 "mp4" | "mov" | "webm" | "mkv" | "avi" => "video",
1400 _ => "file",
1401 }
1402}
1403
1404fn chunk_text(text: &str, limit: usize) -> Vec<String> {
1405 if text.len() <= limit {
1406 return vec![text.to_string()];
1407 }
1408 let mut chunks = Vec::new();
1409 let mut remaining = text;
1410 while !remaining.is_empty() {
1411 if remaining.len() <= limit {
1412 chunks.push(remaining.to_string());
1413 break;
1414 }
1415 let window_end = char_boundary_at_or_before(remaining, limit);
1416 let window = &remaining[..window_end];
1417 let cut = window
1418 .rfind("\n\n")
1419 .filter(|&i| i > window_end * 3 / 10)
1420 .map(|i| i + 2)
1421 .or_else(|| {
1422 window
1423 .rfind('\n')
1424 .filter(|&i| i > window_end * 3 / 10)
1425 .map(|i| i + 1)
1426 })
1427 .or_else(|| {
1428 window
1429 .rfind(' ')
1430 .filter(|&i| i > window_end * 3 / 10)
1431 .map(|i| i + 1)
1432 })
1433 .unwrap_or(window_end);
1434 chunks.push(remaining[..cut].to_string());
1435 remaining = &remaining[cut..];
1436 }
1437 if chunks.is_empty() {
1438 vec![String::new()]
1439 } else {
1440 chunks
1441 }
1442}
1443
1444fn char_boundary_at_or_before(text: &str, limit: usize) -> usize {
1445 let mut end = limit.min(text.len());
1446 while !text.is_char_boundary(end) {
1447 end -= 1;
1448 }
1449 end
1450}
1451
1452fn context_account_key(creds: &Credentials) -> &str {
1453 if !creds.account_id.is_empty() {
1454 &creds.account_id
1455 } else {
1456 &creds.user_id
1457 }
1458}
1459
1460fn chrono_now() -> String {
1461 let dur = SystemTime::now().duration_since(UNIX_EPOCH).unwrap();
1463 format!("{}Z", dur.as_secs())
1464}
1465
1466#[cfg(test)]
1467mod tests {
1468 use super::*;
1469
1470 #[test]
1471 fn builder_accepts_caller_provided_reqwest_client() {
1472 let http_client = reqwest::Client::new();
1473 let _client = WechatIlinkClient::builder()
1474 .http_client(http_client)
1475 .build();
1476 }
1477
1478 #[test]
1479 fn chunk_text_short() {
1480 let chunks = chunk_text("hello", 100);
1481 assert_eq!(chunks, vec!["hello"]);
1482 }
1483
1484 #[test]
1485 fn chunk_text_empty() {
1486 let chunks = chunk_text("", 100);
1487 assert_eq!(chunks, vec![""]);
1488 }
1489
1490 #[test]
1491 fn chunk_text_splits_on_paragraph() {
1492 let text = "aaaa\n\nbbbb";
1493 let chunks = chunk_text(text, 7);
1494 assert_eq!(chunks, vec!["aaaa\n\n", "bbbb"]);
1495 }
1496
1497 #[test]
1498 fn chunk_text_splits_on_newline() {
1499 let text = "aaaa\nbbbb";
1500 let chunks = chunk_text(text, 7);
1501 assert_eq!(chunks, vec!["aaaa\n", "bbbb"]);
1502 }
1503
1504 #[test]
1505 fn chunk_text_exact_limit() {
1506 let text = "abcdef";
1507 let chunks = chunk_text(text, 6);
1508 assert_eq!(chunks, vec!["abcdef"]);
1509 }
1510
1511 #[test]
1512 fn chunk_text_does_not_split_utf8_char_boundary() {
1513 let mut text = "a".repeat(3999);
1514 text.push('启');
1515 text.push_str("tail");
1516
1517 let chunks = chunk_text(&text, 4000);
1518
1519 assert_eq!(chunks.concat(), text);
1520 assert!(chunks.iter().all(|chunk| chunk.len() <= 4000));
1521 }
1522
1523 #[test]
1524 fn categorize_image_extensions() {
1525 assert_eq!(categorize_by_extension("photo.png"), "image");
1526 assert_eq!(categorize_by_extension("photo.JPG"), "image");
1527 assert_eq!(categorize_by_extension("anim.gif"), "image");
1528 assert_eq!(categorize_by_extension("pic.webp"), "image");
1529 }
1530
1531 #[test]
1532 fn categorize_video_extensions() {
1533 assert_eq!(categorize_by_extension("clip.mp4"), "video");
1534 assert_eq!(categorize_by_extension("clip.MOV"), "video");
1535 assert_eq!(categorize_by_extension("movie.webm"), "video");
1536 }
1537
1538 #[test]
1539 fn categorize_file_extensions() {
1540 assert_eq!(categorize_by_extension("report.pdf"), "file");
1541 assert_eq!(categorize_by_extension("data.csv"), "file");
1542 assert_eq!(categorize_by_extension("noext"), "file");
1543 }
1544
1545 #[test]
1546 fn cdn_media_json_with_encrypt_type() {
1547 let media = CDNMedia {
1548 encrypt_query_param: Some("param=1".to_string()),
1549 aes_key: Some("key123".to_string()),
1550 encrypt_type: Some(1),
1551 full_url: None,
1552 };
1553 let j = cdn_media_json(&media);
1554 assert_eq!(j["encrypt_query_param"], "param=1");
1555 assert_eq!(j["aes_key"], "key123");
1556 assert_eq!(j["encrypt_type"], 1);
1557 }
1558
1559 #[test]
1560 fn cdn_media_json_without_encrypt_type() {
1561 let media = CDNMedia {
1562 encrypt_query_param: Some("p".to_string()),
1563 aes_key: Some("k".to_string()),
1564 encrypt_type: None,
1565 full_url: None,
1566 };
1567 let j = cdn_media_json(&media);
1568 assert!(j.get("encrypt_type").is_none());
1569 }
1570
1571 #[test]
1572 fn upload_url_prefers_full_url() {
1573 let response = protocol::GetUploadUrlResponse {
1574 upload_param: Some("param".to_string()),
1575 thumb_upload_param: None,
1576 upload_full_url: Some("https://cdn.example/upload".to_string()),
1577 };
1578
1579 let upload_url = resolve_cdn_upload_url(&response, "file-key").unwrap();
1580 assert_eq!(upload_url, "https://cdn.example/upload");
1581 }
1582
1583 #[test]
1584 fn upload_url_falls_back_to_upload_param() {
1585 let response = protocol::GetUploadUrlResponse {
1586 upload_param: Some("param value".to_string()),
1587 thumb_upload_param: None,
1588 upload_full_url: None,
1589 };
1590
1591 let upload_url = resolve_cdn_upload_url(&response, "file key").unwrap();
1592 assert_eq!(
1593 upload_url,
1594 format!(
1595 "{}/upload?encrypted_query_param=param%20value&filekey=file%20key",
1596 protocol::CDN_BASE_URL
1597 )
1598 );
1599 }
1600
1601 #[test]
1602 fn upload_url_requires_full_url_or_upload_param() {
1603 let response = protocol::GetUploadUrlResponse {
1604 upload_param: None,
1605 thumb_upload_param: None,
1606 upload_full_url: None,
1607 };
1608
1609 let err = resolve_cdn_upload_url(&response, "file-key").unwrap_err();
1610 assert!(matches!(err, WechatIlinkError::Media(_)));
1611 }
1612
1613 #[test]
1614 fn non_image_download_requires_media_aes_key() {
1615 let media = CDNMedia {
1616 encrypt_query_param: Some("param".to_string()),
1617 aes_key: None,
1618 encrypt_type: Some(1),
1619 full_url: None,
1620 };
1621
1622 let err = require_download_aes_key("voice", &media).unwrap_err();
1623 assert!(matches!(err, WechatIlinkError::Media(_)));
1624 }
1625
1626 #[test]
1627 fn non_image_download_accepts_media_aes_key() {
1628 let media = CDNMedia {
1629 encrypt_query_param: Some("param".to_string()),
1630 aes_key: Some("key".to_string()),
1631 encrypt_type: Some(1),
1632 full_url: None,
1633 };
1634
1635 require_download_aes_key("file", &media).unwrap();
1636 }
1637
1638 #[tokio::test]
1639 async fn events_from_cursor_reports_missing_auth_without_storing_cursor() {
1640 let client = Arc::new(WechatIlinkClient::new());
1641 let mut events = client.events_from_cursor(Some("external-cursor".to_string()));
1642
1643 let err = events
1644 .next()
1645 .await
1646 .expect("stream item")
1647 .expect_err("missing auth should be surfaced through stream");
1648
1649 assert!(matches!(err, WechatIlinkError::Auth(_)));
1650 }
1651
1652 #[test]
1653 fn rate_limit_builder_defaults_and_overrides_are_available() {
1654 let default_options = WechatRateLimitOptions::default();
1655 assert_eq!(default_options.retry_after, Duration::from_secs(90));
1656 assert_eq!(default_options.retry_attempts, 5);
1657 assert_eq!(default_options.interaction_window, Duration::from_secs(300));
1658 assert_eq!(default_options.interaction_threshold, 6);
1659 assert_eq!(
1660 default_options.context_ttl,
1661 Duration::from_secs(24 * 60 * 60)
1662 );
1663
1664 let client = WechatIlinkClient::builder()
1665 .rate_limit_retry_after(Duration::from_secs(60))
1666 .rate_limit_max_retries(2)
1667 .rate_limit_interaction_window(Duration::from_secs(120))
1668 .rate_limit_interaction_threshold(4)
1669 .context_ttl(Duration::from_secs(3600))
1670 .context_expiry_remind_before(Duration::from_secs(300))
1671 .build();
1672
1673 assert_eq!(client.rate_limit.retry_after, Duration::from_secs(60));
1674 assert_eq!(client.rate_limit.retry_attempts, 2);
1675 assert_eq!(
1676 client.rate_limit.interaction_window,
1677 Duration::from_secs(120)
1678 );
1679 assert_eq!(client.rate_limit.interaction_threshold, 4);
1680 assert_eq!(client.rate_limit.context_ttl, Duration::from_secs(3600));
1681 assert_eq!(
1682 client.rate_limit.context_remind_before,
1683 Duration::from_secs(300)
1684 );
1685 }
1686
1687 #[tokio::test]
1688 async fn outbound_threshold_emits_interaction_event_and_incoming_resets_window() {
1689 let client = WechatIlinkClient::builder()
1690 .rate_limit_interaction_threshold(3)
1691 .rate_limit_interaction_window(Duration::from_secs(300))
1692 .build();
1693 client.set_credentials(test_credentials()).await;
1694 let mut events = event_stream_from_receiver(client.event_tx.subscribe());
1695
1696 client.record_successful_message_send().await;
1697 client.record_successful_message_send().await;
1698 assert!(
1699 tokio::time::timeout(Duration::from_millis(5), events.next())
1700 .await
1701 .is_err()
1702 );
1703 client.record_successful_message_send().await;
1704 assert_rate_limit_event(events.next().await, "account-1", 3, 3);
1705 client.record_successful_message_send().await;
1706 assert!(
1707 tokio::time::timeout(Duration::from_millis(5), events.next())
1708 .await
1709 .is_err()
1710 );
1711
1712 client.reset_account_rate_limit("account-1").await;
1713 client.record_successful_message_send().await;
1714 client.record_successful_message_send().await;
1715 client.record_successful_message_send().await;
1716 assert_rate_limit_event(events.next().await, "account-1", 3, 3);
1717 }
1718
1719 #[tokio::test]
1720 async fn context_expiry_emits_user_interaction_event_and_new_context_resets_it() {
1721 let client = WechatIlinkClient::builder()
1722 .context_ttl(Duration::from_millis(5))
1723 .context_expiry_remind_before(Duration::from_millis(5))
1724 .build();
1725 let mut events = event_stream_from_receiver(client.event_tx.subscribe());
1726
1727 let context = WechatContext {
1728 account_key: "account-1".to_string(),
1729 user_id: "user-1".to_string(),
1730 context_token: "ctx-1".to_string(),
1731 observed_at_unix_ms: 1,
1732 source_message_id: Some("msg-1".to_string()),
1733 };
1734 client.observe_context_for_interaction(&context).await;
1735 tokio::time::sleep(Duration::from_millis(10)).await;
1736 client.emit_due_context_interaction_requests().await;
1737 assert_context_expiring_event(events.next().await, "account-1", "user-1");
1738
1739 client.observe_context_for_interaction(&context).await;
1740 tokio::time::sleep(Duration::from_millis(10)).await;
1741 client.emit_due_context_interaction_requests().await;
1742 assert_context_expiring_event(events.next().await, "account-1", "user-1");
1743 }
1744
1745 #[tokio::test]
1746 async fn retry_rate_limited_uses_configured_retry_count() {
1747 use std::sync::atomic::{AtomicUsize, Ordering};
1748
1749 let client = WechatIlinkClient::builder()
1750 .rate_limit_retry_after(Duration::from_millis(1))
1751 .rate_limit_max_retries(2)
1752 .build();
1753 client.set_credentials(test_credentials()).await;
1754 let attempts = Arc::new(AtomicUsize::new(0));
1755 let attempts_for_operation = Arc::clone(&attempts);
1756
1757 let err = client
1758 .retry_rate_limited(move || {
1759 let attempts_for_operation = Arc::clone(&attempts_for_operation);
1760 async move {
1761 attempts_for_operation.fetch_add(1, Ordering::SeqCst);
1762 Err::<(), _>(rate_limited_error(Duration::from_millis(1)))
1763 }
1764 })
1765 .await
1766 .expect_err("rate limit should remain after configured retries");
1767
1768 assert!(err.is_rate_limited());
1769 assert_eq!(attempts.load(Ordering::SeqCst), 3);
1770 }
1771
1772 #[tokio::test]
1773 async fn active_rate_limit_backoff_returns_error_without_queueing_new_request() {
1774 let client = WechatIlinkClient::builder()
1775 .rate_limit_retry_after(Duration::from_secs(90))
1776 .build();
1777 client.set_credentials(test_credentials()).await;
1778 client
1779 .defer_account_rate_limit("account-1", Duration::from_secs(90))
1780 .await;
1781
1782 let mut called = false;
1783 let err = client
1784 .retry_rate_limited(|| {
1785 called = true;
1786 async { Ok::<_, WechatIlinkError>(()) }
1787 })
1788 .await
1789 .expect_err("active backoff should return immediately");
1790
1791 assert!(!called);
1792 assert!(err.is_rate_limited());
1793 }
1794
1795 #[tokio::test]
1796 async fn event_stream_accepts_context_observed_and_cursor_events() {
1797 let client = WechatIlinkClient::new();
1798 let mut events = event_stream_from_receiver(client.event_tx.subscribe());
1799
1800 client.emit_event(WechatEvent::ContextObserved(WechatContext {
1801 account_key: "account-1".to_string(),
1802 user_id: "user-1".to_string(),
1803 context_token: "ctx-1".to_string(),
1804 observed_at_unix_ms: 1,
1805 source_message_id: Some("msg-1".to_string()),
1806 }));
1807 client.emit_event(WechatEvent::CursorAdvanced {
1808 account_key: "account-1".to_string(),
1809 cursor: "cursor-2".to_string(),
1810 });
1811
1812 assert_eq!(format_event(events.next().await), "context:user-1");
1813 assert_eq!(format_event(events.next().await), "cursor:cursor-2");
1814 }
1815
1816 fn assert_rate_limit_event(
1817 event: Option<Result<WechatEvent>>,
1818 expected_account: &str,
1819 expected_sent_count: usize,
1820 expected_threshold: usize,
1821 ) {
1822 match event.expect("event").expect("ok event") {
1823 WechatEvent::UserInteractionRequested {
1824 account_key,
1825 reason:
1826 UserInteractionReason::OutboundRateLimitApproaching {
1827 sent_count,
1828 threshold,
1829 ..
1830 },
1831 ..
1832 } => {
1833 assert_eq!(account_key, expected_account);
1834 assert_eq!(sent_count, expected_sent_count);
1835 assert_eq!(threshold, expected_threshold);
1836 }
1837 other => panic!("expected rate-limit interaction event, got {other:?}"),
1838 }
1839 }
1840
1841 fn assert_context_expiring_event(
1842 event: Option<Result<WechatEvent>>,
1843 expected_account: &str,
1844 expected_user: &str,
1845 ) {
1846 match event.expect("event").expect("ok event") {
1847 WechatEvent::UserInteractionRequested {
1848 account_key,
1849 user_id,
1850 reason: UserInteractionReason::ContextExpiring { .. },
1851 } => {
1852 assert_eq!(account_key, expected_account);
1853 assert_eq!(user_id.as_deref(), Some(expected_user));
1854 }
1855 other => panic!("expected context-expiring interaction event, got {other:?}"),
1856 }
1857 }
1858
1859 fn format_event(event: Option<Result<WechatEvent>>) -> String {
1860 match event.expect("event").expect("ok event") {
1861 WechatEvent::ContextObserved(ctx) => format!("context:{}", ctx.user_id),
1862 WechatEvent::CursorAdvanced { cursor, .. } => format!("cursor:{cursor}"),
1863 WechatEvent::Message(msg) => format!("message:{}", msg.user_id),
1864 WechatEvent::AuthSessionExpired { account_key } => format!("auth:{account_key}"),
1865 WechatEvent::UserInteractionRequested { reason, .. } => match reason {
1866 UserInteractionReason::OutboundRateLimitApproaching { sent_count, .. } => {
1867 format!("rate-limit:{sent_count}")
1868 }
1869 UserInteractionReason::ContextExpiring { .. } => "context-expiring".into(),
1870 },
1871 }
1872 }
1873
1874 fn test_credentials() -> Credentials {
1875 Credentials {
1876 token: "token-1".into(),
1877 base_url: "https://example.invalid".into(),
1878 account_id: "account-1".into(),
1879 user_id: "bot-1".into(),
1880 saved_at: None,
1881 }
1882 }
1883
1884 #[tokio::test]
1885 async fn credentials_are_supplied_by_caller() {
1886 let client = WechatIlinkClient::new();
1887 client.set_credentials(test_credentials()).await;
1888
1889 let creds = client.credentials().await.unwrap();
1890 assert_eq!(creds.account_id, "account-1");
1891 }
1892}