streamling_e2e/resources/
external_handler.rs1use crate::{E2eError, Result};
7use axum::body::Bytes;
8use axum::extract::State;
9use axum::http::StatusCode;
10use axum::routing::post;
11use axum::Router;
12use serde::{Deserialize, Serialize};
13use std::net::TcpListener;
14use std::path::PathBuf;
15use std::sync::{Arc, Mutex};
16use tokio::sync::oneshot;
17use tracing::info;
18
19#[derive(Debug, Clone, Serialize, Deserialize)]
21pub struct CapturedHandlerRequest {
22 pub body: String,
24 pub endpoint: String,
26}
27
28#[derive(Clone)]
30struct HandlerState {
31 requests: Arc<Mutex<Vec<CapturedHandlerRequest>>>,
32 capture_file: Option<PathBuf>,
33}
34
35impl HandlerState {
36 fn new(capture_file: Option<PathBuf>) -> Self {
37 Self {
38 requests: Arc::new(Mutex::new(Vec::new())),
39 capture_file,
40 }
41 }
42
43 fn add_request(&self, endpoint: &str, body: String) {
44 let request = CapturedHandlerRequest {
45 body: body.clone(),
46 endpoint: endpoint.to_string(),
47 };
48
49 let mut requests = self.requests.lock().unwrap();
50 requests.push(request.clone());
51
52 if let Some(ref path) = self.capture_file {
54 if let Ok(mut file) = std::fs::OpenOptions::new()
55 .create(true)
56 .append(true)
57 .open(path)
58 {
59 use std::io::Write;
60 let _ = writeln!(
61 file,
62 "{}",
63 serde_json::to_string(&request).unwrap_or_default()
64 );
65 }
66 }
67 }
68
69 fn get_requests(&self) -> Vec<CapturedHandlerRequest> {
70 let requests = self.requests.lock().unwrap();
71 requests.clone()
72 }
73
74 fn request_count(&self) -> usize {
75 let requests = self.requests.lock().unwrap();
76 requests.len()
77 }
78}
79
80pub struct ExternalHandlerResource {
82 pub url: String,
84 pub port: u16,
86 state: HandlerState,
88 pub capture_file: Option<PathBuf>,
90 #[allow(dead_code)]
92 shutdown_tx: Option<oneshot::Sender<()>>,
93}
94
95impl ExternalHandlerResource {
96 pub async fn new() -> Result<Self> {
98 Self::with_capture_file(None).await
99 }
100
101 pub async fn with_capture_file(capture_file: Option<PathBuf>) -> Result<Self> {
103 let listener = TcpListener::bind("127.0.0.1:0").map_err(E2eError::Io)?;
105 let port = listener.local_addr().map_err(E2eError::Io)?.port();
106 drop(listener); let state = HandlerState::new(capture_file.clone());
109 let state_clone = state.clone();
110
111 let app = Router::new()
113 .route("/handler_slim", post(handle_slim_single))
115 .route("/handler_slim_envelope", post(handle_slim_single_envelope))
116 .route("/handler_slim_batch", post(handle_slim_batch))
118 .route(
119 "/handler_slim_batch_envelope",
120 post(handle_slim_batch_envelope),
121 )
122 .route("/handler_passthrough", post(handle_passthrough))
124 .with_state(state_clone);
125
126 let (shutdown_tx, shutdown_rx) = oneshot::channel::<()>();
128
129 let addr = format!("127.0.0.1:{}", port);
131 let listener = tokio::net::TcpListener::bind(&addr)
132 .await
133 .map_err(E2eError::Io)?;
134
135 info!("Starting external handler server at: {}", addr);
136
137 tokio::spawn(async move {
139 axum::serve(listener, app)
140 .with_graceful_shutdown(async move {
141 let _ = shutdown_rx.await;
142 })
143 .await
144 .ok();
145 });
146
147 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
149
150 Ok(Self {
151 url: format!("http://127.0.0.1:{}", port),
152 port,
153 state,
154 capture_file,
155 shutdown_tx: Some(shutdown_tx),
156 })
157 }
158
159 pub fn slim_handler_url(&self) -> String {
161 format!("{}/handler_slim", self.url)
162 }
163
164 pub fn slim_handler_envelope_url(&self) -> String {
166 format!("{}/handler_slim_envelope", self.url)
167 }
168
169 pub fn slim_batch_handler_url(&self) -> String {
171 format!("{}/handler_slim_batch", self.url)
172 }
173
174 pub fn slim_batch_handler_envelope_url(&self) -> String {
176 format!("{}/handler_slim_batch_envelope", self.url)
177 }
178
179 pub fn passthrough_handler_url(&self) -> String {
181 format!("{}/handler_passthrough", self.url)
182 }
183
184 pub fn request_count(&self) -> usize {
186 self.state.request_count()
187 }
188
189 pub fn get_requests(&self) -> Vec<CapturedHandlerRequest> {
191 self.state.get_requests()
192 }
193
194 pub fn read_captured_file(&self) -> Result<Vec<CapturedHandlerRequest>> {
196 match &self.capture_file {
197 Some(path) => {
198 let content = std::fs::read_to_string(path).map_err(E2eError::Io)?;
199 let requests: Vec<CapturedHandlerRequest> = content
200 .lines()
201 .filter_map(|line| serde_json::from_str(line).ok())
202 .collect();
203 Ok(requests)
204 }
205 None => Ok(Vec::new()),
206 }
207 }
208}
209
210#[derive(Debug, Deserialize, Serialize)]
216struct EnvelopeV0 {
217 id: String,
218 data: String,
219 #[serde(rename = "_gs_op")]
220 gs_op: String,
221}
222
223#[derive(Debug, Deserialize, Serialize)]
225struct EnvelopeMetadata {
226 op: String,
227}
228
229#[derive(Debug, Deserialize, Serialize)]
231struct EnvelopeV1Data {
232 id: String,
233 data: String,
234 #[serde(rename = "_gs_op")]
235 gs_op: String,
236}
237
238#[derive(Debug, Deserialize, Serialize)]
240struct EnvelopeV1 {
241 metadata: EnvelopeMetadata,
242 data: EnvelopeV1Data,
243}
244
245async fn handle_slim_single(
248 State(state): State<HandlerState>,
249 body: Bytes,
250) -> (StatusCode, String) {
251 let body_str = String::from_utf8_lossy(&body).to_string();
252 state.add_request("handler_slim", body_str.clone());
253
254 match serde_json::from_slice::<EnvelopeV0>(&body) {
256 Ok(mut req) => {
257 req.data = format!("updated-{}", req.data);
258 let response = serde_json::to_string(&req).unwrap_or_default();
259 (StatusCode::OK, response)
260 }
261 Err(e) => {
262 tracing::error!("Failed to parse request: {} - body: {}", e, body_str);
263 (StatusCode::BAD_REQUEST, format!("Invalid JSON: {}", e))
264 }
265 }
266}
267
268async fn handle_slim_single_envelope(
270 State(state): State<HandlerState>,
271 body: Bytes,
272) -> (StatusCode, String) {
273 let body_str = String::from_utf8_lossy(&body).to_string();
274 state.add_request("handler_slim_envelope", body_str.clone());
275
276 match serde_json::from_slice::<EnvelopeV1>(&body) {
277 Ok(mut req) => {
278 req.data.data = format!("updated-{}", req.data.data);
279 let response = serde_json::to_string(&req).unwrap_or_default();
280 (StatusCode::OK, response)
281 }
282 Err(e) => {
283 tracing::error!(
284 "Failed to parse envelope request: {} - body: {}",
285 e,
286 body_str
287 );
288 (StatusCode::BAD_REQUEST, format!("Invalid JSON: {}", e))
289 }
290 }
291}
292
293async fn handle_slim_batch(State(state): State<HandlerState>, body: Bytes) -> (StatusCode, String) {
295 let body_str = String::from_utf8_lossy(&body).to_string();
296 state.add_request("handler_slim_batch", body_str.clone());
297
298 match serde_json::from_slice::<Vec<EnvelopeV0>>(&body) {
299 Ok(rows) => {
300 let updated: Vec<EnvelopeV0> = rows
301 .into_iter()
302 .map(|mut r| {
303 r.data = format!("updated-{}", r.data);
304 r
305 })
306 .collect();
307 let response = serde_json::to_string(&updated).unwrap_or_default();
308 (StatusCode::OK, response)
309 }
310 Err(e) => {
311 tracing::error!("Failed to parse batch request: {} - body: {}", e, body_str);
312 (StatusCode::BAD_REQUEST, format!("Invalid JSON: {}", e))
313 }
314 }
315}
316
317async fn handle_slim_batch_envelope(
319 State(state): State<HandlerState>,
320 body: Bytes,
321) -> (StatusCode, String) {
322 let body_str = String::from_utf8_lossy(&body).to_string();
323 state.add_request("handler_slim_batch_envelope", body_str.clone());
324
325 match serde_json::from_slice::<Vec<EnvelopeV1>>(&body) {
326 Ok(rows) => {
327 let updated: Vec<EnvelopeV1> = rows
328 .into_iter()
329 .map(|mut r| {
330 r.data.data = format!("updated-{}", r.data.data);
331 r
332 })
333 .collect();
334 let response = serde_json::to_string(&updated).unwrap_or_default();
335 (StatusCode::OK, response)
336 }
337 Err(e) => {
338 tracing::error!(
339 "Failed to parse batch envelope request: {} - body: {}",
340 e,
341 body_str
342 );
343 (StatusCode::BAD_REQUEST, format!("Invalid JSON: {}", e))
344 }
345 }
346}
347
348async fn handle_passthrough(
350 State(state): State<HandlerState>,
351 body: Bytes,
352) -> (StatusCode, String) {
353 let body_str = String::from_utf8_lossy(&body).to_string();
354 state.add_request("handler_passthrough", body_str.clone());
355 (StatusCode::OK, body_str)
356}