Skip to main content

streamling_e2e/resources/
webhook.rs

1//! Webhook server resource for capturing HTTP requests from streamling.
2//!
3//! This module provides a simple HTTP server that can be used to test webhook sinks.
4//! The server captures all incoming requests and stores them for verification.
5
6use crate::{E2eError, Result};
7use axum::body::Bytes;
8use axum::extract::State;
9use axum::http::StatusCode;
10use axum::routing::post;
11use axum::Router;
12use std::net::TcpListener;
13use std::sync::{Arc, Mutex};
14use tokio::sync::oneshot;
15use tracing::info;
16
17/// A captured webhook request
18#[derive(Debug, Clone)]
19pub struct CapturedRequest {
20    /// The request body as bytes
21    pub body: Vec<u8>,
22}
23
24/// Shared state for the webhook server
25#[derive(Clone)]
26struct WebhookState {
27    requests: Arc<Mutex<Vec<CapturedRequest>>>,
28    response_plan: Arc<Mutex<Vec<StatusCode>>>,
29    next_response_index: Arc<Mutex<usize>>,
30}
31
32impl WebhookState {
33    fn with_response_plan(response_plan: Vec<StatusCode>) -> Self {
34        Self {
35            requests: Arc::new(Mutex::new(Vec::new())),
36            response_plan: Arc::new(Mutex::new(response_plan)),
37            next_response_index: Arc::new(Mutex::new(0)),
38        }
39    }
40
41    fn add_request(&self, body: Vec<u8>) {
42        let mut requests = self.requests.lock().unwrap();
43        requests.push(CapturedRequest { body });
44    }
45
46    fn get_requests(&self) -> Vec<CapturedRequest> {
47        let requests = self.requests.lock().unwrap();
48        requests.clone()
49    }
50
51    fn request_count(&self) -> usize {
52        let requests = self.requests.lock().unwrap();
53        requests.len()
54    }
55
56    fn next_status(&self) -> StatusCode {
57        let response_plan = self.response_plan.lock().unwrap();
58        let mut next_response_index = self.next_response_index.lock().unwrap();
59        let status = response_plan
60            .get(*next_response_index)
61            .copied()
62            .unwrap_or(StatusCode::OK);
63        *next_response_index += 1;
64        status
65    }
66}
67
68/// Webhook server resource for capturing HTTP requests
69pub struct WebhookResource {
70    /// The base URL of the webhook server (e.g., "http://127.0.0.1:8080")
71    pub url: String,
72    /// The port the server is listening on
73    pub port: u16,
74    /// Shared state containing captured requests
75    state: WebhookState,
76    /// Shutdown signal sender
77    #[allow(dead_code)]
78    shutdown_tx: Option<oneshot::Sender<()>>,
79}
80
81impl WebhookResource {
82    /// Start a new webhook server on an available port
83    pub async fn new() -> Result<Self> {
84        Self::new_with_response_plan(Vec::new()).await
85    }
86
87    /// Start a new webhook server with a scripted sequence of response statuses.
88    pub async fn new_with_response_plan(response_plan: Vec<StatusCode>) -> Result<Self> {
89        // Find an available port
90        let listener = TcpListener::bind("127.0.0.1:0").map_err(E2eError::Io)?;
91        let port = listener.local_addr().map_err(E2eError::Io)?.port();
92        drop(listener); // Release the port so axum can bind to it
93
94        let state = WebhookState::with_response_plan(response_plan);
95        let state_clone = state.clone();
96
97        // Create the router
98        let app = Router::new()
99            .route("/webhook", post(handle_webhook))
100            .route("/webhook/{path}", post(handle_webhook))
101            .with_state(state_clone);
102
103        // Create shutdown channel
104        let (shutdown_tx, shutdown_rx) = oneshot::channel::<()>();
105
106        // Bind and start the server
107        let addr = format!("127.0.0.1:{}", port);
108        let listener = tokio::net::TcpListener::bind(&addr)
109            .await
110            .map_err(E2eError::Io)?;
111
112        info!("Starting webhook server at: {}", addr);
113
114        // Spawn the server
115        tokio::spawn(async move {
116            axum::serve(listener, app)
117                .with_graceful_shutdown(async move {
118                    let _ = shutdown_rx.await;
119                })
120                .await
121                .ok();
122        });
123
124        // Give the server a moment to start
125        tokio::time::sleep(std::time::Duration::from_millis(100)).await;
126
127        Ok(Self {
128            url: format!("http://127.0.0.1:{}", port),
129            port,
130            state,
131            shutdown_tx: Some(shutdown_tx),
132        })
133    }
134
135    /// Get the URL for the webhook endpoint
136    pub fn webhook_url(&self) -> String {
137        format!("{}/webhook", self.url)
138    }
139
140    /// Get the number of requests received
141    pub fn request_count(&self) -> usize {
142        self.state.request_count()
143    }
144
145    /// Get all captured requests
146    pub fn get_requests(&self) -> Vec<CapturedRequest> {
147        self.state.get_requests()
148    }
149
150    /// Get all captured request bodies as strings (for JSON verification)
151    pub fn get_request_bodies_as_string(&self) -> Vec<String> {
152        self.state
153            .get_requests()
154            .into_iter()
155            .filter_map(|r| String::from_utf8(r.body).ok())
156            .collect()
157    }
158
159    /// Parse all captured request bodies as JSON
160    pub fn get_request_bodies_as_json(&self) -> Vec<serde_json::Value> {
161        self.get_request_bodies_as_string()
162            .into_iter()
163            .filter_map(|s| serde_json::from_str(&s).ok())
164            .collect()
165    }
166
167    /// Wait for at least N requests with a timeout
168    pub async fn wait_for_requests(&self, count: usize, timeout: std::time::Duration) -> bool {
169        let start = std::time::Instant::now();
170        while start.elapsed() < timeout {
171            if self.request_count() >= count {
172                return true;
173            }
174            tokio::time::sleep(std::time::Duration::from_millis(100)).await;
175        }
176        false
177    }
178}
179
180/// Handler for webhook requests - captures the body and returns 200 OK
181async fn handle_webhook(State(state): State<WebhookState>, body: Bytes) -> StatusCode {
182    state.add_request(body.to_vec());
183    state.next_status()
184}