Skip to main content

ranvier_http/
test_harness.rs

1use bytes::Bytes;
2use http::{HeaderMap, HeaderName, HeaderValue, Method, StatusCode};
3use hyper::server::conn::http1;
4use hyper_util::rt::TokioIo;
5use ranvier_core::transition::ResourceRequirement;
6use serde::Serialize;
7use serde::de::DeserializeOwned;
8use tokio::io::{AsyncReadExt, AsyncWriteExt};
9
10use crate::ingress::{HttpIngress, RawIngressService};
11
12#[derive(Debug, thiserror::Error)]
13pub enum TestHarnessError {
14    #[error("io error: {0}")]
15    Io(#[from] std::io::Error),
16    #[error("hyper error: {0}")]
17    Hyper(#[from] hyper::Error),
18    #[error("task join error: {0}")]
19    Join(#[from] tokio::task::JoinError),
20    #[error("invalid response: {0}")]
21    InvalidResponse(&'static str),
22    #[error("invalid utf-8 in response headers: {0}")]
23    Utf8(#[from] std::str::Utf8Error),
24    #[error("invalid status code text: {0}")]
25    InvalidStatus(#[from] std::num::ParseIntError),
26    #[error("invalid status code value: {0}")]
27    InvalidStatusCode(#[from] http::status::InvalidStatusCode),
28    #[error("invalid header name: {0}")]
29    InvalidHeaderName(#[from] http::header::InvalidHeaderName),
30    #[error("invalid header value: {0}")]
31    InvalidHeaderValue(#[from] http::header::InvalidHeaderValue),
32    #[error("json serialization error: {0}")]
33    Json(#[from] serde_json::Error),
34}
35
36/// In-process HTTP test harness for `HttpIngress`.
37///
38/// Uses an in-memory duplex stream and Hyper HTTP/1.1 server connection,
39/// so no TCP socket/network bind is required.
40#[derive(Clone)]
41pub struct TestApp<R> {
42    service: RawIngressService<R>,
43    host: String,
44}
45
46impl<R> TestApp<R>
47where
48    R: ResourceRequirement + Clone + Send + Sync + 'static,
49{
50    pub fn new(ingress: HttpIngress<R>, resources: R) -> Self {
51        Self {
52            service: ingress.into_raw_service(resources),
53            host: "test.local".to_string(),
54        }
55    }
56
57    pub fn with_host(mut self, host: impl Into<String>) -> Self {
58        self.host = host.into();
59        self
60    }
61
62    pub async fn send(&self, request: TestRequest) -> Result<TestResponse, TestHarnessError> {
63        let mut request_bytes = request.to_http1_bytes(&self.host);
64        let capacity = request_bytes.len().saturating_mul(2).max(16 * 1024);
65        let (mut client_io, server_io) = tokio::io::duplex(capacity);
66
67        let service = self.service.clone();
68        let server_task = tokio::spawn(async move {
69            http1::Builder::new()
70                .keep_alive(false)
71                .serve_connection(TokioIo::new(server_io), service)
72                .await
73        });
74
75        client_io.write_all(&request_bytes).await?;
76
77        let mut raw_response = Vec::new();
78        client_io.read_to_end(&mut raw_response).await?;
79
80        let response = TestResponse::from_http1_bytes(&raw_response)?;
81
82        // Connection close races in in-memory duplex mode can surface as
83        // IncompleteMessage after a valid response is already produced.
84        // Treat that specific case as non-fatal for test harness usage.
85        let server_result = server_task.await?;
86        if let Err(error) = server_result {
87            if !error.is_incomplete_message() {
88                return Err(TestHarnessError::Hyper(error));
89            }
90        }
91
92        // Avoid keeping oversized request buffer alive longer than needed.
93        request_bytes.clear();
94
95        Ok(response)
96    }
97}
98
99#[derive(Clone, Debug)]
100pub struct TestRequest {
101    method: Method,
102    path: String,
103    headers: Vec<(String, String)>,
104    body: Bytes,
105}
106
107impl TestRequest {
108    pub fn new(method: Method, path: impl Into<String>) -> Self {
109        Self {
110            method,
111            path: path.into(),
112            headers: Vec::new(),
113            body: Bytes::new(),
114        }
115    }
116
117    pub fn get(path: impl Into<String>) -> Self {
118        Self::new(Method::GET, path)
119    }
120
121    pub fn post(path: impl Into<String>) -> Self {
122        Self::new(Method::POST, path)
123    }
124
125    pub fn put(path: impl Into<String>) -> Self {
126        Self::new(Method::PUT, path)
127    }
128
129    pub fn delete(path: impl Into<String>) -> Self {
130        Self::new(Method::DELETE, path)
131    }
132
133    pub fn patch(path: impl Into<String>) -> Self {
134        Self::new(Method::PATCH, path)
135    }
136
137    pub fn header(mut self, name: impl Into<String>, value: impl Into<String>) -> Self {
138        self.headers.push((name.into(), value.into()));
139        self
140    }
141
142    pub fn body(mut self, body: impl Into<Bytes>) -> Self {
143        self.body = body.into();
144        self
145    }
146
147    pub fn text(mut self, body: impl Into<String>) -> Self {
148        self.body = Bytes::from(body.into());
149        self
150    }
151
152    pub fn json<T: Serialize>(mut self, payload: &T) -> Result<Self, TestHarnessError> {
153        self.body = Bytes::from(serde_json::to_vec(payload)?);
154        self.headers
155            .push(("content-type".to_string(), "application/json".to_string()));
156        Ok(self)
157    }
158
159    fn to_http1_bytes(&self, host: &str) -> Vec<u8> {
160        let path = if self.path.is_empty() {
161            "/"
162        } else {
163            &self.path
164        };
165
166        let mut has_host = false;
167        let mut has_connection = false;
168        let mut has_content_length = false;
169
170        for (name, _) in &self.headers {
171            let lower = name.to_ascii_lowercase();
172            if lower == "host" {
173                has_host = true;
174            } else if lower == "connection" {
175                has_connection = true;
176            } else if lower == "content-length" {
177                has_content_length = true;
178            }
179        }
180
181        let mut output = format!("{} {} HTTP/1.1\r\n", self.method, path);
182
183        if !has_host {
184            output.push_str(&format!("Host: {host}\r\n"));
185        }
186        if !has_connection {
187            output.push_str("Connection: close\r\n");
188        }
189        if !has_content_length {
190            output.push_str(&format!("Content-Length: {}\r\n", self.body.len()));
191        }
192
193        for (name, value) in &self.headers {
194            output.push_str(name);
195            output.push_str(": ");
196            output.push_str(value);
197            output.push_str("\r\n");
198        }
199
200        output.push_str("\r\n");
201
202        let mut bytes = output.into_bytes();
203        bytes.extend_from_slice(&self.body);
204        bytes
205    }
206}
207
208#[derive(Clone, Debug)]
209pub struct TestResponse {
210    status: StatusCode,
211    headers: HeaderMap,
212    body: Bytes,
213}
214
215impl TestResponse {
216    fn from_http1_bytes(raw: &[u8]) -> Result<Self, TestHarnessError> {
217        let delimiter = b"\r\n\r\n";
218        let header_end = raw
219            .windows(delimiter.len())
220            .position(|window| window == delimiter)
221            .ok_or(TestHarnessError::InvalidResponse(
222                "missing HTTP header delimiter",
223            ))?;
224
225        let header_text = std::str::from_utf8(&raw[..header_end])?;
226        let mut lines = header_text.split("\r\n");
227
228        let status_line = lines
229            .next()
230            .ok_or(TestHarnessError::InvalidResponse("missing status line"))?;
231        let mut status_parts = status_line.split_whitespace();
232        let _http_version = status_parts
233            .next()
234            .ok_or(TestHarnessError::InvalidResponse("missing HTTP version"))?;
235        let status_code = status_parts
236            .next()
237            .ok_or(TestHarnessError::InvalidResponse("missing status code"))?
238            .parse::<u16>()?;
239        let status = StatusCode::from_u16(status_code)?;
240
241        let mut headers = HeaderMap::new();
242        for line in lines {
243            if line.is_empty() {
244                continue;
245            }
246            let (name, value) = line
247                .split_once(':')
248                .ok_or(TestHarnessError::InvalidResponse("malformed header line"))?;
249            let name = HeaderName::from_bytes(name.trim().as_bytes())?;
250            let value = HeaderValue::from_str(value.trim())?;
251            headers.append(name, value);
252        }
253
254        let body = Bytes::copy_from_slice(&raw[(header_end + delimiter.len())..]);
255
256        Ok(Self {
257            status,
258            headers,
259            body,
260        })
261    }
262
263    pub fn status(&self) -> StatusCode {
264        self.status
265    }
266
267    pub fn headers(&self) -> &HeaderMap {
268        &self.headers
269    }
270
271    pub fn header(&self, name: &str) -> Option<&HeaderValue> {
272        self.headers.get(name)
273    }
274
275    pub fn body(&self) -> &[u8] {
276        &self.body
277    }
278
279    pub fn text(&self) -> Result<&str, std::str::Utf8Error> {
280        std::str::from_utf8(&self.body)
281    }
282
283    pub fn json<T: DeserializeOwned>(&self) -> Result<T, serde_json::Error> {
284        serde_json::from_slice(&self.body)
285    }
286}
287
288#[cfg(test)]
289mod tests {
290    use super::*;
291    use ranvier_core::{Outcome, Transition};
292    use ranvier_runtime::Axon;
293
294    #[derive(Clone)]
295    struct Ping;
296
297    #[async_trait::async_trait]
298    impl Transition<(), String> for Ping {
299        type Error = String;
300        type Resources = ();
301
302        async fn run(
303            &self,
304            _state: (),
305            _resources: &Self::Resources,
306            _bus: &mut ranvier_core::Bus,
307        ) -> Outcome<String, Self::Error> {
308            Outcome::next("pong".to_string())
309        }
310    }
311
312    #[tokio::test]
313    async fn test_app_executes_route_without_network_socket() {
314        let ingress = crate::Ranvier::http::<()>().get(
315            "/ping",
316            Axon::<(), (), String, ()>::new("Ping").then(Ping),
317        );
318        let app = TestApp::new(ingress, ());
319
320        let response = app
321            .send(TestRequest::get("/ping"))
322            .await
323            .expect("test request should succeed");
324
325        assert_eq!(response.status(), StatusCode::OK);
326        assert_eq!(response.text().expect("utf8 body"), "pong");
327    }
328}