gewe_webhook/
lib.rs

1use axum::{
2    body::Bytes,
3    extract::State,
4    http::{HeaderMap, StatusCode},
5    response::IntoResponse,
6    routing::post,
7    Router,
8};
9use gewe_core::{AppId, BotContext};
10use gewe_session::SessionStore;
11use hmac::{Hmac, Mac};
12use serde::Deserialize;
13use sha2::Sha256;
14use std::sync::{Arc, OnceLock};
15use std::time::{SystemTime, UNIX_EPOCH};
16use tokio::fs;
17use tokio::sync::mpsc;
18use tracing::instrument;
19
20#[derive(Clone)]
21pub struct WebhookState<S> {
22    pub store: Arc<S>,
23    pub tx: mpsc::Sender<WebhookEvent>,
24}
25
26// tokio mpsc Sender is Send + Sync when message is Send; expose bounds on state type for axum.
27unsafe impl<S: Send + Sync> Send for WebhookState<S> {}
28unsafe impl<S: Send + Sync> Sync for WebhookState<S> {}
29
30pub struct WebhookBuilderOptions {
31    pub queue_size: usize,
32}
33
34impl Default for WebhookBuilderOptions {
35    fn default() -> Self {
36        Self { queue_size: 1024 }
37    }
38}
39
40#[derive(Debug, Clone)]
41pub struct WebhookEvent {
42    pub app_id: AppId,
43    pub type_name: Option<String>,
44    pub data: serde_json::Value,
45}
46
47pub fn router_with_channel<S>(opts: WebhookBuilderOptions) -> (Router, mpsc::Receiver<WebhookEvent>)
48where
49    S: SessionStore + Default + Send + Sync + Clone + 'static,
50{
51    router_with_channel_and_store::<S>(opts, Arc::new(Default::default()))
52}
53
54/// 返回 Router、事件接收端及可写的 SessionStore 句柄,方便在应用启动时注册机器人上下文。
55pub fn router_with_channel_and_state<S>(
56    opts: WebhookBuilderOptions,
57) -> (Router, mpsc::Receiver<WebhookEvent>, Arc<S>)
58where
59    S: SessionStore + Default + Send + Sync + Clone + 'static,
60{
61    let store = Arc::new(Default::default());
62    let (router, rx) = router_with_channel_and_store(opts, Arc::clone(&store));
63    (router, rx, store)
64}
65
66/// 构建携带自定义 `SessionStore` 的 webhook,便于外部提前写入 BotContext。
67pub fn router_with_channel_and_store<S>(
68    opts: WebhookBuilderOptions,
69    store: Arc<S>,
70) -> (Router, mpsc::Receiver<WebhookEvent>)
71where
72    S: SessionStore + Send + Sync + Clone + 'static,
73{
74    let (tx, rx) = mpsc::channel(opts.queue_size);
75    let state = WebhookState { store, tx };
76    let router: Router<()> = Router::new()
77        .route(
78            "/webhook",
79            post(
80                |State(state): State<WebhookState<S>>, headers: HeaderMap, body: Bytes| async move {
81                    handle_webhook::<S>(state, headers, body).await
82                },
83            ),
84        )
85        .with_state(state);
86    (router, rx)
87}
88
89// 兼容旧接口:默认队列大小 1024,丢弃接收端。
90pub fn router<S>() -> Router
91where
92    S: SessionStore + Default + Send + Sync + Clone + 'static,
93{
94    let (router, _rx) = router_with_channel::<S>(WebhookBuilderOptions::default());
95    router
96}
97
98#[derive(Debug, Deserialize)]
99struct WebhookBody {
100    #[serde(rename = "Appid")]
101    appid: String,
102    #[serde(rename = "Data")]
103    data: serde_json::Value,
104    #[serde(rename = "TypeName")]
105    type_name: Option<String>,
106}
107
108#[instrument(skip(state, headers, raw_body))]
109async fn handle_webhook<S>(
110    state: WebhookState<S>,
111    headers: HeaderMap,
112    raw_body: Bytes,
113) -> impl IntoResponse
114where
115    S: SessionStore + Send + Sync + 'static,
116{
117    log_request_pre_parse(&headers, &raw_body);
118    if capture_only() {
119        return StatusCode::OK;
120    }
121
122    if is_ping(&raw_body) {
123        tracing::info!("webhook ping: {}", String::from_utf8_lossy(&raw_body));
124        return StatusCode::OK;
125    }
126
127    let body: WebhookBody = match serde_json::from_slice(&raw_body) {
128        Ok(v) => v,
129        Err(err) => {
130            log_raw_invalid_body(&raw_body);
131            tracing::warn!(?err, "invalid webhook body");
132            return StatusCode::BAD_REQUEST;
133        }
134    };
135    maybe_dump_raw(&body.appid, &raw_body).await;
136
137    let app_id = AppId(body.appid.clone());
138    let Some(ctx) = state.store.get_session(&app_id).await else {
139        tracing::warn!("unknown app_id for webhook");
140        return StatusCode::UNAUTHORIZED;
141    };
142
143    if require_signature() {
144        if let Err(err) = verify_signature(&headers, &ctx, &raw_body) {
145            log_headers_on_verify_fail(&headers);
146            log_raw_on_verify_fail(&raw_body);
147            tracing::warn!(?err, "webhook signature verify failed");
148            return StatusCode::UNAUTHORIZED;
149        }
150    }
151
152    if let Some(mid) = extract_new_msg_id(&body.data) {
153        if !state.store.mark_message_seen(&app_id, mid).await {
154            return StatusCode::OK;
155        }
156    }
157
158    // 投递到异步队列,避免阻塞 3s SLA
159    if let Err(err) = state.tx.try_send(WebhookEvent {
160        app_id,
161        type_name: body.type_name,
162        data: body.data,
163    }) {
164        tracing::warn!(?err, "webhook queue full; dropping event");
165    }
166
167    StatusCode::OK
168}
169
170fn dump_dir() -> Option<String> {
171    static DUMP_DIR: OnceLock<Option<String>> = OnceLock::new();
172    DUMP_DIR
173        .get_or_init(|| match std::env::var("GEWE_WEBHOOK_DUMP_DIR") {
174            Ok(v) if !v.trim().is_empty() => Some(v),
175            _ => None,
176        })
177        .clone()
178}
179
180async fn maybe_dump_raw(appid: &str, raw_body: &[u8]) {
181    let Some(dir) = dump_dir() else {
182        return;
183    };
184    if let Err(err) = fs::create_dir_all(&dir).await {
185        tracing::warn!(?err, %dir, "create dump dir failed");
186        return;
187    }
188    let ts = SystemTime::now()
189        .duration_since(UNIX_EPOCH)
190        .unwrap_or_default()
191        .as_millis();
192    let path = format!("{}/{}_{}.json", dir.trim_end_matches('/'), ts, appid);
193    if let Err(err) = fs::write(&path, raw_body).await {
194        tracing::warn!(?err, %path, "write webhook dump failed");
195    } else {
196        tracing::info!(%path, %appid, "webhook raw dumped");
197    }
198}
199
200fn log_raw_invalid_body(raw_body: &[u8]) {
201    if debug_raw_enabled() {
202        let body_str = String::from_utf8_lossy(raw_body);
203        tracing::warn!(%body_str, "webhook raw body (invalid)");
204    }
205}
206
207fn log_raw_on_verify_fail(raw_body: &[u8]) {
208    if debug_raw_enabled() {
209        let body_str = String::from_utf8_lossy(raw_body);
210        tracing::warn!(%body_str, "webhook raw body (signature failed)");
211    }
212}
213
214fn is_ping(raw_body: &[u8]) -> bool {
215    if !raw_body.contains(&b't') {
216        return false;
217    }
218    match serde_json::from_slice::<serde_json::Value>(raw_body) {
219        Ok(val) => val.get("testMsg").is_some(),
220        Err(_) => false,
221    }
222}
223
224fn debug_raw_enabled() -> bool {
225    static LOG_RAW: OnceLock<bool> = OnceLock::new();
226    *LOG_RAW.get_or_init(|| match std::env::var("GEWE_WEBHOOK_DEBUG_RAW") {
227        Ok(v) => matches!(v.as_str(), "1" | "true" | "TRUE" | "True"),
228        Err(_) => false,
229    })
230}
231
232fn capture_only() -> bool {
233    static CAPTURE_ONLY: OnceLock<bool> = OnceLock::new();
234    *CAPTURE_ONLY.get_or_init(|| match std::env::var("GEWE_WEBHOOK_CAPTURE_ONLY") {
235        Ok(v) => matches!(v.as_str(), "1" | "true" | "TRUE" | "True"),
236        Err(_) => false,
237    })
238}
239
240fn require_signature() -> bool {
241    static REQUIRE: OnceLock<bool> = OnceLock::new();
242    *REQUIRE.get_or_init(|| match std::env::var("GEWE_WEBHOOK_REQUIRE_SIGNATURE") {
243        Ok(v) => matches!(v.as_str(), "1" | "true" | "TRUE" | "True"),
244        Err(_) => false,
245    })
246}
247
248fn log_request_pre_parse(headers: &HeaderMap, raw_body: &[u8]) {
249    if !(debug_raw_enabled() || capture_only()) {
250        return;
251    }
252    let body_str = String::from_utf8_lossy(raw_body);
253    tracing::info!(?headers, %body_str, "webhook request (pre-parse)");
254}
255
256fn log_headers_on_verify_fail(headers: &HeaderMap) {
257    static LOG_HEADERS: OnceLock<bool> = OnceLock::new();
258    let enabled = *LOG_HEADERS.get_or_init(|| match std::env::var("GEWE_WEBHOOK_DEBUG_HEADERS") {
259        Ok(v) => matches!(v.as_str(), "1" | "true" | "TRUE" | "True"),
260        Err(_) => false,
261    });
262    if !enabled {
263        return;
264    }
265    let token = headers
266        .get("X-GEWE-TOKEN")
267        .and_then(|v| v.to_str().ok())
268        .unwrap_or("<missing>");
269    let timestamp = headers
270        .get("X-GEWE-TIMESTAMP")
271        .and_then(|v| v.to_str().ok())
272        .unwrap_or("<missing>");
273    let sign_present = headers.contains_key("X-GEWE-SIGN");
274    tracing::warn!(
275        %token,
276        %timestamp,
277        sign_present,
278        "webhook signature headers (debug; no body logged)"
279    );
280}
281
282#[derive(Debug)]
283enum SignatureError {
284    MissingHeader,
285    InvalidTimestamp,
286    Stale,
287    InvalidHex,
288    VerifyFailed,
289}
290
291fn verify_signature(
292    headers: &HeaderMap,
293    ctx: &BotContext,
294    body: &[u8],
295) -> Result<(), SignatureError> {
296    let ts_header = headers
297        .get("X-GEWE-TIMESTAMP")
298        .ok_or(SignatureError::MissingHeader)?
299        .to_str()
300        .map_err(|_| SignatureError::InvalidTimestamp)?;
301    let ts: i64 = ts_header
302        .parse()
303        .map_err(|_| SignatureError::InvalidTimestamp)?;
304    let now = SystemTime::now()
305        .duration_since(UNIX_EPOCH)
306        .map_err(|_| SignatureError::InvalidTimestamp)?
307        .as_secs() as i64;
308    const MAX_SKEW: i64 = 300;
309    if (now - ts).abs() > MAX_SKEW {
310        return Err(SignatureError::Stale);
311    }
312
313    let provided = headers
314        .get("X-GEWE-SIGN")
315        .ok_or(SignatureError::MissingHeader)?
316        .to_str()
317        .map_err(|_| SignatureError::VerifyFailed)?;
318
319    let token_header = headers
320        .get("X-GEWE-TOKEN")
321        .ok_or(SignatureError::MissingHeader)?
322        .to_str()
323        .map_err(|_| SignatureError::VerifyFailed)?;
324    if token_header != ctx.token {
325        return Err(SignatureError::VerifyFailed);
326    }
327
328    let secret = ctx.webhook_secret.as_ref().unwrap_or(&ctx.token);
329
330    let mut mac = Hmac::<Sha256>::new_from_slice(secret.as_bytes())
331        .map_err(|_| SignatureError::VerifyFailed)?;
332    mac.update(ts_header.as_bytes());
333    mac.update(b":");
334    mac.update(body);
335    let expected_bytes = mac.finalize().into_bytes();
336    let provided_bytes = hex::decode(provided).map_err(|_| SignatureError::InvalidHex)?;
337    if expected_bytes.as_slice() == provided_bytes.as_slice() {
338        Ok(())
339    } else {
340        Err(SignatureError::VerifyFailed)
341    }
342}
343
344fn extract_new_msg_id(data: &serde_json::Value) -> Option<i64> {
345    data.get("NewMsgId").and_then(|v| v.as_i64()).or_else(|| {
346        data.get("Data")
347            .and_then(|inner| inner.get("NewMsgId"))
348            .and_then(|v| v.as_i64())
349    })
350}
351
352#[cfg(test)]
353mod tests {
354    use super::*;
355    use gewe_session::InMemorySessionStore;
356
357    fn create_test_context(app_id: &str, token: &str) -> BotContext {
358        BotContext {
359            app_id: AppId(app_id.to_string()),
360            token: token.to_string(),
361            webhook_secret: None,
362            description: None,
363        }
364    }
365
366    fn create_test_context_with_secret(app_id: &str, token: &str, secret: &str) -> BotContext {
367        BotContext {
368            app_id: AppId(app_id.to_string()),
369            token: token.to_string(),
370            webhook_secret: Some(secret.to_string()),
371            description: None,
372        }
373    }
374
375    // ===== WebhookBuilderOptions tests =====
376    #[test]
377    fn test_webhook_builder_options_default() {
378        let opts = WebhookBuilderOptions::default();
379        assert_eq!(opts.queue_size, 1024);
380    }
381
382    #[test]
383    fn test_webhook_builder_options_custom() {
384        let opts = WebhookBuilderOptions { queue_size: 512 };
385        assert_eq!(opts.queue_size, 512);
386    }
387
388    // ===== WebhookEvent tests =====
389    #[test]
390    fn test_webhook_event_debug() {
391        let event = WebhookEvent {
392            app_id: AppId("app123".to_string()),
393            type_name: Some("message".to_string()),
394            data: serde_json::json!({"test": "data"}),
395        };
396        let debug_str = format!("{:?}", event);
397        assert!(debug_str.contains("app123"));
398        assert!(debug_str.contains("message"));
399    }
400
401    #[test]
402    fn test_webhook_event_clone() {
403        let event = WebhookEvent {
404            app_id: AppId("app123".to_string()),
405            type_name: Some("message".to_string()),
406            data: serde_json::json!({"key": "value"}),
407        };
408        let cloned = event.clone();
409        assert_eq!(event.app_id.0, cloned.app_id.0);
410        assert_eq!(event.type_name, cloned.type_name);
411    }
412
413    // ===== WebhookBody tests =====
414    #[test]
415    fn test_webhook_body_deserialize() {
416        let json = r#"{"Appid":"app123","Data":{"test":"data"},"TypeName":"message"}"#;
417        let body: WebhookBody = serde_json::from_str(json).unwrap();
418        assert_eq!(body.appid, "app123");
419        assert_eq!(body.type_name, Some("message".to_string()));
420        assert!(body.data.get("test").is_some());
421    }
422
423    #[test]
424    fn test_webhook_body_deserialize_without_typename() {
425        let json = r#"{"Appid":"app123","Data":{"test":"data"}}"#;
426        let body: WebhookBody = serde_json::from_str(json).unwrap();
427        assert_eq!(body.appid, "app123");
428        assert_eq!(body.type_name, None);
429    }
430
431    // ===== is_ping tests =====
432    #[test]
433    fn test_is_ping_with_test_msg() {
434        let body = br#"{"testMsg":"hello"}"#;
435        assert!(is_ping(body));
436    }
437
438    #[test]
439    fn test_is_ping_without_test_msg() {
440        let body = br#"{"Appid":"app123","Data":{}}"#;
441        assert!(!is_ping(body));
442    }
443
444    #[test]
445    fn test_is_ping_invalid_json() {
446        let body = br#"not json"#;
447        assert!(!is_ping(body));
448    }
449
450    #[test]
451    fn test_is_ping_no_t_character() {
452        // Optimization: if no 't' character, skip JSON parsing
453        let body = br#"{"Appid":"app123"}"#;
454        assert!(!is_ping(body));
455    }
456
457    // ===== extract_new_msg_id tests =====
458    #[test]
459    fn test_extract_new_msg_id_direct() {
460        let data = serde_json::json!({"NewMsgId": 12345});
461        assert_eq!(extract_new_msg_id(&data), Some(12345));
462    }
463
464    #[test]
465    fn test_extract_new_msg_id_nested() {
466        let data = serde_json::json!({"Data": {"NewMsgId": 67890}});
467        assert_eq!(extract_new_msg_id(&data), Some(67890));
468    }
469
470    #[test]
471    fn test_extract_new_msg_id_none() {
472        let data = serde_json::json!({"other": "field"});
473        assert_eq!(extract_new_msg_id(&data), None);
474    }
475
476    #[test]
477    fn test_extract_new_msg_id_prefers_direct() {
478        let data = serde_json::json!({
479            "NewMsgId": 111,
480            "Data": {"NewMsgId": 222}
481        });
482        assert_eq!(extract_new_msg_id(&data), Some(111));
483    }
484
485    // ===== verify_signature tests =====
486    #[test]
487    fn test_verify_signature_missing_timestamp() {
488        let headers = HeaderMap::new();
489        let ctx = create_test_context("app123", "token123");
490        let body = b"test body";
491
492        let result = verify_signature(&headers, &ctx, body);
493        assert!(matches!(result, Err(SignatureError::MissingHeader)));
494    }
495
496    #[test]
497    fn test_verify_signature_invalid_timestamp() {
498        let mut headers = HeaderMap::new();
499        headers.insert("X-GEWE-TIMESTAMP", "not_a_number".parse().unwrap());
500        headers.insert("X-GEWE-TOKEN", "token123".parse().unwrap());
501        headers.insert("X-GEWE-SIGN", "somesign".parse().unwrap());
502
503        let ctx = create_test_context("app123", "token123");
504        let body = b"test body";
505
506        let result = verify_signature(&headers, &ctx, body);
507        assert!(matches!(result, Err(SignatureError::InvalidTimestamp)));
508    }
509
510    #[test]
511    fn test_verify_signature_stale_timestamp() {
512        let mut headers = HeaderMap::new();
513        // Very old timestamp
514        headers.insert("X-GEWE-TIMESTAMP", "1000000000".parse().unwrap());
515        headers.insert("X-GEWE-TOKEN", "token123".parse().unwrap());
516        headers.insert("X-GEWE-SIGN", "somesign".parse().unwrap());
517
518        let ctx = create_test_context("app123", "token123");
519        let body = b"test body";
520
521        let result = verify_signature(&headers, &ctx, body);
522        assert!(matches!(result, Err(SignatureError::Stale)));
523    }
524
525    #[test]
526    fn test_verify_signature_missing_sign() {
527        let now = SystemTime::now()
528            .duration_since(UNIX_EPOCH)
529            .unwrap()
530            .as_secs()
531            .to_string();
532
533        let mut headers = HeaderMap::new();
534        headers.insert("X-GEWE-TIMESTAMP", now.parse().unwrap());
535        headers.insert("X-GEWE-TOKEN", "token123".parse().unwrap());
536
537        let ctx = create_test_context("app123", "token123");
538        let body = b"test body";
539
540        let result = verify_signature(&headers, &ctx, body);
541        assert!(matches!(result, Err(SignatureError::MissingHeader)));
542    }
543
544    #[test]
545    fn test_verify_signature_token_mismatch() {
546        let now = SystemTime::now()
547            .duration_since(UNIX_EPOCH)
548            .unwrap()
549            .as_secs()
550            .to_string();
551
552        let mut headers = HeaderMap::new();
553        headers.insert("X-GEWE-TIMESTAMP", now.parse().unwrap());
554        headers.insert("X-GEWE-TOKEN", "wrong_token".parse().unwrap());
555        headers.insert("X-GEWE-SIGN", "somesign".parse().unwrap());
556
557        let ctx = create_test_context("app123", "token123");
558        let body = b"test body";
559
560        let result = verify_signature(&headers, &ctx, body);
561        assert!(matches!(result, Err(SignatureError::VerifyFailed)));
562    }
563
564    #[test]
565    fn test_verify_signature_invalid_hex() {
566        let now = SystemTime::now()
567            .duration_since(UNIX_EPOCH)
568            .unwrap()
569            .as_secs()
570            .to_string();
571
572        let mut headers = HeaderMap::new();
573        headers.insert("X-GEWE-TIMESTAMP", now.parse().unwrap());
574        headers.insert("X-GEWE-TOKEN", "token123".parse().unwrap());
575        headers.insert("X-GEWE-SIGN", "not_valid_hex_zzz".parse().unwrap());
576
577        let ctx = create_test_context("app123", "token123");
578        let body = b"test body";
579
580        let result = verify_signature(&headers, &ctx, body);
581        assert!(matches!(result, Err(SignatureError::InvalidHex)));
582    }
583
584    #[test]
585    fn test_verify_signature_valid() {
586        let now = SystemTime::now()
587            .duration_since(UNIX_EPOCH)
588            .unwrap()
589            .as_secs();
590        let now_str = now.to_string();
591        let body = b"test body";
592        let token = "token123";
593
594        // Calculate expected signature
595        let mut mac = Hmac::<Sha256>::new_from_slice(token.as_bytes()).unwrap();
596        mac.update(now_str.as_bytes());
597        mac.update(b":");
598        mac.update(body);
599        let signature = hex::encode(mac.finalize().into_bytes());
600
601        let mut headers = HeaderMap::new();
602        headers.insert("X-GEWE-TIMESTAMP", now_str.parse().unwrap());
603        headers.insert("X-GEWE-TOKEN", token.parse().unwrap());
604        headers.insert("X-GEWE-SIGN", signature.parse().unwrap());
605
606        let ctx = create_test_context("app123", token);
607
608        let result = verify_signature(&headers, &ctx, body);
609        assert!(result.is_ok());
610    }
611
612    #[test]
613    fn test_verify_signature_with_webhook_secret() {
614        let now = SystemTime::now()
615            .duration_since(UNIX_EPOCH)
616            .unwrap()
617            .as_secs();
618        let now_str = now.to_string();
619        let body = b"test body";
620        let token = "token123";
621        let secret = "webhook_secret";
622
623        // Calculate expected signature using webhook_secret
624        let mut mac = Hmac::<Sha256>::new_from_slice(secret.as_bytes()).unwrap();
625        mac.update(now_str.as_bytes());
626        mac.update(b":");
627        mac.update(body);
628        let signature = hex::encode(mac.finalize().into_bytes());
629
630        let mut headers = HeaderMap::new();
631        headers.insert("X-GEWE-TIMESTAMP", now_str.parse().unwrap());
632        headers.insert("X-GEWE-TOKEN", token.parse().unwrap());
633        headers.insert("X-GEWE-SIGN", signature.parse().unwrap());
634
635        let ctx = create_test_context_with_secret("app123", token, secret);
636
637        let result = verify_signature(&headers, &ctx, body);
638        assert!(result.is_ok());
639    }
640
641    #[test]
642    fn test_verify_signature_wrong_signature() {
643        let now = SystemTime::now()
644            .duration_since(UNIX_EPOCH)
645            .unwrap()
646            .as_secs();
647        let now_str = now.to_string();
648        let body = b"test body";
649        let token = "token123";
650
651        // Wrong signature (valid hex but incorrect value)
652        let wrong_signature = hex::encode([0u8; 32]);
653
654        let mut headers = HeaderMap::new();
655        headers.insert("X-GEWE-TIMESTAMP", now_str.parse().unwrap());
656        headers.insert("X-GEWE-TOKEN", token.parse().unwrap());
657        headers.insert("X-GEWE-SIGN", wrong_signature.parse().unwrap());
658
659        let ctx = create_test_context("app123", token);
660
661        let result = verify_signature(&headers, &ctx, body);
662        assert!(matches!(result, Err(SignatureError::VerifyFailed)));
663    }
664
665    // ===== SignatureError Debug test =====
666    #[test]
667    fn test_signature_error_debug() {
668        let err = SignatureError::MissingHeader;
669        let debug_str = format!("{:?}", err);
670        assert!(debug_str.contains("MissingHeader"));
671
672        let err2 = SignatureError::Stale;
673        let debug_str2 = format!("{:?}", err2);
674        assert!(debug_str2.contains("Stale"));
675    }
676
677    // ===== Router construction tests =====
678    #[tokio::test]
679    async fn test_router_with_channel() {
680        let opts = WebhookBuilderOptions { queue_size: 100 };
681        let (_router, _rx) = router_with_channel::<InMemorySessionStore>(opts);
682        // Just verify it compiles and creates without panic
683    }
684
685    #[tokio::test]
686    async fn test_router_with_channel_and_state() {
687        let opts = WebhookBuilderOptions { queue_size: 100 };
688        let (_router, _rx, store) = router_with_channel_and_state::<InMemorySessionStore>(opts);
689
690        // Verify store is accessible and works
691        let ctx = create_test_context("app123", "token123");
692        store.put_session(ctx.clone()).await;
693
694        let retrieved = store.get_session(&ctx.app_id).await;
695        assert!(retrieved.is_some());
696    }
697
698    #[tokio::test]
699    async fn test_router_with_channel_and_store() {
700        let store = Arc::new(InMemorySessionStore::default());
701        let ctx = create_test_context("app123", "token123");
702        store.put_session(ctx).await;
703
704        let opts = WebhookBuilderOptions { queue_size: 100 };
705        let (_router, _rx) = router_with_channel_and_store(opts, store);
706    }
707
708    #[test]
709    fn test_router_default() {
710        let _router = router::<InMemorySessionStore>();
711    }
712
713    // ===== WebhookState Clone test =====
714    #[tokio::test]
715    async fn test_webhook_state_clone() {
716        let store = Arc::new(InMemorySessionStore::default());
717        let (tx, _rx) = mpsc::channel(100);
718        let state1 = WebhookState {
719            store: Arc::clone(&store),
720            tx: tx.clone(),
721        };
722        let state2 = state1.clone();
723
724        // Both states should share the same store
725        let ctx = create_test_context("app123", "token123");
726        state1.store.put_session(ctx.clone()).await;
727
728        let retrieved = state2.store.get_session(&ctx.app_id).await;
729        assert!(retrieved.is_some());
730    }
731
732    // ===== Environment variable tests =====
733    #[test]
734    fn test_dump_dir_not_set() {
735        // When GEWE_WEBHOOK_DUMP_DIR is not set, should return None
736        std::env::remove_var("GEWE_WEBHOOK_DUMP_DIR");
737        // Since OnceLock caches, we can't retest in same process, but we verify the function exists
738        let _ = dump_dir();
739    }
740
741    #[test]
742    fn test_debug_raw_enabled_default() {
743        std::env::remove_var("GEWE_WEBHOOK_DEBUG_RAW");
744        let _ = debug_raw_enabled();
745    }
746
747    #[test]
748    fn test_capture_only_default() {
749        std::env::remove_var("GEWE_WEBHOOK_CAPTURE_ONLY");
750        let _ = capture_only();
751    }
752
753    #[test]
754    fn test_require_signature_default() {
755        std::env::remove_var("GEWE_WEBHOOK_REQUIRE_SIGNATURE");
756        let _ = require_signature();
757    }
758
759    // ===== Logging functions tests =====
760    #[test]
761    fn test_log_raw_invalid_body_with_debug() {
762        let body = b"invalid json body";
763        // Should not panic
764        log_raw_invalid_body(body);
765    }
766
767    #[test]
768    fn test_log_raw_on_verify_fail_with_debug() {
769        let body = b"test body";
770        // Should not panic
771        log_raw_on_verify_fail(body);
772    }
773
774    #[test]
775    fn test_log_request_pre_parse() {
776        let headers = HeaderMap::new();
777        let body = b"test body";
778        // Should not panic
779        log_request_pre_parse(&headers, body);
780    }
781
782    #[test]
783    fn test_log_headers_on_verify_fail_missing_headers() {
784        let headers = HeaderMap::new();
785        // Should not panic when headers are missing
786        log_headers_on_verify_fail(&headers);
787    }
788
789    #[test]
790    fn test_log_headers_on_verify_fail_with_headers() {
791        let mut headers = HeaderMap::new();
792        headers.insert("X-GEWE-TOKEN", "token123".parse().unwrap());
793        headers.insert("X-GEWE-TIMESTAMP", "1234567890".parse().unwrap());
794        headers.insert("X-GEWE-SIGN", "somesign".parse().unwrap());
795        // Should not panic
796        log_headers_on_verify_fail(&headers);
797    }
798
799    #[test]
800    fn test_log_headers_on_verify_fail_partial_headers() {
801        let mut headers = HeaderMap::new();
802        headers.insert("X-GEWE-TOKEN", "token123".parse().unwrap());
803        // Missing timestamp and sign
804        log_headers_on_verify_fail(&headers);
805    }
806
807    // ===== Integration tests for handle_webhook =====
808    use axum::body::Body;
809    use axum::http::Request;
810    use tower::util::ServiceExt;
811
812    #[tokio::test]
813    async fn test_handle_webhook_ping() {
814        let (router, _rx) =
815            router_with_channel::<InMemorySessionStore>(WebhookBuilderOptions::default());
816
817        let ping_body = r#"{"testMsg":"hello"}"#;
818        let request = Request::builder()
819            .uri("/webhook")
820            .method("POST")
821            .header("content-type", "application/json")
822            .body(Body::from(ping_body))
823            .unwrap();
824
825        let response = router.oneshot(request).await.unwrap();
826        assert_eq!(response.status(), StatusCode::OK);
827    }
828
829    #[tokio::test]
830    async fn test_handle_webhook_invalid_json() {
831        let (router, _rx) =
832            router_with_channel::<InMemorySessionStore>(WebhookBuilderOptions::default());
833
834        let invalid_body = "not valid json";
835        let request = Request::builder()
836            .uri("/webhook")
837            .method("POST")
838            .header("content-type", "application/json")
839            .body(Body::from(invalid_body))
840            .unwrap();
841
842        let response = router.oneshot(request).await.unwrap();
843        assert_eq!(response.status(), StatusCode::BAD_REQUEST);
844    }
845
846    #[tokio::test]
847    async fn test_handle_webhook_unknown_app_id() {
848        let (router, _rx) =
849            router_with_channel::<InMemorySessionStore>(WebhookBuilderOptions::default());
850
851        let body = r#"{"Appid":"unknown_app","Data":{"test":"data"}}"#;
852        let request = Request::builder()
853            .uri("/webhook")
854            .method("POST")
855            .header("content-type", "application/json")
856            .body(Body::from(body))
857            .unwrap();
858
859        let response = router.oneshot(request).await.unwrap();
860        assert_eq!(response.status(), StatusCode::UNAUTHORIZED);
861    }
862
863    #[tokio::test]
864    async fn test_handle_webhook_success() {
865        let (router, mut rx, store) =
866            router_with_channel_and_state::<InMemorySessionStore>(WebhookBuilderOptions {
867                queue_size: 10,
868            });
869
870        let ctx = create_test_context("app123", "token123");
871        store.put_session(ctx).await;
872
873        let body = r#"{"Appid":"app123","Data":{"test":"data"},"TypeName":"message"}"#;
874        let request = Request::builder()
875            .uri("/webhook")
876            .method("POST")
877            .header("content-type", "application/json")
878            .body(Body::from(body))
879            .unwrap();
880
881        let response = router.oneshot(request).await.unwrap();
882        assert_eq!(response.status(), StatusCode::OK);
883
884        // Verify event was sent to channel
885        let event = rx.try_recv().ok();
886        assert!(event.is_some());
887        let event = event.unwrap();
888        assert_eq!(event.app_id.0, "app123");
889        assert_eq!(event.type_name, Some("message".to_string()));
890    }
891
892    #[tokio::test]
893    async fn test_handle_webhook_with_new_msg_id() {
894        let (router, mut rx, store) =
895            router_with_channel_and_state::<InMemorySessionStore>(WebhookBuilderOptions {
896                queue_size: 10,
897            });
898
899        let ctx = create_test_context("app123", "token123");
900        store.put_session(ctx).await;
901
902        // First message with NewMsgId
903        let body = r#"{"Appid":"app123","Data":{"NewMsgId":12345}}"#;
904        let request = Request::builder()
905            .uri("/webhook")
906            .method("POST")
907            .header("content-type", "application/json")
908            .body(Body::from(body))
909            .unwrap();
910
911        let response = router.clone().oneshot(request).await.unwrap();
912        assert_eq!(response.status(), StatusCode::OK);
913        assert!(rx.try_recv().is_ok());
914
915        // Duplicate message should be filtered
916        let body = r#"{"Appid":"app123","Data":{"NewMsgId":12345}}"#;
917        let request = Request::builder()
918            .uri("/webhook")
919            .method("POST")
920            .header("content-type", "application/json")
921            .body(Body::from(body))
922            .unwrap();
923
924        let response = router.oneshot(request).await.unwrap();
925        assert_eq!(response.status(), StatusCode::OK);
926        // Channel should be empty (message filtered)
927        assert!(rx.try_recv().is_err());
928    }
929
930    #[tokio::test]
931    async fn test_handle_webhook_nested_new_msg_id() {
932        let (router, mut rx, store) =
933            router_with_channel_and_state::<InMemorySessionStore>(WebhookBuilderOptions {
934                queue_size: 10,
935            });
936
937        let ctx = create_test_context("app123", "token123");
938        store.put_session(ctx).await;
939
940        // Message with nested NewMsgId in Data.Data
941        let body = r#"{"Appid":"app123","Data":{"Data":{"NewMsgId":67890}}}"#;
942        let request = Request::builder()
943            .uri("/webhook")
944            .method("POST")
945            .header("content-type", "application/json")
946            .body(Body::from(body))
947            .unwrap();
948
949        let response = router.oneshot(request).await.unwrap();
950        assert_eq!(response.status(), StatusCode::OK);
951        assert!(rx.try_recv().is_ok());
952    }
953
954    #[tokio::test]
955    async fn test_handle_webhook_queue_full() {
956        // Very small queue to test full scenario
957        let (router, _rx, store) =
958            router_with_channel_and_state::<InMemorySessionStore>(WebhookBuilderOptions {
959                queue_size: 1,
960            });
961
962        let ctx = create_test_context("app123", "token123");
963        store.put_session(ctx).await;
964
965        // Fill the queue
966        let body = r#"{"Appid":"app123","Data":{"test":"data1"}}"#;
967        let request = Request::builder()
968            .uri("/webhook")
969            .method("POST")
970            .header("content-type", "application/json")
971            .body(Body::from(body))
972            .unwrap();
973        let response = router.clone().oneshot(request).await.unwrap();
974        assert_eq!(response.status(), StatusCode::OK);
975
976        // Second message should overflow (but still return OK)
977        let body = r#"{"Appid":"app123","Data":{"test":"data2"}}"#;
978        let request = Request::builder()
979            .uri("/webhook")
980            .method("POST")
981            .header("content-type", "application/json")
982            .body(Body::from(body))
983            .unwrap();
984        let response = router.oneshot(request).await.unwrap();
985        assert_eq!(response.status(), StatusCode::OK);
986        // Note: _rx is not consumed, so queue should be full
987    }
988
989    // ===== Edge cases for extract_new_msg_id =====
990    #[test]
991    fn test_extract_new_msg_id_wrong_type() {
992        // NewMsgId is a string, not a number
993        let data = serde_json::json!({"NewMsgId": "not_a_number"});
994        assert_eq!(extract_new_msg_id(&data), None);
995    }
996
997    #[test]
998    fn test_extract_new_msg_id_nested_wrong_type() {
999        let data = serde_json::json!({"Data": {"NewMsgId": "not_a_number"}});
1000        assert_eq!(extract_new_msg_id(&data), None);
1001    }
1002
1003    #[test]
1004    fn test_extract_new_msg_id_empty_data() {
1005        let data = serde_json::json!({});
1006        assert_eq!(extract_new_msg_id(&data), None);
1007    }
1008
1009    #[test]
1010    fn test_extract_new_msg_id_data_not_object() {
1011        let data = serde_json::json!({"Data": "not_an_object"});
1012        assert_eq!(extract_new_msg_id(&data), None);
1013    }
1014
1015    // ===== Edge cases for is_ping =====
1016    #[test]
1017    fn test_is_ping_empty_body() {
1018        let body = b"";
1019        assert!(!is_ping(body));
1020    }
1021
1022    #[test]
1023    fn test_is_ping_body_with_t_but_no_testmsg() {
1024        let body = br#"{"otherField":"test"}"#;
1025        assert!(!is_ping(body));
1026    }
1027
1028    #[test]
1029    fn test_is_ping_testmsg_null() {
1030        let body = br#"{"testMsg":null}"#;
1031        // Should still be considered ping if testMsg key exists
1032        assert!(is_ping(body));
1033    }
1034
1035    // ===== Additional verify_signature edge cases =====
1036    #[test]
1037    fn test_verify_signature_missing_token() {
1038        let now = SystemTime::now()
1039            .duration_since(UNIX_EPOCH)
1040            .unwrap()
1041            .as_secs()
1042            .to_string();
1043
1044        let mut headers = HeaderMap::new();
1045        headers.insert("X-GEWE-TIMESTAMP", now.parse().unwrap());
1046        headers.insert("X-GEWE-SIGN", "somesign".parse().unwrap());
1047        // Missing X-GEWE-TOKEN
1048
1049        let ctx = create_test_context("app123", "token123");
1050        let body = b"test body";
1051
1052        let result = verify_signature(&headers, &ctx, body);
1053        assert!(matches!(result, Err(SignatureError::MissingHeader)));
1054    }
1055
1056    #[test]
1057    fn test_verify_signature_future_timestamp() {
1058        let now = SystemTime::now()
1059            .duration_since(UNIX_EPOCH)
1060            .unwrap()
1061            .as_secs();
1062        // Timestamp 400 seconds in the future (beyond MAX_SKEW of 300)
1063        let future_ts = (now + 400).to_string();
1064
1065        let mut headers = HeaderMap::new();
1066        headers.insert("X-GEWE-TIMESTAMP", future_ts.parse().unwrap());
1067        headers.insert("X-GEWE-TOKEN", "token123".parse().unwrap());
1068        headers.insert("X-GEWE-SIGN", "somesign".parse().unwrap());
1069
1070        let ctx = create_test_context("app123", "token123");
1071        let body = b"test body";
1072
1073        let result = verify_signature(&headers, &ctx, body);
1074        assert!(matches!(result, Err(SignatureError::Stale)));
1075    }
1076
1077    #[test]
1078    fn test_verify_signature_within_max_skew() {
1079        let now = SystemTime::now()
1080            .duration_since(UNIX_EPOCH)
1081            .unwrap()
1082            .as_secs();
1083        // Timestamp 200 seconds in the past (within MAX_SKEW of 300)
1084        let ts = now - 200;
1085        let ts_str = ts.to_string();
1086        let body = b"test body";
1087        let token = "token123";
1088
1089        // Calculate correct signature
1090        let mut mac = Hmac::<Sha256>::new_from_slice(token.as_bytes()).unwrap();
1091        mac.update(ts_str.as_bytes());
1092        mac.update(b":");
1093        mac.update(body);
1094        let signature = hex::encode(mac.finalize().into_bytes());
1095
1096        let mut headers = HeaderMap::new();
1097        headers.insert("X-GEWE-TIMESTAMP", ts_str.parse().unwrap());
1098        headers.insert("X-GEWE-TOKEN", token.parse().unwrap());
1099        headers.insert("X-GEWE-SIGN", signature.parse().unwrap());
1100
1101        let ctx = create_test_context("app123", token);
1102
1103        let result = verify_signature(&headers, &ctx, body);
1104        assert!(result.is_ok());
1105    }
1106
1107    #[test]
1108    fn test_verify_signature_non_utf8_header_values() {
1109        use axum::http::HeaderValue;
1110
1111        let now = SystemTime::now()
1112            .duration_since(UNIX_EPOCH)
1113            .unwrap()
1114            .as_secs()
1115            .to_string();
1116
1117        let mut headers = HeaderMap::new();
1118        headers.insert("X-GEWE-TIMESTAMP", now.parse().unwrap());
1119        headers.insert("X-GEWE-TOKEN", "token123".parse().unwrap());
1120        // Insert a header value that is valid HTTP but we'll test error paths
1121        headers.insert(
1122            "X-GEWE-SIGN",
1123            HeaderValue::from_bytes(&[0xFF, 0xFE]).unwrap(),
1124        );
1125
1126        let ctx = create_test_context("app123", "token123");
1127        let body = b"test body";
1128
1129        let result = verify_signature(&headers, &ctx, body);
1130        // Should fail when trying to convert to str
1131        assert!(matches!(result, Err(SignatureError::VerifyFailed)));
1132    }
1133
1134    // ===== Test WebhookBody edge cases =====
1135    #[test]
1136    fn test_webhook_body_deserialize_minimal() {
1137        // Minimal valid webhook body
1138        let json = r#"{"Appid":"app123","Data":{}}"#;
1139        let body: WebhookBody = serde_json::from_str(json).unwrap();
1140        assert_eq!(body.appid, "app123");
1141        assert_eq!(body.type_name, None);
1142        assert!(body.data.is_object());
1143    }
1144
1145    #[test]
1146    fn test_webhook_body_deserialize_with_null_typename() {
1147        let json = r#"{"Appid":"app123","Data":{},"TypeName":null}"#;
1148        let body: WebhookBody = serde_json::from_str(json).unwrap();
1149        assert_eq!(body.type_name, None);
1150    }
1151
1152    #[test]
1153    fn test_webhook_body_deserialize_complex_data() {
1154        let json = r#"{"Appid":"app123","Data":{"nested":{"deep":"value"}},"TypeName":"complex"}"#;
1155        let body: WebhookBody = serde_json::from_str(json).unwrap();
1156        assert_eq!(body.appid, "app123");
1157        assert!(body.data.get("nested").is_some());
1158    }
1159
1160    #[test]
1161    fn test_webhook_body_missing_required_fields() {
1162        // Missing Appid
1163        let json = r#"{"Data":{}}"#;
1164        let result: Result<WebhookBody, _> = serde_json::from_str(json);
1165        assert!(result.is_err());
1166
1167        // Missing Data
1168        let json = r#"{"Appid":"app123"}"#;
1169        let result: Result<WebhookBody, _> = serde_json::from_str(json);
1170        assert!(result.is_err());
1171    }
1172
1173    // ===== Test unsafe Send/Sync impls =====
1174    #[test]
1175    fn test_webhook_state_is_send_sync() {
1176        fn assert_send<T: Send>() {}
1177        fn assert_sync<T: Sync>() {}
1178
1179        assert_send::<WebhookState<InMemorySessionStore>>();
1180        assert_sync::<WebhookState<InMemorySessionStore>>();
1181    }
1182
1183    // ===== Test all SignatureError variants =====
1184    #[test]
1185    fn test_signature_error_all_variants() {
1186        let errors = vec![
1187            SignatureError::MissingHeader,
1188            SignatureError::InvalidTimestamp,
1189            SignatureError::Stale,
1190            SignatureError::InvalidHex,
1191            SignatureError::VerifyFailed,
1192        ];
1193
1194        for err in errors {
1195            let debug_str = format!("{:?}", err);
1196            assert!(!debug_str.is_empty());
1197        }
1198    }
1199
1200    // ===== Test maybe_dump_raw (error paths) =====
1201    #[tokio::test]
1202    async fn test_maybe_dump_raw_no_dump_dir() {
1203        std::env::remove_var("GEWE_WEBHOOK_DUMP_DIR");
1204        // Should return early without error
1205        maybe_dump_raw("app123", b"test body").await;
1206    }
1207
1208    // ===== Test empty and whitespace values for environment variables =====
1209    #[test]
1210    fn test_dump_dir_empty_string() {
1211        // Empty string should be treated as not set
1212        std::env::set_var("GEWE_WEBHOOK_DUMP_DIR", "");
1213        // OnceLock caches the value, so we just verify function doesn't panic
1214        let _ = dump_dir();
1215        std::env::remove_var("GEWE_WEBHOOK_DUMP_DIR");
1216    }
1217
1218    #[test]
1219    fn test_dump_dir_whitespace_only() {
1220        std::env::set_var("GEWE_WEBHOOK_DUMP_DIR", "   ");
1221        let _ = dump_dir();
1222        std::env::remove_var("GEWE_WEBHOOK_DUMP_DIR");
1223    }
1224
1225    // ===== Additional tests for maybe_dump_raw error paths =====
1226    #[tokio::test]
1227    async fn test_maybe_dump_raw_create_dir_error() {
1228        // Test with an invalid path that can't be created
1229        std::env::set_var("GEWE_WEBHOOK_DUMP_DIR", "/invalid/nonexistent/path");
1230        // Should handle error gracefully
1231        maybe_dump_raw("app123", b"test body").await;
1232        std::env::remove_var("GEWE_WEBHOOK_DUMP_DIR");
1233    }
1234
1235    // ===== Test timestamp edge cases =====
1236    #[test]
1237    fn test_verify_signature_timestamp_exactly_at_max_skew() {
1238        let now = SystemTime::now()
1239            .duration_since(UNIX_EPOCH)
1240            .unwrap()
1241            .as_secs();
1242        // Timestamp exactly 300 seconds in the past (at MAX_SKEW boundary)
1243        let ts = now - 300;
1244        let ts_str = ts.to_string();
1245        let body = b"test body";
1246        let token = "token123";
1247
1248        // Calculate correct signature
1249        let mut mac = Hmac::<Sha256>::new_from_slice(token.as_bytes()).unwrap();
1250        mac.update(ts_str.as_bytes());
1251        mac.update(b":");
1252        mac.update(body);
1253        let signature = hex::encode(mac.finalize().into_bytes());
1254
1255        let mut headers = HeaderMap::new();
1256        headers.insert("X-GEWE-TIMESTAMP", ts_str.parse().unwrap());
1257        headers.insert("X-GEWE-TOKEN", token.parse().unwrap());
1258        headers.insert("X-GEWE-SIGN", signature.parse().unwrap());
1259
1260        let ctx = create_test_context("app123", token);
1261
1262        let result = verify_signature(&headers, &ctx, body);
1263        assert!(result.is_ok());
1264    }
1265
1266    #[test]
1267    fn test_verify_signature_timestamp_just_beyond_max_skew() {
1268        let now = SystemTime::now()
1269            .duration_since(UNIX_EPOCH)
1270            .unwrap()
1271            .as_secs();
1272        // Timestamp 301 seconds in the past (just beyond MAX_SKEW)
1273        let ts_str = (now - 301).to_string();
1274
1275        let mut headers = HeaderMap::new();
1276        headers.insert("X-GEWE-TIMESTAMP", ts_str.parse().unwrap());
1277        headers.insert("X-GEWE-TOKEN", "token123".parse().unwrap());
1278        headers.insert("X-GEWE-SIGN", "somesign".parse().unwrap());
1279
1280        let ctx = create_test_context("app123", "token123");
1281        let body = b"test body";
1282
1283        let result = verify_signature(&headers, &ctx, body);
1284        assert!(matches!(result, Err(SignatureError::Stale)));
1285    }
1286}