1use bytes::Bytes;
2use http::{HeaderMap, HeaderName, HeaderValue, Method, StatusCode};
3use hyper::server::conn::http1;
4use hyper_util::rt::TokioIo;
5use ranvier_core::transition::ResourceRequirement;
6use serde::Serialize;
7use serde::de::DeserializeOwned;
8use tokio::io::{AsyncReadExt, AsyncWriteExt};
9
10use crate::ingress::{HttpIngress, RawIngressService};
11
12#[derive(Debug, thiserror::Error)]
13pub enum TestHarnessError {
14 #[error("io error: {0}")]
15 Io(#[from] std::io::Error),
16 #[error("hyper error: {0}")]
17 Hyper(#[from] hyper::Error),
18 #[error("task join error: {0}")]
19 Join(#[from] tokio::task::JoinError),
20 #[error("invalid response: {0}")]
21 InvalidResponse(&'static str),
22 #[error("invalid utf-8 in response headers: {0}")]
23 Utf8(#[from] std::str::Utf8Error),
24 #[error("invalid status code text: {0}")]
25 InvalidStatus(#[from] std::num::ParseIntError),
26 #[error("invalid status code value: {0}")]
27 InvalidStatusCode(#[from] http::status::InvalidStatusCode),
28 #[error("invalid header name: {0}")]
29 InvalidHeaderName(#[from] http::header::InvalidHeaderName),
30 #[error("invalid header value: {0}")]
31 InvalidHeaderValue(#[from] http::header::InvalidHeaderValue),
32 #[error("json serialization error: {0}")]
33 Json(#[from] serde_json::Error),
34}
35
36#[derive(Clone)]
41pub struct TestApp<R> {
42 service: RawIngressService<R>,
43 host: String,
44}
45
46impl<R> TestApp<R>
47where
48 R: ResourceRequirement + Clone + Send + Sync + 'static,
49{
50 pub fn new(ingress: HttpIngress<R>, resources: R) -> Self {
51 Self {
52 service: ingress.into_raw_service(resources),
53 host: "test.local".to_string(),
54 }
55 }
56
57 pub fn with_host(mut self, host: impl Into<String>) -> Self {
58 self.host = host.into();
59 self
60 }
61
62 pub async fn send(&self, request: TestRequest) -> Result<TestResponse, TestHarnessError> {
63 let mut request_bytes = request.to_http1_bytes(&self.host);
64 let capacity = request_bytes.len().saturating_mul(2).max(16 * 1024);
65 let (mut client_io, server_io) = tokio::io::duplex(capacity);
66
67 let service = self.service.clone();
68 let server_task = tokio::spawn(async move {
69 http1::Builder::new()
70 .keep_alive(false)
71 .serve_connection(TokioIo::new(server_io), service)
72 .await
73 });
74
75 client_io.write_all(&request_bytes).await?;
76
77 let mut raw_response = Vec::new();
78 client_io.read_to_end(&mut raw_response).await?;
79
80 let response = TestResponse::from_http1_bytes(&raw_response)?;
81
82 let server_result = server_task.await?;
86 if let Err(error) = server_result {
87 if !error.is_incomplete_message() {
88 return Err(TestHarnessError::Hyper(error));
89 }
90 }
91
92 request_bytes.clear();
94
95 Ok(response)
96 }
97}
98
99#[derive(Clone, Debug)]
100pub struct TestRequest {
101 method: Method,
102 path: String,
103 headers: Vec<(String, String)>,
104 body: Bytes,
105}
106
107impl TestRequest {
108 pub fn new(method: Method, path: impl Into<String>) -> Self {
109 Self {
110 method,
111 path: path.into(),
112 headers: Vec::new(),
113 body: Bytes::new(),
114 }
115 }
116
117 pub fn get(path: impl Into<String>) -> Self {
118 Self::new(Method::GET, path)
119 }
120
121 pub fn post(path: impl Into<String>) -> Self {
122 Self::new(Method::POST, path)
123 }
124
125 pub fn put(path: impl Into<String>) -> Self {
126 Self::new(Method::PUT, path)
127 }
128
129 pub fn delete(path: impl Into<String>) -> Self {
130 Self::new(Method::DELETE, path)
131 }
132
133 pub fn patch(path: impl Into<String>) -> Self {
134 Self::new(Method::PATCH, path)
135 }
136
137 pub fn header(mut self, name: impl Into<String>, value: impl Into<String>) -> Self {
138 self.headers.push((name.into(), value.into()));
139 self
140 }
141
142 pub fn body(mut self, body: impl Into<Bytes>) -> Self {
143 self.body = body.into();
144 self
145 }
146
147 pub fn text(mut self, body: impl Into<String>) -> Self {
148 self.body = Bytes::from(body.into());
149 self
150 }
151
152 pub fn json<T: Serialize>(mut self, payload: &T) -> Result<Self, TestHarnessError> {
153 self.body = Bytes::from(serde_json::to_vec(payload)?);
154 self.headers
155 .push(("content-type".to_string(), "application/json".to_string()));
156 Ok(self)
157 }
158
159 fn to_http1_bytes(&self, host: &str) -> Vec<u8> {
160 let path = if self.path.is_empty() {
161 "/"
162 } else {
163 &self.path
164 };
165
166 let mut has_host = false;
167 let mut has_connection = false;
168 let mut has_content_length = false;
169
170 for (name, _) in &self.headers {
171 let lower = name.to_ascii_lowercase();
172 if lower == "host" {
173 has_host = true;
174 } else if lower == "connection" {
175 has_connection = true;
176 } else if lower == "content-length" {
177 has_content_length = true;
178 }
179 }
180
181 let mut output = format!("{} {} HTTP/1.1\r\n", self.method, path);
182
183 if !has_host {
184 output.push_str(&format!("Host: {host}\r\n"));
185 }
186 if !has_connection {
187 output.push_str("Connection: close\r\n");
188 }
189 if !has_content_length {
190 output.push_str(&format!("Content-Length: {}\r\n", self.body.len()));
191 }
192
193 for (name, value) in &self.headers {
194 output.push_str(name);
195 output.push_str(": ");
196 output.push_str(value);
197 output.push_str("\r\n");
198 }
199
200 output.push_str("\r\n");
201
202 let mut bytes = output.into_bytes();
203 bytes.extend_from_slice(&self.body);
204 bytes
205 }
206}
207
208#[derive(Clone, Debug)]
209pub struct TestResponse {
210 status: StatusCode,
211 headers: HeaderMap,
212 body: Bytes,
213}
214
215impl TestResponse {
216 fn from_http1_bytes(raw: &[u8]) -> Result<Self, TestHarnessError> {
217 let delimiter = b"\r\n\r\n";
218 let header_end = raw
219 .windows(delimiter.len())
220 .position(|window| window == delimiter)
221 .ok_or(TestHarnessError::InvalidResponse(
222 "missing HTTP header delimiter",
223 ))?;
224
225 let header_text = std::str::from_utf8(&raw[..header_end])?;
226 let mut lines = header_text.split("\r\n");
227
228 let status_line = lines
229 .next()
230 .ok_or(TestHarnessError::InvalidResponse("missing status line"))?;
231 let mut status_parts = status_line.split_whitespace();
232 let _http_version = status_parts
233 .next()
234 .ok_or(TestHarnessError::InvalidResponse("missing HTTP version"))?;
235 let status_code = status_parts
236 .next()
237 .ok_or(TestHarnessError::InvalidResponse("missing status code"))?
238 .parse::<u16>()?;
239 let status = StatusCode::from_u16(status_code)?;
240
241 let mut headers = HeaderMap::new();
242 for line in lines {
243 if line.is_empty() {
244 continue;
245 }
246 let (name, value) = line
247 .split_once(':')
248 .ok_or(TestHarnessError::InvalidResponse("malformed header line"))?;
249 let name = HeaderName::from_bytes(name.trim().as_bytes())?;
250 let value = HeaderValue::from_str(value.trim())?;
251 headers.append(name, value);
252 }
253
254 let body = Bytes::copy_from_slice(&raw[(header_end + delimiter.len())..]);
255
256 Ok(Self {
257 status,
258 headers,
259 body,
260 })
261 }
262
263 pub fn status(&self) -> StatusCode {
264 self.status
265 }
266
267 pub fn headers(&self) -> &HeaderMap {
268 &self.headers
269 }
270
271 pub fn header(&self, name: &str) -> Option<&HeaderValue> {
272 self.headers.get(name)
273 }
274
275 pub fn body(&self) -> &[u8] {
276 &self.body
277 }
278
279 pub fn text(&self) -> Result<&str, std::str::Utf8Error> {
280 std::str::from_utf8(&self.body)
281 }
282
283 pub fn json<T: DeserializeOwned>(&self) -> Result<T, serde_json::Error> {
284 serde_json::from_slice(&self.body)
285 }
286}
287
288#[cfg(test)]
289mod tests {
290 use super::*;
291 use ranvier_core::{Outcome, Transition};
292 use ranvier_runtime::Axon;
293
294 #[derive(Clone)]
295 struct Ping;
296
297 #[async_trait::async_trait]
298 impl Transition<(), String> for Ping {
299 type Error = String;
300 type Resources = ();
301
302 async fn run(
303 &self,
304 _state: (),
305 _resources: &Self::Resources,
306 _bus: &mut ranvier_core::Bus,
307 ) -> Outcome<String, Self::Error> {
308 Outcome::next("pong".to_string())
309 }
310 }
311
312 #[tokio::test]
313 async fn test_app_executes_route_without_network_socket() {
314 let ingress = crate::Ranvier::http::<()>().get(
315 "/ping",
316 Axon::<(), (), String, ()>::new("Ping").then(Ping),
317 );
318 let app = TestApp::new(ingress, ());
319
320 let response = app
321 .send(TestRequest::get("/ping"))
322 .await
323 .expect("test request should succeed");
324
325 assert_eq!(response.status(), StatusCode::OK);
326 assert_eq!(response.text().expect("utf8 body"), "pong");
327 }
328}