http_service_lambda/
lib.rs1#![forbid(future_incompatible, rust_2018_idioms)]
24#![deny(missing_debug_implementations, nonstandard_style)]
25#![warn(missing_docs, missing_doc_code_examples)]
26#![cfg_attr(test, deny(warnings))]
27
28use futures::{FutureExt, TryFutureExt};
29use http_service::{Body as HttpBody, HttpService, Request as HttpRequest};
30use lambda_http::{Body as LambdaBody, Handler, Request as LambdaHttpRequest};
31use lambda_runtime::{error::HandlerError, Context};
32use std::future::Future;
33use std::sync::Arc;
34use tokio::runtime::Runtime as TokioRuntime;
35
36type LambdaResponse = lambda_http::Response<lambda_http::Body>;
37
38trait ResultExt<OK, ERR> {
39 fn handler_error(self, description: &str) -> Result<OK, HandlerError>;
40}
41
42impl<OK, ERR> ResultExt<OK, ERR> for Result<OK, ERR> {
43 fn handler_error(self, description: &str) -> Result<OK, HandlerError> {
44 self.map_err(|_| HandlerError::from(description))
45 }
46}
47
48trait CompatHttpBodyAsLambda {
49 fn into_lambda(self) -> LambdaBody;
50}
51
52impl CompatHttpBodyAsLambda for Vec<u8> {
53 fn into_lambda(self) -> LambdaBody {
54 if self.is_empty() {
55 return LambdaBody::Empty;
56 }
57 match String::from_utf8(self) {
58 Ok(s) => LambdaBody::from(s),
59 Err(e) => LambdaBody::from(e.into_bytes()),
60 }
61 }
62}
63
64struct Server<S> {
65 service: Arc<S>,
66 rt: TokioRuntime,
67}
68
69impl<S> Server<S>
70where
71 S: HttpService,
72{
73 fn new(s: S) -> Server<S> {
74 Server {
75 service: Arc::new(s),
76 rt: tokio::runtime::Runtime::new().expect("failed to start new Runtime"),
77 }
78 }
79
80 fn serve(
81 &self,
82 req: LambdaHttpRequest,
83 ) -> impl Future<Output = Result<LambdaResponse, HandlerError>> {
84 let service = self.service.clone();
85 async move {
86 let req: HttpRequest = req.map(|b| HttpBody::from(b.as_ref()));
87 let mut connection = service
88 .connect()
89 .into_future()
90 .await
91 .handler_error("connect")?;
92 let (parts, body) = service
93 .respond(&mut connection, req)
94 .into_future()
95 .await
96 .handler_error("respond")?
97 .into_parts();
98 let resp = LambdaResponse::from_parts(
99 parts,
100 body.into_vec().await.handler_error("body")?.into_lambda(),
101 );
102 Ok(resp)
103 }
104 }
105}
106
107impl<S> Handler<LambdaResponse> for Server<S>
108where
109 S: HttpService,
110{
111 fn run(
112 &mut self,
113 req: LambdaHttpRequest,
114 _ctx: Context,
115 ) -> Result<LambdaResponse, HandlerError> {
116 self.rt.block_on(self.serve(req).boxed().compat())
119 }
120}
121
122pub fn run<S: HttpService>(s: S) {
125 let server = Server::new(s);
126 lambda_http::start(server, None);
128}
129
130#[cfg(test)]
131mod tests {
132 use super::*;
133 use futures::future;
134
135 struct DummyService;
136
137 impl HttpService for DummyService {
138 type Connection = ();
139 type ConnectionFuture = future::Ready<Result<(), ()>>;
140 type ResponseFuture = future::BoxFuture<'static, Result<http_service::Response, ()>>;
141 fn connect(&self) -> Self::ConnectionFuture {
142 future::ok(())
143 }
144 fn respond(&self, _conn: &mut (), _req: http_service::Request) -> Self::ResponseFuture {
145 Box::pin(async move { Ok(http_service::Response::new(http_service::Body::empty())) })
146 }
147 }
148
149 #[test]
150 fn handle_apigw_request() {
151 let input = include_str!("../tests/data/apigw_proxy_request.json");
154 let request = lambda_http::request::from_str(input).unwrap();
155 let mut handler = Server::new(DummyService);
156 let result = handler.run(request, Context::default());
157 assert!(
158 result.is_ok(),
159 format!("event was not handled as expected {:?}", result)
160 );
161 }
162
163 #[test]
164 fn handle_alb_request() {
165 let input = include_str!("../tests/data/alb_request.json");
168 let request = lambda_http::request::from_str(input).unwrap();
169 let mut handler = Server::new(DummyService);
170 let result = handler.run(request, Context::default());
171 assert!(
172 result.is_ok(),
173 format!("event was not handled as expected {:?}", result)
174 );
175 }
176}