#![doc = include_str!("../README.md")]
#![doc(
test(attr(deny(warnings))),
html_favicon_url = "https://raw.githubusercontent.com/helsing-ai/twurst/main/docs/img/twurst.png",
html_logo_url = "https://raw.githubusercontent.com/helsing-ai/twurst/main/docs/img/twurst.png"
)]
#![cfg_attr(docsrs, feature(doc_auto_cfg))]
use http::header::CONTENT_TYPE;
use http::{HeaderMap, HeaderName, HeaderValue, Method, Request, Response, StatusCode};
use http_body::{Body, Frame, SizeHint};
use http_body_util::BodyExt;
use prost_reflect::bytes::{Buf, Bytes, BytesMut};
use prost_reflect::{DeserializeOptions, DynamicMessage, ReflectMessage};
use serde::Serialize;
use std::convert::Infallible;
use std::error::Error;
use std::future::poll_fn;
use std::mem::take;
use std::pin::Pin;
use std::task::{Context, Poll};
use tower_service::Service;
pub use twurst_error::{TwirpError, TwirpErrorCode};
const APPLICATION_JSON: HeaderValue = HeaderValue::from_static("application/json");
const APPLICATION_PROTOBUF: HeaderValue = HeaderValue::from_static("application/protobuf");
#[derive(Clone)]
pub struct TwirpHttpClient<S: TwirpHttpService> {
service: S,
base_url: Option<String>,
use_json: bool,
}
#[cfg(feature = "reqwest-012")]
impl TwirpHttpClient<Reqwest012Service> {
pub fn new_using_reqwest_012(base_url: impl Into<String>) -> Self {
Self::new_with_reqwest_012_client(reqwest_012::Client::new(), base_url)
}
pub fn new_with_reqwest_012_client(
client: reqwest_012::Client,
base_url: impl Into<String>,
) -> Self {
Self::new_with_base(Reqwest012Service(client), base_url)
}
}
#[cfg(feature = "reqwest-013")]
impl TwirpHttpClient<Reqwest013Service> {
pub fn new_using_reqwest_013(base_url: impl Into<String>) -> Self {
Self::new_with_reqwest_013_client(reqwest_013::Client::new(), base_url)
}
pub fn new_with_reqwest_013_client(
client: reqwest_013::Client,
base_url: impl Into<String>,
) -> Self {
Self::new_with_base(Reqwest013Service(client), base_url)
}
}
impl<S: TwirpHttpService> TwirpHttpClient<S> {
pub fn new_with_base(service: S, base_url: impl Into<String>) -> Self {
let mut base_url = base_url.into();
if base_url.ends_with('/') {
base_url.pop();
}
Self {
service,
base_url: Some(base_url),
use_json: false,
}
}
pub fn new(service: S) -> Self {
Self {
service,
base_url: None,
use_json: false,
}
}
pub fn use_json(&mut self) {
self.use_json = true;
}
pub fn use_binary_protobuf(&mut self) {
self.use_json = false;
}
pub async fn call<I: ReflectMessage, O: ReflectMessage + Default>(
&self,
path: &str,
request: &I,
) -> Result<O, TwirpError> {
self.call_builder(path, request).send().await
}
pub fn call_builder<'a, I: ReflectMessage>(
&'a self,
path: &'a str,
request: &'a I,
) -> TwirpCallBuilder<'a, S, I> {
let uri = match &self.base_url {
Some(base) => format!("{base}{path}"),
None => path.to_string(),
};
TwirpCallBuilder {
client: self,
request,
builder: Request::builder().method(Method::POST).uri(uri),
}
}
fn encode_body<T: ReflectMessage>(&self, message: &T) -> Result<TwirpRequestBody, TwirpError> {
if self.use_json {
Ok(json_encode(message)?.into())
} else {
let mut buffer = BytesMut::with_capacity(message.encoded_len());
message.encode(&mut buffer).map_err(|e| {
TwirpError::wrap(
TwirpErrorCode::Internal,
format!("Failed to serialize to protobuf: {e}"),
e,
)
})?;
Ok(Bytes::from(buffer).into())
}
}
fn content_type(&self) -> HeaderValue {
if self.use_json {
APPLICATION_JSON
} else {
APPLICATION_PROTOBUF
}
}
async fn extract_response<T: ReflectMessage + Default>(
&self,
response: Response<S::ResponseBody>,
) -> Result<T, TwirpError> {
let (parts, body) = response.into_parts();
let body = body.collect().await.map_err(|e| {
TwirpError::wrap(
TwirpErrorCode::Internal,
format!("Failed to load request body: {e}"),
e,
)
})?;
let response = Response::from_parts(parts, body);
if response.status() != StatusCode::OK {
return Err(response.map(|b| b.to_bytes()).into());
}
let content_type = response.headers().get(CONTENT_TYPE).cloned();
let body = response.into_body();
if content_type == Some(APPLICATION_PROTOBUF) {
T::decode(body.aggregate()).map_err(|e| {
TwirpError::wrap(
TwirpErrorCode::Malformed,
format!("Bad response binary protobuf encoding: {e}"),
e,
)
})
} else if content_type == Some(APPLICATION_JSON) {
json_decode(&body.to_bytes())
} else if let Some(content_type) = content_type {
Err(TwirpError::malformed(format!(
"Unsupported response content-type: {}",
String::from_utf8_lossy(content_type.as_bytes())
)))
} else {
Err(TwirpError::malformed("No content-type in the response"))
}
}
}
#[must_use = "TwirpCallBuilder does nothing until `.send()` is awaited"]
pub struct TwirpCallBuilder<'a, S: TwirpHttpService, I> {
client: &'a TwirpHttpClient<S>,
request: &'a I,
builder: http::request::Builder,
}
impl<'a, S: TwirpHttpService, I: ReflectMessage> TwirpCallBuilder<'a, S, I> {
pub fn header<K, V>(mut self, name: K, value: V) -> Self
where
HeaderName: TryFrom<K>,
<HeaderName as TryFrom<K>>::Error: Into<http::Error>,
HeaderValue: TryFrom<V>,
<HeaderValue as TryFrom<V>>::Error: Into<http::Error>,
{
self.builder = self.builder.header(name, value);
self
}
pub fn headers_mut(&mut self) -> Option<&mut HeaderMap> {
self.builder.headers_mut()
}
pub async fn send<O: ReflectMessage + Default>(self) -> Result<O, TwirpError> {
let TwirpCallBuilder {
client,
request,
mut builder,
} = self;
client.service.ready().await.map_err(|e| {
TwirpError::wrap(
TwirpErrorCode::Unknown,
format!("Service is not ready: {e}"),
e,
)
})?;
let body = client.encode_body(request)?;
if let Some(headers) = builder.headers_mut() {
headers.insert(CONTENT_TYPE, client.content_type());
}
let http_request = builder.body(body).map_err(|e| {
TwirpError::wrap(
TwirpErrorCode::Malformed,
format!("Failed to construct request: {e}"),
e,
)
})?;
let response = client.service.call(http_request).await.map_err(|e| {
TwirpError::wrap(
TwirpErrorCode::Unknown,
format!("Transport error during the request: {e}"),
e,
)
})?;
client.extract_response(response).await
}
}
#[trait_variant::make(Send)]
pub trait TwirpHttpService: 'static {
type ResponseBody: Body<Error: Error + Send + Sync>;
type Error: Error + Send + Sync + 'static;
async fn ready(&self) -> Result<(), Self::Error>;
async fn call(
&self,
request: Request<TwirpRequestBody>,
) -> Result<Response<Self::ResponseBody>, Self::Error>;
}
impl<
S: Service<
Request<TwirpRequestBody>,
Error: Error + Send + Sync + 'static,
Response = Response<RespBody>,
Future: Send,
> + Clone
+ Send
+ Sync
+ 'static,
RespBody: Body<Error: Error + Send + Sync + 'static>,
> TwirpHttpService for S
{
type ResponseBody = RespBody;
type Error = S::Error;
async fn ready(&self) -> Result<(), Self::Error> {
poll_fn(|cx| Service::poll_ready(&mut self.clone(), cx)).await
}
async fn call(
&self,
request: Request<TwirpRequestBody>,
) -> Result<Response<RespBody>, S::Error> {
Service::call(&mut self.clone(), request).await
}
}
pub struct TwirpRequestBody(Bytes);
impl From<Bytes> for TwirpRequestBody {
#[inline]
fn from(body: Bytes) -> Self {
Self(body)
}
}
impl From<TwirpRequestBody> for Bytes {
#[inline]
fn from(body: TwirpRequestBody) -> Self {
body.0
}
}
impl Body for TwirpRequestBody {
type Data = Bytes;
type Error = Infallible;
#[inline]
fn poll_frame(
mut self: Pin<&mut Self>,
_cx: &mut Context<'_>,
) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
let data = take(&mut self.0);
Poll::Ready(if data.has_remaining() {
Some(Ok(Frame::data(data)))
} else {
None
})
}
#[inline]
fn is_end_stream(&self) -> bool {
!self.0.has_remaining()
}
#[inline]
fn size_hint(&self) -> SizeHint {
SizeHint::with_exact(self.0.remaining() as u64)
}
}
fn json_encode<T: ReflectMessage>(message: &T) -> Result<Bytes, TwirpError> {
let mut serializer = serde_json::Serializer::new(Vec::new());
message
.transcode_to_dynamic()
.serialize(&mut serializer)
.map_err(|e| {
TwirpError::wrap(
TwirpErrorCode::Malformed,
format!("Failed to serialize request to JSON: {e}"),
e,
)
})?;
Ok(serializer.into_inner().into())
}
fn json_decode<T: ReflectMessage + Default>(message: &[u8]) -> Result<T, TwirpError> {
let dynamic_message = dynamic_json_decode::<T>(message).map_err(|e| {
TwirpError::wrap(
TwirpErrorCode::Malformed,
format!("Failed to parse JSON response: {e}"),
e,
)
})?;
dynamic_message.transcode_to().map_err(|e| {
TwirpError::internal(format!(
"Internal error while parsing the JSON response: {e}"
))
})
}
fn dynamic_json_decode<T: ReflectMessage + Default>(
message: &[u8],
) -> Result<DynamicMessage, serde_json::Error> {
let mut deserializer = serde_json::Deserializer::from_slice(message);
let dynamic_message = DynamicMessage::deserialize_with_options(
T::default().descriptor(),
&mut deserializer,
&DeserializeOptions::new().deny_unknown_fields(false),
)?;
deserializer.end()?;
Ok(dynamic_message)
}
#[cfg(feature = "reqwest-012")]
#[derive(Clone, Default)]
pub struct Reqwest012Service(reqwest_012::Client);
#[cfg(feature = "reqwest-012")]
impl Reqwest012Service {
#[inline]
pub fn new() -> Self {
reqwest_012::Client::new().into()
}
}
#[cfg(feature = "reqwest-012")]
impl From<reqwest_012::Client> for Reqwest012Service {
#[inline]
fn from(client: reqwest_012::Client) -> Self {
Self(client)
}
}
#[cfg(feature = "reqwest-012")]
impl<B: Into<reqwest_012::Body>> Service<Request<B>> for Reqwest012Service {
type Response = Response<reqwest_012::Body>;
type Error = reqwest_012::Error;
type Future = Pin<
Box<dyn Future<Output = Result<Response<reqwest_012::Body>, reqwest_012::Error>> + Send>,
>;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.0.poll_ready(cx)
}
fn call(&mut self, req: Request<B>) -> Self::Future {
let req = match req.try_into() {
Ok(req) => req,
Err(e) => return Box::pin(async move { Err(e) }),
};
let future = self.0.call(req);
Box::pin(async move { Ok(future.await?.into()) })
}
}
#[cfg(feature = "reqwest-012")]
impl From<TwirpRequestBody> for reqwest_012::Body {
#[inline]
fn from(body: TwirpRequestBody) -> Self {
body.0.into()
}
}
#[cfg(feature = "reqwest-013")]
#[derive(Clone, Default)]
pub struct Reqwest013Service(reqwest_013::Client);
#[cfg(feature = "reqwest-013")]
impl Reqwest013Service {
#[inline]
pub fn new() -> Self {
reqwest_013::Client::new().into()
}
}
#[cfg(feature = "reqwest-013")]
impl From<reqwest_013::Client> for Reqwest013Service {
#[inline]
fn from(client: reqwest_013::Client) -> Self {
Self(client)
}
}
#[cfg(feature = "reqwest-013")]
impl<B: Into<reqwest_013::Body>> Service<Request<B>> for Reqwest013Service {
type Response = Response<reqwest_013::Body>;
type Error = reqwest_013::Error;
type Future = Pin<
Box<dyn Future<Output = Result<Response<reqwest_013::Body>, reqwest_013::Error>> + Send>,
>;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.0.poll_ready(cx)
}
fn call(&mut self, req: Request<B>) -> Self::Future {
let req = match req.try_into() {
Ok(req) => req,
Err(e) => return Box::pin(async move { Err(e) }),
};
let future = self.0.call(req);
Box::pin(async move { Ok(future.await?.into()) })
}
}
#[cfg(feature = "reqwest-013")]
impl From<TwirpRequestBody> for reqwest_013::Body {
#[inline]
fn from(body: TwirpRequestBody) -> Self {
body.0.into()
}
}
#[cfg(test)]
mod tests {
use super::*;
use prost_reflect::ReflectMessage;
use prost_reflect::prost::Message;
use prost_reflect::prost_types::Timestamp;
use std::future::Ready;
use std::io;
use std::task::{Context, Poll};
use tower::service_fn;
const FILE_DESCRIPTOR_SET_BYTES: &[u8] = &[
10, 107, 10, 21, 101, 120, 97, 109, 112, 108, 101, 95, 115, 101, 114, 118, 105, 99, 101,
46, 112, 114, 111, 116, 111, 18, 7, 112, 97, 99, 107, 97, 103, 101, 34, 11, 10, 9, 77, 121,
77, 101, 115, 115, 97, 103, 101, 74, 52, 10, 6, 18, 4, 0, 0, 5, 1, 10, 8, 10, 1, 12, 18, 3,
0, 0, 18, 10, 8, 10, 1, 2, 18, 3, 2, 0, 16, 10, 10, 10, 2, 4, 0, 18, 4, 4, 0, 5, 1, 10, 10,
10, 3, 4, 0, 1, 18, 3, 4, 8, 17, 98, 6, 112, 114, 111, 116, 111, 51,
];
#[derive(Message, ReflectMessage, PartialEq)]
#[prost_reflect(
file_descriptor_set_bytes = "crate::tests::FILE_DESCRIPTOR_SET_BYTES",
message_name = "package.MyMessage"
)]
pub struct MyMessage {}
#[tokio::test]
async fn not_ready_service() -> Result<(), Box<dyn Error>> {
#[derive(Clone)]
struct NotReadyService;
impl<S> Service<S> for NotReadyService {
type Response = Response<String>;
type Error = TwirpError;
type Future = Ready<Result<Response<String>, TwirpError>>;
fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Err(TwirpError::internal("foo")))
}
fn call(&mut self, _: S) -> Self::Future {
unimplemented!()
}
}
let client = TwirpHttpClient::new(NotReadyService);
assert_eq!(
client
.call::<_, Timestamp>("", &Timestamp::default())
.await
.unwrap_err()
.to_string(),
"Twirp Unknown error: Service is not ready: Twirp Internal error: foo"
);
Ok(())
}
#[tokio::test]
async fn json_request_without_base_ok() -> Result<(), Box<dyn Error>> {
let service = service_fn(|request: Request<TwirpRequestBody>| async move {
assert_eq!(request.method(), Method::POST);
assert_eq!(request.uri(), "/foo");
Ok::<_, TwirpError>(
Response::builder()
.header(CONTENT_TYPE, APPLICATION_JSON)
.body("\"1970-01-01T00:00:10Z\"".to_string())
.unwrap(),
)
});
let mut client = TwirpHttpClient::new(service);
client.use_json();
let response = client
.call::<_, Timestamp>(
"/foo",
&Timestamp {
seconds: 10,
nanos: 0,
},
)
.await?;
assert_eq!(
response,
Timestamp {
seconds: 10,
nanos: 0
}
);
Ok(())
}
#[tokio::test]
async fn call_builder_with_header_ok() -> Result<(), Box<dyn Error>> {
let service = service_fn(|request: Request<TwirpRequestBody>| async move {
assert_eq!(request.method(), Method::POST);
assert_eq!(request.uri(), "/foo");
assert_eq!(
request.headers().get(http::header::AUTHORIZATION),
Some(&HeaderValue::from_static("Bearer token"))
);
assert_eq!(
request.headers().get("x-request-id"),
Some(&HeaderValue::from_static("abc-123"))
);
assert_eq!(
request.headers().get(CONTENT_TYPE),
Some(&APPLICATION_PROTOBUF)
);
Ok::<_, TwirpError>(
Response::builder()
.header(CONTENT_TYPE, APPLICATION_JSON)
.body("\"1970-01-01T00:00:10Z\"".to_string())
.unwrap(),
)
});
let client = TwirpHttpClient::new(service);
let response: Timestamp = client
.call_builder(
"/foo",
&Timestamp {
seconds: 10,
nanos: 0,
},
)
.header(
http::header::AUTHORIZATION,
HeaderValue::from_static("Bearer token"),
)
.header("x-request-id", "abc-123")
.send()
.await?;
assert_eq!(
response,
Timestamp {
seconds: 10,
nanos: 0
}
);
Ok(())
}
#[tokio::test]
async fn call_builder_with_headers_map_ok() -> Result<(), Box<dyn Error>> {
let service = service_fn(|request: Request<TwirpRequestBody>| async move {
assert_eq!(
request.headers().get(http::header::AUTHORIZATION),
Some(&HeaderValue::from_static("Bearer token"))
);
Ok::<_, TwirpError>(
Response::builder()
.header(CONTENT_TYPE, APPLICATION_JSON)
.body("\"1970-01-01T00:00:10Z\"".to_string())
.unwrap(),
)
});
let client = TwirpHttpClient::new(service);
let mut headers = HeaderMap::new();
headers.insert(
http::header::AUTHORIZATION,
HeaderValue::from_static("Bearer token"),
);
let request = Timestamp::default();
let mut builder = client.call_builder("/foo", &request);
builder.headers_mut().unwrap().extend(headers);
let _response: Timestamp = builder.send().await?;
Ok(())
}
#[tokio::test]
async fn call_builder_invalid_header_name_surfaces_on_send() -> Result<(), Box<dyn Error>> {
let service = service_fn(|_: Request<TwirpRequestBody>| async move {
panic!("service must not be called when builder has a captured error");
#[allow(unreachable_code)]
Ok::<Response<String>, TwirpError>(Response::new(String::new()))
});
let client = TwirpHttpClient::new(service);
let err = client
.call_builder("/foo", &Timestamp::default())
.header("invalid header", "value")
.send::<Timestamp>()
.await
.unwrap_err();
assert_eq!(err.code(), TwirpErrorCode::Malformed);
assert!(
err.message().starts_with("Failed to construct request"),
"unexpected error message: {}",
err.message()
);
Ok(())
}
#[tokio::test]
async fn json_request_with_unknown_fields_ok() -> Result<(), Box<dyn Error>> {
let service = service_fn(|request: Request<TwirpRequestBody>| async move {
assert_eq!(request.method(), Method::POST);
assert_eq!(request.uri(), "/foo");
Ok::<_, TwirpError>(
Response::builder()
.header(CONTENT_TYPE, APPLICATION_JSON)
.body("{\"unknown_field\":\"ignored\"}".to_string())
.unwrap(),
)
});
let mut client = TwirpHttpClient::new(service);
client.use_json();
let response = client
.call::<_, MyMessage>("/foo", &MyMessage::default())
.await?;
assert_eq!(response, MyMessage::default());
Ok(())
}
#[cfg(feature = "reqwest-012")]
#[tokio::test]
async fn binary_request_without_base_ok_012() -> Result<(), Box<dyn Error>> {
let service = service_fn(|request: Request<TwirpRequestBody>| async move {
assert_eq!(request.method(), Method::POST);
assert_eq!(request.uri(), "/foo");
Ok::<_, TwirpError>(
Response::builder()
.header(CONTENT_TYPE, APPLICATION_PROTOBUF)
.body(reqwest_012::Body::from(
Timestamp {
seconds: 10,
nanos: 0,
}
.encode_to_vec(),
))
.unwrap(),
)
});
let response = TwirpHttpClient::new(service)
.call::<_, Timestamp>(
"/foo",
&Timestamp {
seconds: 10,
nanos: 0,
},
)
.await?;
assert_eq!(
response,
Timestamp {
seconds: 10,
nanos: 0
}
);
Ok(())
}
#[cfg(feature = "reqwest-013")]
#[tokio::test]
async fn binary_request_without_base_ok_013() -> Result<(), Box<dyn Error>> {
let service = service_fn(|request: Request<TwirpRequestBody>| async move {
assert_eq!(request.method(), Method::POST);
assert_eq!(request.uri(), "/foo");
Ok::<_, TwirpError>(
Response::builder()
.header(CONTENT_TYPE, APPLICATION_PROTOBUF)
.body(reqwest_013::Body::from(
Timestamp {
seconds: 10,
nanos: 0,
}
.encode_to_vec(),
))
.unwrap(),
)
});
let response = TwirpHttpClient::new(service)
.call::<_, Timestamp>(
"/foo",
&Timestamp {
seconds: 10,
nanos: 0,
},
)
.await?;
assert_eq!(
response,
Timestamp {
seconds: 10,
nanos: 0
}
);
Ok(())
}
#[tokio::test]
async fn request_with_base_twirp_error() -> Result<(), Box<dyn Error>> {
let service = service_fn(|request: Request<TwirpRequestBody>| async move {
assert_eq!(request.method(), Method::POST);
assert_eq!(request.uri(), "http://example.com/twirp/foo");
Ok::<Response<String>, TwirpError>(TwirpError::not_found("not found").into())
});
let response_error = TwirpHttpClient::new_with_base(service, "http://example.com/twirp")
.call::<_, Timestamp>(
"/foo",
&Timestamp {
seconds: 10,
nanos: 0,
},
)
.await
.unwrap_err();
assert_eq!(response_error, TwirpError::not_found("not found"));
Ok(())
}
#[tokio::test]
async fn request_with_base_other_error() -> Result<(), Box<dyn Error>> {
let service = service_fn(|request: Request<TwirpRequestBody>| async move {
assert_eq!(request.method(), Method::POST);
assert_eq!(request.uri(), "http://example.com/twirp/foo");
Ok::<Response<String>, TwirpError>(
Response::builder()
.status(StatusCode::UNAUTHORIZED)
.body("foo".to_string())
.unwrap(),
)
});
let response_error = TwirpHttpClient::new_with_base(service, "http://example.com/twirp/")
.call::<_, Timestamp>(
"/foo",
&Timestamp {
seconds: 10,
nanos: 0,
},
)
.await
.unwrap_err();
assert_eq!(response_error, TwirpError::unauthenticated("foo"));
Ok(())
}
#[tokio::test]
async fn request_transport_error() -> Result<(), Box<dyn Error>> {
let service = service_fn(|request: Request<TwirpRequestBody>| async move {
assert_eq!(request.method(), Method::POST);
assert_eq!(request.uri(), "/foo");
Err::<Response<String>, _>(io::Error::other("Transport error"))
});
let response_error = TwirpHttpClient::new(service)
.call::<_, Timestamp>(
"/foo",
&Timestamp {
seconds: 10,
nanos: 0,
},
)
.await
.unwrap_err();
assert_eq!(
response_error,
TwirpError::new(
TwirpErrorCode::Unknown,
"Transport error during the request: Transport error"
)
);
Ok(())
}
#[tokio::test]
async fn wrong_content_type_response() -> Result<(), Box<dyn Error>> {
let service = service_fn(|request: Request<TwirpRequestBody>| async move {
assert_eq!(request.method(), Method::POST);
assert_eq!(request.uri(), "/foo");
Ok::<Response<String>, TwirpError>(
Response::builder()
.status(StatusCode::OK)
.header(CONTENT_TYPE, "foo/bar")
.body("foo".into())
.unwrap(),
)
});
let response_error = TwirpHttpClient::new(service)
.call::<_, Timestamp>(
"/foo",
&Timestamp {
seconds: 10,
nanos: 0,
},
)
.await
.unwrap_err();
assert_eq!(
response_error,
TwirpError::malformed("Unsupported response content-type: foo/bar")
);
Ok(())
}
#[tokio::test]
async fn invalid_protobuf_response() -> Result<(), Box<dyn Error>> {
let service = service_fn(|request: Request<TwirpRequestBody>| async move {
assert_eq!(request.method(), Method::POST);
assert_eq!(request.uri(), "/foo");
Ok::<Response<String>, TwirpError>(
Response::builder()
.status(StatusCode::OK)
.header(CONTENT_TYPE, APPLICATION_PROTOBUF)
.body("azerty".into())
.unwrap(),
)
});
let mut client = TwirpHttpClient::new(service);
client.use_json();
let response_error = client
.call::<_, Timestamp>(
"/foo",
&Timestamp {
seconds: 10,
nanos: 0,
},
)
.await
.unwrap_err();
assert_eq!(
response_error,
TwirpError::malformed(
"Bad response binary protobuf encoding: failed to decode Protobuf message: buffer underflow"
)
);
Ok(())
}
#[tokio::test]
async fn invalid_json_response() -> Result<(), Box<dyn Error>> {
let service = service_fn(|request: Request<TwirpRequestBody>| async move {
assert_eq!(request.method(), Method::POST);
assert_eq!(request.uri(), "/foo");
Ok::<Response<String>, TwirpError>(
Response::builder()
.status(StatusCode::OK)
.header(CONTENT_TYPE, APPLICATION_JSON)
.body("foo".into())
.unwrap(),
)
});
let mut client = TwirpHttpClient::new(service);
client.use_json();
let response_error = client
.call::<_, Timestamp>(
"/foo",
&Timestamp {
seconds: 10,
nanos: 0,
},
)
.await
.unwrap_err();
assert_eq!(
response_error,
TwirpError::malformed(
"Failed to parse JSON response: expected ident at line 1 column 2"
)
);
Ok(())
}
#[tokio::test]
async fn response_future_is_send() {
fn is_send<T: Send>(_: T) {}
let service = service_fn(|_: Request<TwirpRequestBody>| async move {
Ok::<_, TwirpError>(Response::new(String::new()))
});
let client = TwirpHttpClient::new(service);
is_send(client.call::<_, Timestamp>("/foo", &Timestamp::default()));
}
}