wx-bot-sdk 0.1.3

Standalone Weixin Bot SDK in Rust
Documentation
use tokio::time::{Duration, sleep};

use crate::{
    api::{
        GetUpdatesReq, SESSION_EXPIRED_ERRCODE, WeixinApiOptions, get_remaining_pause_ms,
        get_updates, pause_session,
    },
    messaging::process_message::{MessageHandler, ProcessMessageDeps, process_one_message},
    storage::{
        get_sync_buf_file_path, get_sync_buf_file_path_candidates, load_get_updates_buf,
        save_get_updates_buf,
    },
};

const DEFAULT_LONG_POLL_TIMEOUT_MS: u64 = 35_000;
const MAX_CONSECUTIVE_FAILURES: u32 = 3;
const BACKOFF_DELAY_MS: u64 = 30_000;
const RETRY_DELAY_MS: u64 = 2_000;

#[derive(Clone)]
pub struct MonitorWeixinOpts {
    pub base_url: String,
    pub cdn_base_url: String,
    pub token: Option<String>,
    pub account_id: String,
    pub long_poll_timeout_ms: Option<u64>,
    pub on_message: MessageHandler,
}

pub async fn monitor_weixin_provider(
    opts: MonitorWeixinOpts,
    mut stop: tokio::sync::watch::Receiver<bool>,
) -> crate::Result<()> {
    let log = crate::util::logger().with_account(&opts.account_id);
    log.info(format!(
        "monitor started: baseUrl={} account={}",
        opts.base_url, opts.account_id
    ));
    let sync_path = get_sync_buf_file_path(&opts.account_id);
    let mut get_updates_buf = get_sync_buf_file_path_candidates(&opts.account_id)
        .into_iter()
        .find_map(load_get_updates_buf)
        .unwrap_or_default();
    let mut next_timeout = opts
        .long_poll_timeout_ms
        .unwrap_or(DEFAULT_LONG_POLL_TIMEOUT_MS);
    let mut failures = 0_u32;

    loop {
        if *stop.borrow() {
            break;
        }
        let api_opts = WeixinApiOptions {
            base_url: opts.base_url.clone(),
            token: opts.token.clone(),
            timeout_ms: None,
            long_poll_timeout_ms: Some(next_timeout),
        };
        let fut = get_updates(
            GetUpdatesReq {
                get_updates_buf: Some(get_updates_buf.clone()),
            },
            &api_opts,
        );
        let resp = tokio::select! {
            _ = stop.changed() => break,
            r = fut => r,
        };
        match resp {
            Ok(resp) => {
                if let Some(ms) = resp.longpolling_timeout_ms.filter(|ms| *ms > 0) {
                    next_timeout = ms;
                }
                let is_error = resp.ret.map(|r| r != 0).unwrap_or(false)
                    || resp.errcode.map(|e| e != 0).unwrap_or(false);
                if is_error {
                    let expired = resp.errcode == Some(SESSION_EXPIRED_ERRCODE)
                        || resp.ret == Some(SESSION_EXPIRED_ERRCODE);
                    if expired {
                        pause_session(&opts.account_id);
                        let pause_ms = get_remaining_pause_ms(&opts.account_id);
                        log.error(format!(
                            "session expired, pausing {} min",
                            pause_ms.div_ceil(60_000)
                        ));
                        failures = 0;
                        tokio::select! { _ = stop.changed() => break, _ = sleep(Duration::from_millis(pause_ms)) => {} }
                    } else {
                        failures += 1;
                        log.error(format!(
                            "getUpdates failed: ret={:?} errcode={:?} ({}/{})",
                            resp.ret, resp.errcode, failures, MAX_CONSECUTIVE_FAILURES
                        ));
                        let delay = if failures >= MAX_CONSECUTIVE_FAILURES {
                            failures = 0;
                            BACKOFF_DELAY_MS
                        } else {
                            RETRY_DELAY_MS
                        };
                        tokio::select! { _ = stop.changed() => break, _ = sleep(Duration::from_millis(delay)) => {} }
                    }
                    continue;
                }
                failures = 0;
                if let Some(buf) = resp.get_updates_buf.filter(|b| !b.is_empty()) {
                    save_get_updates_buf(&sync_path, &buf)?;
                    get_updates_buf = buf;
                }
                for msg in resp.msgs.unwrap_or_default() {
                    let deps = ProcessMessageDeps {
                        account_id: opts.account_id.clone(),
                        base_url: opts.base_url.clone(),
                        cdn_base_url: opts.cdn_base_url.clone(),
                        token: opts.token.clone(),
                        on_message: opts.on_message.clone(),
                    };
                    if let Err(err) = process_one_message(msg, &deps).await {
                        log.error(format!("process message failed: {err:?}"));
                    }
                }
            }
            Err(err) => {
                failures += 1;
                log.error(format!(
                    "getUpdates error ({}/{}): {err:?}",
                    failures, MAX_CONSECUTIVE_FAILURES
                ));
                let delay = if failures >= MAX_CONSECUTIVE_FAILURES {
                    failures = 0;
                    BACKOFF_DELAY_MS
                } else {
                    RETRY_DELAY_MS
                };
                tokio::select! { _ = stop.changed() => break, _ = sleep(Duration::from_millis(delay)) => {} }
            }
        }
    }
    log.info("monitor ended");
    Ok(())
}