Skip to main content

wechat_ilink/
bot.rs

1//! Main WechatIlinkClient.
2
3use futures_core::Stream;
4use futures_util::StreamExt;
5use std::collections::{HashMap, VecDeque};
6use std::future::Future;
7use std::path::Path;
8use std::pin::Pin;
9use std::sync::Arc;
10use std::task::{Context, Poll};
11use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
12use tokio::sync::{broadcast, oneshot, Mutex, Notify, RwLock};
13use tokio::time::sleep;
14use tracing::{error, info, warn};
15
16use crate::cdn::CdnClient;
17use crate::crypto;
18use crate::error::{Result, WechatIlinkError};
19use crate::protocol::{self, ILinkClient, ILinkClientOptions};
20use crate::types::*;
21use md5::{Digest, Md5};
22use rand::Rng;
23use serde_json::json;
24use uuid::Uuid;
25
26const EVENT_CHANNEL_CAPACITY: usize = 256;
27
28/// Lifecycle events emitted by the WeChat iLink client.
29#[derive(Debug, Clone)]
30pub enum WechatEvent {
31    /// A new context token was observed from an incoming wire message.
32    ContextObserved(WechatContext),
33    /// An incoming parsed message.
34    Message(IncomingMessage),
35    /// The polling cursor advanced after processing a batch of messages.
36    CursorAdvanced { account_key: String, cursor: String },
37    /// The current auth session has expired and re-login is required.
38    AuthSessionExpired { account_key: String },
39    /// The SDK observed that user interaction would refresh WeChat state.
40    ///
41    /// The SDK does not send a reminder. Applications decide whether to notify
42    /// users and what wording/channel to use.
43    UserInteractionRequested {
44        account_key: String,
45        user_id: Option<String>,
46        reason: UserInteractionReason,
47    },
48}
49
50/// Message ids produced by a send operation.
51#[derive(Debug, Clone, PartialEq, Eq)]
52pub struct SendReceipt {
53    pub message_ids: Vec<String>,
54    pub visible_texts: Vec<String>,
55}
56
57impl SendReceipt {
58    pub fn last_message_id(&self) -> Option<&str> {
59        self.message_ids.last().map(String::as_str)
60    }
61
62    fn single(message_id: String) -> Self {
63        Self {
64            message_ids: vec![message_id],
65            visible_texts: Vec::new(),
66        }
67    }
68
69    fn empty() -> Self {
70        Self {
71            message_ids: Vec::new(),
72            visible_texts: Vec::new(),
73        }
74    }
75
76    fn append(&mut self, mut other: SendReceipt) {
77        self.message_ids.append(&mut other.message_ids);
78        self.visible_texts.append(&mut other.visible_texts);
79    }
80}
81
82/// Why the SDK suggests asking a WeChat user to interact.
83#[derive(Debug, Clone, PartialEq, Eq)]
84pub enum UserInteractionReason {
85    /// A stored context is nearing expiry.
86    ContextExpiring {
87        observed_at: SystemTime,
88        expires_at: SystemTime,
89        remind_before: Duration,
90    },
91    /// Proactive outbound sends are close to the bot-wide iLink rate limit.
92    OutboundRateLimitApproaching {
93        sent_count: usize,
94        window: Duration,
95        threshold: usize,
96    },
97}
98
99/// A stream of WeChat lifecycle events.
100pub struct WechatEventStream<'a> {
101    inner: Pin<Box<dyn Stream<Item = Result<WechatEvent>> + Send + 'a>>,
102}
103
104impl<'a> WechatEventStream<'a> {
105    fn new(stream: impl Stream<Item = Result<WechatEvent>> + Send + 'a) -> Self {
106        Self {
107            inner: Box::pin(stream),
108        }
109    }
110
111    pub async fn next(&mut self) -> Option<Result<WechatEvent>> {
112        self.inner.next().await
113    }
114}
115
116impl Stream for WechatEventStream<'_> {
117    type Item = Result<WechatEvent>;
118
119    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
120        self.inner.as_mut().poll_next(cx)
121    }
122}
123
124/// Events emitted during QR login.
125#[derive(Debug)]
126pub enum LoginQrEvent {
127    /// A new QR code should be displayed or rendered by the application.
128    QrCode { content: String },
129    /// Login status changed.
130    StatusChanged { status: String },
131    /// WeChat requires a verification code before continuing.
132    NeedVerifyCode {
133        prompt: String,
134        responder: VerifyCodeResponder,
135    },
136    /// Login completed and credentials were installed into the client.
137    Confirmed { credentials: Credentials },
138}
139
140/// One-shot responder for [`LoginQrEvent::NeedVerifyCode`].
141#[derive(Debug)]
142pub struct VerifyCodeResponder {
143    tx: oneshot::Sender<Option<String>>,
144}
145
146impl VerifyCodeResponder {
147    pub fn send(self, code: impl Into<String>) -> std::result::Result<(), Option<String>> {
148        let code = Some(code.into());
149        self.tx.send(code)
150    }
151
152    pub fn cancel(self) -> std::result::Result<(), Option<String>> {
153        self.tx.send(None)
154    }
155}
156
157/// A stream of QR login events.
158pub struct LoginQrStream<'a> {
159    inner: Pin<Box<dyn Stream<Item = Result<LoginQrEvent>> + Send + 'a>>,
160}
161
162impl<'a> LoginQrStream<'a> {
163    fn new(stream: impl Stream<Item = Result<LoginQrEvent>> + Send + 'a) -> Self {
164        Self {
165            inner: Box::pin(stream),
166        }
167    }
168
169    pub async fn next(&mut self) -> Option<Result<LoginQrEvent>> {
170        self.inner.next().await
171    }
172}
173
174impl Stream for LoginQrStream<'_> {
175    type Item = Result<LoginQrEvent>;
176
177    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
178        self.inner.as_mut().poll_next(cx)
179    }
180}
181
182/// Rate-limit and context-refresh policy for iLink requests.
183#[derive(Debug, Clone)]
184pub struct WechatRateLimitOptions {
185    /// Delay used before retrying `ret=-2` / `errcode=-2`.
186    pub retry_after: Duration,
187    /// Number of retries after the initial rate-limited attempt.
188    pub retry_attempts: usize,
189    /// Rolling account window used to request proactive user interaction.
190    pub interaction_window: Duration,
191    /// Emit interaction event at this send count.
192    pub interaction_threshold: usize,
193    /// Expected context lifetime after a user WeChat message.
194    pub context_ttl: Duration,
195    /// Emit context-expiring interaction event this long before expiry.
196    pub context_remind_before: Duration,
197}
198
199impl Default for WechatRateLimitOptions {
200    fn default() -> Self {
201        Self {
202            retry_after: protocol::DEFAULT_RATE_LIMIT_RETRY_AFTER,
203            retry_attempts: 5,
204            interaction_window: Duration::from_secs(5 * 60),
205            interaction_threshold: 6,
206            context_ttl: Duration::from_secs(24 * 60 * 60),
207            context_remind_before: Duration::from_secs(30 * 60),
208        }
209    }
210}
211
212#[derive(Default)]
213struct AccountRateLimitState {
214    sent_at: VecDeque<Instant>,
215    next_allowed_at: Option<Instant>,
216}
217
218struct ContextRefreshState {
219    user_id: String,
220    observed_at: SystemTime,
221    expires_at: SystemTime,
222    reminded: bool,
223}
224
225/// Builder for [`WechatIlinkClient`].
226pub struct WechatIlinkClientBuilder {
227    base_url: Option<String>,
228    bot_agent: Option<String>,
229    ilink_app_id: Option<String>,
230    route_tag: Option<String>,
231    markdown_filter: bool,
232    rate_limit: WechatRateLimitOptions,
233    credentials: Option<Credentials>,
234    http_client: Option<reqwest::Client>,
235}
236
237impl Default for WechatIlinkClientBuilder {
238    fn default() -> Self {
239        Self {
240            base_url: None,
241            bot_agent: None,
242            ilink_app_id: None,
243            route_tag: None,
244            markdown_filter: true,
245            rate_limit: WechatRateLimitOptions::default(),
246            credentials: None,
247            http_client: None,
248        }
249    }
250}
251
252impl WechatIlinkClientBuilder {
253    pub fn base_url(mut self, base_url: impl Into<String>) -> Self {
254        self.base_url = Some(base_url.into());
255        self
256    }
257
258    pub fn bot_agent(mut self, bot_agent: impl Into<String>) -> Self {
259        self.bot_agent = Some(bot_agent.into());
260        self
261    }
262
263    pub fn ilink_app_id(mut self, ilink_app_id: impl Into<String>) -> Self {
264        self.ilink_app_id = Some(ilink_app_id.into());
265        self
266    }
267
268    pub fn route_tag(mut self, route_tag: impl Into<String>) -> Self {
269        self.route_tag = Some(route_tag.into());
270        self
271    }
272
273    pub fn markdown_filter(mut self, enabled: bool) -> Self {
274        self.markdown_filter = enabled;
275        self
276    }
277
278    pub fn rate_limit_options(mut self, options: WechatRateLimitOptions) -> Self {
279        self.rate_limit = options;
280        self
281    }
282
283    pub fn rate_limit_retry_after(mut self, retry_after: Duration) -> Self {
284        self.rate_limit.retry_after = if retry_after.is_zero() {
285            protocol::DEFAULT_RATE_LIMIT_RETRY_AFTER
286        } else {
287            retry_after
288        };
289        self
290    }
291
292    pub fn rate_limit_retry_attempts(mut self, retry_attempts: usize) -> Self {
293        self.rate_limit.retry_attempts = retry_attempts;
294        self
295    }
296
297    pub fn rate_limit_max_retries(mut self, max_retries: usize) -> Self {
298        self.rate_limit.retry_attempts = max_retries;
299        self
300    }
301
302    pub fn context_ttl(mut self, ttl: Duration) -> Self {
303        if !ttl.is_zero() {
304            self.rate_limit.context_ttl = ttl;
305        }
306        self
307    }
308
309    pub fn context_expiry_remind_before(mut self, remind_before: Duration) -> Self {
310        self.rate_limit.context_remind_before = remind_before;
311        self
312    }
313
314    pub fn rate_limit_interaction_window(mut self, window: Duration) -> Self {
315        if !window.is_zero() {
316            self.rate_limit.interaction_window = window;
317        }
318        self
319    }
320
321    pub fn rate_limit_interaction_threshold(mut self, threshold: usize) -> Self {
322        self.rate_limit.interaction_threshold = threshold;
323        self
324    }
325
326    pub fn credentials(mut self, credentials: Credentials) -> Self {
327        self.credentials = Some(credentials);
328        self
329    }
330
331    pub fn http_client(mut self, http_client: reqwest::Client) -> Self {
332        self.http_client = Some(http_client);
333        self
334    }
335
336    pub fn build(self) -> WechatIlinkClient {
337        WechatIlinkClient::from_builder(self)
338    }
339}
340
341/// WechatIlinkClient is the main entry point for the WeChat iLink protocol.
342pub struct WechatIlinkClient {
343    client: Arc<ILinkClient>,
344    cdn: CdnClient,
345    credentials: RwLock<Option<Credentials>>,
346    event_tx: broadcast::Sender<WechatEvent>,
347    rate_limit_states: Mutex<HashMap<String, AccountRateLimitState>>,
348    context_refresh_states: Mutex<HashMap<(String, String), ContextRefreshState>>,
349    rate_limit_notify: Notify,
350    base_url: RwLock<String>,
351    stopped: RwLock<bool>,
352    rate_limit: WechatRateLimitOptions,
353}
354
355impl WechatIlinkClient {
356    /// Create a default client instance.
357    pub fn new() -> Self {
358        Self::builder().build()
359    }
360
361    /// Create a client builder.
362    pub fn builder() -> WechatIlinkClientBuilder {
363        WechatIlinkClientBuilder::default()
364    }
365
366    fn from_builder(builder: WechatIlinkClientBuilder) -> Self {
367        let base_url = builder
368            .credentials
369            .as_ref()
370            .map(|creds| creds.base_url.clone())
371            .or(builder.base_url)
372            .unwrap_or_else(|| protocol::DEFAULT_BASE_URL.to_string());
373        let options = ILinkClientOptions {
374            bot_agent: builder.bot_agent,
375            route_tag: builder.route_tag,
376            ilink_app_id: builder.ilink_app_id,
377            markdown_filter: builder.markdown_filter,
378        };
379        let (client, cdn) = match builder.http_client {
380            Some(http_client) => (
381                ILinkClient::with_http_client_and_options(http_client.clone(), options),
382                CdnClient::with_client(http_client),
383            ),
384            None => (ILinkClient::with_options(options), CdnClient::new()),
385        };
386        Self {
387            client: Arc::new(client),
388            cdn,
389            credentials: RwLock::new(builder.credentials),
390            event_tx: broadcast::channel(EVENT_CHANNEL_CAPACITY).0,
391            rate_limit_states: Mutex::new(HashMap::new()),
392            context_refresh_states: Mutex::new(HashMap::new()),
393            rate_limit_notify: Notify::new(),
394            base_url: RwLock::new(base_url),
395            stopped: RwLock::new(false),
396            rate_limit: builder.rate_limit,
397        }
398    }
399
400    /// Maximum number of QR code refresh attempts before giving up.
401    const MAX_QR_REFRESH: u32 = 3;
402    /// Fixed API base URL for QR code requests.
403    const FIXED_QR_BASE_URL: &'static str = "https://ilinkai.weixin.qq.com";
404
405    /// Install externally loaded credentials into this client.
406    pub async fn set_credentials(&self, creds: Credentials) {
407        *self.base_url.write().await = creds.base_url.clone();
408        *self.credentials.write().await = Some(creds);
409    }
410
411    /// Return the currently installed credentials, if any.
412    pub async fn credentials(&self) -> Option<Credentials> {
413        self.credentials.read().await.clone()
414    }
415
416    /// Start QR login and return a stream of login events.
417    ///
418    /// Applications display [`LoginQrEvent::QrCode`], optionally answer
419    /// [`LoginQrEvent::NeedVerifyCode`], and persist credentials from
420    /// [`LoginQrEvent::Confirmed`].
421    pub fn login_qr(&self) -> LoginQrStream<'_> {
422        LoginQrStream::new(async_stream::stream! {
423            let base_url = self.base_url.read().await.clone();
424            let mut qr_refresh_count = 0u32;
425            loop {
426                qr_refresh_count += 1;
427                if qr_refresh_count > Self::MAX_QR_REFRESH {
428                    yield Err(WechatIlinkError::Auth(format!(
429                        "QR code expired {} times — login aborted",
430                        Self::MAX_QR_REFRESH
431                    )));
432                    return;
433                }
434
435                let qr = match self.client.get_qr_code(Self::FIXED_QR_BASE_URL).await {
436                    Ok(qr) => qr,
437                    Err(err) => {
438                        yield Err(err);
439                        return;
440                    }
441                };
442                yield Ok(LoginQrEvent::QrCode {
443                    content: qr.qrcode_img_content.clone(),
444                });
445
446                let mut last_status = String::new();
447                let mut current_poll_base_url = Self::FIXED_QR_BASE_URL.to_string();
448                let mut pending_verify_code: Option<String> = None;
449                loop {
450                    let status = match self
451                        .client
452                        .poll_qr_status_with_verify_code(
453                            &current_poll_base_url,
454                            &qr.qrcode,
455                            pending_verify_code.as_deref(),
456                        )
457                        .await
458                    {
459                        Ok(status) => status,
460                        Err(err) => {
461                            yield Err(err);
462                            return;
463                        }
464                    };
465
466                    if status.status != last_status {
467                        last_status = status.status.clone();
468                        match status.status.as_str() {
469                            "scaned" => info!("QR scanned — confirm in WeChat"),
470                            "expired" => warn!("QR expired — requesting new one"),
471                            "confirmed" => info!("Login confirmed"),
472                            "need_verifycode" => info!("QR verification code required"),
473                            "verify_code_blocked" => warn!("QR verification code blocked"),
474                            "binded_redirect" => warn!("QR already bound"),
475                            _ => {}
476                        }
477                        yield Ok(LoginQrEvent::StatusChanged {
478                            status: status.status.clone(),
479                        });
480                    }
481
482                    if status.status == "wait" {
483                        sleep(Duration::from_secs(1)).await;
484                        continue;
485                    }
486
487                    if status.status == "need_verifycode" {
488                        let prompt = if pending_verify_code.is_some() {
489                            "QR verification code mismatch"
490                        } else {
491                            "QR verification code required"
492                        };
493                        let (tx, rx) = oneshot::channel();
494                        yield Ok(LoginQrEvent::NeedVerifyCode {
495                            prompt: prompt.to_string(),
496                            responder: VerifyCodeResponder { tx },
497                        });
498                        let code = match rx.await {
499                            Ok(Some(value)) => value.trim().to_string(),
500                            _ => String::new(),
501                        };
502                        if code.is_empty() {
503                            yield Err(WechatIlinkError::Auth(
504                                "QR verification code was not provided".into(),
505                            ));
506                            return;
507                        }
508                        pending_verify_code = Some(code);
509                        continue;
510                    }
511
512                    if status.status == "verify_code_blocked" {
513                        break;
514                    }
515
516                    if status.status == "binded_redirect" {
517                        yield Err(WechatIlinkError::Auth(
518                            "QR login is already bound to this app".into(),
519                        ));
520                        return;
521                    }
522
523                    if status.status == "confirmed" {
524                        let token = match status.bot_token {
525                            Some(token) => token,
526                            None => {
527                                yield Err(WechatIlinkError::Auth("missing bot_token".into()));
528                                return;
529                            }
530                        };
531                        let creds = Credentials {
532                            token,
533                            base_url: status.baseurl.unwrap_or_else(|| base_url.clone()),
534                            account_id: status.ilink_bot_id.unwrap_or_default(),
535                            user_id: status.ilink_user_id.unwrap_or_default(),
536                            saved_at: Some(chrono_now()),
537                        };
538                        *self.credentials.write().await = Some(creds.clone());
539                        *self.base_url.write().await = creds.base_url.clone();
540                        yield Ok(LoginQrEvent::Confirmed { credentials: creds });
541                        return;
542                    }
543
544                    if status.status == "scaned_but_redirect" {
545                        if let Some(ref host) = status.redirect_host {
546                            current_poll_base_url = format!("https://{}", host);
547                            info!("IDC redirect, switching polling host to {}", host);
548                        } else {
549                            warn!("Received scaned_but_redirect but redirect_host is missing");
550                        }
551                        sleep(Duration::from_secs(2)).await;
552                        continue;
553                    }
554
555                    if status.status == "scaned" && pending_verify_code.is_some() {
556                        pending_verify_code = None;
557                    }
558
559                    if status.status == "expired" {
560                        break;
561                    }
562
563                    sleep(Duration::from_secs(2)).await;
564                }
565            }
566        })
567    }
568
569    /// Start polling from an externally persisted cursor and stream resulting events.
570    pub fn events_from_cursor(
571        self: Arc<Self>,
572        cursor: Option<String>,
573    ) -> WechatEventStream<'static> {
574        let mut rx = self.event_tx.subscribe();
575        let runner = Arc::clone(&self);
576        let mut task =
577            tokio::spawn(async move { runner.run_loop(cursor.unwrap_or_default()).await });
578        let abort_on_drop = AbortOnDrop(task.abort_handle());
579        WechatEventStream::new(async_stream::stream! {
580            let _abort_on_drop = abort_on_drop;
581            loop {
582                tokio::select! {
583                    biased;
584                    result = &mut task => {
585                        match result {
586                            Ok(Ok(())) => return,
587                            Ok(Err(err)) => {
588                                yield Err(err);
589                                return;
590                            }
591                            Err(err) => {
592                                yield Err(WechatIlinkError::Other(format!(
593                                    "wechat poll task failed: {err}"
594                                )));
595                                return;
596                            }
597                        }
598                    }
599                    event = rx.recv() => match event {
600                        Ok(event) => yield Ok(event),
601                        Err(broadcast::error::RecvError::Lagged(skipped)) => {
602                            yield Err(WechatIlinkError::Other(format!(
603                                "wechat event stream lagged by {skipped} events"
604                            )));
605                        }
606                        Err(broadcast::error::RecvError::Closed) => return,
607                    }
608                }
609            }
610        })
611    }
612
613    /// Emit an event to active streams.
614    fn emit_event(&self, event: WechatEvent) {
615        let _ = self.event_tx.send(event);
616    }
617
618    /// Reply to an incoming message using its observed context.
619    ///
620    /// Returns an error if the incoming message has no context (e.g. the
621    /// context_token was empty in the wire message).
622    pub async fn reply(&self, msg: &IncomingMessage, text: &str) -> Result<SendReceipt> {
623        let context = msg
624            .context
625            .as_ref()
626            .ok_or_else(|| WechatIlinkError::NoContext(msg.user_id.clone()))?;
627        self.send_text_with_context(context, text).await
628    }
629
630    /// Send text using an explicit [`WechatContext`].
631    ///
632    /// The caller is responsible for supplying a valid context. This is the
633    /// preferred way to send a reply when you already hold the context from an
634    /// [`IncomingMessage`].
635    pub async fn send_text_with_context(
636        &self,
637        context: &WechatContext,
638        text: &str,
639    ) -> Result<SendReceipt> {
640        self.send_text(&context.user_id, text, &context.context_token)
641            .await
642    }
643
644    /// Send media content using an explicit [`WechatContext`].
645    ///
646    /// The caller is responsible for supplying a valid context.
647    pub async fn send_media_with_context(
648        &self,
649        context: &WechatContext,
650        content: SendContent,
651    ) -> Result<SendReceipt> {
652        self.send_content(&context.user_id, &context.context_token, content)
653            .await
654    }
655
656    /// Show "typing..." indicator using an explicit [`WechatContext`].
657    ///
658    /// The caller is responsible for supplying a valid context.
659    pub async fn send_typing_with_context(&self, context: &WechatContext) -> Result<()> {
660        let (base_url, token) = self.get_auth().await?;
661        let config = self
662            .retry_rate_limited(|| {
663                self.client
664                    .get_config(&base_url, &token, &context.user_id, &context.context_token)
665            })
666            .await?;
667        if let Some(ticket) = config.typing_ticket {
668            self.retry_rate_limited(|| {
669                self.client
670                    .send_typing(&base_url, &token, &context.user_id, &ticket, 1)
671            })
672            .await?;
673        }
674        Ok(())
675    }
676
677    /// Reply with media content using the incoming message's observed context.
678    ///
679    /// Returns an error if the incoming message has no context.
680    pub async fn reply_media(
681        &self,
682        msg: &IncomingMessage,
683        content: SendContent,
684    ) -> Result<SendReceipt> {
685        let context = msg
686            .context
687            .as_ref()
688            .ok_or_else(|| WechatIlinkError::NoContext(msg.user_id.clone()))?;
689        self.send_media_with_context(context, content).await
690    }
691
692    /// Download media from an incoming message.
693    /// Returns None if the message has no media. Priority: image > file > video > voice.
694    pub async fn download(&self, msg: &IncomingMessage) -> Result<Option<DownloadedMedia>> {
695        if let Some(img) = msg.images.first() {
696            if let Some(ref media) = img.media {
697                let data = self.cdn.download(media, img.aes_key.as_deref()).await?;
698                return Ok(Some(DownloadedMedia {
699                    data,
700                    media_type: "image".into(),
701                    file_name: None,
702                    format: None,
703                }));
704            }
705        }
706        if let Some(file) = msg.files.first() {
707            if let Some(ref media) = file.media {
708                require_download_aes_key("file", media)?;
709                let data = self.cdn.download(media, None).await?;
710                return Ok(Some(DownloadedMedia {
711                    data,
712                    media_type: "file".into(),
713                    file_name: Some(file.file_name.clone().unwrap_or_else(|| "file.bin".into())),
714                    format: None,
715                }));
716            }
717        }
718        if let Some(video) = msg.videos.first() {
719            if let Some(ref media) = video.media {
720                require_download_aes_key("video", media)?;
721                let data = self.cdn.download(media, None).await?;
722                return Ok(Some(DownloadedMedia {
723                    data,
724                    media_type: "video".into(),
725                    file_name: None,
726                    format: None,
727                }));
728            }
729        }
730        if let Some(voice) = msg.voices.first() {
731            if let Some(ref media) = voice.media {
732                require_download_aes_key("voice", media)?;
733                let data = self.cdn.download(media, None).await?;
734                return Ok(Some(DownloadedMedia {
735                    data,
736                    media_type: "voice".into(),
737                    file_name: None,
738                    format: Some("silk".into()),
739                }));
740            }
741        }
742        Ok(None)
743    }
744
745    /// Download and decrypt a raw CDN media reference.
746    pub async fn download_raw(
747        &self,
748        media: &CDNMedia,
749        aeskey_override: Option<&str>,
750    ) -> Result<Vec<u8>> {
751        self.cdn.download(media, aeskey_override).await
752    }
753
754    /// Upload data to WeChat CDN without sending a message.
755    pub async fn upload(
756        &self,
757        data: &[u8],
758        user_id: &str,
759        media_type: i32,
760    ) -> Result<UploadResult> {
761        let (base_url, token) = self.get_auth().await?;
762        self.cdn_upload(&base_url, &token, data, user_id, media_type)
763            .await
764    }
765
766    async fn run_loop(&self, mut cursor: String) -> Result<()> {
767        *self.stopped.write().await = false;
768        info!("Long-poll loop started");
769        let mut retry_delay = Duration::from_secs(1);
770
771        if let Ok((base_url, token)) = self.get_auth().await {
772            match self.client.notify_start(&base_url, &token).await {
773                Ok(resp) if resp.ret.is_some_and(|ret| ret != 0) => warn!(
774                    "notify_start returned ret={:?} errmsg={:?}",
775                    resp.ret, resp.errmsg
776                ),
777                Ok(_) => {}
778                Err(err) => warn!("notify_start failed: {}", err),
779            }
780        }
781
782        loop {
783            if *self.stopped.read().await {
784                break;
785            }
786
787            let (base_url, token) = self.get_auth().await?;
788
789            match self.client.get_updates(&base_url, &token, &cursor).await {
790                Ok(updates) => {
791                    let account_key = self.account_key().await;
792                    let new_cursor = updates.get_updates_buf.clone();
793                    let mut cursor_changed = false;
794                    if !new_cursor.is_empty() && cursor.as_str() != new_cursor.as_str() {
795                        cursor_changed = true;
796                        cursor = new_cursor.clone();
797                    }
798                    retry_delay = Duration::from_secs(1);
799
800                    for wire in &updates.msgs {
801                        if let Some(incoming) =
802                            IncomingMessage::from_wire_for_account(wire, &account_key)
803                        {
804                            self.reset_account_rate_limit(&account_key).await;
805                            if let Some(context) = incoming.context.clone() {
806                                self.observe_context_for_interaction(&context).await;
807                                self.emit_event(WechatEvent::ContextObserved(context));
808                            }
809                            self.emit_event(WechatEvent::Message(incoming.clone()));
810                        }
811                    }
812                    self.emit_due_context_interaction_requests().await;
813                    if cursor_changed {
814                        self.emit_event(WechatEvent::CursorAdvanced {
815                            account_key: account_key.clone(),
816                            cursor: new_cursor.clone(),
817                        });
818                    }
819                }
820                Err(e) if e.is_session_expired() => {
821                    warn!("Session expired — re-login required");
822                    let account_key = self.account_key().await;
823                    self.emit_event(WechatEvent::AuthSessionExpired {
824                        account_key: account_key.clone(),
825                    });
826                    cursor.clear();
827                    break;
828                }
829                Err(e) if e.is_rate_limited() => {
830                    self.report_error(&e);
831                    sleep(self.rate_limit.retry_after).await;
832                    retry_delay = Duration::from_secs(1);
833                    continue;
834                }
835                Err(e) => {
836                    self.report_error(&e);
837                    sleep(retry_delay).await;
838                    retry_delay = std::cmp::min(retry_delay * 2, Duration::from_secs(10));
839                    continue;
840                }
841            }
842        }
843
844        info!("Long-poll loop stopped");
845        if let Ok((base_url, token)) = self.get_auth().await {
846            match self.client.notify_stop(&base_url, &token).await {
847                Ok(resp) if resp.ret.is_some_and(|ret| ret != 0) => warn!(
848                    "notify_stop returned ret={:?} errmsg={:?}",
849                    resp.ret, resp.errmsg
850                ),
851                Ok(_) => {}
852                Err(err) => warn!("notify_stop failed: {}", err),
853            }
854        }
855        Ok(())
856    }
857
858    async fn retry_rate_limited<F, Fut, T>(&self, mut operation: F) -> Result<T>
859    where
860        F: FnMut() -> Fut,
861        Fut: Future<Output = Result<T>>,
862    {
863        let account_key = self.account_key().await;
864        if let Some(wait) = self.active_rate_limit_backoff(&account_key).await {
865            return Err(rate_limited_error(wait));
866        }
867
868        let mut retries = 0usize;
869        loop {
870            match operation().await {
871                Ok(value) => return Ok(value),
872                Err(err) if err.is_rate_limited() => {
873                    let err = with_rate_limit_retry_after(err, self.rate_limit.retry_after);
874                    self.defer_account_rate_limit(&account_key, self.rate_limit.retry_after)
875                        .await;
876                    if retries >= self.rate_limit.retry_attempts {
877                        return Err(err);
878                    }
879                    retries += 1;
880                    self.wait_for_rate_limit_reset_or_timeout(
881                        &account_key,
882                        self.rate_limit.retry_after,
883                    )
884                    .await;
885                }
886                Err(err) => return Err(err),
887            }
888        }
889    }
890
891    async fn active_rate_limit_backoff(&self, account_key: &str) -> Option<Duration> {
892        if account_key.is_empty() {
893            return None;
894        }
895        let mut states = self.rate_limit_states.lock().await;
896        let state = states.entry(account_key.to_string()).or_default();
897        match state.next_allowed_at {
898            Some(next_allowed_at) => match next_allowed_at.checked_duration_since(Instant::now()) {
899                Some(wait) if !wait.is_zero() => Some(wait),
900                _ => {
901                    state.next_allowed_at = None;
902                    None
903                }
904            },
905            None => None,
906        }
907    }
908
909    async fn wait_for_rate_limit_reset_or_timeout(&self, account_key: &str, wait: Duration) {
910        if account_key.is_empty() || wait.is_zero() {
911            return;
912        }
913        let _ = tokio::time::timeout(wait, self.rate_limit_notify.notified()).await;
914    }
915
916    async fn defer_account_rate_limit(&self, account_key: &str, retry_after: Duration) {
917        if account_key.is_empty() {
918            return;
919        }
920        let next_allowed_at = Instant::now() + retry_after;
921        let mut states = self.rate_limit_states.lock().await;
922        let state = states.entry(account_key.to_string()).or_default();
923        if state
924            .next_allowed_at
925            .map_or(true, |current| current < next_allowed_at)
926        {
927            state.next_allowed_at = Some(next_allowed_at);
928        }
929    }
930
931    async fn reset_account_rate_limit(&self, account_key: &str) {
932        if account_key.is_empty() {
933            return;
934        }
935        let mut states = self.rate_limit_states.lock().await;
936        let state = states.entry(account_key.to_string()).or_default();
937        state.sent_at.clear();
938        state.next_allowed_at = None;
939        drop(states);
940        self.rate_limit_notify.notify_waiters();
941    }
942
943    async fn observe_context_for_interaction(&self, context: &WechatContext) {
944        let observed_at = SystemTime::now();
945        let expires_at = observed_at + self.rate_limit.context_ttl;
946        self.context_refresh_states.lock().await.insert(
947            (context.account_key.clone(), context.user_id.clone()),
948            ContextRefreshState {
949                user_id: context.user_id.clone(),
950                observed_at,
951                expires_at,
952                reminded: false,
953            },
954        );
955    }
956
957    async fn emit_due_context_interaction_requests(&self) {
958        if self.rate_limit.context_ttl.is_zero() {
959            return;
960        }
961        let now = SystemTime::now();
962        let mut events = Vec::new();
963        {
964            let mut contexts = self.context_refresh_states.lock().await;
965            contexts.retain(|_, state| !(state.reminded && now > state.expires_at));
966            for ((account_key, _), state) in contexts.iter_mut() {
967                if state.reminded {
968                    continue;
969                }
970                let remind_at = state
971                    .expires_at
972                    .checked_sub(self.rate_limit.context_remind_before)
973                    .unwrap_or(state.observed_at);
974                if now >= remind_at {
975                    state.reminded = true;
976                    events.push(WechatEvent::UserInteractionRequested {
977                        account_key: account_key.clone(),
978                        user_id: Some(state.user_id.clone()),
979                        reason: UserInteractionReason::ContextExpiring {
980                            observed_at: state.observed_at,
981                            expires_at: state.expires_at,
982                            remind_before: self.rate_limit.context_remind_before,
983                        },
984                    });
985                }
986            }
987        }
988        for event in events {
989            self.emit_event(event);
990        }
991    }
992
993    async fn record_successful_message_send(&self) {
994        let account_key = self.account_key().await;
995        if account_key.is_empty() || self.rate_limit.interaction_threshold == 0 {
996            return;
997        }
998
999        let now = Instant::now();
1000        let mut event = None;
1001        {
1002            let mut states = self.rate_limit_states.lock().await;
1003            let state = states.entry(account_key.clone()).or_default();
1004            while let Some(sent_at) = state.sent_at.front().copied() {
1005                if now.duration_since(sent_at) > self.rate_limit.interaction_window {
1006                    state.sent_at.pop_front();
1007                } else {
1008                    break;
1009                }
1010            }
1011            state.sent_at.push_back(now);
1012            let sent_count = state.sent_at.len();
1013            if sent_count == self.rate_limit.interaction_threshold {
1014                event = Some(WechatEvent::UserInteractionRequested {
1015                    account_key,
1016                    user_id: None,
1017                    reason: UserInteractionReason::OutboundRateLimitApproaching {
1018                        sent_count,
1019                        window: self.rate_limit.interaction_window,
1020                        threshold: self.rate_limit.interaction_threshold,
1021                    },
1022                });
1023            }
1024        }
1025
1026        if let Some(event) = event {
1027            self.emit_event(event);
1028        }
1029    }
1030
1031    /// Stop the bot.
1032    pub async fn stop(&self) {
1033        *self.stopped.write().await = true;
1034    }
1035
1036    // --- internal media ---
1037
1038    fn send_content<'a>(
1039        &'a self,
1040        user_id: &'a str,
1041        context_token: &'a str,
1042        content: SendContent,
1043    ) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<SendReceipt>> + Send + 'a>> {
1044        Box::pin(async move {
1045            let (base_url, token) = self.get_auth().await?;
1046            match content {
1047                SendContent::Text(text) => self.send_text(user_id, &text, context_token).await,
1048                SendContent::Image { data, caption } => {
1049                    let result = self
1050                        .cdn_upload(&base_url, &token, &data, user_id, 1)
1051                        .await?;
1052                    let mut receipt = SendReceipt::empty();
1053                    if let Some(cap) = caption {
1054                        receipt.append(self.send_text(user_id, &cap, context_token).await?);
1055                    }
1056                    let item = json!({"type": 2, "image_item": {
1057                        "media": cdn_media_json(&result.media),
1058                        "mid_size": result.encrypted_file_size,
1059                    }});
1060                    receipt.append(
1061                        self.send_media_item(&base_url, &token, user_id, context_token, item)
1062                            .await?,
1063                    );
1064                    Ok(receipt)
1065                }
1066                SendContent::Video { data, caption } => {
1067                    let result = self
1068                        .cdn_upload(&base_url, &token, &data, user_id, 2)
1069                        .await?;
1070                    let mut receipt = SendReceipt::empty();
1071                    if let Some(cap) = caption {
1072                        receipt.append(self.send_text(user_id, &cap, context_token).await?);
1073                    }
1074                    let item = json!({"type": 5, "video_item": {
1075                        "media": cdn_media_json(&result.media),
1076                        "video_size": result.encrypted_file_size,
1077                    }});
1078                    receipt.append(
1079                        self.send_media_item(&base_url, &token, user_id, context_token, item)
1080                            .await?,
1081                    );
1082                    Ok(receipt)
1083                }
1084                SendContent::File {
1085                    data,
1086                    file_name,
1087                    caption,
1088                } => {
1089                    let cat = categorize_by_extension(&file_name);
1090                    match cat {
1091                        "image" => {
1092                            self.send_content(
1093                                user_id,
1094                                context_token,
1095                                SendContent::Image { data, caption },
1096                            )
1097                            .await
1098                        }
1099                        "video" => {
1100                            self.send_content(
1101                                user_id,
1102                                context_token,
1103                                SendContent::Video { data, caption },
1104                            )
1105                            .await
1106                        }
1107                        _ => {
1108                            let mut receipt = SendReceipt::empty();
1109                            if let Some(cap) = caption {
1110                                receipt.append(self.send_text(user_id, &cap, context_token).await?);
1111                            }
1112                            let data_len = data.len();
1113                            let result = self
1114                                .cdn_upload(&base_url, &token, &data, user_id, 3)
1115                                .await?;
1116                            let item = json!({"type": 4, "file_item": {
1117                                "media": cdn_media_json(&result.media),
1118                                "file_name": file_name,
1119                                "len": data_len.to_string(),
1120                            }});
1121                            receipt.append(
1122                                self.send_media_item(
1123                                    &base_url,
1124                                    &token,
1125                                    user_id,
1126                                    context_token,
1127                                    item,
1128                                )
1129                                .await?,
1130                            );
1131                            Ok(receipt)
1132                        }
1133                    }
1134                }
1135            }
1136        })
1137    }
1138
1139    async fn send_media_item(
1140        &self,
1141        base_url: &str,
1142        token: &str,
1143        user_id: &str,
1144        context_token: &str,
1145        item: serde_json::Value,
1146    ) -> Result<SendReceipt> {
1147        let client_id = Uuid::new_v4().to_string();
1148        let msg = protocol::build_media_message_with_client_id(
1149            user_id,
1150            context_token,
1151            vec![item],
1152            &client_id,
1153        );
1154        self.retry_rate_limited(|| self.client.send_message(base_url, token, &msg))
1155            .await?;
1156        self.record_successful_message_send().await;
1157        Ok(SendReceipt::single(client_id))
1158    }
1159
1160    async fn cdn_upload(
1161        &self,
1162        base_url: &str,
1163        token: &str,
1164        data: &[u8],
1165        user_id: &str,
1166        media_type: i32,
1167    ) -> Result<UploadResult> {
1168        let aes_key = crypto::generate_aes_key();
1169        let ciphertext = crypto::encrypt_aes_ecb(data, &aes_key);
1170
1171        let mut filekey_buf = [0u8; 16];
1172        rand::rng().fill_bytes(&mut filekey_buf);
1173        let filekey = hex::encode(filekey_buf);
1174
1175        let raw_md5 = hex::encode(Md5::digest(data));
1176
1177        let params = protocol::GetUploadUrlParams {
1178            filekey: filekey.clone(),
1179            media_type,
1180            to_user_id: user_id.to_string(),
1181            rawsize: data.len(),
1182            rawfilemd5: raw_md5,
1183            filesize: ciphertext.len(),
1184            thumb_rawsize: None,
1185            thumb_rawfilemd5: None,
1186            thumb_filesize: None,
1187            no_need_thumb: true,
1188            aeskey: crypto::encode_aes_key_hex(&aes_key),
1189        };
1190
1191        let upload_resp = self.client.get_upload_url(base_url, token, &params).await?;
1192        let upload_url = resolve_cdn_upload_url(&upload_resp, &filekey)?;
1193
1194        let encrypted_file_size = ciphertext.len();
1195
1196        let encrypt_query_param = self.client.upload_to_cdn(&upload_url, &ciphertext).await?;
1197
1198        Ok(UploadResult {
1199            media: CDNMedia {
1200                encrypt_query_param: Some(encrypt_query_param),
1201                aes_key: Some(crypto::encode_aes_key_base64(&aes_key)),
1202                encrypt_type: Some(1),
1203                full_url: None,
1204            },
1205            aes_key,
1206            encrypted_file_size,
1207        })
1208    }
1209
1210    // --- internal text ---
1211
1212    async fn send_text(
1213        &self,
1214        user_id: &str,
1215        text: &str,
1216        context_token: &str,
1217    ) -> Result<SendReceipt> {
1218        let (base_url, token) = self.get_auth().await?;
1219        let mut message_ids = Vec::new();
1220        let mut visible_texts = Vec::new();
1221        for chunk in chunk_text(text, 4000) {
1222            let client_id = Uuid::new_v4().to_string();
1223            let msg = self.client.build_text_message_with_client_id(
1224                user_id,
1225                context_token,
1226                &chunk,
1227                &client_id,
1228            );
1229            self.retry_rate_limited(|| self.client.send_message(&base_url, &token, &msg))
1230                .await?;
1231            self.record_successful_message_send().await;
1232            message_ids.push(client_id);
1233            visible_texts.push(chunk);
1234        }
1235        Ok(SendReceipt {
1236            message_ids,
1237            visible_texts,
1238        })
1239    }
1240
1241    async fn get_auth(&self) -> Result<(String, String)> {
1242        let creds = self.credentials.read().await;
1243        let creds = creds
1244            .as_ref()
1245            .ok_or_else(|| WechatIlinkError::Auth("not logged in".into()))?;
1246        Ok((creds.base_url.clone(), creds.token.clone()))
1247    }
1248
1249    async fn account_key(&self) -> String {
1250        let creds = self.credentials.read().await;
1251        match creds.as_ref() {
1252            Some(c) => context_account_key(c).to_string(),
1253            None => String::new(),
1254        }
1255    }
1256
1257    fn report_error(&self, err: &WechatIlinkError) {
1258        error!("{}", err);
1259    }
1260}
1261
1262#[cfg(test)]
1263fn event_stream_from_receiver(
1264    mut rx: broadcast::Receiver<WechatEvent>,
1265) -> WechatEventStream<'static> {
1266    WechatEventStream::new(async_stream::stream! {
1267        loop {
1268            match rx.recv().await {
1269                Ok(event) => yield Ok(event),
1270                Err(broadcast::error::RecvError::Lagged(skipped)) => {
1271                    yield Err(WechatIlinkError::Other(format!(
1272                        "wechat event stream lagged by {skipped} events"
1273                    )));
1274                }
1275                Err(broadcast::error::RecvError::Closed) => return,
1276            }
1277        }
1278    })
1279}
1280
1281struct AbortOnDrop(tokio::task::AbortHandle);
1282
1283impl Drop for AbortOnDrop {
1284    fn drop(&mut self) {
1285        self.0.abort();
1286    }
1287}
1288
1289fn rate_limited_error(retry_after: Duration) -> WechatIlinkError {
1290    WechatIlinkError::RateLimited {
1291        retry_after,
1292        message: "ret=-2".to_string(),
1293        http_status: 200,
1294        errcode: -2,
1295    }
1296}
1297
1298fn with_rate_limit_retry_after(err: WechatIlinkError, retry_after: Duration) -> WechatIlinkError {
1299    match err {
1300        WechatIlinkError::RateLimited {
1301            message,
1302            http_status,
1303            errcode,
1304            ..
1305        } => WechatIlinkError::RateLimited {
1306            retry_after,
1307            message,
1308            http_status,
1309            errcode,
1310        },
1311        other => other,
1312    }
1313}
1314
1315/// Content to send via reply_media / send_media_with_context.
1316pub enum SendContent {
1317    Text(String),
1318    Image {
1319        data: Vec<u8>,
1320        caption: Option<String>,
1321    },
1322    Video {
1323        data: Vec<u8>,
1324        caption: Option<String>,
1325    },
1326    File {
1327        data: Vec<u8>,
1328        file_name: String,
1329        caption: Option<String>,
1330    },
1331}
1332
1333fn cdn_media_json(media: &CDNMedia) -> serde_json::Value {
1334    let mut v = json!({});
1335    if let Some(param) = &media.encrypt_query_param {
1336        v["encrypt_query_param"] = json!(param);
1337    }
1338    if let Some(key) = &media.aes_key {
1339        v["aes_key"] = json!(key);
1340    }
1341    if let Some(et) = media.encrypt_type {
1342        v["encrypt_type"] = json!(et);
1343    }
1344    if let Some(url) = &media.full_url {
1345        v["full_url"] = json!(url);
1346    }
1347    v
1348}
1349
1350fn resolve_cdn_upload_url(
1351    response: &protocol::GetUploadUrlResponse,
1352    filekey: &str,
1353) -> Result<String> {
1354    if let Some(upload_full_url) = response
1355        .upload_full_url
1356        .as_deref()
1357        .filter(|value| !value.is_empty())
1358    {
1359        return Ok(upload_full_url.to_string());
1360    }
1361    if let Some(upload_param) = response
1362        .upload_param
1363        .as_deref()
1364        .filter(|value| !value.is_empty())
1365    {
1366        return Ok(protocol::build_cdn_upload_url(
1367            protocol::CDN_BASE_URL,
1368            upload_param,
1369            filekey,
1370        ));
1371    }
1372    Err(WechatIlinkError::Media(
1373        "getuploadurl did not return upload_full_url or upload_param".into(),
1374    ))
1375}
1376
1377fn require_download_aes_key(media_type: &str, media: &CDNMedia) -> Result<()> {
1378    if media
1379        .aes_key
1380        .as_deref()
1381        .is_some_and(|value| !value.is_empty())
1382    {
1383        return Ok(());
1384    }
1385    Err(WechatIlinkError::Media(format!(
1386        "{} CDN media missing AES key",
1387        media_type
1388    )))
1389}
1390
1391fn categorize_by_extension(filename: &str) -> &'static str {
1392    let ext = Path::new(filename)
1393        .extension()
1394        .and_then(|e| e.to_str())
1395        .unwrap_or("")
1396        .to_lowercase();
1397    match ext.as_str() {
1398        "png" | "jpg" | "jpeg" | "gif" | "webp" | "bmp" | "svg" => "image",
1399        "mp4" | "mov" | "webm" | "mkv" | "avi" => "video",
1400        _ => "file",
1401    }
1402}
1403
1404fn chunk_text(text: &str, limit: usize) -> Vec<String> {
1405    if text.len() <= limit {
1406        return vec![text.to_string()];
1407    }
1408    let mut chunks = Vec::new();
1409    let mut remaining = text;
1410    while !remaining.is_empty() {
1411        if remaining.len() <= limit {
1412            chunks.push(remaining.to_string());
1413            break;
1414        }
1415        let window_end = char_boundary_at_or_before(remaining, limit);
1416        let window = &remaining[..window_end];
1417        let cut = window
1418            .rfind("\n\n")
1419            .filter(|&i| i > window_end * 3 / 10)
1420            .map(|i| i + 2)
1421            .or_else(|| {
1422                window
1423                    .rfind('\n')
1424                    .filter(|&i| i > window_end * 3 / 10)
1425                    .map(|i| i + 1)
1426            })
1427            .or_else(|| {
1428                window
1429                    .rfind(' ')
1430                    .filter(|&i| i > window_end * 3 / 10)
1431                    .map(|i| i + 1)
1432            })
1433            .unwrap_or(window_end);
1434        chunks.push(remaining[..cut].to_string());
1435        remaining = &remaining[cut..];
1436    }
1437    if chunks.is_empty() {
1438        vec![String::new()]
1439    } else {
1440        chunks
1441    }
1442}
1443
1444fn char_boundary_at_or_before(text: &str, limit: usize) -> usize {
1445    let mut end = limit.min(text.len());
1446    while !text.is_char_boundary(end) {
1447        end -= 1;
1448    }
1449    end
1450}
1451
1452fn context_account_key(creds: &Credentials) -> &str {
1453    if !creds.account_id.is_empty() {
1454        &creds.account_id
1455    } else {
1456        &creds.user_id
1457    }
1458}
1459
1460fn chrono_now() -> String {
1461    // Lightweight timestamp string without a chrono dependency.
1462    let dur = SystemTime::now().duration_since(UNIX_EPOCH).unwrap();
1463    format!("{}Z", dur.as_secs())
1464}
1465
1466#[cfg(test)]
1467mod tests {
1468    use super::*;
1469
1470    #[test]
1471    fn builder_accepts_caller_provided_reqwest_client() {
1472        let http_client = reqwest::Client::new();
1473        let _client = WechatIlinkClient::builder()
1474            .http_client(http_client)
1475            .build();
1476    }
1477
1478    #[test]
1479    fn chunk_text_short() {
1480        let chunks = chunk_text("hello", 100);
1481        assert_eq!(chunks, vec!["hello"]);
1482    }
1483
1484    #[test]
1485    fn chunk_text_empty() {
1486        let chunks = chunk_text("", 100);
1487        assert_eq!(chunks, vec![""]);
1488    }
1489
1490    #[test]
1491    fn chunk_text_splits_on_paragraph() {
1492        let text = "aaaa\n\nbbbb";
1493        let chunks = chunk_text(text, 7);
1494        assert_eq!(chunks, vec!["aaaa\n\n", "bbbb"]);
1495    }
1496
1497    #[test]
1498    fn chunk_text_splits_on_newline() {
1499        let text = "aaaa\nbbbb";
1500        let chunks = chunk_text(text, 7);
1501        assert_eq!(chunks, vec!["aaaa\n", "bbbb"]);
1502    }
1503
1504    #[test]
1505    fn chunk_text_exact_limit() {
1506        let text = "abcdef";
1507        let chunks = chunk_text(text, 6);
1508        assert_eq!(chunks, vec!["abcdef"]);
1509    }
1510
1511    #[test]
1512    fn chunk_text_does_not_split_utf8_char_boundary() {
1513        let mut text = "a".repeat(3999);
1514        text.push('启');
1515        text.push_str("tail");
1516
1517        let chunks = chunk_text(&text, 4000);
1518
1519        assert_eq!(chunks.concat(), text);
1520        assert!(chunks.iter().all(|chunk| chunk.len() <= 4000));
1521    }
1522
1523    #[test]
1524    fn categorize_image_extensions() {
1525        assert_eq!(categorize_by_extension("photo.png"), "image");
1526        assert_eq!(categorize_by_extension("photo.JPG"), "image");
1527        assert_eq!(categorize_by_extension("anim.gif"), "image");
1528        assert_eq!(categorize_by_extension("pic.webp"), "image");
1529    }
1530
1531    #[test]
1532    fn categorize_video_extensions() {
1533        assert_eq!(categorize_by_extension("clip.mp4"), "video");
1534        assert_eq!(categorize_by_extension("clip.MOV"), "video");
1535        assert_eq!(categorize_by_extension("movie.webm"), "video");
1536    }
1537
1538    #[test]
1539    fn categorize_file_extensions() {
1540        assert_eq!(categorize_by_extension("report.pdf"), "file");
1541        assert_eq!(categorize_by_extension("data.csv"), "file");
1542        assert_eq!(categorize_by_extension("noext"), "file");
1543    }
1544
1545    #[test]
1546    fn cdn_media_json_with_encrypt_type() {
1547        let media = CDNMedia {
1548            encrypt_query_param: Some("param=1".to_string()),
1549            aes_key: Some("key123".to_string()),
1550            encrypt_type: Some(1),
1551            full_url: None,
1552        };
1553        let j = cdn_media_json(&media);
1554        assert_eq!(j["encrypt_query_param"], "param=1");
1555        assert_eq!(j["aes_key"], "key123");
1556        assert_eq!(j["encrypt_type"], 1);
1557    }
1558
1559    #[test]
1560    fn cdn_media_json_without_encrypt_type() {
1561        let media = CDNMedia {
1562            encrypt_query_param: Some("p".to_string()),
1563            aes_key: Some("k".to_string()),
1564            encrypt_type: None,
1565            full_url: None,
1566        };
1567        let j = cdn_media_json(&media);
1568        assert!(j.get("encrypt_type").is_none());
1569    }
1570
1571    #[test]
1572    fn upload_url_prefers_full_url() {
1573        let response = protocol::GetUploadUrlResponse {
1574            upload_param: Some("param".to_string()),
1575            thumb_upload_param: None,
1576            upload_full_url: Some("https://cdn.example/upload".to_string()),
1577        };
1578
1579        let upload_url = resolve_cdn_upload_url(&response, "file-key").unwrap();
1580        assert_eq!(upload_url, "https://cdn.example/upload");
1581    }
1582
1583    #[test]
1584    fn upload_url_falls_back_to_upload_param() {
1585        let response = protocol::GetUploadUrlResponse {
1586            upload_param: Some("param value".to_string()),
1587            thumb_upload_param: None,
1588            upload_full_url: None,
1589        };
1590
1591        let upload_url = resolve_cdn_upload_url(&response, "file key").unwrap();
1592        assert_eq!(
1593            upload_url,
1594            format!(
1595                "{}/upload?encrypted_query_param=param%20value&filekey=file%20key",
1596                protocol::CDN_BASE_URL
1597            )
1598        );
1599    }
1600
1601    #[test]
1602    fn upload_url_requires_full_url_or_upload_param() {
1603        let response = protocol::GetUploadUrlResponse {
1604            upload_param: None,
1605            thumb_upload_param: None,
1606            upload_full_url: None,
1607        };
1608
1609        let err = resolve_cdn_upload_url(&response, "file-key").unwrap_err();
1610        assert!(matches!(err, WechatIlinkError::Media(_)));
1611    }
1612
1613    #[test]
1614    fn non_image_download_requires_media_aes_key() {
1615        let media = CDNMedia {
1616            encrypt_query_param: Some("param".to_string()),
1617            aes_key: None,
1618            encrypt_type: Some(1),
1619            full_url: None,
1620        };
1621
1622        let err = require_download_aes_key("voice", &media).unwrap_err();
1623        assert!(matches!(err, WechatIlinkError::Media(_)));
1624    }
1625
1626    #[test]
1627    fn non_image_download_accepts_media_aes_key() {
1628        let media = CDNMedia {
1629            encrypt_query_param: Some("param".to_string()),
1630            aes_key: Some("key".to_string()),
1631            encrypt_type: Some(1),
1632            full_url: None,
1633        };
1634
1635        require_download_aes_key("file", &media).unwrap();
1636    }
1637
1638    #[tokio::test]
1639    async fn events_from_cursor_reports_missing_auth_without_storing_cursor() {
1640        let client = Arc::new(WechatIlinkClient::new());
1641        let mut events = client.events_from_cursor(Some("external-cursor".to_string()));
1642
1643        let err = events
1644            .next()
1645            .await
1646            .expect("stream item")
1647            .expect_err("missing auth should be surfaced through stream");
1648
1649        assert!(matches!(err, WechatIlinkError::Auth(_)));
1650    }
1651
1652    #[test]
1653    fn rate_limit_builder_defaults_and_overrides_are_available() {
1654        let default_options = WechatRateLimitOptions::default();
1655        assert_eq!(default_options.retry_after, Duration::from_secs(90));
1656        assert_eq!(default_options.retry_attempts, 5);
1657        assert_eq!(default_options.interaction_window, Duration::from_secs(300));
1658        assert_eq!(default_options.interaction_threshold, 6);
1659        assert_eq!(
1660            default_options.context_ttl,
1661            Duration::from_secs(24 * 60 * 60)
1662        );
1663
1664        let client = WechatIlinkClient::builder()
1665            .rate_limit_retry_after(Duration::from_secs(60))
1666            .rate_limit_max_retries(2)
1667            .rate_limit_interaction_window(Duration::from_secs(120))
1668            .rate_limit_interaction_threshold(4)
1669            .context_ttl(Duration::from_secs(3600))
1670            .context_expiry_remind_before(Duration::from_secs(300))
1671            .build();
1672
1673        assert_eq!(client.rate_limit.retry_after, Duration::from_secs(60));
1674        assert_eq!(client.rate_limit.retry_attempts, 2);
1675        assert_eq!(
1676            client.rate_limit.interaction_window,
1677            Duration::from_secs(120)
1678        );
1679        assert_eq!(client.rate_limit.interaction_threshold, 4);
1680        assert_eq!(client.rate_limit.context_ttl, Duration::from_secs(3600));
1681        assert_eq!(
1682            client.rate_limit.context_remind_before,
1683            Duration::from_secs(300)
1684        );
1685    }
1686
1687    #[tokio::test]
1688    async fn outbound_threshold_emits_interaction_event_and_incoming_resets_window() {
1689        let client = WechatIlinkClient::builder()
1690            .rate_limit_interaction_threshold(3)
1691            .rate_limit_interaction_window(Duration::from_secs(300))
1692            .build();
1693        client.set_credentials(test_credentials()).await;
1694        let mut events = event_stream_from_receiver(client.event_tx.subscribe());
1695
1696        client.record_successful_message_send().await;
1697        client.record_successful_message_send().await;
1698        assert!(
1699            tokio::time::timeout(Duration::from_millis(5), events.next())
1700                .await
1701                .is_err()
1702        );
1703        client.record_successful_message_send().await;
1704        assert_rate_limit_event(events.next().await, "account-1", 3, 3);
1705        client.record_successful_message_send().await;
1706        assert!(
1707            tokio::time::timeout(Duration::from_millis(5), events.next())
1708                .await
1709                .is_err()
1710        );
1711
1712        client.reset_account_rate_limit("account-1").await;
1713        client.record_successful_message_send().await;
1714        client.record_successful_message_send().await;
1715        client.record_successful_message_send().await;
1716        assert_rate_limit_event(events.next().await, "account-1", 3, 3);
1717    }
1718
1719    #[tokio::test]
1720    async fn context_expiry_emits_user_interaction_event_and_new_context_resets_it() {
1721        let client = WechatIlinkClient::builder()
1722            .context_ttl(Duration::from_millis(5))
1723            .context_expiry_remind_before(Duration::from_millis(5))
1724            .build();
1725        let mut events = event_stream_from_receiver(client.event_tx.subscribe());
1726
1727        let context = WechatContext {
1728            account_key: "account-1".to_string(),
1729            user_id: "user-1".to_string(),
1730            context_token: "ctx-1".to_string(),
1731            observed_at_unix_ms: 1,
1732            source_message_id: Some("msg-1".to_string()),
1733        };
1734        client.observe_context_for_interaction(&context).await;
1735        tokio::time::sleep(Duration::from_millis(10)).await;
1736        client.emit_due_context_interaction_requests().await;
1737        assert_context_expiring_event(events.next().await, "account-1", "user-1");
1738
1739        client.observe_context_for_interaction(&context).await;
1740        tokio::time::sleep(Duration::from_millis(10)).await;
1741        client.emit_due_context_interaction_requests().await;
1742        assert_context_expiring_event(events.next().await, "account-1", "user-1");
1743    }
1744
1745    #[tokio::test]
1746    async fn retry_rate_limited_uses_configured_retry_count() {
1747        use std::sync::atomic::{AtomicUsize, Ordering};
1748
1749        let client = WechatIlinkClient::builder()
1750            .rate_limit_retry_after(Duration::from_millis(1))
1751            .rate_limit_max_retries(2)
1752            .build();
1753        client.set_credentials(test_credentials()).await;
1754        let attempts = Arc::new(AtomicUsize::new(0));
1755        let attempts_for_operation = Arc::clone(&attempts);
1756
1757        let err = client
1758            .retry_rate_limited(move || {
1759                let attempts_for_operation = Arc::clone(&attempts_for_operation);
1760                async move {
1761                    attempts_for_operation.fetch_add(1, Ordering::SeqCst);
1762                    Err::<(), _>(rate_limited_error(Duration::from_millis(1)))
1763                }
1764            })
1765            .await
1766            .expect_err("rate limit should remain after configured retries");
1767
1768        assert!(err.is_rate_limited());
1769        assert_eq!(attempts.load(Ordering::SeqCst), 3);
1770    }
1771
1772    #[tokio::test]
1773    async fn active_rate_limit_backoff_returns_error_without_queueing_new_request() {
1774        let client = WechatIlinkClient::builder()
1775            .rate_limit_retry_after(Duration::from_secs(90))
1776            .build();
1777        client.set_credentials(test_credentials()).await;
1778        client
1779            .defer_account_rate_limit("account-1", Duration::from_secs(90))
1780            .await;
1781
1782        let mut called = false;
1783        let err = client
1784            .retry_rate_limited(|| {
1785                called = true;
1786                async { Ok::<_, WechatIlinkError>(()) }
1787            })
1788            .await
1789            .expect_err("active backoff should return immediately");
1790
1791        assert!(!called);
1792        assert!(err.is_rate_limited());
1793    }
1794
1795    #[tokio::test]
1796    async fn event_stream_accepts_context_observed_and_cursor_events() {
1797        let client = WechatIlinkClient::new();
1798        let mut events = event_stream_from_receiver(client.event_tx.subscribe());
1799
1800        client.emit_event(WechatEvent::ContextObserved(WechatContext {
1801            account_key: "account-1".to_string(),
1802            user_id: "user-1".to_string(),
1803            context_token: "ctx-1".to_string(),
1804            observed_at_unix_ms: 1,
1805            source_message_id: Some("msg-1".to_string()),
1806        }));
1807        client.emit_event(WechatEvent::CursorAdvanced {
1808            account_key: "account-1".to_string(),
1809            cursor: "cursor-2".to_string(),
1810        });
1811
1812        assert_eq!(format_event(events.next().await), "context:user-1");
1813        assert_eq!(format_event(events.next().await), "cursor:cursor-2");
1814    }
1815
1816    fn assert_rate_limit_event(
1817        event: Option<Result<WechatEvent>>,
1818        expected_account: &str,
1819        expected_sent_count: usize,
1820        expected_threshold: usize,
1821    ) {
1822        match event.expect("event").expect("ok event") {
1823            WechatEvent::UserInteractionRequested {
1824                account_key,
1825                reason:
1826                    UserInteractionReason::OutboundRateLimitApproaching {
1827                        sent_count,
1828                        threshold,
1829                        ..
1830                    },
1831                ..
1832            } => {
1833                assert_eq!(account_key, expected_account);
1834                assert_eq!(sent_count, expected_sent_count);
1835                assert_eq!(threshold, expected_threshold);
1836            }
1837            other => panic!("expected rate-limit interaction event, got {other:?}"),
1838        }
1839    }
1840
1841    fn assert_context_expiring_event(
1842        event: Option<Result<WechatEvent>>,
1843        expected_account: &str,
1844        expected_user: &str,
1845    ) {
1846        match event.expect("event").expect("ok event") {
1847            WechatEvent::UserInteractionRequested {
1848                account_key,
1849                user_id,
1850                reason: UserInteractionReason::ContextExpiring { .. },
1851            } => {
1852                assert_eq!(account_key, expected_account);
1853                assert_eq!(user_id.as_deref(), Some(expected_user));
1854            }
1855            other => panic!("expected context-expiring interaction event, got {other:?}"),
1856        }
1857    }
1858
1859    fn format_event(event: Option<Result<WechatEvent>>) -> String {
1860        match event.expect("event").expect("ok event") {
1861            WechatEvent::ContextObserved(ctx) => format!("context:{}", ctx.user_id),
1862            WechatEvent::CursorAdvanced { cursor, .. } => format!("cursor:{cursor}"),
1863            WechatEvent::Message(msg) => format!("message:{}", msg.user_id),
1864            WechatEvent::AuthSessionExpired { account_key } => format!("auth:{account_key}"),
1865            WechatEvent::UserInteractionRequested { reason, .. } => match reason {
1866                UserInteractionReason::OutboundRateLimitApproaching { sent_count, .. } => {
1867                    format!("rate-limit:{sent_count}")
1868                }
1869                UserInteractionReason::ContextExpiring { .. } => "context-expiring".into(),
1870            },
1871        }
1872    }
1873
1874    fn test_credentials() -> Credentials {
1875        Credentials {
1876            token: "token-1".into(),
1877            base_url: "https://example.invalid".into(),
1878            account_id: "account-1".into(),
1879            user_id: "bot-1".into(),
1880            saved_at: None,
1881        }
1882    }
1883
1884    #[tokio::test]
1885    async fn credentials_are_supplied_by_caller() {
1886        let client = WechatIlinkClient::new();
1887        client.set_credentials(test_credentials()).await;
1888
1889        let creds = client.credentials().await.unwrap();
1890        assert_eq!(creds.account_id, "account-1");
1891    }
1892}