use std::time::Duration;
use rand::{RngExt, rngs::StdRng};
use reqwest::{
RequestBuilder, Response, StatusCode,
header::{self, AUTHORIZATION, HeaderMap},
};
use serde::{Deserialize, Serialize, de::DeserializeOwned};
use crate::error::{Error, ErrorResponse, LineLoginErrorResponse};
pub mod error;
pub mod line_login;
pub mod messaging_api;
pub mod option;
#[cfg(feature = "mock")]
pub mod mock;
pub use option::{
LineOptions, LineOptionsBuilder, LineRequestLog, LineResponseLog, OnRequest, OnResponse,
REDACTED_BODY_KEYS, ResponseBody,
};
pub(crate) use option::{CapturedRequest, run_log_callback, serialize_log_body};
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct LineResponseHeader {
pub request_id: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub accepted_request_id: Option<String>,
}
pub(crate) fn make_url(postfix_url: &str, options: &LineOptions) -> String {
format!("{}{postfix_url}", options.resolve_prefix_url())
}
pub(crate) fn apply_auth(builder: RequestBuilder, channel_access_token: &str) -> RequestBuilder {
builder.header(AUTHORIZATION, format!("Bearer {channel_access_token}"))
}
pub(crate) fn apply_timeout(builder: RequestBuilder, options: &LineOptions) -> RequestBuilder {
let timeout_duration = options.get_timeout_duration();
if timeout_duration.is_zero() {
builder
} else {
builder.timeout(timeout_duration)
}
}
pub(crate) fn is_standard_retry(status_code: StatusCode) -> bool {
status_code.is_server_error() || status_code == StatusCode::TOO_MANY_REQUESTS
}
pub(crate) fn make_line_header(response: &Response) -> LineResponseHeader {
let headers: &header::HeaderMap = response.headers();
let request_id = headers
.get("X-Line-Request-Id")
.map(|it| {
it.to_str().unwrap_or_else(|_| {
tracing::warn!("X-Line-Request-Id present but not valid ASCII; recording empty");
""
})
})
.unwrap_or("");
let accepted_request_id = headers.get("X-Line-Accepted-Request-Id").map(|it| {
it.to_str()
.unwrap_or_else(|_| {
tracing::warn!(
"X-Line-Accepted-Request-Id present but not valid ASCII; recording empty"
);
""
})
.to_string()
});
LineResponseHeader {
request_id: request_id.to_owned(),
accepted_request_id,
}
}
pub(crate) fn calc_retry_duration(
retry_duration: Duration,
try_count: u32,
rng: &mut StdRng,
) -> Duration {
let jitter = Duration::from_millis(rng.random_range(0..100));
let retry_count = 2u64.pow(try_count) as u32;
retry_duration * retry_count + jitter
}
pub(crate) async fn execute_api_raw(
builder: RequestBuilder,
allow_conflict: bool,
options: &LineOptions,
request_value: &serde_json::Value,
) -> Result<(serde_json::Value, LineResponseHeader, StatusCode), Box<Error>> {
let need_log = options.on_request.is_some() || options.on_response.is_some();
let captured: Option<CapturedRequest> = if need_log {
builder
.try_clone()
.and_then(|b| b.build().ok())
.map(|req| CapturedRequest {
headers: req.headers().clone(),
method: req.method().clone(),
path: req.url().path().to_string(),
query: req.url().query().map(|q| q.to_string()),
})
} else {
None
};
if need_log && captured.is_none() {
tracing::debug!(
"request capture (try_clone/build) failed; headers/method/path/query will be None in logs"
);
}
let redacted_body_keys = options.get_redacted_body_keys();
if let Some(cb) = &options.on_request {
run_log_callback("on_request", || {
cb(&LineRequestLog::new(
captured.as_ref(),
request_value,
redacted_body_keys,
));
});
}
let response = builder
.send()
.await
.map_err(|err| Box::new(Error::Reqwest(err)))?;
let status_code = response.status();
let line_header = make_line_header(&response);
let response_headers = if options.on_response.is_some() {
response.headers().clone()
} else {
HeaderMap::new()
};
let text = response
.text()
.await
.map_err(|err| Box::new(Error::Reqwest(err)))?;
let json_result = serde_json::from_str::<serde_json::Value>(&text);
if let Some(cb) = &options.on_response {
let response_body = match json_result.as_ref() {
Ok(value) => ResponseBody::Json(value.clone()),
Err(_) => ResponseBody::Raw(text.clone()),
};
run_log_callback("on_response", || {
cb(
&LineRequestLog::new(captured.as_ref(), request_value, redacted_body_keys),
&LineResponseLog::new(
&response_headers,
response_body,
status_code,
redacted_body_keys,
),
);
});
}
let Ok(json) = json_result else {
return Err(Box::new(Error::OtherText(text, status_code, line_header)));
};
if status_code.is_success() || (allow_conflict && status_code == StatusCode::CONFLICT) {
Ok((json, line_header, status_code))
} else {
match serde_json::from_value::<ErrorResponse>(json.clone()) {
Ok(error_response) => Err(Box::new(Error::Line(
error_response,
status_code,
line_header,
))),
Err(_) => match serde_json::from_value::<LineLoginErrorResponse>(json.clone()) {
Ok(error_response) => Err(Box::new(Error::LineLogin(
error_response,
status_code,
line_header,
))),
Err(_) => Err(Box::new(Error::OtherJson(json, status_code, line_header))),
},
}
}
}
const HEADER_RETRY_KEY: &str = "X-Line-Retry-Key";
pub(crate) async fn execute_api<T, F>(
f: impl Fn() -> RequestBuilder,
options: &LineOptions,
is_retry: F,
retry_key: Option<String>,
request_value_fn: impl FnOnce() -> serde_json::Value,
) -> Result<(T, LineResponseHeader), Box<Error>>
where
T: DeserializeOwned,
F: Fn(StatusCode) -> bool,
{
let mut res = Err(Error::Invalid("fail loop".to_string()));
let try_count = options.get_try_count();
let retry_duration: Duration = options.get_retry_duration();
let request_value = if options.on_request.is_some() || options.on_response.is_some() {
request_value_fn()
} else {
serde_json::Value::Null
};
let mut rng: StdRng = rand::make_rng();
for i in 0..try_count {
let mut builder = f();
if let Some(retry_key) = &retry_key
&& try_count > 1
{
builder = builder.header(HEADER_RETRY_KEY, retry_key);
}
match execute_api_raw(builder, retry_key.is_some(), options, &request_value).await {
Ok((json, line_header, status_code)) => {
res = match serde_json::from_value(json.clone()) {
Ok(data) => Ok((data, line_header)),
Err(_err) => match serde_json::from_value::<ErrorResponse>(json.clone()) {
Ok(error_response) => {
Err(Error::Line(error_response, status_code, line_header))
}
Err(_) => {
match serde_json::from_value::<LineLoginErrorResponse>(json.clone()) {
Ok(error_response) => {
Err(Error::LineLogin(error_response, status_code, line_header))
}
Err(_) => Err(Error::OtherJson(json, status_code, line_header)),
}
}
},
};
break;
}
Err(err) => {
tracing::debug!("error: {:?}", err);
if !is_retry(
err.status_code()
.unwrap_or(StatusCode::INTERNAL_SERVER_ERROR),
) {
res = Err(*err);
break;
}
if i + 1 >= try_count {
res = Err(*err);
} else if !retry_duration.is_zero() {
tokio::time::sleep(calc_retry_duration(retry_duration, i as u32, &mut rng))
.await;
}
}
}
}
res.map_err(Box::new)
}
#[cfg(test)]
mod tests {
use super::*;
#[cfg(feature = "mock")]
#[tokio::test]
async fn test_no_callback_skips_request_value_fn() {
let mut server = mockito::Server::new_async().await;
let mock = server
.mock("GET", "/test")
.with_status(200)
.with_body("{}")
.create_async()
.await;
let url = format!("{}/test", server.url());
let options = LineOptions::default();
let result: Result<(serde_json::Value, LineResponseHeader), _> = execute_api(
|| reqwest::Client::new().get(&url),
&options,
is_standard_retry,
None,
|| panic!("request_value_fn must not be called when no callback is set"),
)
.await;
assert!(result.is_ok());
mock.assert_async().await;
}
#[cfg(feature = "mock")]
#[tokio::test]
async fn test_callback_panic_does_not_fail_api() {
let mut server = mockito::Server::new_async().await;
let mock = server
.mock("GET", "/test")
.with_status(200)
.with_body("{}")
.create_async()
.await;
let url = format!("{}/test", server.url());
let options = LineOptions::builder()
.with_on_request(|_log| panic!("on_request callback panics"))
.with_on_response(|_req, _res| panic!("on_response callback panics"))
.build();
let result: Result<(serde_json::Value, LineResponseHeader), _> = execute_api(
|| reqwest::Client::new().get(&url),
&options,
is_standard_retry,
None,
|| serde_json::Value::Null,
)
.await;
assert!(result.is_ok(), "callback panic must not fail the API call");
mock.assert_async().await;
}
#[cfg(feature = "mock")]
#[tokio::test]
async fn test_error_status_invalid_grant_is_line_login() {
let mut server = mockito::Server::new_async().await;
let mock = server
.mock("POST", "/test")
.with_status(400)
.with_body(r#"{"error":"invalid_grant","error_description":"invalid grant"}"#)
.create_async()
.await;
let url = format!("{}/test", server.url());
let options = LineOptions::default();
let result: Result<(serde_json::Value, LineResponseHeader), _> = execute_api(
|| reqwest::Client::new().post(&url),
&options,
is_standard_retry,
None,
|| serde_json::Value::Null,
)
.await;
match result {
Err(boxed) => match *boxed {
Error::LineLogin(resp, status_code, _) => {
assert_eq!(resp.error, "invalid_grant");
assert_eq!(status_code, StatusCode::BAD_REQUEST);
}
other => panic!("expected Error::LineLogin, got {other:?}"),
},
Ok(_) => panic!("expected an error"),
}
mock.assert_async().await;
}
#[cfg(feature = "mock")]
#[tokio::test]
async fn test_success_status_line_login_error_body() {
#[derive(serde::Deserialize, Debug)]
struct Dummy {
#[allow(dead_code)]
id: u64,
}
let mut server = mockito::Server::new_async().await;
let mock = server
.mock("POST", "/test")
.with_status(200)
.with_body(r#"{"error":"invalid_grant","error_description":"invalid grant"}"#)
.create_async()
.await;
let url = format!("{}/test", server.url());
let options = LineOptions::default();
let result: Result<(Dummy, LineResponseHeader), _> = execute_api(
|| reqwest::Client::new().post(&url),
&options,
is_standard_retry,
None,
|| serde_json::Value::Null,
)
.await;
match result {
Err(boxed) => match *boxed {
Error::LineLogin(resp, status_code, _) => {
assert_eq!(resp.error, "invalid_grant");
assert_eq!(status_code, StatusCode::OK);
}
other => panic!("expected Error::LineLogin, got {other:?}"),
},
Ok(_) => panic!("expected an error"),
}
mock.assert_async().await;
}
#[cfg(feature = "mock")]
#[tokio::test]
async fn test_error_status_message_is_line() {
let mut server = mockito::Server::new_async().await;
let mock = server
.mock("POST", "/test")
.with_status(400)
.with_body(r#"{"message":"bad request"}"#)
.create_async()
.await;
let url = format!("{}/test", server.url());
let options = LineOptions::default();
let result: Result<(serde_json::Value, LineResponseHeader), _> = execute_api(
|| reqwest::Client::new().post(&url),
&options,
is_standard_retry,
None,
|| serde_json::Value::Null,
)
.await;
match result {
Err(boxed) => match *boxed {
Error::Line(resp, status_code, _) => {
assert_eq!(resp.message, "bad request");
assert_eq!(status_code, StatusCode::BAD_REQUEST);
}
other => panic!("expected Error::Line, got {other:?}"),
},
Ok(_) => panic!("expected an error"),
}
mock.assert_async().await;
}
#[cfg(feature = "mock")]
#[tokio::test]
async fn test_error_status_unrecognized_is_other_json() {
let mut server = mockito::Server::new_async().await;
let mock = server
.mock("POST", "/test")
.with_status(400)
.with_body(r#"{"foo":"bar"}"#)
.create_async()
.await;
let url = format!("{}/test", server.url());
let options = LineOptions::default();
let result: Result<(serde_json::Value, LineResponseHeader), _> = execute_api(
|| reqwest::Client::new().post(&url),
&options,
is_standard_retry,
None,
|| serde_json::Value::Null,
)
.await;
match result {
Err(boxed) => match *boxed {
Error::OtherJson(json, status_code, _) => {
assert_eq!(json["foo"], "bar");
assert_eq!(status_code, StatusCode::BAD_REQUEST);
}
other => panic!("expected Error::OtherJson, got {other:?}"),
},
Ok(_) => panic!("expected an error"),
}
mock.assert_async().await;
}
}