streamling_e2e/resources/
webhook.rs1use crate::{E2eError, Result};
7use axum::body::Bytes;
8use axum::extract::State;
9use axum::http::StatusCode;
10use axum::routing::post;
11use axum::Router;
12use std::net::TcpListener;
13use std::sync::{Arc, Mutex};
14use tokio::sync::oneshot;
15use tracing::info;
16
17#[derive(Debug, Clone)]
19pub struct CapturedRequest {
20 pub body: Vec<u8>,
22}
23
24#[derive(Clone)]
26struct WebhookState {
27 requests: Arc<Mutex<Vec<CapturedRequest>>>,
28 response_plan: Arc<Mutex<Vec<StatusCode>>>,
29 next_response_index: Arc<Mutex<usize>>,
30}
31
32impl WebhookState {
33 fn with_response_plan(response_plan: Vec<StatusCode>) -> Self {
34 Self {
35 requests: Arc::new(Mutex::new(Vec::new())),
36 response_plan: Arc::new(Mutex::new(response_plan)),
37 next_response_index: Arc::new(Mutex::new(0)),
38 }
39 }
40
41 fn add_request(&self, body: Vec<u8>) {
42 let mut requests = self.requests.lock().unwrap();
43 requests.push(CapturedRequest { body });
44 }
45
46 fn get_requests(&self) -> Vec<CapturedRequest> {
47 let requests = self.requests.lock().unwrap();
48 requests.clone()
49 }
50
51 fn request_count(&self) -> usize {
52 let requests = self.requests.lock().unwrap();
53 requests.len()
54 }
55
56 fn next_status(&self) -> StatusCode {
57 let response_plan = self.response_plan.lock().unwrap();
58 let mut next_response_index = self.next_response_index.lock().unwrap();
59 let status = response_plan
60 .get(*next_response_index)
61 .copied()
62 .unwrap_or(StatusCode::OK);
63 *next_response_index += 1;
64 status
65 }
66}
67
68pub struct WebhookResource {
70 pub url: String,
72 pub port: u16,
74 state: WebhookState,
76 #[allow(dead_code)]
78 shutdown_tx: Option<oneshot::Sender<()>>,
79}
80
81impl WebhookResource {
82 pub async fn new() -> Result<Self> {
84 Self::new_with_response_plan(Vec::new()).await
85 }
86
87 pub async fn new_with_response_plan(response_plan: Vec<StatusCode>) -> Result<Self> {
89 let listener = TcpListener::bind("127.0.0.1:0").map_err(E2eError::Io)?;
91 let port = listener.local_addr().map_err(E2eError::Io)?.port();
92 drop(listener); let state = WebhookState::with_response_plan(response_plan);
95 let state_clone = state.clone();
96
97 let app = Router::new()
99 .route("/webhook", post(handle_webhook))
100 .route("/webhook/{path}", post(handle_webhook))
101 .with_state(state_clone);
102
103 let (shutdown_tx, shutdown_rx) = oneshot::channel::<()>();
105
106 let addr = format!("127.0.0.1:{}", port);
108 let listener = tokio::net::TcpListener::bind(&addr)
109 .await
110 .map_err(E2eError::Io)?;
111
112 info!("Starting webhook server at: {}", addr);
113
114 tokio::spawn(async move {
116 axum::serve(listener, app)
117 .with_graceful_shutdown(async move {
118 let _ = shutdown_rx.await;
119 })
120 .await
121 .ok();
122 });
123
124 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
126
127 Ok(Self {
128 url: format!("http://127.0.0.1:{}", port),
129 port,
130 state,
131 shutdown_tx: Some(shutdown_tx),
132 })
133 }
134
135 pub fn webhook_url(&self) -> String {
137 format!("{}/webhook", self.url)
138 }
139
140 pub fn request_count(&self) -> usize {
142 self.state.request_count()
143 }
144
145 pub fn get_requests(&self) -> Vec<CapturedRequest> {
147 self.state.get_requests()
148 }
149
150 pub fn get_request_bodies_as_string(&self) -> Vec<String> {
152 self.state
153 .get_requests()
154 .into_iter()
155 .filter_map(|r| String::from_utf8(r.body).ok())
156 .collect()
157 }
158
159 pub fn get_request_bodies_as_json(&self) -> Vec<serde_json::Value> {
161 self.get_request_bodies_as_string()
162 .into_iter()
163 .filter_map(|s| serde_json::from_str(&s).ok())
164 .collect()
165 }
166
167 pub async fn wait_for_requests(&self, count: usize, timeout: std::time::Duration) -> bool {
169 let start = std::time::Instant::now();
170 while start.elapsed() < timeout {
171 if self.request_count() >= count {
172 return true;
173 }
174 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
175 }
176 false
177 }
178}
179
180async fn handle_webhook(State(state): State<WebhookState>, body: Bytes) -> StatusCode {
182 state.add_request(body.to_vec());
183 state.next_status()
184}