Skip to main content

streamling_e2e/resources/
external_handler.rs

1//! External handler server resource for testing handler transforms.
2//!
3//! This module provides an HTTP server that can receive requests from external handler
4//! transforms and respond with modified data, similar to the test webhook in the unit tests.
5
6use crate::{E2eError, Result};
7use axum::body::Bytes;
8use axum::extract::State;
9use axum::http::StatusCode;
10use axum::routing::post;
11use axum::Router;
12use serde::{Deserialize, Serialize};
13use std::net::TcpListener;
14use std::path::PathBuf;
15use std::sync::{Arc, Mutex};
16use tokio::sync::oneshot;
17use tracing::info;
18
19/// A captured handler request
20#[derive(Debug, Clone, Serialize, Deserialize)]
21pub struct CapturedHandlerRequest {
22    /// The request body as string
23    pub body: String,
24    /// The endpoint that was called
25    pub endpoint: String,
26}
27
28/// Shared state for the handler server
29#[derive(Clone)]
30struct HandlerState {
31    requests: Arc<Mutex<Vec<CapturedHandlerRequest>>>,
32    capture_file: Option<PathBuf>,
33}
34
35impl HandlerState {
36    fn new(capture_file: Option<PathBuf>) -> Self {
37        Self {
38            requests: Arc::new(Mutex::new(Vec::new())),
39            capture_file,
40        }
41    }
42
43    fn add_request(&self, endpoint: &str, body: String) {
44        let request = CapturedHandlerRequest {
45            body: body.clone(),
46            endpoint: endpoint.to_string(),
47        };
48
49        let mut requests = self.requests.lock().unwrap();
50        requests.push(request.clone());
51
52        // Also write to file if configured
53        if let Some(ref path) = self.capture_file {
54            if let Ok(mut file) = std::fs::OpenOptions::new()
55                .create(true)
56                .append(true)
57                .open(path)
58            {
59                use std::io::Write;
60                let _ = writeln!(
61                    file,
62                    "{}",
63                    serde_json::to_string(&request).unwrap_or_default()
64                );
65            }
66        }
67    }
68
69    fn get_requests(&self) -> Vec<CapturedHandlerRequest> {
70        let requests = self.requests.lock().unwrap();
71        requests.clone()
72    }
73
74    fn request_count(&self) -> usize {
75        let requests = self.requests.lock().unwrap();
76        requests.len()
77    }
78}
79
80/// External handler server resource for testing handler transforms
81pub struct ExternalHandlerResource {
82    /// The base URL of the handler server (e.g., "http://127.0.0.1:8080")
83    pub url: String,
84    /// The port the server is listening on
85    pub port: u16,
86    /// Shared state containing captured requests
87    state: HandlerState,
88    /// Path to the capture file (if any)
89    pub capture_file: Option<PathBuf>,
90    /// Shutdown signal sender
91    #[allow(dead_code)]
92    shutdown_tx: Option<oneshot::Sender<()>>,
93}
94
95impl ExternalHandlerResource {
96    /// Start a new external handler server on an available port
97    pub async fn new() -> Result<Self> {
98        Self::with_capture_file(None).await
99    }
100
101    /// Start a new external handler server with request capture to a file
102    pub async fn with_capture_file(capture_file: Option<PathBuf>) -> Result<Self> {
103        // Find an available port
104        let listener = TcpListener::bind("127.0.0.1:0").map_err(E2eError::Io)?;
105        let port = listener.local_addr().map_err(E2eError::Io)?.port();
106        drop(listener); // Release the port so axum can bind to it
107
108        let state = HandlerState::new(capture_file.clone());
109        let state_clone = state.clone();
110
111        // Create the router with all handler endpoints
112        let app = Router::new()
113            // Single row handlers (one request per row)
114            .route("/handler_slim", post(handle_slim_single))
115            .route("/handler_slim_envelope", post(handle_slim_single_envelope))
116            // Batch handlers (one request for all rows)
117            .route("/handler_slim_batch", post(handle_slim_batch))
118            .route(
119                "/handler_slim_batch_envelope",
120                post(handle_slim_batch_envelope),
121            )
122            // Generic passthrough handler (captures but doesn't modify)
123            .route("/handler_passthrough", post(handle_passthrough))
124            .with_state(state_clone);
125
126        // Create shutdown channel
127        let (shutdown_tx, shutdown_rx) = oneshot::channel::<()>();
128
129        // Bind and start the server
130        let addr = format!("127.0.0.1:{}", port);
131        let listener = tokio::net::TcpListener::bind(&addr)
132            .await
133            .map_err(E2eError::Io)?;
134
135        info!("Starting external handler server at: {}", addr);
136
137        // Spawn the server
138        tokio::spawn(async move {
139            axum::serve(listener, app)
140                .with_graceful_shutdown(async move {
141                    let _ = shutdown_rx.await;
142                })
143                .await
144                .ok();
145        });
146
147        // Give the server a moment to start
148        tokio::time::sleep(std::time::Duration::from_millis(100)).await;
149
150        Ok(Self {
151            url: format!("http://127.0.0.1:{}", port),
152            port,
153            state,
154            capture_file,
155            shutdown_tx: Some(shutdown_tx),
156        })
157    }
158
159    /// Get the URL for the slim single-row handler
160    pub fn slim_handler_url(&self) -> String {
161        format!("{}/handler_slim", self.url)
162    }
163
164    /// Get the URL for the slim single-row handler with envelope
165    pub fn slim_handler_envelope_url(&self) -> String {
166        format!("{}/handler_slim_envelope", self.url)
167    }
168
169    /// Get the URL for the slim batch handler
170    pub fn slim_batch_handler_url(&self) -> String {
171        format!("{}/handler_slim_batch", self.url)
172    }
173
174    /// Get the URL for the slim batch handler with envelope
175    pub fn slim_batch_handler_envelope_url(&self) -> String {
176        format!("{}/handler_slim_batch_envelope", self.url)
177    }
178
179    /// Get the URL for the passthrough handler
180    pub fn passthrough_handler_url(&self) -> String {
181        format!("{}/handler_passthrough", self.url)
182    }
183
184    /// Get the number of requests received
185    pub fn request_count(&self) -> usize {
186        self.state.request_count()
187    }
188
189    /// Get all captured requests
190    pub fn get_requests(&self) -> Vec<CapturedHandlerRequest> {
191        self.state.get_requests()
192    }
193
194    /// Read captured requests from the file (if configured)
195    pub fn read_captured_file(&self) -> Result<Vec<CapturedHandlerRequest>> {
196        match &self.capture_file {
197            Some(path) => {
198                let content = std::fs::read_to_string(path).map_err(E2eError::Io)?;
199                let requests: Vec<CapturedHandlerRequest> = content
200                    .lines()
201                    .filter_map(|line| serde_json::from_str(line).ok())
202                    .collect();
203                Ok(requests)
204            }
205            None => Ok(Vec::new()),
206        }
207    }
208}
209
210// ============================================================================
211// Handler Implementations
212// ============================================================================
213
214/// Envelope format version 0 - flat JSON with _gs_op field
215#[derive(Debug, Deserialize, Serialize)]
216struct EnvelopeV0 {
217    id: String,
218    data: String,
219    #[serde(rename = "_gs_op")]
220    gs_op: String,
221}
222
223/// Metadata for envelope version 1
224#[derive(Debug, Deserialize, Serialize)]
225struct EnvelopeMetadata {
226    op: String,
227}
228
229/// Inner data for envelope version 1
230#[derive(Debug, Deserialize, Serialize)]
231struct EnvelopeV1Data {
232    id: String,
233    data: String,
234    #[serde(rename = "_gs_op")]
235    gs_op: String,
236}
237
238/// Envelope format version 1 - wrapped with metadata.op
239#[derive(Debug, Deserialize, Serialize)]
240struct EnvelopeV1 {
241    metadata: EnvelopeMetadata,
242    data: EnvelopeV1Data,
243}
244
245/// Handle single row requests without envelope (payload_version=0)
246/// Modifies the data field by adding "updated-" prefix
247async fn handle_slim_single(
248    State(state): State<HandlerState>,
249    body: Bytes,
250) -> (StatusCode, String) {
251    let body_str = String::from_utf8_lossy(&body).to_string();
252    state.add_request("handler_slim", body_str.clone());
253
254    // Parse and modify the data
255    match serde_json::from_slice::<EnvelopeV0>(&body) {
256        Ok(mut req) => {
257            req.data = format!("updated-{}", req.data);
258            let response = serde_json::to_string(&req).unwrap_or_default();
259            (StatusCode::OK, response)
260        }
261        Err(e) => {
262            tracing::error!("Failed to parse request: {} - body: {}", e, body_str);
263            (StatusCode::BAD_REQUEST, format!("Invalid JSON: {}", e))
264        }
265    }
266}
267
268/// Handle single row requests with envelope (payload_version=1)
269async fn handle_slim_single_envelope(
270    State(state): State<HandlerState>,
271    body: Bytes,
272) -> (StatusCode, String) {
273    let body_str = String::from_utf8_lossy(&body).to_string();
274    state.add_request("handler_slim_envelope", body_str.clone());
275
276    match serde_json::from_slice::<EnvelopeV1>(&body) {
277        Ok(mut req) => {
278            req.data.data = format!("updated-{}", req.data.data);
279            let response = serde_json::to_string(&req).unwrap_or_default();
280            (StatusCode::OK, response)
281        }
282        Err(e) => {
283            tracing::error!(
284                "Failed to parse envelope request: {} - body: {}",
285                e,
286                body_str
287            );
288            (StatusCode::BAD_REQUEST, format!("Invalid JSON: {}", e))
289        }
290    }
291}
292
293/// Handle batch requests without envelope (payload_version=0)
294async fn handle_slim_batch(State(state): State<HandlerState>, body: Bytes) -> (StatusCode, String) {
295    let body_str = String::from_utf8_lossy(&body).to_string();
296    state.add_request("handler_slim_batch", body_str.clone());
297
298    match serde_json::from_slice::<Vec<EnvelopeV0>>(&body) {
299        Ok(rows) => {
300            let updated: Vec<EnvelopeV0> = rows
301                .into_iter()
302                .map(|mut r| {
303                    r.data = format!("updated-{}", r.data);
304                    r
305                })
306                .collect();
307            let response = serde_json::to_string(&updated).unwrap_or_default();
308            (StatusCode::OK, response)
309        }
310        Err(e) => {
311            tracing::error!("Failed to parse batch request: {} - body: {}", e, body_str);
312            (StatusCode::BAD_REQUEST, format!("Invalid JSON: {}", e))
313        }
314    }
315}
316
317/// Handle batch requests with envelope (payload_version=1)
318async fn handle_slim_batch_envelope(
319    State(state): State<HandlerState>,
320    body: Bytes,
321) -> (StatusCode, String) {
322    let body_str = String::from_utf8_lossy(&body).to_string();
323    state.add_request("handler_slim_batch_envelope", body_str.clone());
324
325    match serde_json::from_slice::<Vec<EnvelopeV1>>(&body) {
326        Ok(rows) => {
327            let updated: Vec<EnvelopeV1> = rows
328                .into_iter()
329                .map(|mut r| {
330                    r.data.data = format!("updated-{}", r.data.data);
331                    r
332                })
333                .collect();
334            let response = serde_json::to_string(&updated).unwrap_or_default();
335            (StatusCode::OK, response)
336        }
337        Err(e) => {
338            tracing::error!(
339                "Failed to parse batch envelope request: {} - body: {}",
340                e,
341                body_str
342            );
343            (StatusCode::BAD_REQUEST, format!("Invalid JSON: {}", e))
344        }
345    }
346}
347
348/// Passthrough handler - captures request but returns it unchanged
349async fn handle_passthrough(
350    State(state): State<HandlerState>,
351    body: Bytes,
352) -> (StatusCode, String) {
353    let body_str = String::from_utf8_lossy(&body).to_string();
354    state.add_request("handler_passthrough", body_str.clone());
355    (StatusCode::OK, body_str)
356}